From 634802c008c8e39c45c569ac02e760a6e434c9b2 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 30 Apr 2020 12:44:55 +0000 Subject: [PATCH] Streaming: Westwood+ congestion control (ticket #2719) Increase max slow start window to 64 Change RTT calculations from double to float Original idea from jogger Original patch from zlatinb Developed and tested with zlatinb --- .../streaming/impl/BandwidthEstimator.java | 20 +++ .../i2p/client/streaming/impl/Connection.java | 19 ++- .../streaming/impl/ConnectionOptions.java | 15 +- .../impl/ConnectionPacketHandler.java | 2 +- .../impl/SimpleBandwidthEstimator.java | 159 ++++++++++++++++++ history.txt | 18 ++ .../src/net/i2p/router/RouterVersion.java | 2 +- 7 files changed, 223 insertions(+), 12 deletions(-) create mode 100644 apps/streaming/java/src/net/i2p/client/streaming/impl/BandwidthEstimator.java create mode 100644 apps/streaming/java/src/net/i2p/client/streaming/impl/SimpleBandwidthEstimator.java diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/BandwidthEstimator.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/BandwidthEstimator.java new file mode 100644 index 000000000..b0fb51ce0 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/BandwidthEstimator.java @@ -0,0 +1,20 @@ +package net.i2p.client.streaming.impl; + +/** + * A Westwood bandwidth estimator + * + * @since 0.9.46 + */ +interface BandwidthEstimator { + + /** + * Records an arriving ack. + * @param acked how many packets were acked with this ack + */ + public void addSample(int acked); + + /** + * @return the current bandwidth estimate in packets/ms. + */ + public float getBandwidthEstimate(); +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 558f31c15..16bc997b3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -89,6 +89,7 @@ class Connection { private final int _localPort; private final int _remotePort; private final SimpleTimer2 _timer; + private final BandwidthEstimator _bwEstimator; private final AtomicLong _lifetimeBytesSent = new AtomicLong(); /** TBD for tcpdump-compatible ack output */ @@ -114,7 +115,7 @@ class Connection { /** Maximum number of packets to retransmit when the timer hits */ private static final int MAX_RTX = 16; - + /**** public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { @@ -170,6 +171,7 @@ class Connection { _nextSendLock = new Object(); _connectionEvent = new ConEvent(); _retransmitEvent = new RetransmitEvent(); + _bwEstimator = new SimpleBandwidthEstimator(ctx, _options); _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage // all createRateStats in ConnectionManager if (_log.shouldLog(Log.INFO)) @@ -579,6 +581,7 @@ class Connection { } if ((acked != null) && (!acked.isEmpty()) ) { _ackSinceCongestion.set(true); + _bwEstimator.addSample(acked.size()); if (anyLeft) { // RFC 6298 section 5.3 int rto = getOptions().getRTO(); @@ -1417,6 +1420,7 @@ class Connection { buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); buf.append(" ackThru ").append(_highestAckedThrough); buf.append(" ssThresh ").append(_ssthresh); + buf.append(" minRTT ").append(getOptions().getMinRTT()); buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize()); @@ -1477,7 +1481,7 @@ class Connection { final long now = _context.clock().now(); pushBackRTO(getOptions().doubleRTO()); - // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4) + // 2. cut ssthresh to bandwidth estimate, window to 1 List toResend = null; synchronized(_outboundPackets) { if (_outboundPackets.isEmpty()) { @@ -1490,8 +1494,8 @@ class Connection { if (oldest.getNumSends() == 1) { if (_log.shouldLog(Log.DEBUG)) _log.debug(Connection.this + " cutting ssthresh and window"); - int flightSize = _outboundPackets.size(); - _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 )); + _ssthresh = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * getOptions().getMinRTT()), 2 ); + _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh); getOptions().setWindowSize(1); } else if (_log.shouldLog(Log.DEBUG)) _log.debug(Connection.this + " not cutting ssthresh and window"); @@ -1693,11 +1697,12 @@ class Connection { // This prevents being stuck at a window size of 1, retransmitting every packet, // never updating the RTT or RTO. getOptions().doubleRTO(); - getOptions().setWindowSize(1); if (_packet.getNumSends() == 1) { - int flightSize = getUnackedPacketsSent(); - _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 )); + _ssthresh = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * getOptions().getMinRTT()), 2 ); + _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh); + int wsize = getOptions().getWindowSize(); + getOptions().setWindowSize(Math.min(_ssthresh, wsize)); } if (_log.shouldLog(Log.INFO)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java index 4d9ae6ef5..a32592b2e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java @@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private int _receiveWindow; private int _profile; private int _rtt; + private int _minRtt = Integer.MAX_VALUE; private int _rttDev; private int _rto = INITIAL_RTO; private int _resendDelay; @@ -83,9 +84,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * These values are specified in RFC 6298 * Do not change unless you know what you're doing */ - private static final double TCP_ALPHA = 1.0/8; - private static final double TCP_BETA = 1.0/4; - private static final double TCP_KAPPA = 4; + private static final float TCP_ALPHA = 1.0f/8; + private static final float TCP_BETA = 1.0f/4; + private static final float TCP_KAPPA = 4; private static final String PROP_INITIAL_RTO = "i2p.streaming.initialRTO"; private static final int INITIAL_RTO = 9000; @@ -578,6 +579,13 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public synchronized int getRTT() { return _rtt; } + /** + * @return minimum RTT observed over the life of the connection, greater than zero + * @since 0.9.46 + */ + public synchronized int getMinRTT() {return _minRtt; } + + /** * not public, use updateRTT() */ @@ -677,6 +685,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * @param measuredValue must be positive */ public synchronized void updateRTT(int measuredValue) { + _minRtt = Math.min(_minRtt, measuredValue); switch(_initState) { case INIT: _initState = AckInit.FIRST; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java index dee04479b..4cf909d6a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java @@ -28,7 +28,7 @@ class ConnectionPacketHandler { private final Log _log; private final ByteCache _cache = ByteCache.getInstance(32, 4*1024); - public static final int MAX_SLOW_START_WINDOW = 24; + public static final int MAX_SLOW_START_WINDOW = 64; // see tickets 1939 and 2584 private static final int IMMEDIATE_ACK_DELAY = 150; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/SimpleBandwidthEstimator.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/SimpleBandwidthEstimator.java new file mode 100644 index 000000000..bcc1ccc45 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/SimpleBandwidthEstimator.java @@ -0,0 +1,159 @@ +package net.i2p.client.streaming.impl; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.util.Log; + +/** + * A Westwood+ bandwidth estimator with + * a first stage anti-aliasing low pass filter based on RTT, + * and the time-varying Westwood filter based on inter-arrival time. + * + * Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks + * Casetti et al + * (Westwood) + * + * Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks + * Grieco and Mascolo + * (Westwood+) + * + * Adapted from: Linux kernel tcp_westwood.c (GPLv2) + * + * @since 0.9.46 + */ +class SimpleBandwidthEstimator implements BandwidthEstimator { + + private final I2PAppContext _context; + private final Log _log; + private final ConnectionOptions _opts; + + private long _tAck; + // bw_est, bw_ns_est + private float _bKFiltered, _bK_ns_est; + // bk + private int _acked; + + // As in kernel tcp_westwood.c + // Should probably match ConnectionOptions.TCP_ALPHA + private static final int DECAY_FACTOR = 8; + private static final int WESTWOOD_RTT_MIN = 500; + + SimpleBandwidthEstimator(I2PAppContext ctx, ConnectionOptions opts) { + _log = ctx.logManager().getLog(SimpleBandwidthEstimator.class); + _context = ctx; + _opts = opts; + // assume we're about to send something + _tAck = ctx.clock().now(); + _acked = -1; + } + + /** + * Records an arriving ack. + * @param acked how many packets were acked with this ack + */ + public synchronized void addSample(int acked) { + long now = _context.clock().now(); + if (_acked < 0) { + // first sample + // use time since constructed as the RTT + // getRTT() would return zero here. + long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN); + float bkdt = ((float) acked) / deltaT; + _bKFiltered = bkdt; + _bK_ns_est = bkdt; + _acked = 0; + _tAck = now; + if (_log.shouldDebug()) + _log.debug("first sample packets: " + acked + " deltaT: " + deltaT + ' ' + this); + } else { + _acked += acked; + // anti-aliasing filter + // As in kernel tcp_westwood.c + // and the Westwood+ paper + if (now - _tAck >= Math.max(_opts.getRTT(), WESTWOOD_RTT_MIN)) + computeBWE(now); + } + } + + /** + * @return the current bandwidth estimate in packets/ms. + */ + public synchronized float getBandwidthEstimate() { + long now = _context.clock().now(); + // anti-aliasing filter + // As in kernel tcp_westwood.c + // and the Westwood+ paper + if (now - _tAck >= Math.max(_opts.getRTT(), WESTWOOD_RTT_MIN)) + return computeBWE(now); + return _bKFiltered; + } + + private synchronized float computeBWE(final long now) { + if (_acked < 0) + return 0.0f; // nothing ever sampled + updateBK(now, _acked); + _acked = 0; + return _bKFiltered; + } + + /** + * Optimized version of updateBK with packets == 0 + */ + private void decay() { + _bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR; + _bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est); + } + + /** + * Here we insert virtual null samples if necessary as in Westwood, + * And use a very simple EWMA (exponential weighted moving average) + * time-varying filter, as in kernel tcp_westwood.c + * + * @param time the time of the measurement + * @param packets number of packets acked + */ + private void updateBK(long time, int packets) { + long deltaT = time - _tAck; + int rtt = Math.max(_opts.getRTT(), WESTWOOD_RTT_MIN); + if (deltaT > 2 * rtt) { + // Decay with virtual null samples as in the Westwood paper + int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR); + for (int i = 0; i < numrtts; i++) { + decay(); + } + deltaT -= numrtts * rtt; + if (_log.shouldDebug()) + _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this); + } + float bkdt; + if (packets > 0) { + // As in kernel tcp_westwood.c + bkdt = ((float) packets) / deltaT; + _bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt); + _bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est); + } else { + bkdt = 0; + decay(); + } + _tAck = time; + if (_log.shouldDebug()) + _log.debug("computeBWE packets: " + packets + " deltaT: " + deltaT + + " bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this); + } + + /** + * As in kernel tcp_westwood.c + */ + private static float westwood_do_filter(float a, float b) { + return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR; + } + + @Override + public synchronized String toString() { + return "SBE[" + + " _bKFiltered " + _bKFiltered + + " _tAck " + _tAck + "; " + + DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000 * _opts.getMaxMessageSize()), false) + + "Bps]"; + } +} diff --git a/history.txt b/history.txt index cf46c9d02..70c0756ca 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,21 @@ +2020-04-30 zzz + * Ratchet: Error handling fixes + * Streaming: Westwood+ congestion control (ticket #2719) + +2020-04-29 zzz + * Ratchet: Increase callback timeout + * Router: Replace old data structure classes + +2020-04-27 zzz + * Crypto: Reduce max ElG tagset expiration at receiver + * i2psnark: + - Don't unchoke when we don't have pieces + - Don't avoid partial pieces if there are several seeds + +2020-04-25 zzz + * i2psnark: Increase min size for video preview + * Ratchet: Variable tagset lookahead/trim limits + 2020-04-24 zzz * i2psnark: Don't mark torrent BAD on I2CP errors (ticket #2725) * Logging: Log to wrapper log after log manager shutdown (ticket #2725) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 7f4edb975..31c35f8c3 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 13; + public final static long BUILD = 14; /** for example "-test" */ public final static String EXTRA = "";