diff --git a/history.txt b/history.txt index d2cfbd24b..522741a41 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,22 @@ +2021-01-07 zzz + * SSU: Implement fast retransmit (ticket #2427) + +2021-01-05 zzz + * Console: Reduce limit of concurrent graph generation on slow devices + * i2psnark: Add ability to remove I2CP options + * SusiDNS: Hide last-modified on details page if empty + +2021-01-04 zzz + * Build: + - Gradle build fixes + - Update external javadoc links + +2021-01-02 zzz + * Sybil: Reduce default threshold + * Tunnels: Improve error handling at OBEP + 2020-12-31 zzz + * Console: Use local time on graphs by default * NetDB: - Drop lookups with replies going to us - Extend lookup expire time diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 5a58b19bc..e36cd3b14 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 11; + public final static long BUILD = 12; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 27819055d..8360c4011 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -213,6 +213,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) throws DataFormatException { int rv = 0; boolean newAck = false; + ModifiableLong highestSeqNumAcked = new ModifiableLong(-1); if (data.readACKsIncluded()) { int ackCount = data.readACKCount(); if (ackCount > 0) { @@ -222,7 +223,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ for (int i = 0; i < ackCount; i++) { long id = data.readACK(i); - if (from.acked(id)) { + if (from.acked(id, highestSeqNumAcked)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("First full ACK of message " + id + " received from " + from.getRemotePeer()); newAck = true; @@ -241,12 +242,10 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ //_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0); for (int i = 0; i < bitfields.length; i++) { - if (from.acked(bitfields[i])) { + if (from.acked(bitfields[i], highestSeqNumAcked)) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Final partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer()); + _log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer()); newAck = true; - } else if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer()); } } } @@ -256,12 +255,30 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ else from.dataReceived(); + long highest = highestSeqNumAcked.value; + if (highest >= 0) { + boolean retx = from.highestSeqNumAcked(highest); + if (retx) + newAck = true; + } + // Wake up the packet pusher if it is sleeping. // By calling add(), this also is a failsafe against possible // races in OutboundMessageFragments. +/* if (newAck && from.getOutboundMessageCount() > 0) _outbound.add(from, 0); +*/ return rv; } + + /** + * Modifiable Long, no locking + * @since 0.9.49 + */ + public static class ModifiableLong { + public long value; + public ModifiableLong(long val) { value = val; } + } } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 31f6832d5..5db5dc526 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -1,6 +1,7 @@ package net.i2p.router.transport.udp; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; import net.i2p.data.Base64; @@ -38,9 +39,10 @@ class OutboundMessageState implements CDPQEntry { private int _maxSends; // we can't use the ones in _message since it is null for injections private long _enqueueTime; - private long _seqNum; + private volatile long _seqNum; /** how many bytes push() is allowed to send */ private int _allowedSendBytes; + private final AtomicInteger _nacks = new AtomicInteger(); public static final int MAX_MSG_SIZE = 32 * 1024; @@ -114,6 +116,22 @@ class OutboundMessageState implements CDPQEntry { public long getMessageId() { return _i2npMessage.getUniqueId(); } + /** + * @return new value + * @since 0.9.49 + */ + public int incrementNACKs() { return _nacks.incrementAndGet(); } + + /** + * @since 0.9.49 + */ + public int getNACKs() { return _nacks.get(); } + + /** + * @since 0.9.49 + */ + public void clearNACKs() { _nacks.set(0); } + public PeerState getPeer() { return _peer; } public boolean isExpired() { @@ -489,6 +507,7 @@ class OutboundMessageState implements CDPQEntry { public String toString() { StringBuilder buf = new StringBuilder(256); buf.append("OB Message ").append(_i2npMessage.getUniqueId()); + buf.append(" seq ").append(_seqNum); buf.append(" type ").append(_i2npMessage.getType()); buf.append(" size ").append(_messageBuf.length); if (_numFragments > 1) @@ -496,6 +515,8 @@ class OutboundMessageState implements CDPQEntry { buf.append(" volleys: ").append(_maxSends); buf.append(" lifetime: ").append(getLifetime()); if (!isComplete()) { + if (_nacks.get() > 0) + buf.append(" NACKs: ").append(_nacks); if (_fragmentSends != null) { buf.append(" unacked fragments: "); for (int i = 0; i < _numFragments; i++) { diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index e521c3f97..fa0e9bc07 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -6,11 +6,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import net.i2p.data.DataHelper; @@ -18,6 +20,7 @@ import net.i2p.data.Hash; import net.i2p.data.SessionKey; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; +import net.i2p.router.transport.udp.InboundMessageFragments.ModifiableLong; import net.i2p.router.util.CachedIteratorCollection; import net.i2p.router.util.CoDelPriorityBlockingQueue; import net.i2p.router.util.PriBlockingQueue; @@ -176,6 +179,8 @@ public class PeerState { private int _packetsTransmitted; /** how many packets were retransmitted within the last RETRANSMISSION_PERIOD_WIDTH packets */ private int _packetsRetransmitted; + private long _nextSequenceNumber; + private final AtomicBoolean _fastRetransmit = new AtomicBoolean(); /** how many dup packets were received within the last RETRANSMISSION_PERIOD_WIDTH packets */ private int _packetsReceivedDuplicate; @@ -197,6 +202,8 @@ public class PeerState { */ //private final CoDelPriorityBlockingQueue _outboundQueue; private final PriBlockingQueue _outboundQueue; + /** Message ID to sequence number */ + private final Map _ackedMessages; /** when the retransmit timer is about to trigger */ private long _retransmitTimer; @@ -297,18 +304,20 @@ public class PeerState { /** * The max number of acks we save to send as duplicates */ - private static final int MAX_RESEND_ACKS = 64; + private static final int MAX_RESEND_ACKS = 32; /** * The max number of duplicate acks sent in each ack-only messge. * Doesn't really matter, we have plenty of room... * @since 0.7.13 */ - private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS / 3; + private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS * 2 / 3; /** for small MTU */ - private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS / 5; + private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS * 2 / 5; - private static final long RESEND_ACK_TIMEOUT = 5*60*1000; + private static final long RESEND_ACK_TIMEOUT = 60*1000; + /** if this many acks arrive out of order, fast rtx */ + private static final int FAST_RTX_ACKS = 3; /** * @param rtt from the EstablishState, or 0 if not available @@ -356,6 +365,7 @@ public class PeerState { _outboundMessages = new CachedIteratorCollection(); //_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32); _outboundQueue = new PriBlockingQueue(ctx, "UDP-PeerState", 32); + _ackedMessages = new AckedMessages(); // all createRateStat() moved to EstablishmentManager _remoteIP = remoteIP; _remotePeer = remotePeer; @@ -760,19 +770,26 @@ public class PeerState { // If we reduced the MTU, then we won't be able to send any previously-fragmented messages, // so set to the max MTU. This is the easiest fix, although it violates the RFC. //_sendWindowBytes = _mtu; - _sendWindowBytes = isIPv6() ? MAX_IPV6_MTU : LARGE_MTU; int oldsst = _slowStartThreshold; - float bwe = _bwEstimator.getBandwidthEstimate(); - _slowStartThreshold = Math.max( (int)(bwe * _rtt), 2 * _mtu); + float bwe; + if (_fastRetransmit.get()) { + // window and SST set in highestSeqNumAcked() + bwe = -1; // for log below + } else { + _sendWindowBytes = isIPv6() ? MAX_IPV6_MTU : LARGE_MTU; + bwe = _bwEstimator.getBandwidthEstimate(); + _slowStartThreshold = Math.max( (int)(bwe * _rtt), 2 * _mtu); + } int oldRto = _rto; long oldTimer = _retransmitTimer - now; _rto = Math.min(MAX_RTO, Math.max(MIN_RTO, _rto << 1 )); _retransmitTimer = now + _rto; if (_log.shouldInfo()) - _log.info(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now) + + _log.info(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + _rto + " window: " + congestionAt + " -> " + _sendWindowBytes + " SST: " + oldsst + " -> " + _slowStartThreshold + + " FRTX? " + _fastRetransmit + " BWE: " + DataHelper.formatSize2Decimal((long) (bwe * 1000), false) + "bps"); } @@ -1070,6 +1087,7 @@ public class PeerState { if (_log.shouldLog(Log.DEBUG)) _log.debug(_remotePeer + " nothing pending, cancelling timer"); _retransmitTimer = 0; + exitFastRetransmit(); } else { // any time new data gets acked, push out the timer long now = _context.clock().now(); @@ -1313,6 +1331,8 @@ public class PeerState { boolean fail; synchronized (_outboundQueue) { fail = !_outboundQueue.offer(state); + // reuse of CDPQ value, don't do both + state.setSeqNum(_nextSequenceNumber++); } if (fail) { if (_log.shouldLog(Log.WARN)) @@ -1447,6 +1467,14 @@ public class PeerState { } // no need to nudge(), this is called from OMF loop before allocateSend() } + if (rv <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_remotePeer + " nothing pending, cancelling timer"); + synchronized(this) { + _retransmitTimer = 0; + exitFastRetransmit(); + } + } } return rv + _outboundQueue.size(); @@ -1475,6 +1503,9 @@ public class PeerState { _retransmitTimer = now + getRTO(); if (_log.shouldLog(Log.DEBUG)) _log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer); + } else if (_fastRetransmit.get()) { + // right? + _retransmitTimer = now + getRTO(); } } } else if (canSendOld) { @@ -1484,10 +1515,12 @@ public class PeerState { isEmpty = _outboundMessages.isEmpty(); } synchronized(this) { - if (isEmpty) + if (isEmpty) { _retransmitTimer = 0; - else + exitFastRetransmit(); + } else { _retransmitTimer = now + 250; + } } } return rv; @@ -1505,8 +1538,16 @@ public class PeerState { synchronized (_outboundMessages) { if (canSendOld) { for (OutboundMessageState state : _outboundMessages) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId()); + if (_fastRetransmit.get()) { + // If fast retx flag set, just add those + if (state.getNACKs() < FAST_RTX_ACKS) + continue; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Allocate sending (FAST) to " + _remotePeer + ": " + state); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId()); + } if (rv == null) { rv = new ArrayList((1 + _outboundMessages.size()) / 2); _lastSendTime = now; @@ -1514,7 +1555,7 @@ public class PeerState { rv.add(state); // Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3) // TODO this is fragments from half the messages... OK as is? - if (rv.size() >= _outboundMessages.size() / 2) + if (rv.size() >= _outboundMessages.size() / 2 && !_fastRetransmit.get()) return rv; } return rv; @@ -1577,7 +1618,7 @@ public class PeerState { } if ( rv == null && _log.shouldLog(Log.DEBUG)) _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + - " / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - _context.clock().now())); + " / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - now)); return rv; } @@ -1589,14 +1630,14 @@ public class PeerState { * * @param now what time it is now * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send. - * If ready now, will return 0 or a negative value. + * If ready now, will return 0. */ int getNextDelay(long now) { int rv = Integer.MAX_VALUE; if (_dead) return rv; synchronized(this) { - if (_retransmitTimer >= now) - return (int) (_retransmitTimer - now); + if (_retransmitTimer > 0) + rv = Math.max(0, (int) (_retransmitTimer - now)); } return rv; } @@ -1671,9 +1712,10 @@ public class PeerState { * A full ACK was received. * TODO if messages awaiting ack were a HashMap<Long, OutboundMessageState> this would be faster. * + * @param highestSeqNumAcked in/out param, will modify if this seq. number is higher * @return true if the message was acked for the first time */ - boolean acked(long messageId) { + boolean acked(long messageId, ModifiableLong highestSeqNumAcked) { if (_dead) return false; OutboundMessageState state = null; boolean anyPending; @@ -1715,10 +1757,25 @@ public class PeerState { anyQueued = !_outboundQueue.isEmpty(); } } + long sn = state.getSeqNum(); + if (sn > highestSeqNumAcked.value) + highestSeqNumAcked.value = sn; + synchronized(_ackedMessages) { + _ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn)); + } // this adjusts the rtt/rto/window/etc messageACKed(state.getUnackedSize(), lifetime, numSends, anyPending, anyQueued); } else { // dupack, likely + Long seq; + synchronized(_ackedMessages) { + seq = _ackedMessages.get(Integer.valueOf((int) messageId)); + } + if (seq != null) { + long sn = seq.longValue(); + if (sn > highestSeqNumAcked.value) + highestSeqNumAcked.value = sn; + } //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Received an ACK for a message not pending: " + messageId); } @@ -1728,15 +1785,16 @@ public class PeerState { /** * A partial ACK was received. This is much less common than full ACKs. * - * @return true if the message was completely acked for the first time + * @param highestSeqNumAcked in/out param, will modify if this seq. number is higher + * @return true if any fragment of the message was completely acked for the first time */ - boolean acked(ACKBitfield bitfield) { + boolean acked(ACKBitfield bitfield, ModifiableLong highestSeqNumAcked) { if (_dead) return false; final long messageId = bitfield.getMessageId(); if (bitfield.receivedComplete()) { - return acked(messageId); + return acked(messageId, highestSeqNumAcked); } OutboundMessageState state = null; @@ -1795,6 +1853,7 @@ public class PeerState { + " for: " + state); } if (ackedSize > 0) { + state.clearNACKs(); boolean anyQueued; if (anyPending) { // locked_messageACKed will nudge() @@ -1807,15 +1866,119 @@ public class PeerState { // this adjusts the rtt/rto/window/etc messageACKed(ackedSize, lifetime, numSends, anyPending, anyQueued); } - return isComplete; + // we do this even if only partial + long sn = state.getSeqNum(); + if (sn > highestSeqNumAcked.value) + highestSeqNumAcked.value = sn; + if (isComplete) { + synchronized(_ackedMessages) { + _ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn)); + } + } + return ackedSize > 0; } else { // dupack + Long seq; + synchronized(_ackedMessages) { + seq = _ackedMessages.get(Integer.valueOf((int) messageId)); + } + if (seq != null) { + long sn = seq.longValue(); + if (sn > highestSeqNumAcked.value) + highestSeqNumAcked.value = sn; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("Received an ACK for a message not pending: " + bitfield); return false; } } + /** + * Enter or leave fast retransmit mode, and adjust + * SST and window variables accordingly. + * See RFC 5681 sec. 2.4 + * + * @param highest the highest sequence number that was acked + * @return true if we have something to fast-retransmit + * @since 0.9.49 + */ + boolean highestSeqNumAcked(long highest) { + boolean rv = false; + boolean startFast = false; + boolean continueFast = false; + synchronized(_outboundMessages) { + for (Iterator iter = _outboundMessages.iterator(); iter.hasNext(); ) { + OutboundMessageState state = iter.next(); + long sn = state.getSeqNum(); + if (sn >= highest) + break; + if (sn < highest) { + // this will also increment NACKs for a state that was just partially acked... ok? + int nacks = state.incrementNACKs(); + if (nacks == FAST_RTX_ACKS) { + startFast = true; + rv = true; + } else if (nacks > FAST_RTX_ACKS) { + continueFast = true; + rv = true; + } + if (_log.shouldDebug()) + _log.debug("Message NACKed: " + state); + } + } + if (rv) { + // set the variables for fast retransmit + // timer will be reset below + _fastRetransmit.set(true); + // caller (IMF) will wakeup OMF + if (continueFast) { + // RFC 5681 sec. 3.2 #4 increase cwnd + _sendWindowBytes += _mtu; + _sendWindowBytesRemaining += _mtu; + if (_log.shouldDebug()) + _log.debug("Continue FAST RTX, inflated window: " + this); + } else if (startFast) { + // RFC 5681 sec. 3.2 #2 set SST (equation 4) + // But use W+ BWE instead + float bwe = _bwEstimator.getBandwidthEstimate(); + _slowStartThreshold = Math.max((int)(bwe * _rtt), 2 * _mtu); + // RFC 5681 sec. 3.2 #3 set cwnd + _sendWindowBytes = _slowStartThreshold + (3 * _mtu); + _sendWindowBytesRemaining = _sendWindowBytes; + if (_log.shouldDebug()) + _log.debug("Start of FAST RTX, inflated window: " + this); + } + } else { + exitFastRetransmit(); + } + } + if (rv) { + synchronized(this) { + _retransmitTimer = _context.clock().now(); + } + } + return rv; + } + + /** + * Leave fast retransmit mode if we were in it, and adjust + * SST and window variables accordingly. + * See RFC 5681 sec. 2.4 + * + * @since 0.9.49 + */ + private void exitFastRetransmit() { + if (_fastRetransmit.compareAndSet(true, false)) { + synchronized(this) { + // RFC 5681 sec. 2.4 #6 deflate the window + _sendWindowBytes = _slowStartThreshold; + _sendWindowBytesRemaining = _sendWindowBytes; + } + if (_log.shouldDebug()) + _log.debug("End of FAST RTX, deflated window: " + this); + } + } + /** * Transfer the basic activity/state from the old peer to the current peer * @@ -1894,6 +2057,19 @@ public class PeerState { } } + /** + * Message ID to sequence number. + * Insertion order. Caller must synch. + * @since 0.9.49 + */ + private static class AckedMessages extends LinkedHashMap { + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_SEND_MSGS_PENDING; + } + } + // why removed? Some risk of dups in OutboundMessageFragments._activePeers ??? /* @@ -1932,8 +2108,12 @@ public class PeerState { buf.append(" sendAttemptAge: ").append(now-_lastSendTime); buf.append(" sendACKAge: ").append(now-_lastACKSend); buf.append(" lifetime: ").append(now-_keyEstablishedTime); + buf.append(" RTT: ").append(_rtt); + buf.append(" RTO: ").append(_rto); buf.append(" cwin: ").append(_sendWindowBytes); buf.append(" acwin: ").append(_sendWindowBytesRemaining); + buf.append(" SST: ").append(_slowStartThreshold); + buf.append(" FRTX? ").append(_fastRetransmit); buf.append(" consecFail: ").append(_consecutiveFailedSends); buf.append(" msgs rcvd: ").append(_messagesReceived); buf.append(" msgs sent: ").append(_messagesSent);