From 380783c1ba163f9619b906ece14c00e0c75a29b1 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 9 Oct 2013 21:09:34 +0000 Subject: [PATCH] Snark DHT: - Combine getPeers and announce into a single method, as we must announce to the closest from the getPeers, not the closest from the kbuckets - Stop getPeers when nothing closer is found --- .../src/org/klomp/snark/TrackerClient.java | 17 ++--- .../java/src/org/klomp/snark/dht/DHT.java | 6 +- .../java/src/org/klomp/snark/dht/KRPC.java | 67 ++++++++++++++++--- 3 files changed, 65 insertions(+), 25 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index a41dd3de28..ace18c25de 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -634,25 +634,16 @@ public class TrackerClient implements Runnable { numwant = 1; else numwant = _util.getMaxConnections(); - Collection<Hash> hashes = dht.getPeers(snark.getInfoHash(), numwant, 5*60*1000); + Collection<Hash> hashes = dht.getPeersAndAnnounce(snark.getInfoHash(), numwant, 5*60*1000, 1, 3*60*1000); if (!hashes.isEmpty()) { runStarted = true; + lastDHTAnnounce = _util.getContext().clock().now(); rv = hashes.size(); + } else { + lastDHTAnnounce = 0; } if (_log.shouldLog(Log.INFO)) _log.info("Got " + hashes + " from DHT"); - // announce ourselves while the token is still good - // FIXME this needs to be in its own thread - if (!stop) { - // announce only to the 1 closest - int good = dht.announce(snark.getInfoHash(), 1, 5*60*1000); - if (_log.shouldLog(Log.INFO)) - _log.info("Sent " + good + " good announces to DHT"); - if (good > 0) - lastDHTAnnounce = _util.getContext().clock().now(); - else - lastDHTAnnounce = 0; - } // now try these peers if ((!stop) && !hashes.isEmpty()) { diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java b/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java index 3a32e1fa37..5dc1c0a625 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java @@ -36,16 +36,18 @@ public interface DHT { public void ping(Destination dest, int port); /** - * Get peers for a torrent. + * Get peers for a torrent, and announce to the closest node we find. * Blocking! * Caller should run in a thread. * * @param ih the Info Hash (torrent) * @param max maximum number of peers to return * @param maxWait the maximum time to wait (ms) must be > 0 + * @param annMax the number of peers to announce to + * @param maxWait the maximum total time to wait for announces, may be 0 to return immediately without waiting for acks * @return possibly empty (never null) */ - public Collection<Hash> getPeers(byte[] ih, int max, long maxWait); + public Collection<Hash> getPeersAndAnnounce(byte[] ih, int max, long maxWait, int annMax, long annMaxWait); /** * Announce to ourselves. diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java index 72ea891567..bfaf808f88 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java @@ -308,7 +308,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } /** - * Get peers for a torrent. + * Get peers for a torrent, and announce to the closest node we find. * This is an iterative lookup in the DHT. * Blocking! * Caller should run in a thread. @@ -316,9 +316,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { * @param ih the Info Hash (torrent) * @param max maximum number of peers to return * @param maxWait the maximum time to wait (ms) must be > 0 + * @param annMax the number of peers to announce to + * @param maxWait the maximum total time to wait for announces, may be 0 to return immediately without waiting for acks * @return possibly empty (never null) */ - public Collection<Hash> getPeers(byte[] ih, int max, long maxWait) { + public Collection<Hash> getPeersAndAnnounce(byte[] ih, int max, long maxWait, int annMax, long annMaxWait) { // check local tracker first InfoHash iHash = new InfoHash(ih); Collection<Hash> rv = _tracker.getPeers(iHash, max); @@ -333,9 +335,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { int maxNodes = 30; // Initial set to try, will get added to as we go List<NodeInfo> nodes = _knownNodes.findClosest(iHash, maxNodes); - SortedSet<NodeInfo> toTry = new TreeSet(new NodeInfoComparator(iHash)); + NodeInfoComparator comp = new NodeInfoComparator(iHash); + SortedSet<NodeInfo> toTry = new TreeSet(comp); + SortedSet<NodeInfo> heardFrom = new TreeSet(comp); toTry.addAll(nodes); - Set<NodeInfo> tried = new HashSet(); + SortedSet<NodeInfo> tried = new TreeSet(comp); if (_log.shouldLog(Log.INFO)) _log.info("Starting getPeers for " + iHash + " (b64: " + new NID(ih) + ") " + " with " + nodes.size() + " to try"); @@ -372,20 +376,24 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (_log.shouldLog(Log.DEBUG)) _log.debug("Got pong"); } else if (replyType == REPLY_PEERS) { + heardFrom.add(waiter.getSentTo()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Got peers"); List<Hash> reply = (List<Hash>) waiter.getReplyObject(); + // shouldn't send us an empty peers list but through + // 0.9.8.1 it will if (!reply.isEmpty()) { for (int j = 0; j < reply.size() && rv.size() < max; j++) { Hash h = reply.get(j); if (!h.equals(_myNodeInfo.getHash())) rv.add(h); } - if (_log.shouldLog(Log.INFO)) - _log.info("Finished get Peers, got " + reply.size() + " from DHT, returning " + rv.size()); - return rv; } + if (_log.shouldLog(Log.INFO)) + _log.info("Finished get Peers, got " + reply.size() + " from DHT, returning " + rv.size()); + break; } else if (replyType == REPLY_NODES) { + heardFrom.add(waiter.getSentTo()); List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Got " + reply.size() + " nodes"); @@ -401,9 +409,45 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } if (_context.clock().now() > endTime) break; + if (!toTry.isEmpty() && !heardFrom.isEmpty() && + comp.compare(toTry.first(), heardFrom.first()) >= 0) { + if (_log.shouldLog(Log.INFO)) + _log.info("Finished get Peers, nothing closer to try after " + (i+1)); + break; + } + } + // now announce + if (!heardFrom.isEmpty()) { + announce(ih); + // announce to the closest we've heard from + int annCnt = 0; + long start = _context.clock().now(); + for (Iterator<NodeInfo> iter = heardFrom.iterator(); iter.hasNext() && annCnt < annMax && _isRunning; ) { + NodeInfo annTo = iter.next(); + if (_log.shouldLog(Log.INFO)) + _log.info("Announcing to closest from get peers: " + annTo); + long toWait = annMaxWait > 0 ? Math.min(annMaxWait, 60*1000) : 0; + if (announce(ih, annTo, toWait)) + annCnt++; + if (annMaxWait > 0) { + annMaxWait -= _context.clock().now() - start; + if (annMaxWait < 1000) + break; + } + } + } else { + // spray it, but unlikely to work, we just went through the kbuckets, + // so this is essentially just a retry + if (_log.shouldLog(Log.INFO)) + _log.info("Announcing to closest in kbuckets after get peers failed"); + announce(ih, annMax, annMaxWait); + } + if (_log.shouldLog(Log.INFO)) { + _log.info("Finished get Peers, returning " + rv.size()); + _log.info("Tried: " + tried); + _log.info("Heard from: " + heardFrom); + _log.info("Not tried: " + toTry); } - if (_log.shouldLog(Log.INFO)) - _log.info("Finished get Peers, " + rv.size() + " from local and none from DHT"); return rv; } @@ -445,13 +489,16 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } /** + * Not recommended - use getPeersAndAnnounce(). + * * Announce to the closest peers in the local DHT. * This is NOT iterative - call getPeers() first to get the closest * peers into the local DHT. * Blocking unless maxWait <= 0 * Caller should run in a thread. * This also automatically announces ourself to our local tracker. - * For best results do a getPeers() first so we have tokens. + * For best results do a getPeersAndAnnounce() instead, as this announces to + * the closest in the kbuckets, it does NOT sort through the known nodes hashmap. * * @param ih the Info Hash (torrent) * @param max maximum number of peers to announce to -- GitLab