From 54f52d37ca22a14bdb733d16f44e144e3ca57349 Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Wed, 18 Aug 2004 00:20:59 +0000
Subject: [PATCH] * don't allow concurrent requests for the same key - add them
 to a list of deferred requests which are notified on completion * query peers
 who are sending us bad references, just don't follow their suggestions. this
 is necessary since the peer may actually have the data (and other people may
 not be getting shitty references from them)

---
 .../KademliaNetworkDatabaseFacade.java        | 108 +++++++++++++++++-
 .../networkdb/kademlia/PeerSelector.java      |  26 +++--
 .../router/networkdb/kademlia/SearchJob.java  |   4 +
 3 files changed, 124 insertions(+), 14 deletions(-)

diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
index d6e742b8f8..47ef5fddb7 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
@@ -34,6 +34,7 @@ import net.i2p.data.RouterInfo;
 import net.i2p.data.i2np.DatabaseLookupMessage;
 import net.i2p.data.i2np.DatabaseStoreMessage;
 import net.i2p.router.Job;
+import net.i2p.router.JobImpl;
 import net.i2p.router.NetworkDatabaseFacade;
 import net.i2p.router.Router;
 import net.i2p.router.RouterContext;
@@ -71,7 +72,88 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
      * removed when the job decides to stop running.
      *
      */
-    private Set _publishingLeaseSets;
+    private Set _publishingLeaseSets;   
+    
+    /** 
+     * Hash of the key currently being searched for, pointing at a List of 
+     * DeferredSearchJob elements for each additional request waiting for that
+     * search to complete.
+     *
+     */
+    private Map _activeRequests;
+    
+    /**
+     * The search for the given key is no longer active
+     *
+     */
+    void searchComplete(Hash key) {
+        List deferred = null;
+        synchronized (_activeRequests) {
+            deferred = (List)_activeRequests.remove(key);
+        }
+        if (deferred != null) {
+            for (int i = 0; i < deferred.size(); i++) {
+                DeferredSearchJob j = (DeferredSearchJob)deferred.get(i);
+                _context.jobQueue().addJob(j);
+            }
+        }
+    }
+    
+    /**
+     * We want to search for a given key, but since there is already a job 
+     * out searching for it, we can just sit back and wait for them to finish.
+     * Perhaps we should also queue up a 'wakeup' job, in case that already
+     * active search won't expire/complete until after we time out?  Though in
+     * practice, pretty much all of the searches are the same duration...
+     *
+     * Anyway, this job is fired when that already active search completes -
+     * successfully or not - and either fires off the success task (or the fail
+     * task if we have expired), or it runs up its own search.
+     *
+     */
+    private class DeferredSearchJob extends JobImpl {
+        private Hash _key;
+        private Job _onFind;
+        private Job _onFailed;
+        private long _expiration;
+        private boolean _isLease;
+        
+        public DeferredSearchJob(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) {
+            super(KademliaNetworkDatabaseFacade.this._context);
+            _key = key;
+            _onFind = onFindJob;
+            _onFailed = onFailedLookupJob;
+            _isLease = isLease;
+            _expiration = getContext().clock().now() + timeoutMs;
+        }
+        public String getName() { return "Execute deferred search"; }
+        public void runJob() {
+            long remaining = getContext().clock().now() - _expiration;
+            if (remaining <= 0) {
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Deferred search for " + _key.toBase64() + " expired prior to sending");
+                if (_onFailed != null)
+                    getContext().jobQueue().addJob(_onFailed);                
+            } else {
+                // ok, didn't time out - either we have the key or we can search
+                // for it
+                LeaseSet ls = lookupLeaseSetLocally(_key);
+                if (ls == null) {
+                    RouterInfo ri = lookupRouterInfoLocally(_key);
+                    if (ri == null) {
+                        search(_key, _onFind, _onFailed, remaining, _isLease);
+                    } else {
+                        if (_onFind != null)
+                            getContext().jobQueue().addJob(_onFind);
+                    }
+                } else {
+                    if (_onFind != null)
+                        getContext().jobQueue().addJob(_onFind);
+                }
+            }
+        }
+    }
+
     
     /**
      * for the 10 minutes after startup, don't fail db entries so that if we were
@@ -96,6 +178,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
         _peerSelector = new PeerSelector(_context);
         _publishingLeaseSets = new HashSet(8);
         _lastExploreNew = 0;
+        _activeRequests = new HashMap(8);
     }
     
     KBucketSet getKBuckets() { return _kb; }
@@ -664,8 +747,27 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
      */
     private void search(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) {
         if (!_initialized) return;
-        // all searching is indirect (through tunnels) now
-        _context.jobQueue().addJob(new SearchJob(_context, this, key, onFindJob, onFailedLookupJob, timeoutMs, true, isLease));
+        int pendingRequests = 0;
+        boolean allowSearch = false;
+        synchronized (_activeRequests) {
+            List pending = (List)_activeRequests.get(key);
+            if (pending == null) {
+                _activeRequests.put(key, new ArrayList(0));
+                allowSearch = true;
+            } else {
+                pending.add(new DeferredSearchJob(key, onFindJob, onFailedLookupJob, timeoutMs, isLease));
+                pendingRequests = pending.size();
+                allowSearch = false;
+            }
+        }
+        if (allowSearch) {
+            _context.jobQueue().addJob(new SearchJob(_context, this, key, onFindJob, onFailedLookupJob, 
+                                                     timeoutMs, true, isLease));
+        } else {
+            if (_log.shouldLog(Log.INFO))
+                _log.info("Deferring search for " + key.toBase64() + ": there are " + pendingRequests
+                          + " other concurrent requests for it");
+        }
     }
     
     private Set getLeases() {
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java
index baaa7a8111..62042d68a0 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java
@@ -95,17 +95,21 @@ class PeerSelector {
                     failing = new ArrayList(4);
                 failing.add(cur);
             } else if (_context.profileOrganizer().peerSendsBadReplies(cur)) {
-                if (failing == null)
-                    failing = new ArrayList(4);
-                failing.add(cur);
-                if (_log.shouldLog(Log.WARN)) {
-                    PeerProfile profile = _context.profileOrganizer().getProfile(cur);
-                    if (profile != null) {
-                        RateStat invalidReplyRateStat = profile.getDBHistory().getInvalidReplyRate();
-                        Rate invalidReplyRate = invalidReplyRateStat.getRate(60*60*1000l);
-                        _log.warn("Peer " + cur.toBase64() + " sends us bad replies: current hour: " 
-                                  + invalidReplyRate.getCurrentEventCount() + " and last hour: " 
-                                  + invalidReplyRate.getLastEventCount() + ":\n" + invalidReplyRate.toString());
+                if (true) {
+                    _log.warn("Peer " + cur.toBase64() + " sends us bad replies (but we still query them)");
+                } else {
+                    if (failing == null)
+                        failing = new ArrayList(4);
+                    failing.add(cur);
+                    if (_log.shouldLog(Log.WARN)) {
+                        PeerProfile profile = _context.profileOrganizer().getProfile(cur);
+                        if (profile != null) {
+                            RateStat invalidReplyRateStat = profile.getDBHistory().getInvalidReplyRate();
+                            Rate invalidReplyRate = invalidReplyRateStat.getRate(60*60*1000l);
+                            _log.warn("Peer " + cur.toBase64() + " sends us bad replies: current hour: " 
+                                      + invalidReplyRate.getCurrentEventCount() + " and last hour: " 
+                                      + invalidReplyRate.getLastEventCount() + ":\n" + invalidReplyRate.toString());
+                        }
                     }
                 }
             }
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java
index 45f37cfafb..cbe838823c 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java
@@ -566,6 +566,8 @@ class SearchJob extends JobImpl {
         if (_onSuccess != null)
             getContext().jobQueue().addJob(_onSuccess);
         
+        _facade.searchComplete(_state.getTarget());
+        
         resend();
     }
     
@@ -600,6 +602,8 @@ class SearchJob extends JobImpl {
         }
         if (_onFailure != null)
             getContext().jobQueue().addJob(_onFailure);
+        
+        _facade.searchComplete(_state.getTarget());
     }
 
     public String getName() { return "Kademlia NetDb Search"; }
-- 
GitLab