diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index d6e742b8f861d4d229db55a8d4ba4d59ad56ebe7..47ef5fddb7b4c8e71a7791187a760d0a99b5190e 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -34,6 +34,7 @@ import net.i2p.data.RouterInfo; import net.i2p.data.i2np.DatabaseLookupMessage; import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.router.Job; +import net.i2p.router.JobImpl; import net.i2p.router.NetworkDatabaseFacade; import net.i2p.router.Router; import net.i2p.router.RouterContext; @@ -71,7 +72,88 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { * removed when the job decides to stop running. * */ - private Set _publishingLeaseSets; + private Set _publishingLeaseSets; + + /** + * Hash of the key currently being searched for, pointing at a List of + * DeferredSearchJob elements for each additional request waiting for that + * search to complete. + * + */ + private Map _activeRequests; + + /** + * The search for the given key is no longer active + * + */ + void searchComplete(Hash key) { + List deferred = null; + synchronized (_activeRequests) { + deferred = (List)_activeRequests.remove(key); + } + if (deferred != null) { + for (int i = 0; i < deferred.size(); i++) { + DeferredSearchJob j = (DeferredSearchJob)deferred.get(i); + _context.jobQueue().addJob(j); + } + } + } + + /** + * We want to search for a given key, but since there is already a job + * out searching for it, we can just sit back and wait for them to finish. + * Perhaps we should also queue up a 'wakeup' job, in case that already + * active search won't expire/complete until after we time out? Though in + * practice, pretty much all of the searches are the same duration... + * + * Anyway, this job is fired when that already active search completes - + * successfully or not - and either fires off the success task (or the fail + * task if we have expired), or it runs up its own search. + * + */ + private class DeferredSearchJob extends JobImpl { + private Hash _key; + private Job _onFind; + private Job _onFailed; + private long _expiration; + private boolean _isLease; + + public DeferredSearchJob(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) { + super(KademliaNetworkDatabaseFacade.this._context); + _key = key; + _onFind = onFindJob; + _onFailed = onFailedLookupJob; + _isLease = isLease; + _expiration = getContext().clock().now() + timeoutMs; + } + public String getName() { return "Execute deferred search"; } + public void runJob() { + long remaining = getContext().clock().now() - _expiration; + if (remaining <= 0) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Deferred search for " + _key.toBase64() + " expired prior to sending"); + if (_onFailed != null) + getContext().jobQueue().addJob(_onFailed); + } else { + // ok, didn't time out - either we have the key or we can search + // for it + LeaseSet ls = lookupLeaseSetLocally(_key); + if (ls == null) { + RouterInfo ri = lookupRouterInfoLocally(_key); + if (ri == null) { + search(_key, _onFind, _onFailed, remaining, _isLease); + } else { + if (_onFind != null) + getContext().jobQueue().addJob(_onFind); + } + } else { + if (_onFind != null) + getContext().jobQueue().addJob(_onFind); + } + } + } + } + /** * for the 10 minutes after startup, don't fail db entries so that if we were @@ -96,6 +178,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _peerSelector = new PeerSelector(_context); _publishingLeaseSets = new HashSet(8); _lastExploreNew = 0; + _activeRequests = new HashMap(8); } KBucketSet getKBuckets() { return _kb; } @@ -664,8 +747,27 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { */ private void search(Hash key, Job onFindJob, Job onFailedLookupJob, long timeoutMs, boolean isLease) { if (!_initialized) return; - // all searching is indirect (through tunnels) now - _context.jobQueue().addJob(new SearchJob(_context, this, key, onFindJob, onFailedLookupJob, timeoutMs, true, isLease)); + int pendingRequests = 0; + boolean allowSearch = false; + synchronized (_activeRequests) { + List pending = (List)_activeRequests.get(key); + if (pending == null) { + _activeRequests.put(key, new ArrayList(0)); + allowSearch = true; + } else { + pending.add(new DeferredSearchJob(key, onFindJob, onFailedLookupJob, timeoutMs, isLease)); + pendingRequests = pending.size(); + allowSearch = false; + } + } + if (allowSearch) { + _context.jobQueue().addJob(new SearchJob(_context, this, key, onFindJob, onFailedLookupJob, + timeoutMs, true, isLease)); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Deferring search for " + key.toBase64() + ": there are " + pendingRequests + + " other concurrent requests for it"); + } } private Set getLeases() { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java index baaa7a811144c6953b3175aa2692c2fdc3f85f70..62042d68a0afe927272faf547002211c77a56923 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java @@ -95,17 +95,21 @@ class PeerSelector { failing = new ArrayList(4); failing.add(cur); } else if (_context.profileOrganizer().peerSendsBadReplies(cur)) { - if (failing == null) - failing = new ArrayList(4); - failing.add(cur); - if (_log.shouldLog(Log.WARN)) { - PeerProfile profile = _context.profileOrganizer().getProfile(cur); - if (profile != null) { - RateStat invalidReplyRateStat = profile.getDBHistory().getInvalidReplyRate(); - Rate invalidReplyRate = invalidReplyRateStat.getRate(60*60*1000l); - _log.warn("Peer " + cur.toBase64() + " sends us bad replies: current hour: " - + invalidReplyRate.getCurrentEventCount() + " and last hour: " - + invalidReplyRate.getLastEventCount() + ":\n" + invalidReplyRate.toString()); + if (true) { + _log.warn("Peer " + cur.toBase64() + " sends us bad replies (but we still query them)"); + } else { + if (failing == null) + failing = new ArrayList(4); + failing.add(cur); + if (_log.shouldLog(Log.WARN)) { + PeerProfile profile = _context.profileOrganizer().getProfile(cur); + if (profile != null) { + RateStat invalidReplyRateStat = profile.getDBHistory().getInvalidReplyRate(); + Rate invalidReplyRate = invalidReplyRateStat.getRate(60*60*1000l); + _log.warn("Peer " + cur.toBase64() + " sends us bad replies: current hour: " + + invalidReplyRate.getCurrentEventCount() + " and last hour: " + + invalidReplyRate.getLastEventCount() + ":\n" + invalidReplyRate.toString()); + } } } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 45f37cfafb23d0760f4cf3a7c4e66095f60e8746..cbe838823cb97ba473d72ca865320ab8124d115a 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -566,6 +566,8 @@ class SearchJob extends JobImpl { if (_onSuccess != null) getContext().jobQueue().addJob(_onSuccess); + _facade.searchComplete(_state.getTarget()); + resend(); } @@ -600,6 +602,8 @@ class SearchJob extends JobImpl { } if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); + + _facade.searchComplete(_state.getTarget()); } public String getName() { return "Kademlia NetDb Search"; }