diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java index bdf0891fc3ffea628aa1a51a6924de05661032dd..6dd9572d63354b6dd7fb4283316e41d7c29e537c 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java @@ -38,6 +38,7 @@ import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.data.Hash; import net.i2p.data.SimpleDataStructure; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; import net.i2p.util.SimpleTimer2; @@ -97,6 +98,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private final ConcurrentHashMap<Token, NodeInfo> _outgoingTokens; /** index to incoming opaque tokens, received in a peers or nodes reply */ private final ConcurrentHashMap<NID, Token> _incomingTokens; + /** recently unreachable, with lastSeen() as the added-to-blacklist time */ + private final Set<NID> _blacklist; /** hook to inject and receive datagrams */ private final I2PSession _session; @@ -147,6 +150,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { /** stagger with other cleaners */ private static final long CLEAN_TIME = 63*1000; private static final long EXPLORE_TIME = 877*1000; + private static final long BLACKLIST_CLEAN_TIME = 17*60*1000; private static final String DHT_FILE = "i2psnark.dht.dat"; private static final int SEND_CRYPTO_TAGS = 8; @@ -161,6 +165,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { _sentQueries = new ConcurrentHashMap(); _outgoingTokens = new ConcurrentHashMap(); _incomingTokens = new ConcurrentHashMap(); + _blacklist = new ConcurrentHashSet(); // Construct my NodeInfo // Pick ports over a big range to marginally increase security @@ -262,13 +267,13 @@ public class KRPC implements I2PSessionMuxedListener, DHT { int replyType = waiter.getReplyCode(); if (replyType == REPLY_NONE) { - if (_log.shouldLog(Log.INFO)) - _log.info("Got no reply"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got no reply"); } else if (replyType == REPLY_NODES) { List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject(); // It seems like we are just going to get back ourselves all the time - if (_log.shouldLog(Log.INFO)) - _log.info("Got " + reply.size() + " nodes"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got " + reply.size() + " nodes"); for (NodeInfo ni : reply) { if (! (ni.equals(_myNodeInfo) || (toTry.contains(ni) && tried.contains(ni)))) toTry.add(ni); @@ -348,14 +353,14 @@ public class KRPC implements I2PSessionMuxedListener, DHT { int replyType = waiter.getReplyCode(); if (replyType == REPLY_NONE) { - if (_log.shouldLog(Log.INFO)) - _log.info("Got no reply"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got no reply"); } else if (replyType == REPLY_PONG) { - if (_log.shouldLog(Log.INFO)) - _log.info("Got pong"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got pong"); } else if (replyType == REPLY_PEERS) { - if (_log.shouldLog(Log.INFO)) - _log.info("Got peers"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got peers"); List<Hash> reply = (List<Hash>) waiter.getReplyObject(); if (!reply.isEmpty()) { for (int j = 0; j < reply.size() && rv.size() < max; j++) { @@ -367,8 +372,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } } else if (replyType == REPLY_NODES) { List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject(); - if (_log.shouldLog(Log.INFO)) - _log.info("Got " + reply.size() + " nodes"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got " + reply.size() + " nodes"); for (NodeInfo ni : reply) { if (! (ni.equals(_myNodeInfo) || tried.contains(ni) || toTry.contains(ni))) toTry.add(ni); @@ -576,6 +581,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { } _outgoingTokens.clear(); _incomingTokens.clear(); + _blacklist.clear(); } /** @@ -592,7 +598,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { */ public String renderStatusHTML() { long uptime = Math.max(1000, _context.clock().now() - _started); - StringBuilder buf = new StringBuilder(); + StringBuilder buf = new StringBuilder(256); buf.append("<br><b>DHT DEBUG</b><br>TX: ").append(_txPkts.get()).append(" pkts / ") .append(DataHelper.formatSize2(_txBytes.get())).append("B / ") .append(DataHelper.formatSize2(_txBytes.get() * 1000 / uptime)).append("Bps<br>" + @@ -600,6 +606,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { .append(DataHelper.formatSize2(_rxBytes.get())).append("B / ") .append(DataHelper.formatSize2(_rxBytes.get() * 1000 / uptime)).append("Bps<br>" + "DHT Peers: ").append( _knownNodes.size()).append("<br>" + + "Blacklisted: ").append(_blacklist.size()).append("<br>" + "Sent tokens: ").append(_outgoingTokens.size()).append("<br>" + "Rcvd tokens: ").append(_incomingTokens.size()).append("<br>" + "Pending queries: ").append(_sentQueries.size()).append("<br>"); @@ -1079,7 +1086,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (oldInfo.getDestination() == null && nInfo.getDestination() != null) oldInfo.setDestination(nInfo.getDestination()); } - oldInfo.getNID().setLastSeen(); + nID = oldInfo.getNID(); + nID.setLastSeen(); + if (_blacklist.remove(nID)) { + if (_log.shouldLog(Log.INFO)) + _log.info("UN-blacklisted: " + nID); + } return oldInfo; } @@ -1109,6 +1121,13 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (_log.shouldLog(Log.INFO)) _log.info("Removed after consecutive timeouts: " + nInfo); } + if (!_blacklist.contains(nid)) { + // used as when-added time + nid.setLastSeen(); + _blacklist.add(nid); + if (_log.shouldLog(Log.INFO)) + _log.info("Blacklisted: " + nid); + } } } @@ -1223,11 +1242,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { byte[] tok = btok.getBytes(); Token token = new Token(_context, tok); _incomingTokens.put(nInfo.getNID(), token); - if (_log.shouldLog(Log.INFO)) - _log.info("Got token: " + token + ", must be a response to get_peers"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got token: " + token + ", must be a response to get_peers"); } else { - if (_log.shouldLog(Log.INFO)) - _log.info("No token and saved infohash, must be a response to find_node"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("No token and saved infohash, must be a response to find_node"); } } @@ -1259,6 +1278,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { List<NodeInfo> rv = new ArrayList(ids.length / NodeInfo.LENGTH); for (int off = 0; off < ids.length; off += NodeInfo.LENGTH) { NodeInfo nInf = new NodeInfo(ids, off); + if (_blacklist.contains(nInf.getNID())) { + if (_log.shouldLog(Log.INFO)) + _log.info("Ignoring blacklisted " + nInf.getNID() + " from: " + nInfo); + continue; + } nInf = heardAbout(nInf); rv.add(nInf); } @@ -1501,6 +1525,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { long now = _context.clock().now(); if (_log.shouldLog(Log.DEBUG)) _log.debug("KRPC cleaner starting with " + + _blacklist.size() + " in blacklist, " + _outgoingTokens.size() + " sent Tokens, " + _incomingTokens.size() + " rcvd Tokens"); for (Iterator<Token> iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) { @@ -1513,9 +1538,16 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (tok.lastSeen() < now - MAX_INBOUND_TOKEN_AGE) iter.remove(); } + for (Iterator<NID> iter = _blacklist.iterator(); iter.hasNext(); ) { + NID nid = iter.next(); + // lastSeen() is actually when-added + if (now > nid.lastSeen() + BLACKLIST_CLEAN_TIME) + iter.remove(); + } // TODO sent queries? if (_log.shouldLog(Log.DEBUG)) _log.debug("KRPC cleaner done, now with " + + _blacklist.size() + " in blacklist, " + _outgoingTokens.size() + " sent Tokens, " + _incomingTokens.size() + " rcvd Tokens, " + _knownNodes.size() + " known peers, " + diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/NID.java b/apps/i2psnark/java/src/org/klomp/snark/dht/NID.java index 44aa29c8560a31a32f352dcd34feaef803a06c82..f1b7b221255eb1ec37af1031089eb367b47fb594 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/NID.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/NID.java @@ -18,7 +18,7 @@ public class NID extends SHA1Hash { private long lastSeen; private int fails; - private static final int MAX_FAILS = 3; + private static final int MAX_FAILS = 2; public NID() { super(null); @@ -41,6 +41,6 @@ public class NID extends SHA1Hash { * @return if more than max timeouts */ public boolean timeout() { - return fails++ > MAX_FAILS; + return ++fails > MAX_FAILS; } }