From dcc1861399fe9499399cd0cf4086aa989a37f8d3 Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 29 Jul 2011 20:43:52 +0000 Subject: [PATCH] * Netdb Search: - Follow all DSRM's, not just the last one, by moving the code from the match job to the selector - Update peer profile after SingleSearchJob - Cleanups, javadocs, log tweaks, final --- history.txt | 8 ++++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../kademlia/FloodOnlyLookupMatchJob.java | 22 ++--------- .../kademlia/FloodOnlyLookupSelector.java | 30 ++++++++++++--- .../kademlia/FloodOnlyLookupTimeoutJob.java | 7 ++-- .../kademlia/FloodOnlySearchJob.java | 37 +++++++++--------- .../networkdb/kademlia/FloodSearchJob.java | 38 +++++++++++++------ .../FloodfillNetworkDatabaseFacade.java | 5 ++- .../kademlia/FloodfillPeerSelector.java | 8 +++- .../networkdb/kademlia/SingleLookupJob.java | 6 +-- .../networkdb/kademlia/SingleSearchJob.java | 5 ++- 11 files changed, 102 insertions(+), 66 deletions(-) diff --git a/history.txt b/history.txt index d46297ab9..a5e3c0e35 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,11 @@ +2011-07-29 zzz + * Netdb Search: + - Follow all DSRM's, not just the last one, by moving the code + from the match job to the selector + - Update peer profile after SingleSearchJob + - Cleanups, javadocs, log tweaks, final + * ProfileOrganizer: Tweak fast tier size + 2011-07-28 zzz * Context: Split up big lock to avoid deadlocks * Streaming: Avoid a rare exception on race diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index e1f87e077..8ab24107c 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 16; + public final static long BUILD = 17; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupMatchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupMatchJob.java index e925f07db..d8c2f57da 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupMatchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupMatchJob.java @@ -14,7 +14,6 @@ import net.i2p.util.Log; class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob { private final Log _log; private final FloodOnlySearchJob _search; - private DatabaseSearchReplyMessage _dsrm; public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) { super(ctx); @@ -28,19 +27,8 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob { _log.info(_search.getJobId() + ": search match and found locally"); _search.success(); } else { - int remaining = _search.getLookupsRemaining(); - if (_log.shouldLog(Log.INFO)) - _log.info(_search.getJobId() + ": got a DatabaseSearchReply when we were looking for " - + _search.getKey().toBase64() + ", with " + remaining + " outstanding searches"); - // netDb reply pointing us at other people - // Only process if we don't know enough floodfills - // This only works if both reply, otherwise we aren't called - should be fixed - if (_search.shouldProcessDSRM() && _dsrm != null) { - if (_log.shouldLog(Log.INFO)) - _log.info(_search.getJobId() + ": Processing DatabaseSearchReply"); - // Chase the hashes from the reply - getContext().jobQueue().addJob(new SingleLookupJob(getContext(), _dsrm)); - } + // In practice, we always have zero remaining when this is called, + // because the selector only returns true when there is zero remaining _search.failed(); } } @@ -49,11 +37,9 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob { public void setMessage(I2NPMessage message) { if (message instanceof DatabaseSearchReplyMessage) { + // DSRM processing now in FloodOnlyLookupSelector instead of here, // a dsrm is only passed in when there are no more lookups remaining - // If more than one peer sent one, we only process the last one - // And sadly if the first peer sends a DRSM and the second one times out, - // this won't get called... - _dsrm = (DatabaseSearchReplyMessage) message; + // so that all DSRM's are processed, not just the last one. _search.failed(); return; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupSelector.java index 425dc2ee5..bba1749d1 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupSelector.java @@ -38,14 +38,32 @@ class FloodOnlyLookupSelector implements MessageSelector { } else if (message instanceof DatabaseSearchReplyMessage) { DatabaseSearchReplyMessage dsrm = (DatabaseSearchReplyMessage)message; if (_search.getKey().equals(dsrm.getSearchKey())) { - _search.decrementRemaining(dsrm.getFromHash()); - // assume 0 old, all new, 0 invalid, 0 dup + + // TODO - dsrm.getFromHash() can't be trusted - check against the list of + // those we sent the search to in _search ? + + // assume 0 new, all old, 0 invalid, 0 dup _context.profileManager().dbLookupReply(dsrm.getFromHash(), 0, dsrm.getNumReplies(), 0, 0, System.currentTimeMillis()-_search.getCreated()); - if (_search.getLookupsRemaining() <= 0) - return true; // ok, no more left, so time to fail - else - return false; + + // Moved from FloodOnlyLookupMatchJob so it is called for all replies + // rather than just the last one + // Got a netDb reply pointing us at other floodfills... + // Only process if we don't know enough floodfills or are starting up + if (_search.shouldProcessDSRM()) { + if (_log.shouldLog(Log.INFO)) + _log.info(_search.getJobId() + ": Processing DSRM via SingleLookupJob, apparently from " + dsrm.getFromHash()); + // Chase the hashes from the reply + _context.jobQueue().addJob(new SingleLookupJob(_context, dsrm)); + } else if (_log.shouldLog(Log.INFO)) { + int remaining = _search.getLookupsRemaining(); + _log.info(_search.getJobId() + ": got a DSRM apparently from " + dsrm.getFromHash() + " when we were looking for " + + _search.getKey() + ", with " + remaining + " outstanding searches"); + } + + // if no more left, time to fail + int remaining = _search.decrementRemaining(dsrm.getFromHash()); + return remaining <= 0; } } return false; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupTimeoutJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupTimeoutJob.java index da4783cb5..f58d3da3b 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupTimeoutJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupTimeoutJob.java @@ -6,17 +6,16 @@ import net.i2p.util.Log; class FloodOnlyLookupTimeoutJob extends JobImpl { private final FloodSearchJob _search; - private final Log _log; public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodOnlySearchJob job) { super(ctx); _search = job; - _log = ctx.logManager().getLog(getClass()); } public void runJob() { - if (_log.shouldLog(Log.INFO)) - _log.info(_search.getJobId() + ": search timed out"); + Log log = getContext().logManager().getLog(getClass()); + if (log.shouldLog(Log.INFO)) + log.info(_search.getJobId() + ": search timed out"); _search.failed(); } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java index 302f6d324..d7d7c54f0 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java @@ -38,24 +38,24 @@ import net.i2p.util.Log; */ class FloodOnlySearchJob extends FloodSearchJob { private volatile boolean _dead; - private final long _created; + protected final long _created; private boolean _shouldProcessDSRM; private final HashSet _unheardFrom; - private final List _out; + /** this is a marker to register with the MessageRegistry, it is never sent */ + private OutNetMessage _out; protected final MessageSelector _replySelector; protected final ReplyJob _onReply; protected final Job _onTimeout; + private static final int MIN_FOR_NO_DSRM = 4; + public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) { super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease); // these override the settings in super - _log = ctx.logManager().getLog(FloodOnlySearchJob.class); _timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT); _expiration = _timeoutMs + ctx.clock().now(); _origExpiration = _timeoutMs + ctx.clock().now(); - // do we need a synchronizedList, since we synch on _out everywhere below... - _out = Collections.synchronizedList(new ArrayList(2)); _unheardFrom = new HashSet(CONCURRENT_SEARCHES); _replySelector = new FloodOnlyLookupSelector(getContext(), this); _onReply = new FloodOnlyLookupMatchJob(getContext(), this); @@ -63,10 +63,10 @@ class FloodOnlySearchJob extends FloodSearchJob { _created = System.currentTimeMillis(); } + /** System time, NOT context time */ public long getCreated() { return _created; } + public boolean shouldProcessDSRM() { return _shouldProcessDSRM; } - private static final int CONCURRENT_SEARCHES = 2; - private static final int MIN_FOR_NO_DSRM = 4; @Override public void runJob() { @@ -105,8 +105,10 @@ class FloodOnlySearchJob extends FloodSearchJob { } Collections.shuffle(floodfillPeers, getContext().random()); } - OutNetMessage out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs); - synchronized (_out) { _out.add(out); } + + // This OutNetMessage is never used or sent (setMessage() is never called), it's only + // so we can register a reply selector. + _out = getContext().messageRegistry().registerPending(_replySelector, _onReply, _onTimeout, _timeoutMs); /******** // We need to randomize our ff selection, else we stay with the same ones since @@ -194,11 +196,15 @@ class FloodOnlySearchJob extends FloodSearchJob { @Override public String getName() { return "NetDb flood search (phase 1)"; } - /** Note that we heard from the peer */ - void decrementRemaining(Hash peer) { - decrementRemaining(); + /** + * Note that we heard from the peer + * + * @return number remaining after decrementing + */ + int decrementRemaining(Hash peer) { synchronized(_unheardFrom) { _unheardFrom.remove(peer); + return decrementRemaining(); } } @@ -208,12 +214,7 @@ class FloodOnlySearchJob extends FloodSearchJob { if (_dead) return; _dead = true; } - List outBuf = null; - synchronized (_out) { outBuf = new ArrayList(_out); } - for (int i = 0; i < outBuf.size(); i++) { - OutNetMessage out = (OutNetMessage)outBuf.get(i); - getContext().messageRegistry().unregisterPending(out); - } + getContext().messageRegistry().unregisterPending(_out); int timeRemaining = (int)(_origExpiration - getContext().clock().now()); if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created)); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java index b9aab934c..74e525990 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java @@ -29,7 +29,7 @@ import net.i2p.util.Log; * Note that this does NOT extend SearchJob. */ public class FloodSearchJob extends JobImpl { - protected Log _log; + protected final Log _log; protected final FloodfillNetworkDatabaseFacade _facade; protected final Hash _key; protected final List _onFind; @@ -43,24 +43,22 @@ public class FloodSearchJob extends JobImpl { public FloodSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) { super(ctx); - _log = ctx.logManager().getLog(FloodSearchJob.class); + _log = ctx.logManager().getLog(getClass()); _facade = facade; _key = key; - _onFind = new ArrayList(); + _onFind = new ArrayList(4); _onFind.add(onFind); - _onFailed = new ArrayList(); + _onFailed = new ArrayList(4); _onFailed.add(onFailed); - int timeout = -1; - timeout = timeoutMs / FLOOD_SEARCH_TIME_FACTOR; + int timeout = timeoutMs / FLOOD_SEARCH_TIME_FACTOR; if (timeout < timeoutMs) timeout = timeoutMs; _timeoutMs = timeout; _expiration = timeout + ctx.clock().now(); _origExpiration = timeoutMs + ctx.clock().now(); _isLease = isLease; - _lookupsRemaining = 0; - _dead = false; } + void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) { if (_dead) { getContext().jobQueue().addJob(onFailed); @@ -69,10 +67,14 @@ public class FloodSearchJob extends JobImpl { if (onFailed != null) synchronized (_onFailed) { _onFailed.add(onFailed); } } } + + /** using context clock */ public long getExpiration() { return _expiration; } - private static final int CONCURRENT_SEARCHES = 2; + + protected static final int CONCURRENT_SEARCHES = 2; private static final int FLOOD_SEARCH_TIME_FACTOR = 2; private static final int FLOOD_SEARCH_TIME_MIN = 30*1000; + public void runJob() { // pick some floodfill peers and send out the searches List floodfillPeers = _facade.getFloodfillPeers(); @@ -120,10 +122,21 @@ public class FloodSearchJob extends JobImpl { _facade.searchFull(_key, _onFind, _onFailed, _timeoutMs*FLOOD_SEARCH_TIME_FACTOR, _isLease); } } + public String getName() { return "NetDb search (phase 1)"; } protected Hash getKey() { return _key; } - protected void decrementRemaining() { if (_lookupsRemaining > 0) _lookupsRemaining--; } + + /** + * TODO AtomicInteger? + * @return number remaining after decrementing + */ + protected int decrementRemaining() { + if (_lookupsRemaining > 0) + return (--_lookupsRemaining); + return 0; + } + protected int getLookupsRemaining() { return _lookupsRemaining; } void failed() { @@ -144,6 +157,7 @@ public class FloodSearchJob extends JobImpl { getContext().jobQueue().addJob(removed.remove(0)); } } + void success() { if (_dead) return; if (_log.shouldLog(Log.INFO)) @@ -166,8 +180,8 @@ public class FloodSearchJob extends JobImpl { _search = job; } public void runJob() { - _search.decrementRemaining(); - if (_search.getLookupsRemaining() <= 0) + int remaining = _search.decrementRemaining(); + if (remaining <= 0) _search.failed(); } public String getName() { return "NetDb search (phase 1) timeout"; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java index e85537054..2039b8db5 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java @@ -33,7 +33,7 @@ import net.i2p.util.Log; */ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacade { public static final char CAPABILITY_FLOODFILL = 'f'; - private final Map _activeFloodQueries; + private final Map _activeFloodQueries; private boolean _floodfillEnabled; /** for testing, see isFloodfill() below */ private static String _alwaysQuery; @@ -250,7 +250,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad boolean isNew = false; FloodSearchJob searchJob = null; synchronized (_activeFloodQueries) { - searchJob = (FloodSearchJob)_activeFloodQueries.get(key); + searchJob = _activeFloodQueries.get(key); if (searchJob == null) { //if (SearchJob.onlyQueryFloodfillPeers(_context)) { searchJob = new FloodOnlySearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease); @@ -325,6 +325,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad } } } + void complete(Hash key) { synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java index 1b0fa1478..2312049d6 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java @@ -43,6 +43,7 @@ class FloodfillPeerSelector extends PeerSelector { * Puts the floodfill peers that are directly connected first in the list. * List will not include our own hash. * + * @param key the ROUTING key (NOT the original key) * @param peersToIgnore can be null * @return List of Hash for the peers selected */ @@ -57,6 +58,7 @@ class FloodfillPeerSelector extends PeerSelector { * Does not prefer the floodfill peers that are directly connected. * List will not include our own hash. * + * @param key the ROUTING key (NOT the original key) * @param peersToIgnore can be null * @return List of Hash for the peers selected */ @@ -70,6 +72,7 @@ class FloodfillPeerSelector extends PeerSelector { * after they're complete, sort via kademlia. * List will not include our own hash. * + * @param key the ROUTING key (NOT the original key) * @param peersToIgnore can be null * @return List of Hash for the peers selected */ @@ -133,7 +136,7 @@ class FloodfillPeerSelector extends PeerSelector { * List will not include our own hash. * * @return floodfills closest to the key that are not shitlisted forever - * @param key the routing key + * @param key the ROUTING key (NOT the original key) * @param maxNumRouters max to return * Sorted by closest to the key if > maxNumRouters, otherwise not * The list is in 3 groups - sorted by routing key within each group. @@ -159,6 +162,7 @@ class FloodfillPeerSelector extends PeerSelector { /** * See above for description * List will not include our own hash + * @param key the ROUTING key (NOT the original key) * @param toIgnore can be null */ List selectFloodfillParticipants(Hash key, int howMany, Set toIgnore, KBucketSet kbuckets) { @@ -175,6 +179,7 @@ class FloodfillPeerSelector extends PeerSelector { /** * See above for description * List MAY CONTAIN our own hash unless included in toIgnore + * @param key the ROUTING key (NOT the original key) * @param toIgnore can be null */ private List selectFloodfillParticipantsIncludingUs(Hash key, int howMany, Set toIgnore, KBucketSet kbuckets) { @@ -271,6 +276,7 @@ class FloodfillPeerSelector extends PeerSelector { /** * Warning - may return our router hash - add to toIgnore if necessary + * @param key the ROUTING key (NOT the original key) * @param toIgnore can be null */ public FloodfillSelectionCollector(Hash key, Set toIgnore, int wanted) { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SingleLookupJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SingleLookupJob.java index 93567c463..1d5f2f0ed 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SingleLookupJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SingleLookupJob.java @@ -5,7 +5,7 @@ import net.i2p.data.RouterInfo; import net.i2p.data.i2np.DatabaseSearchReplyMessage; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; -import net.i2p.util.Log; +//import net.i2p.util.Log; /** * Ask the peer who sent us the DSRM for the RouterInfos. @@ -20,12 +20,12 @@ import net.i2p.util.Log; * */ class SingleLookupJob extends JobImpl { - private final Log _log; + //private final Log _log; private final DatabaseSearchReplyMessage _dsrm; public SingleLookupJob(RouterContext ctx, DatabaseSearchReplyMessage dsrm) { super(ctx); - _log = ctx.logManager().getLog(getClass()); + //_log = ctx.logManager().getLog(getClass()); _dsrm = dsrm; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SingleSearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SingleSearchJob.java index 1a7755d50..9458321fe 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SingleSearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SingleSearchJob.java @@ -56,8 +56,11 @@ class SingleSearchJob extends FloodOnlySearchJob { @Override void failed() { getContext().messageRegistry().unregisterPending(_onm); + getContext().profileManager().dbLookupFailed(_to); } @Override - void success() {} + void success() { + getContext().profileManager().dbLookupSuccessful(_to, System.currentTimeMillis()-_created); + } }