From 567ce84e1e6bb3bf563ef1bc4bf456d696279b3c Mon Sep 17 00:00:00 2001 From: jrandom Date: Mon, 25 Apr 2005 16:29:48 +0000 Subject: [PATCH] * randomized the shitlist duration (still with exponential backoff though) * fail UDP sessions after two consecutive failed messages in different minutes * honor UDP reconnections --- router/java/src/net/i2p/router/Shitlist.java | 2 +- .../udp/OutboundMessageFragments.java | 8 ++-- .../i2p/router/transport/udp/PeerState.java | 41 ++++++++--------- .../i2p/router/transport/udp/UDPFlooder.java | 6 +++ .../router/transport/udp/UDPTransport.java | 44 +++++++++++++++++-- 5 files changed, 70 insertions(+), 31 deletions(-) diff --git a/router/java/src/net/i2p/router/Shitlist.java b/router/java/src/net/i2p/router/Shitlist.java index 827daec96..7276984c9 100644 --- a/router/java/src/net/i2p/router/Shitlist.java +++ b/router/java/src/net/i2p/router/Shitlist.java @@ -63,7 +63,7 @@ public class Shitlist { if (_log.shouldLog(Log.INFO)) _log.info("Shitlisting router " + peer.toBase64(), new Exception("Shitlist cause")); - long period = SHITLIST_DURATION_MS; + long period = SHITLIST_DURATION_MS + _context.random().nextLong(SHITLIST_DURATION_MS); PeerProfile prof = _context.profileOrganizer().getProfile(peer); if (prof != null) period = SHITLIST_DURATION_MS << prof.incrementShitlists(); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 32619896a..0f9167988 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -136,7 +136,7 @@ public class OutboundMessageFragments { _context.statManager().addRateData("udp.sendFailed", state.getFragmentCount(), state.getLifetime()); if (state.getMessage() != null) { - _transport.failed(state.getMessage()); + _transport.failed(state); } else { // it can not have an OutNetMessage if the source is the // final after establishment message @@ -152,7 +152,7 @@ public class OutboundMessageFragments { // state.getPeer().congestionOccurred(); if (state.getMessage() != null) { - _transport.failed(state.getMessage()); + _transport.failed(state); } else { // it can not have an OutNetMessage if the source is the // final after establishment message @@ -192,9 +192,9 @@ public class OutboundMessageFragments { peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash()); if (peer == null) { - // peer disconnected (whatever that means) + // peer disconnected _activeMessages.remove(cur); - _transport.failed(state.getMessage()); + _transport.failed(state); if (_log.shouldLog(Log.WARN)) _log.warn("Peer disconnected for " + state); state.releaseResources(); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 7c6b14d44..d84c0f2f3 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -63,8 +63,10 @@ public class PeerState { private long _lastSendTime; /** when did we last receive a packet from them? */ private long _lastReceiveTime; - /** how many seconds have we sent packets without any ACKs received? */ - private int _consecutiveSendingSecondsWithoutACKs; + /** how many consecutive messages have we sent and not received an ACK to */ + private int _consecutiveFailedSends; + /** when did we last have a failed send */ + private long _lastFailedSendMinute; /** list of messageIds (Long) that we have received but not yet sent */ private List _currentACKs; /** when did we last send ACKs to the peer? */ @@ -212,7 +214,7 @@ public class PeerState { /** when did we last receive a packet from them? */ public long getLastReceiveTime() { return _lastReceiveTime; } /** how many seconds have we sent packets without any ACKs received? */ - public int getConsecutiveSendingSecondsWithoutACKS() { return _consecutiveSendingSecondsWithoutACKs; } + public int getConsecutiveFailedSends() { return _consecutiveFailedSends; } /** have we received a packet with the ECN bit set in the current second? */ public boolean getCurrentSecondECNReceived() { return _currentSecondECNReceived; } /** @@ -303,22 +305,16 @@ public class PeerState { public void setLastSendTime(long when) { _lastSendTime = when; } /** when did we last receive a packet from them? */ public void setLastReceiveTime(long when) { _lastReceiveTime = when; } - public void incrementConsecutiveSendingSecondsWithoutACKS() { _consecutiveSendingSecondsWithoutACKs++; } - public void resetConsecutiveSendingSecondsWithoutACKS() { _consecutiveSendingSecondsWithoutACKs = 0; } - - /* - public void migrateACKs(List NACKs, long newSecond) { - _previousSecondACKs = _currentSecondACKs; - if (_currentSecondECNReceived) - _sendWindowBytes /= 2; - if (_sendWindowBytes < MINIMUM_WINDOW_BYTES) - _sendWindowBytes = MINIMUM_WINDOW_BYTES; - _sendWindowBytesRemaining = _sendWindowBytes; - _currentSecondECNReceived = false; - _remoteWantsPreviousACKs = true; - _currentReceiveSecond = newSecond; + public int incrementConsecutiveFailedSends() { + long now = _context.clock().now()/60*1000; + if (_lastFailedSendMinute == now) { + // ignore... too fast + } else { + _lastFailedSendMinute = now; + _consecutiveFailedSends++; + } + return _consecutiveFailedSends; } - */ /** * have all of the packets received in the current second requested that @@ -419,7 +415,8 @@ public class PeerState { /** we sent a message which was ACKed containing the given # of bytes */ public void messageACKed(int bytesACKed, long lifetime, int numSends) { - _consecutiveSendingSecondsWithoutACKs = 0; + _consecutiveFailedSends = 0; + _lastFailedSendMinute = -1; if (_sendWindowBytes <= _slowStartThreshold) { _sendWindowBytes += bytesACKed; } else { @@ -442,9 +439,9 @@ public class PeerState { _rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation)); _rtt = (int)((float)_rtt*(0.9f) + (0.1f)*(float)lifetime); _rto = _rtt + (_rttDeviation<<2); - if (_log.shouldLog(Log.WARN)) - _log.warn("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt - + " rttDev=" + _rttDeviation + " rto=" + _rto); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt + + " rttDev=" + _rttDeviation + " rto=" + _rto); if (_rto < 1000) _rto = 1000; if (_rto > 5000) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java index 8daaaa351..1e259b2b6 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java @@ -35,6 +35,12 @@ class UDPFlooder implements Runnable { _peers.notifyAll(); } } + public void removePeer(PeerState peer) { + synchronized (_peers) { + _peers.remove(peer); + _peers.notifyAll(); + } + } public void startup() { _alive = true; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 7b30df480..040795275 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -88,6 +88,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** should we flood all UDP peers with the configured rate? */ private static final boolean SHOULD_FLOOD_PEERS = true; + private static final int MAX_CONSECUTIVE_FAILED = 2; + public UDPTransport(RouterContext ctx) { super(ctx); _context = ctx; @@ -290,8 +292,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority synchronized (_peersByIdent) { PeerState oldPeer = (PeerState)_peersByIdent.put(peer.getRemotePeer(), peer); if ( (oldPeer != null) && (oldPeer != peer) ) { - _peersByIdent.put(oldPeer.getRemotePeer(), oldPeer); - return false; + // should we transfer the oldPeer's RTT/RTO/etc? nah + // or perhaps reject the new session? nah, + // using the new one allow easier reconnect } } } @@ -302,8 +305,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority synchronized (_peersByRemoteHost) { PeerState oldPeer = (PeerState)_peersByRemoteHost.put(remoteString, peer); if ( (oldPeer != null) && (oldPeer != peer) ) { - _peersByRemoteHost.put(remoteString, oldPeer); - return false; + //_peersByRemoteHost.put(remoteString, oldPeer); + //return false; } } @@ -315,6 +318,27 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return true; } + private void dropPeer(PeerState peer) { + if (_log.shouldLog(Log.WARN)) + _log.debug("Dropping remote peer: " + peer); + if (peer.getRemotePeer() != null) { + _context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries"); + synchronized (_peersByIdent) { + _peersByIdent.remove(peer.getRemotePeer()); + } + } + + String remoteString = peer.getRemoteHostString(); + if (remoteString != null) { + synchronized (_peersByRemoteHost) { + _peersByRemoteHost.remove(remoteString); + } + } + + if (SHOULD_FLOOD_PEERS) + _flooder.removePeer(peer); + } + int send(UDPPacket packet) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending packet " + packet); @@ -451,6 +475,18 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority replaceAddress(addr); } + public void failed(OutboundMessageState msg) { + if (msg == null) return; + int consecutive = 0; + if (msg.getPeer() != null) + consecutive = msg.getPeer().incrementConsecutiveFailedSends(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer()); + if (consecutive > MAX_CONSECUTIVE_FAILED) + dropPeer(msg.getPeer()); + failed(msg.getMessage()); + } + public void failed(OutNetMessage msg) { if (msg == null) return; if (_log.shouldLog(Log.WARN))