Merge branch 'wplus-wip' into 'master'

SSU: Westwood+ congestion control (ticket #2427)

See merge request i2p-hackers/i2p.i2p!4
This commit is contained in:
zzz
2020-12-20 13:13:34 +00:00
5 changed files with 291 additions and 90 deletions

View File

@@ -0,0 +1,20 @@
package net.i2p.router.transport.udp;
/**
* A Westwood bandwidth estimator
*
* @since 0.9.49 adapted from streaming
*/
interface BandwidthEstimator {
/**
* Records an arriving ack.
* @param acked how many bytes were acked with this ack
*/
public void addSample(int acked);
/**
* @return the current bandwidth estimate in bytes/ms.
*/
public float getBandwidthEstimate();
}

View File

@@ -170,8 +170,6 @@ class EstablishmentManager {
_context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);

View File

@@ -154,6 +154,23 @@ class OutboundMessageState implements CDPQEntry {
return rv;
}
/**
* @return count of unacked fragments
* @since 0.9.49
*/
public synchronized int getUnackedFragments() {
if (isComplete())
return 0;
if (_numFragments == 1)
return 1;
int rv = 0;
for (int i = 0; i < _numFragments; i++) {
if (needsSending(i))
rv++;
}
return rv;
}
/**
* Is any fragment unsent?
*

View File

@@ -13,6 +13,7 @@ import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.router.OutNetMessage;
@@ -116,10 +117,7 @@ public class PeerState {
private int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */
private int _sendWindowBytesRemaining;
private long _lastSendRefill;
// smoothed value, for display only
private int _sendBps;
private int _sendBytes;
private final BandwidthEstimator _bwEstimator;
// smoothed value, for display only
private int _receiveBps;
private int _receiveBytes;
@@ -223,8 +221,6 @@ public class PeerState {
/** Last time it was made an introducer **/
private long _lastIntroducerTime;
private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
/**
@@ -328,12 +324,8 @@ public class PeerState {
_lastReceiveTime = now;
_currentACKs = new ConcurrentHashSet<Long>();
_currentACKsResend = new LinkedBlockingQueue<ResendACK>();
_sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
_sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
_slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
_lastSendRefill = now;
_receivePeriodBegin = now;
_lastCongestionOccurred = -1;
_remotePort = remotePort;
if (remoteIP.length == 4) {
_mtu = DEFAULT_MTU;
@@ -344,6 +336,13 @@ public class PeerState {
_mtuReceive = MIN_IPV6_MTU;
_largeMTU = transport.getMTU(true);
}
// RFC 5681 sec. 3.1
if (_mtu > 1095)
_sendWindowBytes = 3 * _mtu;
else
_sendWindowBytes = 4 * _mtu;
_sendWindowBytesRemaining = _sendWindowBytes;
_lastACKSend = -1;
_rto = INIT_RTO;
@@ -362,6 +361,7 @@ public class PeerState {
_remotePeer = remotePeer;
_isInbound = isInbound;
_remoteHostId = new RemoteHostId(remoteIP, remotePort);
_bwEstimator = new SimpleBandwidthEstimator(ctx, this);
}
/**
@@ -550,10 +550,10 @@ public class PeerState {
}
/**
* An approximation, for display only
* The Westwood+ bandwidth estimate
* @return the smoothed send transfer rate
*/
public int getSendBps() { return _sendBps; }
public int getSendBps() { return (int) (_bwEstimator.getBandwidthEstimate() * 1000); }
/**
* An approximation, for display only
@@ -588,16 +588,7 @@ public class PeerState {
*
* Caller should synch
*/
private boolean allocateSendingBytes(OutboundMessageState state, long now, boolean resetWindow) {
long duration = now - _lastSendRefill;
if (resetWindow || duration >= 1000) {
_sendWindowBytesRemaining = _sendWindowBytes;
if (duration <= 0)
duration = 10;
_sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
_sendBytes = 0;
_lastSendRefill = now;
}
private boolean allocateSendingBytes(OutboundMessageState state, long now) {
int messagePushCount = state.getPushCount();
if (messagePushCount == 0 && _outboundMessages.size() > _concurrentMessagesAllowed) {
_consecutiveRejections++;
@@ -616,7 +607,6 @@ public class PeerState {
_consecutiveRejections = 0;
}
_sendWindowBytesRemaining -= size;
_sendBytes += size;
_lastSendTime = now;
return true;
} else {
@@ -706,7 +696,6 @@ public class PeerState {
_receiveBps = (int)(0.9f*_receiveBps + 0.1f*(_receiveBytes * (1000f/duration)));
_receiveBytes = 0;
_receivePeriodBegin = now;
_context.statManager().addRateData("udp.receiveBps", _receiveBps);
}
if (_wantACKSendSince <= 0)
@@ -758,28 +747,33 @@ public class PeerState {
* either they told us to back off, or we had to resend to get
* the data through.
* Caller should synch on this
* @return true if window shrunk, but nobody uses the return value
*/
private boolean congestionOccurred() {
private void congestionOccurred() {
long now = _context.clock().now();
if (_lastCongestionOccurred + _rto > now)
return false; // only shrink once every few seconds
return; // only shrink once every few seconds
_lastCongestionOccurred = now;
// 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
// 2. cut ssthresh to bandwidth estimate, window to 1 MTU
// 3. Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
int congestionAt = _sendWindowBytes;
_sendWindowBytes = _sendWindowBytes/2; //(_sendWindowBytes*2) / 3;
if (_sendWindowBytes < MINIMUM_WINDOW_BYTES)
_sendWindowBytes = MINIMUM_WINDOW_BYTES;
_slowStartThreshold = congestionAt/2;
// If we reduced the MTU, then we won't be able to send any previously-fragmented messages,
// so set to the max MTU. This is the easiest fix, although it violates the RFC.
//_sendWindowBytes = _mtu;
_sendWindowBytes = isIPv6() ? MAX_IPV6_MTU : LARGE_MTU;
int oldsst = _slowStartThreshold;
float bwe = _bwEstimator.getBandwidthEstimate();
_slowStartThreshold = Math.max( (int)(bwe * _rtt), 2 * _mtu);
int oldRto = _rto;
long oldTimer = _retransmitTimer - now;
_rto = Math.min(MAX_RTO, Math.max(MIN_RTO, _rto << 1 ));
_retransmitTimer = now + _rto;
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now));
return true;
if (_log.shouldInfo())
_log.info(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now) +
" window: " + congestionAt + " -> " + _sendWindowBytes +
" SST: " + oldsst + " -> " + _slowStartThreshold +
" BWE: " + DataHelper.formatSize2Decimal((long) (bwe * 1000), false) + "bps");
}
/**
@@ -1042,12 +1036,15 @@ public class PeerState {
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
_sendWindowBytesRemaining += bytesACKed;
} else {
float prob = ((float)bytesACKed) / ((float)(_sendWindowBytes<<1));
float v = _context.random().nextFloat();
if (v < 0) v = 0-v;
if (v <= prob)
_sendWindowBytes += bytesACKed; //512; // bytesACKed;
if (v <= prob) {
_sendWindowBytes += bytesACKed;
_sendWindowBytesRemaining += bytesACKed;
}
}
} else {
int allow = _concurrentMessagesAllowed - 1;
@@ -1060,9 +1057,8 @@ public class PeerState {
_lastReceiveTime = _context.clock().now();
_lastSendFullyTime = _lastReceiveTime;
if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
_sendWindowBytesRemaining += bytesACKed;
else
_sendWindowBytesRemaining += bytesACKed;
if (_sendWindowBytesRemaining > _sendWindowBytes)
_sendWindowBytesRemaining = _sendWindowBytes;
if (numSends < 2) {
@@ -1094,10 +1090,9 @@ public class PeerState {
synchronized(this) {
locked_messageACKed(bytesACKed, lifetime, numSends, anyPending, anyQueued);
}
_bwEstimator.addSample(bytesACKed);
if (numSends >= 2 && _log.shouldDebug())
_log.debug(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
_context.statManager().addRateData("udp.sendBps", _sendBps);
}
/** This is the value specified in RFC 2988 */
@@ -1421,17 +1416,37 @@ public class PeerState {
msg.timestamp("sending complete");
}
for (int i = 0; failed != null && i < failed.size(); i++) {
OutboundMessageState state = failed.get(i);
OutNetMessage msg = state.getMessage();
if (msg != null) {
msg.timestamp("expired in the active pool");
_transport.failed(state);
} else {
// it can not have an OutNetMessage if the source is the
// final after establishment message
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send a direct message: " + state + " to: " + this);
if (failed != null) {
int failedSize = 0;
int failedCount = 0;
for (int i = 0; i < failed.size(); i++) {
OutboundMessageState state = failed.get(i);
failedSize += state.getUnackedSize();
failedCount += state.getUnackedFragments();
OutNetMessage msg = state.getMessage();
if (msg != null) {
msg.timestamp("expired in the active pool");
_transport.failed(state);
if (_log.shouldWarn())
_log.warn("Message expired: " + state + " to: " + this);
} else {
// it can not have an OutNetMessage if the source is the
// final after establishment message
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send a direct message: " + state + " to: " + this);
}
if (failedSize > 0) {
// restore the window
synchronized(this) {
// this isn't exactly right, because some fragments may not have been sent at all,
// but that should be unlikely
_sendWindowBytesRemaining += failedSize;
_sendWindowBytesRemaining += failedCount * fragmentOverhead();
if (_sendWindowBytesRemaining > _sendWindowBytes)
_sendWindowBytesRemaining = _sendWindowBytes;
}
// no need to nudge(), this is called from OMF loop before allocateSend()
}
}
}
@@ -1491,29 +1506,17 @@ public class PeerState {
synchronized (_outboundMessages) {
if (canSendOld) {
for (OutboundMessageState state : _outboundMessages) {
boolean should = locked_shouldSend(state, now, true);
if (should) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
if (rv == null)
rv = new ArrayList<OutboundMessageState>((1 + _outboundMessages.size()) / 2);
rv.add(state);
if (rv.size() >= _outboundMessages.size() / 2)
return rv;
} else {
// no more bandwidth available
// we don't bother looking for a smaller msg that would fit.
// By not looking further, we keep strict sending order, and that allows
// some efficiency in acked() below.
if (_log.shouldLog(Log.DEBUG)) {
if (rv == null)
_log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
else
_log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size());
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
if (rv == null) {
rv = new ArrayList<OutboundMessageState>((1 + _outboundMessages.size()) / 2);
_lastSendTime = now;
}
rv.add(state);
// Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
// TODO this is fragments from half the messages... OK as is?
if (rv.size() >= _outboundMessages.size() / 2)
return rv;
}
}
return rv;
} else if (!_outboundMessages.isEmpty()) {
@@ -1521,7 +1524,7 @@ public class PeerState {
for (OutboundMessageState state : _outboundMessages) {
if (!state.hasUnsentFragments())
continue;
boolean should = locked_shouldSend(state, now, false);
boolean should = locked_shouldSend(state, now);
if (should) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending more fragments to " + _remotePeer + ": " + state.getMessageId());
@@ -1548,7 +1551,7 @@ public class PeerState {
OutboundMessageState state;
synchronized (_outboundQueue) {
while ((state = _outboundQueue.peek()) != null &&
locked_shouldSend(state, now, false)) {
locked_shouldSend(state, now)) {
// This is guaranted to be the same as what we got in peek(),
// due to locking and because we aren't using the dropping CDPBQ.
// If we do switch to CDPBQ,
@@ -1642,9 +1645,9 @@ public class PeerState {
/**
* Locks this.
*/
private boolean locked_shouldSend(OutboundMessageState state, long now, boolean resetWindow) {
private boolean locked_shouldSend(OutboundMessageState state, long now) {
synchronized(this) {
if (allocateSendingBytes(state, now, resetWindow)) {
if (allocateSendingBytes(state, now)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " Allocation allowed with "
+ getSendWindowBytesRemaining()
@@ -1773,20 +1776,24 @@ public class PeerState {
_context.statManager().addRateData("udp.partialACKReceived", numACKed);
long lifetime = state.getLifetime();
if (_log.shouldLog(Log.INFO))
_log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
+ " after " + lifetime + " and " + numSends + " sends"
+ " complete? " + isComplete
+ " newly-acked: " + ackedSize
+ ' ' + bitfield
+ " for: " + state);
if (isComplete) {
_context.statManager().addRateData("udp.sendConfirmTime", lifetime);
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
_transport.succeeded(state);
if (_log.shouldDebug())
_log.debug("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
+ " newly-acked: " + ackedSize
+ ", now complete for: " + state);
} else {
if (_log.shouldInfo())
_log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
+ " after " + lifetime + " and " + numSends + " sends"
+ " complete? false"
+ " newly-acked: " + ackedSize
+ ' ' + bitfield
+ " for: " + state);
}
if (ackedSize > 0) {
boolean anyQueued;

View File

@@ -0,0 +1,159 @@
package net.i2p.router.transport.udp;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
* A Westwood+ bandwidth estimator with
* a first stage anti-aliasing low pass filter based on RTT,
* and the time-varying Westwood filter based on inter-arrival time.
*
* Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
* Casetti et al
* (Westwood)
*
* Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
* Grieco and Mascolo
* (Westwood+)
*
* Adapted from: Linux kernel tcp_westwood.c (GPLv2)
*
* @since 0.9.49 adapted from streaming
*/
class SimpleBandwidthEstimator implements BandwidthEstimator {
private final I2PAppContext _context;
private final Log _log;
private final PeerState _state;
private long _tAck;
// bw_est, bw_ns_est
private float _bKFiltered, _bK_ns_est;
// bk
private int _acked;
// As in kernel tcp_westwood.c
// Should probably match ConnectionOptions.TCP_ALPHA
private static final int DECAY_FACTOR = 8;
private static final int WESTWOOD_RTT_MIN = 500;
SimpleBandwidthEstimator(I2PAppContext ctx, PeerState state) {
_log = ctx.logManager().getLog(SimpleBandwidthEstimator.class);
_context = ctx;
_state = state;
// assume we're about to send something
_tAck = ctx.clock().now();
_acked = -1;
}
/**
* Records an arriving ack.
* @param acked how many bytes were acked with this ack
*/
public synchronized void addSample(int acked) {
long now = _context.clock().now();
if (_acked < 0) {
// first sample
// use time since constructed as the RTT
// getRTT() would return zero here.
long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
float bkdt = ((float) acked) / deltaT;
_bKFiltered = bkdt;
_bK_ns_est = bkdt;
_acked = 0;
_tAck = now;
if (_log.shouldDebug())
_log.debug("first sample packets: " + acked + " deltaT: " + deltaT + ' ' + this);
} else {
_acked += acked;
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
if (now - _tAck >= Math.max(_state.getRTT(), WESTWOOD_RTT_MIN))
computeBWE(now);
}
}
/**
* @return the current bandwidth estimate in bytes/ms.
*/
public synchronized float getBandwidthEstimate() {
long now = _context.clock().now();
// anti-aliasing filter
// As in kernel tcp_westwood.c
// and the Westwood+ paper
if (now - _tAck >= Math.max(_state.getRTT(), WESTWOOD_RTT_MIN))
return computeBWE(now);
return _bKFiltered;
}
private synchronized float computeBWE(final long now) {
if (_acked < 0)
return 0.0f; // nothing ever sampled
updateBK(now, _acked);
_acked = 0;
return _bKFiltered;
}
/**
* Optimized version of updateBK with packets == 0
*/
private void decay() {
_bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
}
/**
* Here we insert virtual null samples if necessary as in Westwood,
* And use a very simple EWMA (exponential weighted moving average)
* time-varying filter, as in kernel tcp_westwood.c
*
* @param time the time of the measurement
* @param packets number of packets acked
*/
private void updateBK(long time, int packets) {
long deltaT = time - _tAck;
int rtt = Math.max(_state.getRTT(), WESTWOOD_RTT_MIN);
if (deltaT > 2 * rtt) {
// Decay with virtual null samples as in the Westwood paper
int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
for (int i = 0; i < numrtts; i++) {
decay();
}
deltaT -= numrtts * rtt;
if (_log.shouldDebug())
_log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
float bkdt;
if (packets > 0) {
// As in kernel tcp_westwood.c
bkdt = ((float) packets) / deltaT;
_bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
_bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
} else {
bkdt = 0;
decay();
}
_tAck = time;
if (_log.shouldDebug())
_log.debug("computeBWE packets: " + packets + " deltaT: " + deltaT +
" bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
}
/**
* As in kernel tcp_westwood.c
*/
private static float westwood_do_filter(float a, float b) {
return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
}
@Override
public synchronized String toString() {
return "SBE[" +
" _bKFiltered " + _bKFiltered +
" _tAck " + _tAck + "; " +
DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000 * _state.getMTU()), false) +
"Bps]";
}
}