introduce a lock just for _sendWindowBytesRemaining field

This commit is contained in:
Zlatin Balevsky
2021-09-17 17:31:45 +01:00
parent 67fea26638
commit 3355daa334

View File

@@ -121,6 +121,7 @@ public class PeerState {
private int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */
private int _sendWindowBytesRemaining;
private final Object _sendWindowBytesRemainingLock = new Object();
private final BandwidthEstimator _bwEstimator;
// smoothed value, for display only
private int _receiveBps;
@@ -452,7 +453,7 @@ public class PeerState {
/** how many bytes can we send to the peer in the current second */
public int getSendWindowBytesRemaining() {
synchronized(_outboundMessages) {
synchronized(_sendWindowBytesRemainingLock) {
return _sendWindowBytesRemaining;
}
}
@@ -606,10 +607,11 @@ public class PeerState {
_context.statManager().addRateData("udp.rejectConcurrentActive", _outboundMessages.size(), _consecutiveRejections);
return false;
}
if (_sendWindowBytesRemaining <= fragmentOverhead())
final int sendRemaining = getSendWindowBytesRemaining();
if (sendRemaining <= fragmentOverhead())
return false;
int size = state.getSendSize(_sendWindowBytesRemaining);
int size = state.getSendSize(sendRemaining);
if (size > 0) {
if (messagePushCount == 0) {
_context.statManager().addRateData("udp.allowConcurrentActive", _outboundMessages.size(), _concurrentMessagesAllowed);
@@ -617,7 +619,9 @@ public class PeerState {
_context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _outboundMessages.size());
_consecutiveRejections = 0;
}
_sendWindowBytesRemaining -= size;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining -= size;
}
_lastSendTime = now;
return true;
} else {
@@ -1053,14 +1057,18 @@ public class PeerState {
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
_sendWindowBytesRemaining += bytesACKed;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining += bytesACKed;
}
} else {
float prob = ((float)bytesACKed) / ((float)(_sendWindowBytes<<1));
float v = _context.random().nextFloat();
if (v < 0) v = 0-v;
if (v <= prob) {
_sendWindowBytes += bytesACKed;
_sendWindowBytesRemaining += bytesACKed;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining += bytesACKed;
}
}
}
} else {
@@ -1074,9 +1082,11 @@ public class PeerState {
_lastReceiveTime = _context.clock().now();
_lastSendFullyTime = _lastReceiveTime;
_sendWindowBytesRemaining += bytesACKed;
if (_sendWindowBytesRemaining > _sendWindowBytes)
_sendWindowBytesRemaining = _sendWindowBytes;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining += bytesACKed;
if (_sendWindowBytesRemaining > _sendWindowBytes)
_sendWindowBytesRemaining = _sendWindowBytes;
}
if (numSends < 2) {
// caller synchs
@@ -1458,7 +1468,7 @@ public class PeerState {
}
if (failedSize > 0) {
// restore the window
synchronized(this) {
synchronized(_sendWindowBytesRemainingLock) {
// this isn't exactly right, because some fragments may not have been sent at all,
// but that should be unlikely
_sendWindowBytesRemaining += failedSize;
@@ -1935,7 +1945,9 @@ public class PeerState {
if (continueFast) {
// RFC 5681 sec. 3.2 #4 increase cwnd
_sendWindowBytes += _mtu;
_sendWindowBytesRemaining += _mtu;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining += _mtu;
}
if (_log.shouldDebug())
_log.debug("Continue FAST RTX, inflated window: " + this);
} else if (startFast) {
@@ -1945,7 +1957,9 @@ public class PeerState {
_slowStartThreshold = Math.max((int)(bwe * _rtt), 2 * _mtu);
// RFC 5681 sec. 3.2 #3 set cwnd
_sendWindowBytes = _slowStartThreshold + (3 * _mtu);
_sendWindowBytesRemaining = _sendWindowBytes;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining = _sendWindowBytes;
}
if (_log.shouldDebug())
_log.debug("Start of FAST RTX, inflated window: " + this);
}
@@ -1973,7 +1987,9 @@ public class PeerState {
synchronized(this) {
// RFC 5681 sec. 2.4 #6 deflate the window
_sendWindowBytes = _slowStartThreshold;
_sendWindowBytesRemaining = _sendWindowBytes;
synchronized(_sendWindowBytesRemainingLock) {
_sendWindowBytesRemaining = _sendWindowBytes;
}
}
if (_log.shouldDebug())
_log.debug("End of FAST RTX, deflated window: " + this);