From 7513d42e9ee3eb206eced48e40e6ec8852f68adb Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 9 Oct 2013 13:06:47 +0000 Subject: [PATCH] Snark DHT: - Increase dest lookup and search timeouts - Increase max search depth - Loop tracker client faster when in magnet mode - Loop tracker client faster if DHT announce fails - Don't return an empty peers list if we only know about the requestor - volatiles, log tweaks - Major fixes of getPeers() to follow --- .../src/org/klomp/snark/ExtensionHandler.java | 6 ++-- .../src/org/klomp/snark/I2PSnarkUtil.java | 2 +- .../java/src/org/klomp/snark/Snark.java | 6 ++-- .../src/org/klomp/snark/TrackerClient.java | 26 +++++++++++++---- .../java/src/org/klomp/snark/dht/KRPC.java | 28 ++++++++++++------- 5 files changed, 46 insertions(+), 22 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java index 8a49fc4284..fd79e82527 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java +++ b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java @@ -110,7 +110,8 @@ abstract class ExtensionHandler { // drop if we need metainfo and we haven't found anybody yet synchronized(state) { if (!state.isInitialized()) { - log.debug("Dropping peer, we need metadata! " + peer); + if (log.shouldLog(Log.DEBUG)) + log.debug("Dropping peer, we need metadata! " + peer); peer.disconnect(); } } @@ -124,7 +125,8 @@ abstract class ExtensionHandler { // drop if we need metainfo and we haven't found anybody yet synchronized(state) { if (!state.isInitialized()) { - log.debug("Dropping peer, we need metadata! " + peer); + if (log.shouldLog(Log.DEBUG)) + log.debug("Dropping peer, we need metadata! " + peer); peer.disconnect(); } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 4d34cbd47c..8e41722654 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -513,7 +513,7 @@ public class I2PSnarkUtil { if (_log.shouldLog(Log.INFO)) _log.info("Using existing session for lookup of " + ip); try { - return sess.lookupDest(h); + return sess.lookupDest(h, 15*1000); } catch (I2PSessionException ise) { } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index 6e04d29adc..fb4de137a9 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -235,9 +235,9 @@ public class Snark private final I2PSnarkUtil _util; private final Log _log; private final PeerCoordinatorSet _peerCoordinatorSet; - private String trackerProblems; - private int trackerSeenPeers; - private boolean _autoStoppable; + private volatile String trackerProblems; + private volatile int trackerSeenPeers; + private volatile boolean _autoStoppable; /** from main() via parseArguments() single torrent */ diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index 91f68ce0b2..a41dd3de28 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -379,15 +379,24 @@ public class TrackerClient implements Runnable { if (dht != null && (meta == null || !meta.isPrivate())) dht.announce(snark.getInfoHash()); + int oldSeenPeers = snark.getTrackerSeenPeers(); int maxSeenPeers = 0; - if (!trackers.isEmpty()) + if (!trackers.isEmpty()) { maxSeenPeers = getPeersFromTrackers(trackers); + // fast update for UI at startup + if (maxSeenPeers > oldSeenPeers) + snark.setTrackerSeenPeers(maxSeenPeers); + } int p = getPeersFromPEX(); if (p > maxSeenPeers) maxSeenPeers = p; p = getPeersFromDHT(); - if (p > maxSeenPeers) + if (p > maxSeenPeers) { maxSeenPeers = p; + // fast update for UI at startup + if (maxSeenPeers > oldSeenPeers) + snark.setTrackerSeenPeers(maxSeenPeers); + } // backup if DHT needs bootstrapping if (trackers.isEmpty() && !backupTrackers.isEmpty() && dht != null && dht.size() < 16) { p = getPeersFromTrackers(backupTrackers); @@ -616,17 +625,18 @@ public class TrackerClient implements Runnable { // FIXME this needs to be in its own thread int rv = 0; DHT dht = _util.getDHT(); - if (dht != null && (meta == null || !meta.isPrivate()) && (!stop) && - _util.getContext().clock().now() > lastDHTAnnounce + MIN_DHT_ANNOUNCE_INTERVAL) { + if (dht != null && + (meta == null || !meta.isPrivate()) && + (!stop) && + (meta == null || _util.getContext().clock().now() > lastDHTAnnounce + MIN_DHT_ANNOUNCE_INTERVAL)) { int numwant; if (!coordinator.needOutboundPeers()) numwant = 1; else numwant = _util.getMaxConnections(); - Collection<Hash> hashes = dht.getPeers(snark.getInfoHash(), numwant, 2*60*1000); + Collection<Hash> hashes = dht.getPeers(snark.getInfoHash(), numwant, 5*60*1000); if (!hashes.isEmpty()) { runStarted = true; - lastDHTAnnounce = _util.getContext().clock().now(); rv = hashes.size(); } if (_log.shouldLog(Log.INFO)) @@ -638,6 +648,10 @@ public class TrackerClient implements Runnable { int good = dht.announce(snark.getInfoHash(), 1, 5*60*1000); if (_log.shouldLog(Log.INFO)) _log.info("Sent " + good + " good announces to DHT"); + if (good > 0) + lastDHTAnnounce = _util.getContext().clock().now(); + else + lastDHTAnnounce = 0; } // now try these peers diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java index 90ae9a7561..72ea891567 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java @@ -149,6 +149,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private static final long MAX_MSGID_AGE = 2*60*1000; /** how long since sent do we wait for a reply */ private static final long DEFAULT_QUERY_TIMEOUT = 75*1000; + private static final long DEST_LOOKUP_TIMEOUT = 10*1000; /** stagger with other cleaners */ private static final long CLEAN_TIME = 63*1000; private static final long EXPLORE_TIME = 877*1000; @@ -327,18 +328,22 @@ public class KRPC implements I2PSessionMuxedListener, DHT { rv = new HashSet(rv); long endTime = _context.clock().now() + maxWait; + // needs to be much higher than log(size) since many lookups will fail + // at first and we will give up too early + int maxNodes = 30; // Initial set to try, will get added to as we go - int maxNodes = 12; List<NodeInfo> nodes = _knownNodes.findClosest(iHash, maxNodes); SortedSet<NodeInfo> toTry = new TreeSet(new NodeInfoComparator(iHash)); toTry.addAll(nodes); Set<NodeInfo> tried = new HashSet(); if (_log.shouldLog(Log.INFO)) - _log.info("Starting getPeers for " + iHash + " with " + nodes.size() + " to try"); + _log.info("Starting getPeers for " + iHash + " (b64: " + new NID(ih) + ") " + " with " + nodes.size() + " to try"); for (int i = 0; i < maxNodes; i++) { if (!_isRunning) break; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Now to try: " + toTry); NodeInfo nInfo; try { nInfo = toTry.first(); @@ -347,13 +352,15 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } toTry.remove(nInfo); tried.add(nInfo); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Try " + i + ": " + nInfo); ReplyWaiter waiter = sendGetPeers(nInfo, iHash); if (waiter == null) continue; synchronized(waiter) { try { - waiter.wait(Math.max(20*1000, (Math.min(40*1000, endTime - _context.clock().now())))); + waiter.wait(Math.max(30*1000, (Math.min(45*1000, endTime - _context.clock().now())))); } catch (InterruptedException ie) {} } @@ -370,10 +377,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT { List<Hash> reply = (List<Hash>) waiter.getReplyObject(); if (!reply.isEmpty()) { for (int j = 0; j < reply.size() && rv.size() < max; j++) { - rv.add(reply.get(j)); + Hash h = reply.get(j); + if (!h.equals(_myNodeInfo.getHash())) + rv.add(h); } if (_log.shouldLog(Log.INFO)) - _log.info("Finished get Peers, got " + rv.size() + " from DHT, returning " + reply.size()); + _log.info("Finished get Peers, got " + reply.size() + " from DHT, returning " + rv.size()); return rv; } } else if (replyType == REPLY_NODES) { @@ -891,7 +900,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { _log.info("looking up dest for " + nInfo); try { // use a short timeout for now - Destination dest = _session.lookupDest(nInfo.getHash(), 5*1000); + Destination dest = _session.lookupDest(nInfo.getHash(), DEST_LOOKUP_TIMEOUT); if (dest != null) { nInfo.setDestination(dest); if (_log.shouldLog(Log.INFO)) @@ -1190,6 +1199,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { _log.info("Stored new OB token: " + token + " for: " + nInfo); List<Hash> peers = _tracker.getPeers(ih, MAX_WANT); + peers.remove(nInfo.getHash()); // him if (peers.isEmpty()) { // similar to find node, but with token // get closest from DHT @@ -1203,10 +1213,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { sendNodes(nInfo, msgID, token, nodeArray); } else { List<byte[]> hashes = new ArrayList(peers.size()); - Hash him = nInfo.getHash(); for (Hash peer : peers) { - if (!peer.equals(him)) - hashes.add(peer.getData()); + hashes.add(peer.getData()); } sendPeers(nInfo, msgID, token, hashes); } @@ -1324,7 +1332,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { break; } if (_log.shouldLog(Log.INFO)) - _log.info("Rcvd peers from: " + nInfo + ": " + DataHelper.toString(rv)); + _log.info("Rcvd " + peers.size() + " peers from: " + nInfo + ": " + DataHelper.toString(rv)); return rv; } -- GitLab