From 1219dadbd54f33098674ae613e73c93fdce90bfa Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Fri, 12 Aug 2005 23:54:46 +0000 Subject: [PATCH] 2005-08-12 jrandom * Keep detailed stats on the peer testing, publishing the results in the netDb. * Don't overwrite the status with 'unknown' unless we haven't had a valid status in a while. * Make sure to avoid shitlisted peers for peer testing. * When we get an unknown result to a peer test, try again soon afterwards. * When a peer tells us that our address is different from what we expect, if we've done a recent peer test with a result of OK, fire off a peer test to make sure our IP/port is still valid. If our test is old or the result was not OK, accept their suggestion, but queue up a peer test for later. * Don't try to do a netDb store to a shitlisted peer, and adjust the way we monitor netDb store progress (to clear up the high netDb.storePeers stat) --- history.txt | 18 ++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../src/net/i2p/router/StatisticsManager.java | 7 + .../router/networkdb/kademlia/StoreJob.java | 12 +- .../router/networkdb/kademlia/StoreState.java | 6 + .../i2p/router/peermanager/PeerTestJob.java | 13 +- .../i2p/router/transport/udp/PeerState.java | 2 +- .../router/transport/udp/UDPTransport.java | 121 +++++++++++++++--- 8 files changed, 153 insertions(+), 30 deletions(-) diff --git a/history.txt b/history.txt index eda862f00a..afce019646 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,20 @@ -$Id: history.txt,v 1.224 2005/08/08 15:35:51 jrandom Exp $ +$Id: history.txt,v 1.225 2005/08/10 18:55:41 jrandom Exp $ + +2005-08-12 jrandom + * Keep detailed stats on the peer testing, publishing the results in the + netDb. + * Don't overwrite the status with 'unknown' unless we haven't had a valid + status in a while. + * Make sure to avoid shitlisted peers for peer testing. + * When we get an unknown result to a peer test, try again soon afterwards. + * When a peer tells us that our address is different from what we expect, + if we've done a recent peer test with a result of OK, fire off a peer + test to make sure our IP/port is still valid. If our test is old or the + result was not OK, accept their suggestion, but queue up a peer test for + later. + * Don't try to do a netDb store to a shitlisted peer, and adjust the way + we monitor netDb store progress (to clear up the high netDb.storePeers + stat) 2005-08-10 jrandom * Deployed the peer testing implementation to be run every few minutes on diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 2759afd4ac..6b9b86bbb6 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.213 $ $Date: 2005/08/08 15:35:50 $"; + public final static String ID = "$Revision: 1.214 $ $Date: 2005/08/10 18:55:41 $"; public final static String VERSION = "0.6.0.2"; - public final static long BUILD = 1; + public final static long BUILD = 2; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index ff49f8e0d8..5fc76612e7 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -122,6 +122,13 @@ public class StatisticsManager implements Service { //includeRate("router.throttleTunnelProcessingTime1m", stats, new long[] { 60*60*1000 }); includeRate("router.fastPeers", stats, new long[] { 60*60*1000 }); + + includeRate("udp.statusOK", stats, new long[] { 20*60*1000 }); + includeRate("udp.statusDifferent", stats, new long[] { 20*60*1000 }); + includeRate("udp.statusReject", stats, new long[] { 20*60*1000 }); + includeRate("udp.statusUnknown", stats, new long[] { 20*60*1000 }); + includeRate("udp.addressUpdated", stats, new long[] { 1*60*1000 }); + includeRate("udp.addressTestInsteadOfUpdate", stats, new long[] { 1*60*1000 }); includeRate("clock.skew", stats, new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*1000 }); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index 5b9309c129..cce346bbb5 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -38,7 +38,7 @@ class StoreJob extends JobImpl { private long _expiration; private PeerSelector _peerSelector; - private final static int PARALLELIZATION = 3; // how many sent at a time + private final static int PARALLELIZATION = 6; // how many sent at a time private final static int REDUNDANCY = 6; // we want the data sent to 6 peers /** * additionally send to 1 outlier(s), in case all of the routers chosen in our @@ -146,7 +146,7 @@ class StoreJob extends JobImpl { return; } } else { - _state.addPending(closestHashes); + //_state.addPending(closestHashes); if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Continue sending key " + _state.getTarget() + " after " + _state.getAttempted().size() + " tries to " + closestHashes); for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { @@ -155,8 +155,14 @@ class StoreJob extends JobImpl { if ( (ds == null) || !(ds instanceof RouterInfo) ) { if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds); + _state.addSkipped(peer); } else { - sendStore((RouterInfo)ds); + if (getContext().shitlist().isShitlisted(((RouterInfo)ds).getIdentity().calculateHash())) { + _state.addSkipped(peer); + } else { + _state.addPending(peer); + sendStore((RouterInfo)ds); + } } } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java index 9f76105da3..e7bfe32926 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java @@ -102,6 +102,12 @@ class StoreState { _attemptedPeers.addAll(pending); } } + /** we aren't even going to try to contact this peer */ + public void addSkipped(Hash peer) { + synchronized (_attemptedPeers) { + _attemptedPeers.add(peer); + } + } public long confirmed(Hash peer) { long rv = -1; diff --git a/router/java/src/net/i2p/router/peermanager/PeerTestJob.java b/router/java/src/net/i2p/router/peermanager/PeerTestJob.java index 9005f739d1..7d425d21e2 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerTestJob.java +++ b/router/java/src/net/i2p/router/peermanager/PeerTestJob.java @@ -147,7 +147,7 @@ public class PeerTestJob extends JobImpl { ReplySelector sel = new ReplySelector(peer.getIdentity().getHash(), nonce, expiration); PeerReplyFoundJob reply = new PeerReplyFoundJob(getContext(), peer, inTunnel, outTunnel); - PeerReplyTimeoutJob timeoutJob = new PeerReplyTimeoutJob(getContext(), peer, inTunnel, outTunnel); + PeerReplyTimeoutJob timeoutJob = new PeerReplyTimeoutJob(getContext(), peer, inTunnel, outTunnel, sel); getContext().messageRegistry().registerPending(sel, reply, timeoutJob, timeoutMs); getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, null, peer.getIdentity().getHash()); @@ -193,10 +193,12 @@ public class PeerTestJob extends JobImpl { private long _expiration; private long _nonce; private Hash _peer; + private boolean _matchFound; public ReplySelector(Hash peer, long nonce, long expiration) { _nonce = nonce; _expiration = expiration; _peer = peer; + _matchFound = false; } public boolean continueMatching() { return false; } public long getExpiration() { return _expiration; } @@ -213,11 +215,13 @@ public class PeerTestJob extends JobImpl { } else { getContext().statManager().addRateData("peer.testOK", getTestTimeout() - timeLeft, 0); } + _matchFound = true; return true; } } return false; } + public boolean matchFound() { return _matchFound; } public String toString() { StringBuffer buf = new StringBuffer(64); buf.append("Test peer ").append(_peer.toBase64().substring(0,4)); @@ -268,15 +272,20 @@ public class PeerTestJob extends JobImpl { private RouterInfo _peer; private TunnelInfo _replyTunnel; private TunnelInfo _sendTunnel; - public PeerReplyTimeoutJob(RouterContext context, RouterInfo peer, TunnelInfo replyTunnel, TunnelInfo sendTunnel) { + private ReplySelector _selector; + public PeerReplyTimeoutJob(RouterContext context, RouterInfo peer, TunnelInfo replyTunnel, TunnelInfo sendTunnel, ReplySelector sel) { super(context); _peer = peer; _replyTunnel = replyTunnel; _sendTunnel = sendTunnel; + _selector = sel; } public String getName() { return "Peer test failed"; } private boolean getShouldFailPeer() { return true; } public void runJob() { + if (_selector.matchFound()) + return; + if (getShouldFailPeer()) getContext().profileManager().dbLookupFailed(_peer.getIdentity().getHash()); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 4e3765e3e2..7aea8613f0 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -164,7 +164,7 @@ public class PeerState { */ private static final int DEFAULT_MTU = 608;//600; //1500; private static final int MIN_RTO = 1000 + ACKSender.ACK_FREQUENCY; - private static final int MAX_RTO = 2000; // 5000; + private static final int MAX_RTO = 3000; // 5000; public PeerState(I2PAppContext ctx) { _context = ctx; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 1325c779cc..6a20e31266 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -67,6 +67,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private ExpirePeerEvent _expireEvent; private PeerTestEvent _testEvent; private short _reachabilityStatus; + private long _reachabilityStatusLastUpdated; /** list of RelayPeer objects for people who will relay to us */ private List _relayPeers; @@ -152,6 +153,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.droppedPeerInactive", "How long ago did we receive from a dropped peer (duration == session lifetime)", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.peersByCapacity", "How many peers of the given capacity were available to pick between? (duration == (int)capacity)", "udp", new long[] { 1*60*1000, 5*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.statusOK", "How many times the peer test returned OK", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.statusDifferent", "How many times the peer test returned different IP/ports", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.statusReject", "How many times the peer test returned reject unsolicited", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.statusUnknown", "How many times the peer test returned an unknown result", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.addressTestInsteadOfUpdate", "How many times we fire off a peer test of ourselves instead of adjusting our own reachable address?", "udp", new long[] { 1*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.addressUpdated", "How many times we adjust our own reachable IP address", "udp", new long[] { 1*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 }); } public void startup() { @@ -290,28 +298,46 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority boolean fixedPort = getIsPortFixed(); boolean updated = false; + boolean fireTest = false; synchronized (this) { if ( (_externalListenHost == null) || (!eq(_externalListenHost.getAddress(), _externalListenPort, ourIP, ourPort)) ) { - try { - _externalListenHost = InetAddress.getByAddress(ourIP); - if (!fixedPort) - _externalListenPort = ourPort; - rebuildExternalAddress(); - replaceAddress(_externalAddress); - updated = true; - } catch (UnknownHostException uhe) { - _externalListenHost = null; + if ( (_reachabilityStatus != CommSystemFacade.STATUS_OK) || + (_context.clock().now() - _reachabilityStatusLastUpdated > 2*TEST_FREQUENCY) ) { + // they told us something different and our tests are either old or failing + try { + _externalListenHost = InetAddress.getByAddress(ourIP); + if (!fixedPort) + _externalListenPort = ourPort; + rebuildExternalAddress(); + replaceAddress(_externalAddress); + updated = true; + } catch (UnknownHostException uhe) { + _externalListenHost = null; + } + } else { + // they told us something different, but our tests are recent and positive, + // so lets test again + fireTest = true; } + } else { + // matched what we expect } } - if (!fixedPort) - _context.router().setConfigSetting(PROP_EXTERNAL_PORT, ourPort+""); - _context.router().saveConfig(); - - if (updated) + if (fireTest) { + _context.statManager().addRateData("udp.addressTestInsteadOfUpdate", 1, 0); + _testEvent.forceRun(); + SimpleTimer.getInstance().addEvent(_testEvent, 5*1000); + } else if (updated) { + _context.statManager().addRateData("udp.addressUpdated", 1, 0); + if (!fixedPort) + _context.router().setConfigSetting(PROP_EXTERNAL_PORT, ourPort+""); + _context.router().saveConfig(); _context.router().rebuildRouterInfo(); + _testEvent.forceRun(); + SimpleTimer.getInstance().addEvent(_testEvent, 5*1000); + } } private static final boolean eq(byte laddr[], int lport, byte raddr[], int rport) { @@ -355,12 +381,25 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public PeerState getPeerState(char capacity) { int index = _context.random().nextInt(1024); List peers = _peersByCapacity[capacity-'A']; - synchronized (peers) { - int size = peers.size(); - if (size <= 0) return null; - index = index % size; - return (PeerState)peers.get(index); + int size = 0; + PeerState rv = null; + for (int i = 0; i < 5; i++) { + synchronized (peers) { + size = peers.size(); + if (size > 0) { + index = (index + i) % size; + rv = (PeerState)peers.get(index); + } + } + if (rv == null) + break; + if (_context.shitlist().isShitlisted(rv.getRemotePeer())) + rv = null; + else + break; } + _context.statManager().addRateData("udp.peersByCapacity", size, capacity); + return rv; } private static final int MAX_PEERS_PER_CAPACITY = 16; @@ -991,7 +1030,44 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } - void setReachabilityStatus(short status) { _reachabilityStatus = status; } + /** + * If we haven't had a non-unknown test result in 5 minutes, we really dont know. Otherwise, + * when we receive an unknown we should ignore that value and try again (with different peers) + * + */ + private static final long STATUS_GRACE_PERIOD = 5*60*1000; + + void setReachabilityStatus(short status) { + long now = _context.clock().now(); + switch (status) { + case CommSystemFacade.STATUS_OK: + _context.statManager().addRateData("udp.statusOK", 1, 0); + _reachabilityStatus = status; + _reachabilityStatusLastUpdated = now; + break; + case CommSystemFacade.STATUS_DIFFERENT: + _context.statManager().addRateData("udp.statusDifferent", 1, 0); + _reachabilityStatus = status; + _reachabilityStatusLastUpdated = now; + break; + case CommSystemFacade.STATUS_REJECT_UNSOLICITED: + _context.statManager().addRateData("udp.statusReject", 1, 0); + _reachabilityStatus = status; + _reachabilityStatusLastUpdated = now; + break; + case CommSystemFacade.STATUS_UNKNOWN: + default: + _context.statManager().addRateData("udp.statusUnknown", 1, 0); + if (now - _reachabilityStatusLastUpdated < STATUS_GRACE_PERIOD) { + _testEvent.forceRun(); + SimpleTimer.getInstance().addEvent(_testEvent, 5*1000); + } else { + _reachabilityStatus = status; + _reachabilityStatusLastUpdated = now; + } + break; + } + } public short getReachabilityStatus() { return _reachabilityStatus; } public void recheckReachability() { _testEvent.runTest(); @@ -1009,11 +1085,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private boolean _alive; /** when did we last test our reachability */ private long _lastTested; + private boolean _forceRun; public void timeReached() { if (shouldTest()) { long now = _context.clock().now(); - if (now - _lastTested >= TEST_FREQUENCY) { + if ( (_forceRun) || (now - _lastTested >= TEST_FREQUENCY) ) { runTest(); } } @@ -1036,6 +1113,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _lastTested = _context.clock().now(); } + private void forceRun() { _forceRun = true; } + public void setIsAlive(boolean isAlive) { _alive = isAlive; if (isAlive) { -- GitLab