diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java index 2b8659c697a9cfd7a12e9fbb865bd75238ab503c..bb34f8dc6bb528813463deccae634a0052435064 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java @@ -94,9 +94,22 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad } } + private static final int MAX_TO_FLOOD = 9; + + /** + * Send to a subset of all floodfill peers. + * We do this to implement Kademlia within the floodfills, i.e. + * we flood to those closest to the key. + */ public void flood(DataStructure ds) { + Hash key; + if (ds instanceof LeaseSet) + key = ((LeaseSet)ds).getDestination().calculateHash(); + else + key = ((RouterInfo)ds).getIdentity().calculateHash(); + Hash rkey = _context.routingKeyGenerator().getRoutingKey(key); FloodfillPeerSelector sel = (FloodfillPeerSelector)getPeerSelector(); - List peers = sel.selectFloodfillParticipants(getKBuckets()); + List peers = sel.selectFloodfillParticipants(rkey, MAX_TO_FLOOD, getKBuckets()); int flooded = 0; for (int i = 0; i < peers.size(); i++) { Hash peer = (Hash)peers.get(i); @@ -107,12 +120,11 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad continue; DatabaseStoreMessage msg = new DatabaseStoreMessage(_context); if (ds instanceof LeaseSet) { - msg.setKey(((LeaseSet)ds).getDestination().calculateHash()); msg.setLeaseSet((LeaseSet)ds); } else { - msg.setKey(((RouterInfo)ds).getIdentity().calculateHash()); msg.setRouterInfo((RouterInfo)ds); } + msg.setKey(key); msg.setReplyGateway(null); msg.setReplyToken(0); msg.setReplyTunnel(null); @@ -125,11 +137,11 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad _context.commSystem().processMessage(m); flooded++; if (_log.shouldLog(Log.INFO)) - _log.info("Flooding the entry for " + msg.getKey().toBase64() + " to " + peer.toBase64()); + _log.info("Flooding the entry for " + key.toBase64() + " to " + peer.toBase64()); } if (_log.shouldLog(Log.INFO)) - _log.info("Flooded the to " + flooded + " peers"); + _log.info("Flooded the data to " + flooded + " of " + peers.size() + " peers"); } private static final int FLOOD_PRIORITY = 200; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java index d051460a29d349138b3d6862f2ac1ec49b6804e4..0aa6725879787e15d423a75930bcd95a55efe116 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java @@ -32,16 +32,16 @@ class FloodfillPeerSelector extends PeerSelector { * @return List of Hash for the peers selected */ @Override - public List selectMostReliablePeers(Hash key, int maxNumRouters, Set peersToIgnore, KBucketSet kbuckets) { + public List<Hash> selectMostReliablePeers(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) { return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets, true); } @Override - public List selectNearestExplicitThin(Hash key, int maxNumRouters, Set peersToIgnore, KBucketSet kbuckets) { + public List<Hash> selectNearestExplicitThin(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) { return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets, false); } - public List selectNearestExplicitThin(Hash key, int maxNumRouters, Set peersToIgnore, KBucketSet kbuckets, boolean preferConnected) { + public List<Hash> selectNearestExplicitThin(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets, boolean preferConnected) { if (peersToIgnore == null) peersToIgnore = new HashSet(1); peersToIgnore.add(_context.routerHash()); @@ -56,30 +56,55 @@ class FloodfillPeerSelector extends PeerSelector { return rv; } - /** Returned list will not include our own hash */ - public List selectFloodfillParticipants(KBucketSet kbuckets) { + /** + * @return all floodfills not shitlisted forever. list will not include our own hash + * + */ + public List<Hash> selectFloodfillParticipants(KBucketSet kbuckets) { if (kbuckets == null) return new ArrayList(); FloodfillSelectionCollector matches = new FloodfillSelectionCollector(null, null, 0); kbuckets.getAll(matches); return matches.getFloodfillParticipants(); } + /** + * @return all floodfills not shitlisted foreverx + * @param maxNumRouters max to return + * Sorted by closest to the key if > maxNumRouters, otherwise not + */ + public List<Hash> selectFloodfillParticipants(Hash key, int maxNumRouters, KBucketSet kbuckets) { + List<Hash> ffs = selectFloodfillParticipants(kbuckets); + if (ffs.size() <= maxNumRouters) + return ffs; // unsorted + TreeMap<BigInteger, Hash> sorted = new TreeMap(); + for (int i = 0; i < ffs.size(); i++) { + Hash h = ffs.get(i); + BigInteger diff = getDistance(key, h); + sorted.put(diff, h); + } + List<Hash> rv = new ArrayList(maxNumRouters); + for (int i = 0; i < maxNumRouters; i++) { + rv.add(sorted.remove(sorted.firstKey())); + } + return rv; + } + private class FloodfillSelectionCollector implements SelectionCollector { - private TreeMap _sorted; - private List _floodfillMatches; + private TreeMap<BigInteger, Hash> _sorted; + private List<Hash> _floodfillMatches; private Hash _key; - private Set _toIgnore; + private Set<Hash> _toIgnore; private int _matches; private int _wanted; - public FloodfillSelectionCollector(Hash key, Set toIgnore, int wanted) { + public FloodfillSelectionCollector(Hash key, Set<Hash> toIgnore, int wanted) { _key = key; _sorted = new TreeMap(); - _floodfillMatches = new ArrayList(1); + _floodfillMatches = new ArrayList(8); _toIgnore = toIgnore; _matches = 0; _wanted = wanted; } - public List getFloodfillParticipants() { return _floodfillMatches; } + public List<Hash> getFloodfillParticipants() { return _floodfillMatches; } private static final int EXTRA_MATCHES = 100; public void add(Hash entry) { //if (_context.profileOrganizer().isFailing(entry)) @@ -115,15 +140,15 @@ class FloodfillPeerSelector extends PeerSelector { _matches++; } /** get the first $howMany entries matching */ - public List get(int howMany) { + public List<Hash> get(int howMany) { return get(howMany, false); } - public List get(int howMany, boolean preferConnected) { + public List<Hash> get(int howMany, boolean preferConnected) { Collections.shuffle(_floodfillMatches, _context.random()); - List rv = new ArrayList(howMany); - List badff = new ArrayList(howMany); - List unconnectedff = new ArrayList(howMany); + List<Hash> rv = new ArrayList(howMany); + List<Hash> badff = new ArrayList(howMany); + List<Hash> unconnectedff = new ArrayList(howMany); int found = 0; long now = _context.clock().now(); // Only add in "good" floodfills here... diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java index 3279f437cf019ca1780e2e48d3fc06f4f1b99ba3..2c9eeb5b62a19e40603a4c08642e91eb54130d40 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java @@ -17,10 +17,14 @@ import net.i2p.data.RouterInfo; import net.i2p.router.Job; import net.i2p.router.RouterContext; +/** + * This extends StoreJob to fire off a FloodfillVerifyStoreJob after success. + * + */ class FloodfillStoreJob extends StoreJob { private FloodfillNetworkDatabaseFacade _facade; /** - * Create a new search for the routingKey specified + * Send a data structure to the floodfills * */ public FloodfillStoreJob(RouterContext context, FloodfillNetworkDatabaseFacade facade, Hash key, DataStructure data, Job onSuccess, Job onFailure, long timeoutMs) { @@ -31,7 +35,7 @@ class FloodfillStoreJob extends StoreJob { * @param toSkip set of peer hashes of people we dont want to send the data to (e.g. we * already know they have it). This can be null. */ - public FloodfillStoreJob(RouterContext context, FloodfillNetworkDatabaseFacade facade, Hash key, DataStructure data, Job onSuccess, Job onFailure, long timeoutMs, Set toSkip) { + public FloodfillStoreJob(RouterContext context, FloodfillNetworkDatabaseFacade facade, Hash key, DataStructure data, Job onSuccess, Job onFailure, long timeoutMs, Set<Hash> toSkip) { super(context, facade, key, data, onSuccess, onFailure, timeoutMs, toSkip); _facade = facade; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java index 328691b99a7ec6b640956005dd19d11509da0746..918465391798679fb935dfce06b2a209763ded03 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/PeerSelector.java @@ -43,10 +43,9 @@ public class PeerSelector { * @return ordered list of Hash objects */ /* FIXME Exporting non-public type through public API FIXME */ - public List selectMostReliablePeers(Hash key, int numClosest, Set alreadyChecked, KBucketSet kbuckets) {// LINT -- Exporting non-public type through public API + public List<Hash> selectMostReliablePeers(Hash key, int numClosest, Set<Hash> alreadyChecked, KBucketSet kbuckets) {// LINT -- Exporting non-public type through public API // get the peers closest to the key - List nearest = selectNearestExplicit(key, numClosest, alreadyChecked, kbuckets); - return nearest; + return selectNearestExplicit(key, numClosest, alreadyChecked, kbuckets); } /** @@ -57,10 +56,11 @@ public class PeerSelector { * @return List of Hash for the peers selected, ordered by bucket (but intra bucket order is not defined) */ /* FIXME Exporting non-public type through public API FIXME */ - public List selectNearestExplicit(Hash key, int maxNumRouters, Set peersToIgnore, KBucketSet kbuckets) {// LINT -- Exporting non-public type through public API - if (true) + public List<Hash> selectNearestExplicit(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) {// LINT -- Exporting non-public type through public API + //if (true) return selectNearestExplicitThin(key, maxNumRouters, peersToIgnore, kbuckets); +/****** if (peersToIgnore == null) peersToIgnore = new HashSet(1); peersToIgnore.add(_context.routerHash()); @@ -84,6 +84,7 @@ public class PeerSelector { + peerHashes + " (not including " + peersToIgnore + ") [allHashes.size = " + allHashes.size() + "]"); return peerHashes; +******/ } /** @@ -94,7 +95,7 @@ public class PeerSelector { * @return List of Hash for the peers selected, ordered by bucket (but intra bucket order is not defined) */ /* FIXME Exporting non-public type through public API FIXME */ - public List selectNearestExplicitThin(Hash key, int maxNumRouters, Set peersToIgnore, KBucketSet kbuckets) { // LINT -- Exporting non-public type through public API + public List<Hash> selectNearestExplicitThin(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) { // LINT -- Exporting non-public type through public API if (peersToIgnore == null) peersToIgnore = new HashSet(1); peersToIgnore.add(_context.routerHash()); @@ -109,11 +110,11 @@ public class PeerSelector { } private class MatchSelectionCollector implements SelectionCollector { - private TreeMap _sorted; + private TreeMap<BigInteger, Hash> _sorted; private Hash _key; - private Set _toIgnore; + private Set<Hash> _toIgnore; private int _matches; - public MatchSelectionCollector(Hash key, Set toIgnore) { + public MatchSelectionCollector(Hash key, Set<Hash> toIgnore) { _key = key; _sorted = new TreeMap(); _toIgnore = toIgnore; @@ -135,8 +136,8 @@ public class PeerSelector { _matches++; } /** get the first $howMany entries matching */ - public List get(int howMany) { - List rv = new ArrayList(howMany); + public List<Hash> get(int howMany) { + List<Hash> rv = new ArrayList(howMany); for (int i = 0; i < howMany; i++) { if (_sorted.size() <= 0) break; @@ -151,6 +152,7 @@ public class PeerSelector { * strip out all of the peers that are failing * */ +/******** private void removeFailingPeers(Set peerHashes) { List failing = null; for (Iterator iter = peerHashes.iterator(); iter.hasNext(); ) { @@ -184,6 +186,7 @@ public class PeerSelector { if (failing != null) peerHashes.removeAll(failing); } +**********/ public static BigInteger getDistance(Hash targetKey, Hash routerInQuestion) { // plain XOR of the key and router @@ -199,7 +202,7 @@ public class PeerSelector { * @return List of Hash for the peers selected, ordered by bucket (but intra bucket order is not defined) */ /* FIXME Exporting non-public type through public API FIXME */ - public List selectNearest(Hash key, int maxNumRouters, Set peersToIgnore, KBucketSet kbuckets) { // LINT -- Exporting non-public type through public API + public List<Hash> selectNearest(Hash key, int maxNumRouters, Set<Hash> peersToIgnore, KBucketSet kbuckets) { // LINT -- Exporting non-public type through public API // sure, this may not be exactly correct per kademlia (peers on the border of a kbucket in strict kademlia // would behave differently) but I can see no reason to keep around an /additional/ more complicated algorithm. // later if/when selectNearestExplicit gets costly, we may revisit this (since kbuckets let us cache the distance() diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index cc4c51329c550e8d4f2be2c98777c91e5206e652..af3ba2b277624a51c4e7a8cfcbb8aace6b51439a 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -56,7 +56,7 @@ class StoreJob extends JobImpl { private final static int STORE_PRIORITY = 100; /** - * Create a new search for the routingKey specified + * Send a data structure to the floodfills * */ public StoreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, @@ -69,7 +69,7 @@ class StoreJob extends JobImpl { * already know they have it). This can be null. */ public StoreJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, - DataStructure data, Job onSuccess, Job onFailure, long timeoutMs, Set toSkip) { + DataStructure data, Job onSuccess, Job onFailure, long timeoutMs, Set<Hash> toSkip) { super(context); _log = context.logManager().getLog(StoreJob.class); getContext().statManager().createRateStat("netDb.storeRouterInfoSent", "How many routerInfo store messages have we sent?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); @@ -146,7 +146,7 @@ class StoreJob extends JobImpl { // This will help minimize active connections for floodfill peers and allow // the network to scale. // Perhaps the ultimate solution is to send RouterInfos through a lease also. - List closestHashes; + List<Hash> closestHashes; if (_state.getData() instanceof RouterInfo) closestHashes = getMostReliableRouters(_state.getTarget(), toCheck, _state.getAttempted()); else @@ -165,8 +165,8 @@ class StoreJob extends JobImpl { //_state.addPending(closestHashes); if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Continue sending key " + _state.getTarget() + " after " + _state.getAttempted().size() + " tries to " + closestHashes); - for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); + for (Iterator<Hash> iter = closestHashes.iterator(); iter.hasNext(); ) { + Hash peer = iter.next(); DataStructure ds = _facade.getDataStore().get(peer); if ( (ds == null) || !(ds instanceof RouterInfo) ) { if (_log.shouldLog(Log.INFO)) @@ -215,7 +215,7 @@ class StoreJob extends JobImpl { * * @return ordered list of Hash objects */ - private List getClosestRouters(Hash key, int numClosest, Set alreadyChecked) { + private List<Hash> getClosestRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) { Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key); //if (_log.shouldLog(Log.DEBUG)) // _log.debug(getJobId() + ": Current routing key for " + key + ": " + rkey); @@ -225,7 +225,7 @@ class StoreJob extends JobImpl { return _peerSelector.selectNearestExplicit(rkey, numClosest, alreadyChecked, ks); } - private List getMostReliableRouters(Hash key, int numClosest, Set alreadyChecked) { + private List<Hash> getMostReliableRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) { Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key); KBucketSet ks = _facade.getKBuckets(); if (ks == null) return new ArrayList(); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java index 91e7687ba3c020d5b1846acb3c89c110aee5f4e8..eaa51edb0b5783224c84a9cf12461a12e6df0b64 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java @@ -15,12 +15,12 @@ class StoreState { private RouterContext _context; private Hash _key; private DataStructure _data; - private final HashSet _pendingPeers; - private HashMap _pendingPeerTimes; - private final HashSet _successfulPeers; - private final HashSet _successfulExploratoryPeers; - private final HashSet _failedPeers; - private final HashSet _attemptedPeers; + private final HashSet<Hash> _pendingPeers; + private HashMap<Hash, Long> _pendingPeerTimes; + private final HashSet<Hash> _successfulPeers; + private final HashSet<Hash> _successfulExploratoryPeers; + private final HashSet<Hash> _failedPeers; + private final HashSet<Hash> _attemptedPeers; private int _completeCount; private volatile long _completed; private volatile long _started; @@ -28,7 +28,7 @@ class StoreState { public StoreState(RouterContext ctx, Hash key, DataStructure data) { this(ctx, key, data, null); } - public StoreState(RouterContext ctx, Hash key, DataStructure data, Set toSkip) { + public StoreState(RouterContext ctx, Hash key, DataStructure data, Set<Hash> toSkip) { _context = ctx; _key = key; _data = data; @@ -48,29 +48,29 @@ class StoreState { public Hash getTarget() { return _key; } public DataStructure getData() { return _data; } - public Set getPending() { + public Set<Hash> getPending() { synchronized (_pendingPeers) { - return (Set)_pendingPeers.clone(); + return (Set<Hash>)_pendingPeers.clone(); } } - public Set getAttempted() { + public Set<Hash> getAttempted() { synchronized (_attemptedPeers) { - return (Set)_attemptedPeers.clone(); + return (Set<Hash>)_attemptedPeers.clone(); } } - public Set getSuccessful() { + public Set<Hash> getSuccessful() { synchronized (_successfulPeers) { - return (Set)_successfulPeers.clone(); + return (Set<Hash>)_successfulPeers.clone(); } } - public Set getSuccessfulExploratory() { + public Set<Hash> getSuccessfulExploratory() { synchronized (_successfulExploratoryPeers) { - return (Set)_successfulExploratoryPeers.clone(); + return (Set<Hash>)_successfulExploratoryPeers.clone(); } } - public Set getFailed() { + public Set<Hash> getFailed() { synchronized (_failedPeers) { - return (Set)_failedPeers.clone(); + return (Set<Hash>)_failedPeers.clone(); } } public boolean completed() { return _completed != -1; } @@ -92,10 +92,10 @@ class StoreState { _attemptedPeers.add(peer); } } - public void addPending(Collection pending) { + public void addPending(Collection<Hash> pending) { synchronized (_pendingPeers) { _pendingPeers.addAll(pending); - for (Iterator iter = pending.iterator(); iter.hasNext(); ) + for (Iterator<Hash> iter = pending.iterator(); iter.hasNext(); ) _pendingPeerTimes.put(iter.next(), new Long(_context.clock().now())); } synchronized (_attemptedPeers) { @@ -113,7 +113,7 @@ class StoreState { long rv = -1; synchronized (_pendingPeers) { _pendingPeers.remove(peer); - Long when = (Long)_pendingPeerTimes.remove(peer); + Long when = _pendingPeerTimes.remove(peer); if (when != null) rv = _context.clock().now() - when.longValue(); } @@ -128,7 +128,7 @@ class StoreState { long rv = -1; synchronized (_pendingPeers) { _pendingPeers.remove(peer); - Long when = (Long)_pendingPeerTimes.remove(peer); + Long when = _pendingPeerTimes.remove(peer); if (when != null) rv = _context.clock().now() - when.longValue(); } @@ -159,43 +159,43 @@ class StoreState { buf.append(" Attempted: "); synchronized (_attemptedPeers) { buf.append(_attemptedPeers.size()).append(' '); - for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); + for (Iterator<Hash> iter = _attemptedPeers.iterator(); iter.hasNext(); ) { + Hash peer = iter.next(); buf.append(peer.toBase64()).append(" "); } } buf.append(" Pending: "); synchronized (_pendingPeers) { buf.append(_pendingPeers.size()).append(' '); - for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); + for (Iterator<Hash> iter = _pendingPeers.iterator(); iter.hasNext(); ) { + Hash peer = iter.next(); buf.append(peer.toBase64()).append(" "); } } buf.append(" Failed: "); synchronized (_failedPeers) { buf.append(_failedPeers.size()).append(' '); - for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); + for (Iterator<Hash> iter = _failedPeers.iterator(); iter.hasNext(); ) { + Hash peer = iter.next(); buf.append(peer.toBase64()).append(" "); } } buf.append(" Successful: "); synchronized (_successfulPeers) { buf.append(_successfulPeers.size()).append(' '); - for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); + for (Iterator<Hash> iter = _successfulPeers.iterator(); iter.hasNext(); ) { + Hash peer = iter.next(); buf.append(peer.toBase64()).append(" "); } } buf.append(" Successful Exploratory: "); synchronized (_successfulExploratoryPeers) { buf.append(_successfulExploratoryPeers.size()).append(' '); - for (Iterator iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); + for (Iterator<Hash> iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) { + Hash peer = iter.next(); buf.append(peer.toBase64()).append(" "); } } return buf.toString(); } -} \ No newline at end of file +}