From 3355daa33437a659208d457b3463528182c77e69 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 17 Sep 2021 17:31:45 +0100 Subject: [PATCH 1/2] introduce a lock just for _sendWindowBytesRemaining field --- .../i2p/router/transport/udp/PeerState.java | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 115960ec0..6b3aa142f 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -121,6 +121,7 @@ public class PeerState { private int _sendWindowBytes; /** how many bytes can we send to the peer in the current second */ private int _sendWindowBytesRemaining; + private final Object _sendWindowBytesRemainingLock = new Object(); private final BandwidthEstimator _bwEstimator; // smoothed value, for display only private int _receiveBps; @@ -452,7 +453,7 @@ public class PeerState { /** how many bytes can we send to the peer in the current second */ public int getSendWindowBytesRemaining() { - synchronized(_outboundMessages) { + synchronized(_sendWindowBytesRemainingLock) { return _sendWindowBytesRemaining; } } @@ -606,10 +607,11 @@ public class PeerState { _context.statManager().addRateData("udp.rejectConcurrentActive", _outboundMessages.size(), _consecutiveRejections); return false; } - if (_sendWindowBytesRemaining <= fragmentOverhead()) + final int sendRemaining = getSendWindowBytesRemaining(); + if (sendRemaining <= fragmentOverhead()) return false; - int size = state.getSendSize(_sendWindowBytesRemaining); + int size = state.getSendSize(sendRemaining); if (size > 0) { if (messagePushCount == 0) { _context.statManager().addRateData("udp.allowConcurrentActive", _outboundMessages.size(), _concurrentMessagesAllowed); @@ -617,7 +619,9 @@ public class PeerState { _context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _outboundMessages.size()); _consecutiveRejections = 0; } - _sendWindowBytesRemaining -= size; + synchronized(_sendWindowBytesRemainingLock) { + _sendWindowBytesRemaining -= size; + } _lastSendTime = now; return true; } else { @@ -1053,14 +1057,18 @@ public class PeerState { if (_sendWindowBytes <= _slowStartThreshold) { _sendWindowBytes += bytesACKed; - _sendWindowBytesRemaining += bytesACKed; + synchronized(_sendWindowBytesRemainingLock) { + _sendWindowBytesRemaining += bytesACKed; + } } else { float prob = ((float)bytesACKed) / ((float)(_sendWindowBytes<<1)); float v = _context.random().nextFloat(); if (v < 0) v = 0-v; if (v <= prob) { _sendWindowBytes += bytesACKed; - _sendWindowBytesRemaining += bytesACKed; + synchronized(_sendWindowBytesRemainingLock) { + _sendWindowBytesRemaining += bytesACKed; + } } } } else { @@ -1074,9 +1082,11 @@ public class PeerState { _lastReceiveTime = _context.clock().now(); _lastSendFullyTime = _lastReceiveTime; - _sendWindowBytesRemaining += bytesACKed; - if (_sendWindowBytesRemaining > _sendWindowBytes) - _sendWindowBytesRemaining = _sendWindowBytes; + synchronized(_sendWindowBytesRemainingLock) { + _sendWindowBytesRemaining += bytesACKed; + if (_sendWindowBytesRemaining > _sendWindowBytes) + _sendWindowBytesRemaining = _sendWindowBytes; + } if (numSends < 2) { // caller synchs @@ -1458,7 +1468,7 @@ public class PeerState { } if (failedSize > 0) { // restore the window - synchronized(this) { + synchronized(_sendWindowBytesRemainingLock) { // this isn't exactly right, because some fragments may not have been sent at all, // but that should be unlikely _sendWindowBytesRemaining += failedSize; @@ -1935,7 +1945,9 @@ public class PeerState { if (continueFast) { // RFC 5681 sec. 3.2 #4 increase cwnd _sendWindowBytes += _mtu; - _sendWindowBytesRemaining += _mtu; + synchronized(_sendWindowBytesRemainingLock) { + _sendWindowBytesRemaining += _mtu; + } if (_log.shouldDebug()) _log.debug("Continue FAST RTX, inflated window: " + this); } else if (startFast) { @@ -1945,7 +1957,9 @@ public class PeerState { _slowStartThreshold = Math.max((int)(bwe * _rtt), 2 * _mtu); // RFC 5681 sec. 3.2 #3 set cwnd _sendWindowBytes = _slowStartThreshold + (3 * _mtu); - _sendWindowBytesRemaining = _sendWindowBytes; + synchronized(_sendWindowBytesRemainingLock) { + _sendWindowBytesRemaining = _sendWindowBytes; + } if (_log.shouldDebug()) _log.debug("Start of FAST RTX, inflated window: " + this); } @@ -1973,7 +1987,9 @@ public class PeerState { synchronized(this) { // RFC 5681 sec. 2.4 #6 deflate the window _sendWindowBytes = _slowStartThreshold; - _sendWindowBytesRemaining = _sendWindowBytes; + synchronized(_sendWindowBytesRemainingLock) { + _sendWindowBytesRemaining = _sendWindowBytes; + } } if (_log.shouldDebug()) _log.debug("End of FAST RTX, deflated window: " + this); From f14b7d53a3a7dd193987a80ce09a688a35277376 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 17 Sep 2021 17:32:37 +0100 Subject: [PATCH 2/2] reduce the delay in ACKs to the minimum of rtt/2 and the constant --- router/java/src/net/i2p/router/transport/udp/ACKSender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 62e1e0d60..8d806f74d 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -78,7 +78,7 @@ class ACKSender implements Runnable { // the unsentACKThreshold to figure out when to send an ACK instead of // using the timer, so we can set the timeout/frequency higher if (timeSinceACK < 2*1000) - return Math.max(rtt/2, ACK_FREQUENCY); + return Math.min(rtt/2, ACK_FREQUENCY); else return ACK_FREQUENCY; }