From bcbda3cd2732cb379ceded26d194dcb487200120 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Tue, 21 Oct 2014 18:37:11 +0000 Subject: [PATCH] SSU: Don't resend ACKS that are too old (ticket #772) untested --- .../i2p/router/transport/udp/PeerState.java | 103 ++++++++++++------ 1 file changed, 68 insertions(+), 35 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 22a92f22eb..37cde0c8e6 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -106,7 +106,7 @@ class PeerState { * an ACK for. We keep a few of these around to retransmit with _currentACKs, * hopefully saving some spurious retransmissions */ - private final Queue<Long> _currentACKsResend; + private final Queue<ResendACK> _currentACKsResend; /** when did we last send ACKs to the peer? */ private volatile long _lastACKSend; @@ -306,6 +306,22 @@ class PeerState { private static final int MAX_RTO = 15*1000; private static final int CLOCK_SKEW_FUDGE = (ACKSender.ACK_FREQUENCY * 2) / 3; + /** + * The max number of acks we save to send as duplicates + */ + private static final int MAX_RESEND_ACKS = 64; + /** + * The max number of duplicate acks sent in each ack-only messge. + * Doesn't really matter, we have plenty of room... + * @since 0.7.13 + */ + private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS / 3; + /** for small MTU */ + private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS / 5; + + private static final long RESEND_ACK_TIMEOUT = 5*60*1000; + + public PeerState(RouterContext ctx, UDPTransport transport, byte[] remoteIP, int remotePort, Hash remotePeer, boolean isInbound) { _context = ctx; @@ -317,7 +333,7 @@ class PeerState { _lastSendTime = now; _lastReceiveTime = now; _currentACKs = new ConcurrentHashSet<Long>(); - _currentACKsResend = new LinkedBlockingQueue<Long>(); + _currentACKsResend = new LinkedBlockingQueue<ResendACK>(); _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES; _sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES; _slowStartThreshold = MAX_SEND_WINDOW_BYTES/2; @@ -877,10 +893,25 @@ class PeerState { * @since 0.8.12 was included in getCurrentFullACKs() */ public List<Long> getCurrentResendACKs() { - List<Long> randomResends = new ArrayList<Long>(_currentACKsResend); - Collections.shuffle(randomResends, _context.random()); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Returning " + randomResends.size() + " resend acks"); + int sz = _currentACKsResend.size(); + List<Long> randomResends = new ArrayList<Long>(sz); + if (sz > 0) { + long cutoff = _context.clock().now() - RESEND_ACK_TIMEOUT; + int i = 0; + for (Iterator<ResendACK> iter = _currentACKsResend.iterator(); iter.hasNext(); ) { + ResendACK rack = iter.next(); + if (rack.time > cutoff && i++ < MAX_RESEND_ACKS) { + randomResends.add(rack.id); + } else { + iter.remove(); + if (_log.shouldLog(Log.INFO)) + _log.info("Expired ack " + rack.id + " sent " + (cutoff + RESEND_ACK_TIMEOUT - rack.time) + + " ago, now " + i + " resend acks"); + } + } + if (i > 1) + Collections.shuffle(randomResends, _context.random()); + } return randomResends; } @@ -891,12 +922,10 @@ class PeerState { public void removeACKMessage(Long messageId) { boolean removed = _currentACKs.remove(messageId); if (removed) { - // only add if reoved from current, as this may be called for + // only add if removed from current, as this may be called for // acks already in _currentACKsResend. - _currentACKsResend.offer(messageId); - // trim down the resends - while (_currentACKsResend.size() > MAX_RESEND_ACKS) - _currentACKsResend.poll(); + _currentACKsResend.offer(new ResendACK(messageId, _context.clock().now())); + // trim happens in getCurrentResendACKs above if (_log.shouldLog(Log.INFO)) _log.info("Sent ack " + messageId + " now " + _currentACKs.size() + " current and " + _currentACKsResend.size() + " resend acks"); @@ -905,19 +934,6 @@ class PeerState { _lastACKSend = _context.clock().now(); } - /** - * The max number of acks we save to send as duplicates - */ - private static final int MAX_RESEND_ACKS = 64; - /** - * The max number of duplicate acks sent in each ack-only messge. - * Doesn't really matter, we have plenty of room... - * @since 0.7.13 - */ - private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS / 3; - /** for small MTU */ - private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS / 5; - /** * grab a list of ACKBitfield instances, some of which may fully * ACK a message while others may only partially ACK a message. @@ -970,13 +986,11 @@ class PeerState { if (_currentACKs.isEmpty()) _wantACKSendSince = -1; if (alwaysIncludeRetransmissions || !rv.isEmpty()) { - List<Long> randomResends = new ArrayList<Long>(_currentACKsResend); + List<Long> randomResends = getCurrentResendACKs(); // now repeat by putting in some old ACKs // randomly selected from the Resend queue. // Maybe we should only resend each one a certain number of times... int oldIndex = Math.min(resendSize, maxResendAcks); - if (oldIndex > 0 && oldIndex < resendSize) - Collections.shuffle(randomResends, _context.random()); iter = randomResends.iterator(); while (bytesRemaining >= 4 && oldIndex-- > 0 && iter.hasNext()) { Long cur = iter.next(); @@ -991,13 +1005,18 @@ class PeerState { bytesRemaining -= 4; //} } - for (Long val : currentACKsRemoved) { - _currentACKsResend.offer(val); + if (!currentACKsRemoved.isEmpty()) { + long now = _context.clock().now(); + for (Long val : currentACKsRemoved) { + _currentACKsResend.offer(new ResendACK(val, now)); + } + // trim happens in getCurrentResendACKs above } } - // trim down the resends - while (_currentACKsResend.size() > MAX_RESEND_ACKS) - _currentACKsResend.poll(); + + + + int partialIncluded = 0; if (bytesRemaining > 4) { @@ -1956,13 +1975,13 @@ class PeerState { if (!_dead) { _currentACKs.addAll(tmp); } - tmp.clear(); - tmp.addAll(oldPeer._currentACKsResend); + List<ResendACK> tmp3 = new ArrayList<ResendACK>(); + tmp3.addAll(oldPeer._currentACKsResend); oldPeer._currentACKsResend.clear(); if (!_dead) { - _currentACKsResend.addAll(tmp); + _currentACKsResend.addAll(tmp3); } Map<Long, InboundMessageState> msgs = new HashMap<Long, InboundMessageState>(); @@ -1999,6 +2018,20 @@ class PeerState { return _transport; } + /** + * A message ID and a timestamp. Used for the resend ACKS. + * @since 0.9.17 + */ + private static class ResendACK { + public final Long id; + public final long time; + + public ResendACK(Long id, long time) { + this.id = id; + this.time = time; + } + } + // why removed? Some risk of dups in OutboundMessageFragments._activePeers ??? /* -- GitLab