diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java index b6fec178cfeed2e0ed16a9a91bb3bd479d496c73..0aa8d5099f4721310028df423ecdc08a298f99d0 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java @@ -37,40 +37,25 @@ import net.i2p.util.Log; * */ class FloodOnlySearchJob extends FloodSearchJob { - protected Log _log; - private FloodfillNetworkDatabaseFacade _facade; - protected Hash _key; - private final List _onFind; - private final List _onFailed; - private long _expiration; - protected int _timeoutMs; - private long _origExpiration; - private boolean _isLease; protected volatile int _lookupsRemaining; private volatile boolean _dead; private long _created; private boolean _shouldProcessDSRM; - private final HashSet _unheardFrom; + private final HashSet<Hash> _unheardFrom; - protected final List _out; + private final List<OutNetMessage> _out; protected MessageSelector _replySelector; protected ReplyJob _onReply; protected Job _onTimeout; + public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) { super(ctx, facade, key, onFind, onFailed, timeoutMs, isLease); + // these override the settings in super _log = ctx.logManager().getLog(FloodOnlySearchJob.class); - _facade = facade; - _key = key; - _onFind = new ArrayList(); - _onFind.add(onFind); - _onFailed = new ArrayList(); - _onFailed.add(onFailed); _timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT); _expiration = _timeoutMs + ctx.clock().now(); _origExpiration = _timeoutMs + ctx.clock().now(); - _isLease = isLease; - _lookupsRemaining = 0; - _dead = false; + // do we need a synchronizedList, since we synch on _out everywhere below... _out = Collections.synchronizedList(new ArrayList(2)); _unheardFrom = new HashSet(CONCURRENT_SEARCHES); _replySelector = new FloodOnlyLookupSelector(getContext(), this); @@ -79,17 +64,7 @@ class FloodOnlySearchJob extends FloodSearchJob { _created = System.currentTimeMillis(); _shouldProcessDSRM = false; } - @Override - void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) { - if (_dead) { - getContext().jobQueue().addJob(onFailed); - } else { - if (onFind != null) synchronized (_onFind) { _onFind.add(onFind); } - if (onFailed != null) synchronized (_onFailed) { _onFailed.add(onFailed); } - } - } - @Override - public long getExpiration() { return _expiration; } + public long getCreated() { return _created; } public boolean shouldProcessDSRM() { return _shouldProcessDSRM; } private static final int CONCURRENT_SEARCHES = 2; @@ -188,12 +163,6 @@ class FloodOnlySearchJob extends FloodSearchJob { @Override public String getName() { return "NetDb flood search (phase 1)"; } - @Override - Hash getKey() { return _key; } - @Override - void decrementRemaining() { if (_lookupsRemaining > 0) _lookupsRemaining--; } - @Override - int getLookupsRemaining() { return _lookupsRemaining; } /** Note that we heard from the peer */ void decrementRemaining(Hash peer) { decrementRemaining(); @@ -218,8 +187,8 @@ class FloodOnlySearchJob extends FloodSearchJob { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created)); synchronized(_unheardFrom) { - for (Iterator iter = _unheardFrom.iterator(); iter.hasNext(); ) - getContext().profileManager().dbLookupFailed((Hash) iter.next()); + for (Iterator<Hash> iter = _unheardFrom.iterator(); iter.hasNext(); ) + getContext().profileManager().dbLookupFailed(iter.next()); } _facade.complete(_key); getContext().statManager().addRateData("netDb.failedTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created); @@ -248,7 +217,7 @@ class FloodOnlySearchJob extends FloodSearchJob { // StoreJob also calls dbStoreSent() which updates the lastHeardFrom timer - this also helps. synchronized(_unheardFrom) { if (_unheardFrom.size() == 1) { - Hash peer = (Hash) _unheardFrom.iterator().next(); + Hash peer = _unheardFrom.iterator().next(); getContext().profileManager().dbLookupSuccessful(peer, System.currentTimeMillis()-_created); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java index 810b29c4fa51308ce3700809f5ad3993f8725a84..f3a78b67edf06914efa8154f55b444f64a9f7d03 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java @@ -23,19 +23,22 @@ import net.i2p.util.Log; * the normal (kademlia) channels. This should cut down on spurious lookups caused * by simple delays in responses from floodfill peers * + * NOTE: Unused directly - see FloodOnlySearchJob extension which overrides almost everything. + * TODO: Comment out or delete what we don't use here. */ public class FloodSearchJob extends JobImpl { - private Log _log; - private FloodfillNetworkDatabaseFacade _facade; - private Hash _key; - private final List _onFind; - private final List _onFailed; - private long _expiration; - private int _timeoutMs; - private long _origExpiration; - private boolean _isLease; - private volatile int _lookupsRemaining; - private volatile boolean _dead; + protected Log _log; + protected FloodfillNetworkDatabaseFacade _facade; + protected Hash _key; + protected final List<Job> _onFind; + protected final List<Job> _onFailed; + protected long _expiration; + protected int _timeoutMs; + protected long _origExpiration; + protected boolean _isLease; + protected volatile int _lookupsRemaining; + protected volatile boolean _dead; + public FloodSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) { super(ctx); _log = ctx.logManager().getLog(FloodSearchJob.class); @@ -86,13 +89,13 @@ public class FloodSearchJob extends JobImpl { TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundTunnel(); if ( (replyTunnel == null) || (outTunnel == null) ) { _dead = true; - List removed = null; + List<Job> removed = null; synchronized (_onFailed) { removed = new ArrayList(_onFailed); _onFailed.clear(); } while (removed.size() > 0) - getContext().jobQueue().addJob((Job)removed.remove(0)); + getContext().jobQueue().addJob(removed.remove(0)); getContext().messageRegistry().unregisterPending(out); return; } @@ -117,9 +120,9 @@ public class FloodSearchJob extends JobImpl { } public String getName() { return "NetDb search (phase 1)"; } - Hash getKey() { return _key; } - void decrementRemaining() { _lookupsRemaining--; } - int getLookupsRemaining() { return _lookupsRemaining; } + protected Hash getKey() { return _key; } + protected void decrementRemaining() { if (_lookupsRemaining > 0) _lookupsRemaining--; } + protected int getLookupsRemaining() { return _lookupsRemaining; } void failed() { if (_dead) return; @@ -130,13 +133,13 @@ public class FloodSearchJob extends JobImpl { if (timeRemaining > 0) { _facade.searchFull(_key, _onFind, _onFailed, timeRemaining, _isLease); } else { - List removed = null; + List<Job> removed = null; synchronized (_onFailed) { removed = new ArrayList(_onFailed); _onFailed.clear(); } while (removed.size() > 0) - getContext().jobQueue().addJob((Job)removed.remove(0)); + getContext().jobQueue().addJob(removed.remove(0)); } } void success() { @@ -145,13 +148,13 @@ public class FloodSearchJob extends JobImpl { _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful"); _dead = true; _facade.complete(_key); - List removed = null; + List<Job> removed = null; synchronized (_onFind) { removed = new ArrayList(_onFind); _onFind.clear(); } while (removed.size() > 0) - getContext().jobQueue().addJob((Job)removed.remove(0)); + getContext().jobQueue().addJob(removed.remove(0)); } private static class FloodLookupTimeoutJob extends JobImpl { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java index 767e9f7c9760a08965c3b02656099c94d79a6812..d9d2eaa144e65b1c69fff6c896369b3093aa67a6 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java @@ -179,8 +179,8 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad return false; } - public List getKnownRouterData() { - List rv = new ArrayList(); + public List<RouterInfo> getKnownRouterData() { + List<RouterInfo> rv = new ArrayList(); DataStore ds = getDataStore(); if (ds != null) { Set keys = ds.getKeys(); @@ -188,7 +188,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad for (Iterator iter = keys.iterator(); iter.hasNext(); ) { Object o = getDataStore().get((Hash)iter.next()); if (o instanceof RouterInfo) - rv.add(o); + rv.add((RouterInfo)o); } } } @@ -237,7 +237,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad * Ok, the initial set of searches to the floodfill peers timed out, lets fall back on the * wider kademlia-style searches */ - void searchFull(Hash key, List onFind, List onFailed, long timeoutMs, boolean isLease) { + void searchFull(Hash key, List<Job> onFind, List<Job> onFailed, long timeoutMs, boolean isLease) { synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); } Job find = null; @@ -245,13 +245,13 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad if (onFind != null) { synchronized (onFind) { if (onFind.size() > 0) - find = (Job)onFind.remove(0); + find = onFind.remove(0); } } if (onFailed != null) { synchronized (onFailed) { if (onFailed.size() > 0) - fail = (Job)onFailed.remove(0); + fail = onFailed.remove(0); } } SearchJob job = super.search(key, find, fail, timeoutMs, isLease); @@ -260,14 +260,14 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad _log.info("Floodfill search timed out for " + key.toBase64() + ", falling back on normal search (#" + job.getJobId() + ") with " + timeoutMs + " remaining"); long expiration = timeoutMs + _context.clock().now(); - List removed = null; + List<Job> removed = null; if (onFind != null) { synchronized (onFind) { removed = new ArrayList(onFind); onFind.clear(); } for (int i = 0; i < removed.size(); i++) - job.addDeferred((Job)removed.get(i), null, expiration, isLease); + job.addDeferred(removed.get(i), null, expiration, isLease); removed = null; } if (onFailed != null) { @@ -276,7 +276,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad onFailed.clear(); } for (int i = 0; i < removed.size(); i++) - job.addDeferred(null, (Job)removed.get(i), expiration, isLease); + job.addDeferred(null, removed.get(i), expiration, isLease); removed = null; } } @@ -287,8 +287,9 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad /** list of the Hashes of currently known floodfill peers; * Returned list will not include our own hash. + * List is not sorted and not shuffled. */ - public List getFloodfillPeers() { + public List<Hash> getFloodfillPeers() { FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector(); return sel.selectFloodfillParticipants(getKBuckets()); } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java index 0aa6725879787e15d423a75930bcd95a55efe116..eec1908a8f042eed8dfccfdac9ff203faec98989 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java @@ -22,12 +22,19 @@ import net.i2p.router.RouterContext; import net.i2p.router.peermanager.PeerProfile; import net.i2p.util.Log; +/** + * This is where we implement semi-Kademlia with the floodfills, by + * selecting floodfills closest to a given key for + * searches and stores. + * + */ class FloodfillPeerSelector extends PeerSelector { public FloodfillPeerSelector(RouterContext ctx) { super(ctx); } /** * Pick out peers with the floodfill capacity set, returning them first, but then * after they're complete, sort via kademlia. + * Puts the floodfill peers that are directly connected first in the list. * * @return List of Hash for the peers selected */ @@ -36,6 +43,13 @@ class FloodfillPeerSelector extends PeerSelector { return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets, true); } + /** + * Pick out peers with the floodfill capacity set, returning them first, but then + * after they're complete, sort via kademlia. + * Does not prefer the floodfill peers that are directly connected. + * + * @return List of Hash for the peers selected + */ @Override public List<Hash> selectNearestExplicitThin(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) { return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets, false); @@ -58,7 +72,7 @@ class FloodfillPeerSelector extends PeerSelector { /** * @return all floodfills not shitlisted forever. list will not include our own hash - * + * List is not sorted and not shuffled. */ public List<Hash> selectFloodfillParticipants(KBucketSet kbuckets) { if (kbuckets == null) return new ArrayList(); @@ -69,6 +83,7 @@ class FloodfillPeerSelector extends PeerSelector { /** * @return all floodfills not shitlisted foreverx + * @param key the routing key * @param maxNumRouters max to return * Sorted by closest to the key if > maxNumRouters, otherwise not */ @@ -104,7 +119,13 @@ class FloodfillPeerSelector extends PeerSelector { _matches = 0; _wanted = wanted; } + + /** + * @return unsorted list of all with the 'f' mark in their netdb + * except for shitlisted ones. + */ public List<Hash> getFloodfillParticipants() { return _floodfillMatches; } + private static final int EXTRA_MATCHES = 100; public void add(Hash entry) { //if (_context.profileOrganizer().isFailing(entry)) @@ -144,6 +165,14 @@ class FloodfillPeerSelector extends PeerSelector { return get(howMany, false); } + /** + * @return list of all with the 'f' mark in their netdb except for shitlisted ones. + * The list is in 3 groups - unsorted (shuffled) within each group. + * Group 1: If preferConnected = true, the peers we are directly + * connected to, that meet the group 2 criteria + * Group 2: Netdb published less than 3h ago, no bad send in last 30m. + * Group 3: All others + */ public List<Hash> get(int howMany, boolean preferConnected) { Collections.shuffle(_floodfillMatches, _context.random()); List<Hash> rv = new ArrayList(howMany);