SSU: Implement fast retransmit (ticket #2427)

This partially fixes the issue of packets not being retransmitted
before they expire in 10 seconds, introduced in 0.9.48 as reported by
jogger at http://zzz.i2p/topics/3003
Fast retransmit was also suggested by jogger as a solution and discussed in that thread.
This code is based on the requirements for TCP fast retransmit
as specified in RFC 5681 but cannot precisely follow the RFC
as UDP messages can be dropped without affecting later messages:
- nack counter is per-message, not per-connection
- some interactions with the retransmit timer when in fast retx mode
- msg expiration is currently 10s but max RTO is 60s
- interactions with individual fragment transmission implemented in 0.9.48-5
- this is a sender-side fix but it depends on far-end ack resend strategy

Maintain a local message sequence number and store
it in OMF, previously unused as codel is disabled
Removed acked messages from _outboundMessages as usual,
but stores message and seq. numbers in a LinkedHashMap,
so we may interpret additional acks as nacks.
Calculate the highest-acked seq. number for every incoming packet.
Marks messages older than highest acked as nacked
Fast-retransmits after 3 nacks
Window and SST adjustments per RFC 5681 sec. 2.4
Reduce resend ack quantity and timeout to improve odds of receiving "nacks"
Disable wakeup of OMF from IMF; should not be needed now that PS calls nudge()
PS.acked(partial) now returns true if any fragment was acked, not if complete
Log tweaks

Still todo: possible additional changes to ack resend strategy;
possible parameter adjustments including msg expiration;
confirm that OMF wakeup in IMF is not required;
further testing and cleanups;
take additional ideas from alternative proposal in MR !8;
stat tweaks;
find related tickets to close

Reviewed by and contains code from zlatinb in MR !8
This builds on several previous SSU improvements; see #2427 for a list.
ref: gitlab MRs !8 !9 !10 !11
This commit is contained in:
zzz
2021-01-07 09:33:09 -05:00
parent b4e1fbd857
commit 41c7b7382a
5 changed files with 265 additions and 29 deletions

View File

@@ -1,4 +1,22 @@
2021-01-07 zzz
* SSU: Implement fast retransmit (ticket #2427)
2021-01-05 zzz
* Console: Reduce limit of concurrent graph generation on slow devices
* i2psnark: Add ability to remove I2CP options
* SusiDNS: Hide last-modified on details page if empty
2021-01-04 zzz
* Build:
- Gradle build fixes
- Update external javadoc links
2021-01-02 zzz
* Sybil: Reduce default threshold
* Tunnels: Improve error handling at OBEP
2020-12-31 zzz
* Console: Use local time on graphs by default
* NetDB:
- Drop lookups with replies going to us
- Extend lookup expire time

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 11;
public final static long BUILD = 12;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -213,6 +213,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) throws DataFormatException {
int rv = 0;
boolean newAck = false;
ModifiableLong highestSeqNumAcked = new ModifiableLong(-1);
if (data.readACKsIncluded()) {
int ackCount = data.readACKCount();
if (ackCount > 0) {
@@ -222,7 +223,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
for (int i = 0; i < ackCount; i++) {
long id = data.readACK(i);
if (from.acked(id)) {
if (from.acked(id, highestSeqNumAcked)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("First full ACK of message " + id + " received from " + from.getRemotePeer());
newAck = true;
@@ -241,12 +242,10 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
//_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0);
for (int i = 0; i < bitfields.length; i++) {
if (from.acked(bitfields[i])) {
if (from.acked(bitfields[i], highestSeqNumAcked)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Final partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
_log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
newAck = true;
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
}
}
}
@@ -256,12 +255,30 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
else
from.dataReceived();
long highest = highestSeqNumAcked.value;
if (highest >= 0) {
boolean retx = from.highestSeqNumAcked(highest);
if (retx)
newAck = true;
}
// Wake up the packet pusher if it is sleeping.
// By calling add(), this also is a failsafe against possible
// races in OutboundMessageFragments.
/*
if (newAck && from.getOutboundMessageCount() > 0)
_outbound.add(from, 0);
*/
return rv;
}
/**
* Modifiable Long, no locking
* @since 0.9.49
*/
public static class ModifiableLong {
public long value;
public ModifiableLong(long val) { value = val; }
}
}

View File

@@ -1,6 +1,7 @@
package net.i2p.router.transport.udp;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
@@ -38,9 +39,10 @@ class OutboundMessageState implements CDPQEntry {
private int _maxSends;
// we can't use the ones in _message since it is null for injections
private long _enqueueTime;
private long _seqNum;
private volatile long _seqNum;
/** how many bytes push() is allowed to send */
private int _allowedSendBytes;
private final AtomicInteger _nacks = new AtomicInteger();
public static final int MAX_MSG_SIZE = 32 * 1024;
@@ -114,6 +116,22 @@ class OutboundMessageState implements CDPQEntry {
public long getMessageId() { return _i2npMessage.getUniqueId(); }
/**
* @return new value
* @since 0.9.49
*/
public int incrementNACKs() { return _nacks.incrementAndGet(); }
/**
* @since 0.9.49
*/
public int getNACKs() { return _nacks.get(); }
/**
* @since 0.9.49
*/
public void clearNACKs() { _nacks.set(0); }
public PeerState getPeer() { return _peer; }
public boolean isExpired() {
@@ -489,6 +507,7 @@ class OutboundMessageState implements CDPQEntry {
public String toString() {
StringBuilder buf = new StringBuilder(256);
buf.append("OB Message ").append(_i2npMessage.getUniqueId());
buf.append(" seq ").append(_seqNum);
buf.append(" type ").append(_i2npMessage.getType());
buf.append(" size ").append(_messageBuf.length);
if (_numFragments > 1)
@@ -496,6 +515,8 @@ class OutboundMessageState implements CDPQEntry {
buf.append(" volleys: ").append(_maxSends);
buf.append(" lifetime: ").append(getLifetime());
if (!isComplete()) {
if (_nacks.get() > 0)
buf.append(" NACKs: ").append(_nacks);
if (_fragmentSends != null) {
buf.append(" unacked fragments: ");
for (int i = 0; i < _numFragments; i++) {

View File

@@ -6,11 +6,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.DataHelper;
@@ -18,6 +20,7 @@ import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.udp.InboundMessageFragments.ModifiableLong;
import net.i2p.router.util.CachedIteratorCollection;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.router.util.PriBlockingQueue;
@@ -176,6 +179,8 @@ public class PeerState {
private int _packetsTransmitted;
/** how many packets were retransmitted within the last RETRANSMISSION_PERIOD_WIDTH packets */
private int _packetsRetransmitted;
private long _nextSequenceNumber;
private final AtomicBoolean _fastRetransmit = new AtomicBoolean();
/** how many dup packets were received within the last RETRANSMISSION_PERIOD_WIDTH packets */
private int _packetsReceivedDuplicate;
@@ -197,6 +202,8 @@ public class PeerState {
*/
//private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
private final PriBlockingQueue<OutboundMessageState> _outboundQueue;
/** Message ID to sequence number */
private final Map<Integer, Long> _ackedMessages;
/** when the retransmit timer is about to trigger */
private long _retransmitTimer;
@@ -297,18 +304,20 @@ public class PeerState {
/**
* The max number of acks we save to send as duplicates
*/
private static final int MAX_RESEND_ACKS = 64;
private static final int MAX_RESEND_ACKS = 32;
/**
* The max number of duplicate acks sent in each ack-only messge.
* Doesn't really matter, we have plenty of room...
* @since 0.7.13
*/
private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS / 3;
private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS * 2 / 3;
/** for small MTU */
private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS / 5;
private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS * 2 / 5;
private static final long RESEND_ACK_TIMEOUT = 5*60*1000;
private static final long RESEND_ACK_TIMEOUT = 60*1000;
/** if this many acks arrive out of order, fast rtx */
private static final int FAST_RTX_ACKS = 3;
/**
* @param rtt from the EstablishState, or 0 if not available
@@ -356,6 +365,7 @@ public class PeerState {
_outboundMessages = new CachedIteratorCollection<OutboundMessageState>();
//_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32);
_outboundQueue = new PriBlockingQueue<OutboundMessageState>(ctx, "UDP-PeerState", 32);
_ackedMessages = new AckedMessages();
// all createRateStat() moved to EstablishmentManager
_remoteIP = remoteIP;
_remotePeer = remotePeer;
@@ -760,19 +770,26 @@ public class PeerState {
// If we reduced the MTU, then we won't be able to send any previously-fragmented messages,
// so set to the max MTU. This is the easiest fix, although it violates the RFC.
//_sendWindowBytes = _mtu;
_sendWindowBytes = isIPv6() ? MAX_IPV6_MTU : LARGE_MTU;
int oldsst = _slowStartThreshold;
float bwe = _bwEstimator.getBandwidthEstimate();
_slowStartThreshold = Math.max( (int)(bwe * _rtt), 2 * _mtu);
float bwe;
if (_fastRetransmit.get()) {
// window and SST set in highestSeqNumAcked()
bwe = -1; // for log below
} else {
_sendWindowBytes = isIPv6() ? MAX_IPV6_MTU : LARGE_MTU;
bwe = _bwEstimator.getBandwidthEstimate();
_slowStartThreshold = Math.max( (int)(bwe * _rtt), 2 * _mtu);
}
int oldRto = _rto;
long oldTimer = _retransmitTimer - now;
_rto = Math.min(MAX_RTO, Math.max(MIN_RTO, _rto << 1 ));
_retransmitTimer = now + _rto;
if (_log.shouldInfo())
_log.info(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now) +
_log.info(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + _rto +
" window: " + congestionAt + " -> " + _sendWindowBytes +
" SST: " + oldsst + " -> " + _slowStartThreshold +
" FRTX? " + _fastRetransmit +
" BWE: " + DataHelper.formatSize2Decimal((long) (bwe * 1000), false) + "bps");
}
@@ -1070,6 +1087,7 @@ public class PeerState {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " nothing pending, cancelling timer");
_retransmitTimer = 0;
exitFastRetransmit();
} else {
// any time new data gets acked, push out the timer
long now = _context.clock().now();
@@ -1313,6 +1331,8 @@ public class PeerState {
boolean fail;
synchronized (_outboundQueue) {
fail = !_outboundQueue.offer(state);
// reuse of CDPQ value, don't do both
state.setSeqNum(_nextSequenceNumber++);
}
if (fail) {
if (_log.shouldLog(Log.WARN))
@@ -1447,6 +1467,14 @@ public class PeerState {
}
// no need to nudge(), this is called from OMF loop before allocateSend()
}
if (rv <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " nothing pending, cancelling timer");
synchronized(this) {
_retransmitTimer = 0;
exitFastRetransmit();
}
}
}
return rv + _outboundQueue.size();
@@ -1475,6 +1503,9 @@ public class PeerState {
_retransmitTimer = now + getRTO();
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer);
} else if (_fastRetransmit.get()) {
// right?
_retransmitTimer = now + getRTO();
}
}
} else if (canSendOld) {
@@ -1484,10 +1515,12 @@ public class PeerState {
isEmpty = _outboundMessages.isEmpty();
}
synchronized(this) {
if (isEmpty)
if (isEmpty) {
_retransmitTimer = 0;
else
exitFastRetransmit();
} else {
_retransmitTimer = now + 250;
}
}
}
return rv;
@@ -1505,8 +1538,16 @@ public class PeerState {
synchronized (_outboundMessages) {
if (canSendOld) {
for (OutboundMessageState state : _outboundMessages) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
if (_fastRetransmit.get()) {
// If fast retx flag set, just add those
if (state.getNACKs() < FAST_RTX_ACKS)
continue;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (FAST) to " + _remotePeer + ": " + state);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
}
if (rv == null) {
rv = new ArrayList<OutboundMessageState>((1 + _outboundMessages.size()) / 2);
_lastSendTime = now;
@@ -1514,7 +1555,7 @@ public class PeerState {
rv.add(state);
// Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
// TODO this is fragments from half the messages... OK as is?
if (rv.size() >= _outboundMessages.size() / 2)
if (rv.size() >= _outboundMessages.size() / 2 && !_fastRetransmit.get())
return rv;
}
return rv;
@@ -1577,7 +1618,7 @@ public class PeerState {
}
if ( rv == null && _log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - _context.clock().now()));
" / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - now));
return rv;
}
@@ -1589,14 +1630,14 @@ public class PeerState {
*
* @param now what time it is now
* @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
* If ready now, will return 0 or a negative value.
* If ready now, will return 0.
*/
int getNextDelay(long now) {
int rv = Integer.MAX_VALUE;
if (_dead) return rv;
synchronized(this) {
if (_retransmitTimer >= now)
return (int) (_retransmitTimer - now);
if (_retransmitTimer > 0)
rv = Math.max(0, (int) (_retransmitTimer - now));
}
return rv;
}
@@ -1671,9 +1712,10 @@ public class PeerState {
* A full ACK was received.
* TODO if messages awaiting ack were a HashMap&lt;Long, OutboundMessageState&gt; this would be faster.
*
* @param highestSeqNumAcked in/out param, will modify if this seq. number is higher
* @return true if the message was acked for the first time
*/
boolean acked(long messageId) {
boolean acked(long messageId, ModifiableLong highestSeqNumAcked) {
if (_dead) return false;
OutboundMessageState state = null;
boolean anyPending;
@@ -1715,10 +1757,25 @@ public class PeerState {
anyQueued = !_outboundQueue.isEmpty();
}
}
long sn = state.getSeqNum();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
synchronized(_ackedMessages) {
_ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn));
}
// this adjusts the rtt/rto/window/etc
messageACKed(state.getUnackedSize(), lifetime, numSends, anyPending, anyQueued);
} else {
// dupack, likely
Long seq;
synchronized(_ackedMessages) {
seq = _ackedMessages.get(Integer.valueOf((int) messageId));
}
if (seq != null) {
long sn = seq.longValue();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Received an ACK for a message not pending: " + messageId);
}
@@ -1728,15 +1785,16 @@ public class PeerState {
/**
* A partial ACK was received. This is much less common than full ACKs.
*
* @return true if the message was completely acked for the first time
* @param highestSeqNumAcked in/out param, will modify if this seq. number is higher
* @return true if any fragment of the message was completely acked for the first time
*/
boolean acked(ACKBitfield bitfield) {
boolean acked(ACKBitfield bitfield, ModifiableLong highestSeqNumAcked) {
if (_dead)
return false;
final long messageId = bitfield.getMessageId();
if (bitfield.receivedComplete()) {
return acked(messageId);
return acked(messageId, highestSeqNumAcked);
}
OutboundMessageState state = null;
@@ -1795,6 +1853,7 @@ public class PeerState {
+ " for: " + state);
}
if (ackedSize > 0) {
state.clearNACKs();
boolean anyQueued;
if (anyPending) {
// locked_messageACKed will nudge()
@@ -1807,15 +1866,119 @@ public class PeerState {
// this adjusts the rtt/rto/window/etc
messageACKed(ackedSize, lifetime, numSends, anyPending, anyQueued);
}
return isComplete;
// we do this even if only partial
long sn = state.getSeqNum();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
if (isComplete) {
synchronized(_ackedMessages) {
_ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn));
}
}
return ackedSize > 0;
} else {
// dupack
Long seq;
synchronized(_ackedMessages) {
seq = _ackedMessages.get(Integer.valueOf((int) messageId));
}
if (seq != null) {
long sn = seq.longValue();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received an ACK for a message not pending: " + bitfield);
return false;
}
}
/**
* Enter or leave fast retransmit mode, and adjust
* SST and window variables accordingly.
* See RFC 5681 sec. 2.4
*
* @param highest the highest sequence number that was acked
* @return true if we have something to fast-retransmit
* @since 0.9.49
*/
boolean highestSeqNumAcked(long highest) {
boolean rv = false;
boolean startFast = false;
boolean continueFast = false;
synchronized(_outboundMessages) {
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
OutboundMessageState state = iter.next();
long sn = state.getSeqNum();
if (sn >= highest)
break;
if (sn < highest) {
// this will also increment NACKs for a state that was just partially acked... ok?
int nacks = state.incrementNACKs();
if (nacks == FAST_RTX_ACKS) {
startFast = true;
rv = true;
} else if (nacks > FAST_RTX_ACKS) {
continueFast = true;
rv = true;
}
if (_log.shouldDebug())
_log.debug("Message NACKed: " + state);
}
}
if (rv) {
// set the variables for fast retransmit
// timer will be reset below
_fastRetransmit.set(true);
// caller (IMF) will wakeup OMF
if (continueFast) {
// RFC 5681 sec. 3.2 #4 increase cwnd
_sendWindowBytes += _mtu;
_sendWindowBytesRemaining += _mtu;
if (_log.shouldDebug())
_log.debug("Continue FAST RTX, inflated window: " + this);
} else if (startFast) {
// RFC 5681 sec. 3.2 #2 set SST (equation 4)
// But use W+ BWE instead
float bwe = _bwEstimator.getBandwidthEstimate();
_slowStartThreshold = Math.max((int)(bwe * _rtt), 2 * _mtu);
// RFC 5681 sec. 3.2 #3 set cwnd
_sendWindowBytes = _slowStartThreshold + (3 * _mtu);
_sendWindowBytesRemaining = _sendWindowBytes;
if (_log.shouldDebug())
_log.debug("Start of FAST RTX, inflated window: " + this);
}
} else {
exitFastRetransmit();
}
}
if (rv) {
synchronized(this) {
_retransmitTimer = _context.clock().now();
}
}
return rv;
}
/**
* Leave fast retransmit mode if we were in it, and adjust
* SST and window variables accordingly.
* See RFC 5681 sec. 2.4
*
* @since 0.9.49
*/
private void exitFastRetransmit() {
if (_fastRetransmit.compareAndSet(true, false)) {
synchronized(this) {
// RFC 5681 sec. 2.4 #6 deflate the window
_sendWindowBytes = _slowStartThreshold;
_sendWindowBytesRemaining = _sendWindowBytes;
}
if (_log.shouldDebug())
_log.debug("End of FAST RTX, deflated window: " + this);
}
}
/**
* Transfer the basic activity/state from the old peer to the current peer
*
@@ -1894,6 +2057,19 @@ public class PeerState {
}
}
/**
* Message ID to sequence number.
* Insertion order. Caller must synch.
* @since 0.9.49
*/
private static class AckedMessages extends LinkedHashMap<Integer, Long> {
@Override
protected boolean removeEldestEntry(Map.Entry<Integer, Long> eldest) {
return size() > MAX_SEND_MSGS_PENDING;
}
}
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
/*
@@ -1932,8 +2108,12 @@ public class PeerState {
buf.append(" sendAttemptAge: ").append(now-_lastSendTime);
buf.append(" sendACKAge: ").append(now-_lastACKSend);
buf.append(" lifetime: ").append(now-_keyEstablishedTime);
buf.append(" RTT: ").append(_rtt);
buf.append(" RTO: ").append(_rto);
buf.append(" cwin: ").append(_sendWindowBytes);
buf.append(" acwin: ").append(_sendWindowBytesRemaining);
buf.append(" SST: ").append(_slowStartThreshold);
buf.append(" FRTX? ").append(_fastRetransmit);
buf.append(" consecFail: ").append(_consecutiveFailedSends);
buf.append(" msgs rcvd: ").append(_messagesReceived);
buf.append(" msgs sent: ").append(_messagesSent);