I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 78203aac authored by zzz's avatar zzz
Browse files

* i2psnark:

    - Implement blacklist for unreachable DHT peers
    - Reduce threshold for unreachable
    - Log tweaks
parent 3c95f0b6
No related branches found
No related tags found
No related merge requests found
...@@ -38,6 +38,7 @@ import net.i2p.data.DataHelper; ...@@ -38,6 +38,7 @@ import net.i2p.data.DataHelper;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.SimpleDataStructure; import net.i2p.data.SimpleDataStructure;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2; import net.i2p.util.SimpleTimer2;
...@@ -97,6 +98,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -97,6 +98,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private final ConcurrentHashMap<Token, NodeInfo> _outgoingTokens; private final ConcurrentHashMap<Token, NodeInfo> _outgoingTokens;
/** index to incoming opaque tokens, received in a peers or nodes reply */ /** index to incoming opaque tokens, received in a peers or nodes reply */
private final ConcurrentHashMap<NID, Token> _incomingTokens; private final ConcurrentHashMap<NID, Token> _incomingTokens;
/** recently unreachable, with lastSeen() as the added-to-blacklist time */
private final Set<NID> _blacklist;
/** hook to inject and receive datagrams */ /** hook to inject and receive datagrams */
private final I2PSession _session; private final I2PSession _session;
...@@ -147,6 +150,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -147,6 +150,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
/** stagger with other cleaners */ /** stagger with other cleaners */
private static final long CLEAN_TIME = 63*1000; private static final long CLEAN_TIME = 63*1000;
private static final long EXPLORE_TIME = 877*1000; private static final long EXPLORE_TIME = 877*1000;
private static final long BLACKLIST_CLEAN_TIME = 17*60*1000;
private static final String DHT_FILE = "i2psnark.dht.dat"; private static final String DHT_FILE = "i2psnark.dht.dat";
private static final int SEND_CRYPTO_TAGS = 8; private static final int SEND_CRYPTO_TAGS = 8;
...@@ -161,6 +165,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -161,6 +165,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
_sentQueries = new ConcurrentHashMap(); _sentQueries = new ConcurrentHashMap();
_outgoingTokens = new ConcurrentHashMap(); _outgoingTokens = new ConcurrentHashMap();
_incomingTokens = new ConcurrentHashMap(); _incomingTokens = new ConcurrentHashMap();
_blacklist = new ConcurrentHashSet();
// Construct my NodeInfo // Construct my NodeInfo
// Pick ports over a big range to marginally increase security // Pick ports over a big range to marginally increase security
...@@ -262,13 +267,13 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -262,13 +267,13 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
int replyType = waiter.getReplyCode(); int replyType = waiter.getReplyCode();
if (replyType == REPLY_NONE) { if (replyType == REPLY_NONE) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Got no reply"); _log.debug("Got no reply");
} else if (replyType == REPLY_NODES) { } else if (replyType == REPLY_NODES) {
List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject(); List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject();
// It seems like we are just going to get back ourselves all the time // It seems like we are just going to get back ourselves all the time
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Got " + reply.size() + " nodes"); _log.debug("Got " + reply.size() + " nodes");
for (NodeInfo ni : reply) { for (NodeInfo ni : reply) {
if (! (ni.equals(_myNodeInfo) || (toTry.contains(ni) && tried.contains(ni)))) if (! (ni.equals(_myNodeInfo) || (toTry.contains(ni) && tried.contains(ni))))
toTry.add(ni); toTry.add(ni);
...@@ -348,14 +353,14 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -348,14 +353,14 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
int replyType = waiter.getReplyCode(); int replyType = waiter.getReplyCode();
if (replyType == REPLY_NONE) { if (replyType == REPLY_NONE) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Got no reply"); _log.debug("Got no reply");
} else if (replyType == REPLY_PONG) { } else if (replyType == REPLY_PONG) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Got pong"); _log.debug("Got pong");
} else if (replyType == REPLY_PEERS) { } else if (replyType == REPLY_PEERS) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Got peers"); _log.debug("Got peers");
List<Hash> reply = (List<Hash>) waiter.getReplyObject(); List<Hash> reply = (List<Hash>) waiter.getReplyObject();
if (!reply.isEmpty()) { if (!reply.isEmpty()) {
for (int j = 0; j < reply.size() && rv.size() < max; j++) { for (int j = 0; j < reply.size() && rv.size() < max; j++) {
...@@ -367,8 +372,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -367,8 +372,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
} }
} else if (replyType == REPLY_NODES) { } else if (replyType == REPLY_NODES) {
List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject(); List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject();
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Got " + reply.size() + " nodes"); _log.debug("Got " + reply.size() + " nodes");
for (NodeInfo ni : reply) { for (NodeInfo ni : reply) {
if (! (ni.equals(_myNodeInfo) || tried.contains(ni) || toTry.contains(ni))) if (! (ni.equals(_myNodeInfo) || tried.contains(ni) || toTry.contains(ni)))
toTry.add(ni); toTry.add(ni);
...@@ -576,6 +581,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -576,6 +581,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
} }
_outgoingTokens.clear(); _outgoingTokens.clear();
_incomingTokens.clear(); _incomingTokens.clear();
_blacklist.clear();
} }
/** /**
...@@ -592,7 +598,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -592,7 +598,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
*/ */
public String renderStatusHTML() { public String renderStatusHTML() {
long uptime = Math.max(1000, _context.clock().now() - _started); long uptime = Math.max(1000, _context.clock().now() - _started);
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder(256);
buf.append("<br><b>DHT DEBUG</b><br>TX: ").append(_txPkts.get()).append(" pkts / ") buf.append("<br><b>DHT DEBUG</b><br>TX: ").append(_txPkts.get()).append(" pkts / ")
.append(DataHelper.formatSize2(_txBytes.get())).append("B / ") .append(DataHelper.formatSize2(_txBytes.get())).append("B / ")
.append(DataHelper.formatSize2(_txBytes.get() * 1000 / uptime)).append("Bps<br>" + .append(DataHelper.formatSize2(_txBytes.get() * 1000 / uptime)).append("Bps<br>" +
...@@ -600,6 +606,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -600,6 +606,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
.append(DataHelper.formatSize2(_rxBytes.get())).append("B / ") .append(DataHelper.formatSize2(_rxBytes.get())).append("B / ")
.append(DataHelper.formatSize2(_rxBytes.get() * 1000 / uptime)).append("Bps<br>" + .append(DataHelper.formatSize2(_rxBytes.get() * 1000 / uptime)).append("Bps<br>" +
"DHT Peers: ").append( _knownNodes.size()).append("<br>" + "DHT Peers: ").append( _knownNodes.size()).append("<br>" +
"Blacklisted: ").append(_blacklist.size()).append("<br>" +
"Sent tokens: ").append(_outgoingTokens.size()).append("<br>" + "Sent tokens: ").append(_outgoingTokens.size()).append("<br>" +
"Rcvd tokens: ").append(_incomingTokens.size()).append("<br>" + "Rcvd tokens: ").append(_incomingTokens.size()).append("<br>" +
"Pending queries: ").append(_sentQueries.size()).append("<br>"); "Pending queries: ").append(_sentQueries.size()).append("<br>");
...@@ -1079,7 +1086,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -1079,7 +1086,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (oldInfo.getDestination() == null && nInfo.getDestination() != null) if (oldInfo.getDestination() == null && nInfo.getDestination() != null)
oldInfo.setDestination(nInfo.getDestination()); oldInfo.setDestination(nInfo.getDestination());
} }
oldInfo.getNID().setLastSeen(); nID = oldInfo.getNID();
nID.setLastSeen();
if (_blacklist.remove(nID)) {
if (_log.shouldLog(Log.INFO))
_log.info("UN-blacklisted: " + nID);
}
return oldInfo; return oldInfo;
} }
...@@ -1109,6 +1121,13 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -1109,6 +1121,13 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Removed after consecutive timeouts: " + nInfo); _log.info("Removed after consecutive timeouts: " + nInfo);
} }
if (!_blacklist.contains(nid)) {
// used as when-added time
nid.setLastSeen();
_blacklist.add(nid);
if (_log.shouldLog(Log.INFO))
_log.info("Blacklisted: " + nid);
}
} }
} }
...@@ -1223,11 +1242,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -1223,11 +1242,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
byte[] tok = btok.getBytes(); byte[] tok = btok.getBytes();
Token token = new Token(_context, tok); Token token = new Token(_context, tok);
_incomingTokens.put(nInfo.getNID(), token); _incomingTokens.put(nInfo.getNID(), token);
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("Got token: " + token + ", must be a response to get_peers"); _log.debug("Got token: " + token + ", must be a response to get_peers");
} else { } else {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.DEBUG))
_log.info("No token and saved infohash, must be a response to find_node"); _log.debug("No token and saved infohash, must be a response to find_node");
} }
} }
...@@ -1259,6 +1278,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -1259,6 +1278,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
List<NodeInfo> rv = new ArrayList(ids.length / NodeInfo.LENGTH); List<NodeInfo> rv = new ArrayList(ids.length / NodeInfo.LENGTH);
for (int off = 0; off < ids.length; off += NodeInfo.LENGTH) { for (int off = 0; off < ids.length; off += NodeInfo.LENGTH) {
NodeInfo nInf = new NodeInfo(ids, off); NodeInfo nInf = new NodeInfo(ids, off);
if (_blacklist.contains(nInf.getNID())) {
if (_log.shouldLog(Log.INFO))
_log.info("Ignoring blacklisted " + nInf.getNID() + " from: " + nInfo);
continue;
}
nInf = heardAbout(nInf); nInf = heardAbout(nInf);
rv.add(nInf); rv.add(nInf);
} }
...@@ -1501,6 +1525,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -1501,6 +1525,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
long now = _context.clock().now(); long now = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("KRPC cleaner starting with " + _log.debug("KRPC cleaner starting with " +
_blacklist.size() + " in blacklist, " +
_outgoingTokens.size() + " sent Tokens, " + _outgoingTokens.size() + " sent Tokens, " +
_incomingTokens.size() + " rcvd Tokens"); _incomingTokens.size() + " rcvd Tokens");
for (Iterator<Token> iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) { for (Iterator<Token> iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) {
...@@ -1513,9 +1538,16 @@ public class KRPC implements I2PSessionMuxedListener, DHT { ...@@ -1513,9 +1538,16 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (tok.lastSeen() < now - MAX_INBOUND_TOKEN_AGE) if (tok.lastSeen() < now - MAX_INBOUND_TOKEN_AGE)
iter.remove(); iter.remove();
} }
for (Iterator<NID> iter = _blacklist.iterator(); iter.hasNext(); ) {
NID nid = iter.next();
// lastSeen() is actually when-added
if (now > nid.lastSeen() + BLACKLIST_CLEAN_TIME)
iter.remove();
}
// TODO sent queries? // TODO sent queries?
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("KRPC cleaner done, now with " + _log.debug("KRPC cleaner done, now with " +
_blacklist.size() + " in blacklist, " +
_outgoingTokens.size() + " sent Tokens, " + _outgoingTokens.size() + " sent Tokens, " +
_incomingTokens.size() + " rcvd Tokens, " + _incomingTokens.size() + " rcvd Tokens, " +
_knownNodes.size() + " known peers, " + _knownNodes.size() + " known peers, " +
......
...@@ -18,7 +18,7 @@ public class NID extends SHA1Hash { ...@@ -18,7 +18,7 @@ public class NID extends SHA1Hash {
private long lastSeen; private long lastSeen;
private int fails; private int fails;
private static final int MAX_FAILS = 3; private static final int MAX_FAILS = 2;
public NID() { public NID() {
super(null); super(null);
...@@ -41,6 +41,6 @@ public class NID extends SHA1Hash { ...@@ -41,6 +41,6 @@ public class NID extends SHA1Hash {
* @return if more than max timeouts * @return if more than max timeouts
*/ */
public boolean timeout() { public boolean timeout() {
return fails++ > MAX_FAILS; return ++fails > MAX_FAILS;
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment