From 542efa0d9a19701bffdd6ee7a910a8155454f393 Mon Sep 17 00:00:00 2001
From: zzz <zzz@i2pmail.org>
Date: Fri, 18 Dec 2020 14:46:26 -0500
Subject: [PATCH] SSU: Westwood+ congestion control (ticket #2427)

Reduce initial window to match RFC
Add back to window on message failure
Remove two bps stats
log tweaks
---
 .../transport/udp/EstablishmentManager.java   |   2 -
 .../transport/udp/OutboundMessageState.java   |  17 ++
 .../i2p/router/transport/udp/PeerState.java   | 176 +++++++++---------
 3 files changed, 107 insertions(+), 88 deletions(-)

diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
index 1393eb7f4a..27a331cc38 100644
--- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
+++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
@@ -170,8 +170,6 @@ class EstablishmentManager {
         _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
-        _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
-        _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
index 9b6c7aeb2e..4d76978e1f 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
@@ -154,6 +154,23 @@ class OutboundMessageState implements CDPQEntry {
         return rv;
     }
 
+    /**
+     * @return count of unacked fragments
+     * @since 0.9.49
+     */
+    public synchronized int getUnackedFragments() {
+        if (isComplete())
+            return 0;
+        if (_numFragments == 1)
+            return 1;
+        int rv = 0;
+        for (int i = 0; i < _numFragments; i++) {
+            if (needsSending(i))
+                rv++;
+        }
+        return rv;
+    }
+
     /**
      *  Is any fragment unsent?
      *
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index ecfb93d543..98411efc79 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -13,6 +13,7 @@ import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import net.i2p.data.DataHelper;
 import net.i2p.data.Hash;
 import net.i2p.data.SessionKey;
 import net.i2p.router.OutNetMessage;
@@ -116,10 +117,7 @@ public class PeerState {
     private int _sendWindowBytes;
     /** how many bytes can we send to the peer in the current second */
     private int _sendWindowBytesRemaining;
-    private long _lastSendRefill;
-    // smoothed value, for display only
-    private int _sendBps;
-    private int _sendBytes;
+    private final BandwidthEstimator _bwEstimator;
     // smoothed value, for display only
     private int _receiveBps;
     private int _receiveBytes;
@@ -223,8 +221,6 @@ public class PeerState {
     /** Last time it was made an introducer **/
     private long _lastIntroducerTime;
 
-    private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
-    private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
     private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
 
     /**
@@ -328,12 +324,8 @@ public class PeerState {
         _lastReceiveTime = now;
         _currentACKs = new ConcurrentHashSet<Long>();
         _currentACKsResend = new LinkedBlockingQueue<ResendACK>();
-        _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
-        _sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
         _slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
-        _lastSendRefill = now;
         _receivePeriodBegin = now;
-        _lastCongestionOccurred = -1;
         _remotePort = remotePort;
         if (remoteIP.length == 4) {
             _mtu = DEFAULT_MTU;
@@ -344,6 +336,13 @@ public class PeerState {
             _mtuReceive = MIN_IPV6_MTU;
             _largeMTU = transport.getMTU(true);
         }
+        // RFC 5681 sec. 3.1
+        if (_mtu > 1095)
+            _sendWindowBytes = 3 * _mtu;
+        else
+            _sendWindowBytes = 4 * _mtu;
+        _sendWindowBytesRemaining = _sendWindowBytes;
+
         _lastACKSend = -1;
 
         _rto = INIT_RTO;
@@ -362,6 +361,7 @@ public class PeerState {
         _remotePeer = remotePeer;
         _isInbound = isInbound;
         _remoteHostId = new RemoteHostId(remoteIP, remotePort);
+        _bwEstimator = new SimpleBandwidthEstimator(ctx, this);
     }
     
     /** 
@@ -550,10 +550,10 @@ public class PeerState {
     }
 
     /**
-     * An approximation, for display only
+     * The Westwood+ bandwidth estimate
      * @return the smoothed send transfer rate
      */
-    public int getSendBps() { return _sendBps; }
+    public int getSendBps() { return (int) (_bwEstimator.getBandwidthEstimate() * 1000); }
 
     /**
      * An approximation, for display only
@@ -588,16 +588,7 @@ public class PeerState {
      *
      *  Caller should synch
      */
-    private boolean allocateSendingBytes(OutboundMessageState state, long now, boolean resetWindow) {
-        long duration = now - _lastSendRefill;
-        if (resetWindow || duration >= 1000) {
-            _sendWindowBytesRemaining = _sendWindowBytes;
-            if (duration <= 0)
-                duration = 10;
-            _sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
-            _sendBytes = 0;
-            _lastSendRefill = now;
-        }
+    private boolean allocateSendingBytes(OutboundMessageState state, long now) {
         int messagePushCount = state.getPushCount();
         if (messagePushCount == 0 && _outboundMessages.size() > _concurrentMessagesAllowed) {
             _consecutiveRejections++;
@@ -616,7 +607,6 @@ public class PeerState {
                 _consecutiveRejections = 0;
             }
             _sendWindowBytesRemaining -= size; 
-            _sendBytes += size;
             _lastSendTime = now;
             return true;
         } else {
@@ -706,7 +696,6 @@ public class PeerState {
             _receiveBps = (int)(0.9f*_receiveBps + 0.1f*(_receiveBytes * (1000f/duration)));
             _receiveBytes = 0;
             _receivePeriodBegin = now;
-           _context.statManager().addRateData("udp.receiveBps", _receiveBps);
         }
         
         if (_wantACKSendSince <= 0)
@@ -758,28 +747,33 @@ public class PeerState {
      * either they told us to back off, or we had to resend to get 
      * the data through.  
      *  Caller should synch on this
-     *  @return true if window shrunk, but nobody uses the return value
      */
-    private boolean congestionOccurred() {
+    private void congestionOccurred() {
         long now = _context.clock().now();
         if (_lastCongestionOccurred + _rto > now)
-            return false; // only shrink once every few seconds
+            return; // only shrink once every few seconds
         _lastCongestionOccurred = now;
-        
+        // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+        // 2. cut ssthresh to bandwidth estimate, window to 1 MTU
+        // 3. Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
         int congestionAt = _sendWindowBytes;
-        _sendWindowBytes = _sendWindowBytes/2; //(_sendWindowBytes*2) / 3;
-        if (_sendWindowBytes < MINIMUM_WINDOW_BYTES)
-            _sendWindowBytes = MINIMUM_WINDOW_BYTES;
-        _slowStartThreshold = congestionAt/2;
+        // If we reduced the MTU, then we won't be able to send any previously-fragmented messages,
+        // so set to the max MTU. This is the easiest fix, although it violates the RFC.
+        //_sendWindowBytes = _mtu;
+        _sendWindowBytes = isIPv6() ? MAX_IPV6_MTU : LARGE_MTU;
+        int oldsst = _slowStartThreshold;
+        float bwe = _bwEstimator.getBandwidthEstimate();
+        _slowStartThreshold = Math.max( (int)(bwe * _rtt), 2 * _mtu);
 
         int oldRto = _rto;
         long oldTimer = _retransmitTimer - now;
         _rto = Math.min(MAX_RTO, Math.max(MIN_RTO, _rto << 1 ));
         _retransmitTimer = now + _rto;
-        if (_log.shouldLog(Log.DEBUG))
-            _log.debug(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now));
-
-        return true;
+        if (_log.shouldInfo())
+            _log.info(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now) +
+                                    " window: " + congestionAt + " -> " + _sendWindowBytes +
+                                    " SST: " + oldsst + " -> " + _slowStartThreshold +
+                                    " BWE: " + DataHelper.formatSize2Decimal((long) (bwe * 1000)) + "bps");
     }
     
     /**
@@ -1060,9 +1054,8 @@ public class PeerState {
         _lastReceiveTime = _context.clock().now();
         _lastSendFullyTime = _lastReceiveTime;
         
-        if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
-            _sendWindowBytesRemaining += bytesACKed;
-        else
+        _sendWindowBytesRemaining += bytesACKed;
+        if (_sendWindowBytesRemaining > _sendWindowBytes)
             _sendWindowBytesRemaining = _sendWindowBytes;
         
         if (numSends < 2) {
@@ -1094,10 +1087,9 @@ public class PeerState {
         synchronized(this) {
             locked_messageACKed(bytesACKed, lifetime, numSends, anyPending, anyQueued);
         }
+        _bwEstimator.addSample(bytesACKed);
         if (numSends >= 2 && _log.shouldDebug())
             _log.debug(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
-        
-        _context.statManager().addRateData("udp.sendBps", _sendBps);
     }
 
     /** This is the value specified in RFC 2988 */
@@ -1421,17 +1413,37 @@ public class PeerState {
                 msg.timestamp("sending complete");
         }
         
-        for (int i = 0; failed != null && i < failed.size(); i++) {
-            OutboundMessageState state = failed.get(i);
-            OutNetMessage msg = state.getMessage();
-            if (msg != null) {
-                msg.timestamp("expired in the active pool");
-                _transport.failed(state);
-            } else {
-                // it can not have an OutNetMessage if the source is the
-                // final after establishment message
-                if (_log.shouldLog(Log.WARN))
-                    _log.warn("Unable to send a direct message: " + state + " to: " + this);
+        if (failed != null) {
+            int failedSize = 0;
+            int failedCount = 0;
+            for (int i = 0; i < failed.size(); i++) {
+                OutboundMessageState state = failed.get(i);
+                failedSize += state.getUnackedSize();
+                failedCount += state.getUnackedFragments();
+                OutNetMessage msg = state.getMessage();
+                if (msg != null) {
+                    msg.timestamp("expired in the active pool");
+                    _transport.failed(state);
+                    if (_log.shouldWarn())
+                        _log.warn("Message expired: " + state + " to: " + this);
+                } else {
+                    // it can not have an OutNetMessage if the source is the
+                    // final after establishment message
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn("Unable to send a direct message: " + state + " to: " + this);
+                }
+                if (failedSize > 0) {
+                    // restore the window
+                    synchronized(this) {
+                        // this isn't exactly right, because some fragments may not have been sent at all,
+                        // but that should be unlikely
+                        _sendWindowBytesRemaining += failedSize;
+                        _sendWindowBytesRemaining += failedCount * fragmentOverhead();
+                        if (_sendWindowBytesRemaining > _sendWindowBytes)
+                            _sendWindowBytesRemaining = _sendWindowBytes;
+                    }
+                    // no need to nudge(), this is called from OMF loop before allocateSend()
+                }
             }
         }
         
@@ -1491,29 +1503,17 @@ public class PeerState {
         synchronized (_outboundMessages) {
             if (canSendOld) {
                 for (OutboundMessageState state : _outboundMessages) {
-                    boolean should = locked_shouldSend(state, now, true);
-                    if (should) {
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
-                        if (rv == null)
-                            rv = new ArrayList<OutboundMessageState>((1 + _outboundMessages.size()) / 2);
-                        rv.add(state);
-                        if (rv.size() >= _outboundMessages.size() / 2)
-                            return rv;
-                    } else {
-                        // no more bandwidth available
-                        // we don't bother looking for a smaller msg that would fit.
-                        // By not looking further, we keep strict sending order, and that allows
-                        // some efficiency in acked() below.
-                        if (_log.shouldLog(Log.DEBUG)) {
-                            if (rv == null)
-                                _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
-                                       " / " + _outboundQueue.size() + " remaining");
-                            else 
-                               _log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size());
-                        }
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
+                    if (rv == null) {
+                        rv = new ArrayList<OutboundMessageState>((1 + _outboundMessages.size()) / 2);
+                        _lastSendTime = now;
+                    }
+                    rv.add(state);
+                    // Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+                    // TODO this is fragments from half the messages... OK as is?
+                    if (rv.size() >= _outboundMessages.size() / 2)
                         return rv;
-                    } 
                 }
                 return rv;
             } else if (!_outboundMessages.isEmpty()) {
@@ -1521,7 +1521,7 @@ public class PeerState {
                 for (OutboundMessageState state : _outboundMessages) {
                     if (!state.hasUnsentFragments())
                         continue;
-                    boolean should = locked_shouldSend(state, now, false);
+                    boolean should = locked_shouldSend(state, now);
                     if (should) {
                         if (_log.shouldLog(Log.DEBUG))
                             _log.debug("Allocate sending more fragments to " + _remotePeer + ": " + state.getMessageId());
@@ -1548,7 +1548,7 @@ public class PeerState {
             OutboundMessageState state;
             synchronized (_outboundQueue) {
                 while ((state = _outboundQueue.peek()) != null &&
-                       locked_shouldSend(state, now, false)) {
+                       locked_shouldSend(state, now)) {
                     // This is guaranted to be the same as what we got in peek(),
                     // due to locking and because we aren't using the dropping CDPBQ.
                     // If we do switch to CDPBQ,
@@ -1642,9 +1642,9 @@ public class PeerState {
     /**
      *  Locks this.
      */
-    private boolean locked_shouldSend(OutboundMessageState state, long now, boolean resetWindow) {
+    private boolean locked_shouldSend(OutboundMessageState state, long now) {
         synchronized(this) {
-            if (allocateSendingBytes(state, now, resetWindow)) {
+            if (allocateSendingBytes(state, now)) {
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug(_remotePeer + " Allocation allowed with " 
                               + getSendWindowBytesRemaining() 
@@ -1773,20 +1773,24 @@ public class PeerState {
             _context.statManager().addRateData("udp.partialACKReceived", numACKed);
             
             long lifetime = state.getLifetime();
-            if (_log.shouldLog(Log.INFO))
-                _log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
-                          + " after " + lifetime + " and " + numSends + " sends"
-                          + " complete? " + isComplete
-                          + " newly-acked: " + ackedSize
-                          + ' ' + bitfield
-                          + " for: " + state);
-            
             if (isComplete) {
                 _context.statManager().addRateData("udp.sendConfirmTime", lifetime);
                 if (state.getFragmentCount() > 1)
                     _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
                 _context.statManager().addRateData("udp.sendConfirmVolley", numSends);
                 _transport.succeeded(state);
+                if (_log.shouldDebug())
+                    _log.debug("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
+                          + " newly-acked: " + ackedSize
+                          + ", now complete for: " + state);
+            } else {
+                if (_log.shouldInfo())
+                    _log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
+                          + " after " + lifetime + " and " + numSends + " sends"
+                          + " complete? false"
+                          + " newly-acked: " + ackedSize
+                          + ' ' + bitfield
+                          + " for: " + state);
             }
             if (ackedSize > 0) {
                 boolean anyQueued;
-- 
GitLab