From 45c3fa842df5d74447e249fcd3bd77df379dc50a Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 10 Mar 2021 07:21:44 -0500 Subject: [PATCH] Part. tunnel RED test Change BW est. to exponential moving average instead of 40 ms bucket Also use BW est. for tunnel.participatingBandwidthOut stat Comment out linear moving average code previously used for stat Reduce RED threshold from 120% to 95% of limit For testing only, not to be merged --- .../transport/FIFOBandwidthRefiller.java | 21 ++- .../transport/SimpleBandwidthEstimator.java | 168 ++++++++++++++++++ .../i2p/router/tunnel/TunnelDispatcher.java | 2 +- 3 files changed, 188 insertions(+), 3 deletions(-) create mode 100644 router/java/src/net/i2p/router/transport/SimpleBandwidthEstimator.java diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java index 5ab204560..f37d661c3 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java @@ -8,6 +8,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; import net.i2p.router.transport.FIFOBandwidthLimiter.Request; +import net.i2p.util.BandwidthEstimator; import net.i2p.util.Log; /** @@ -24,6 +25,8 @@ public class FIFOBandwidthRefiller implements Runnable { private final Log _log; private final I2PAppContext _context; private final FIFOBandwidthLimiter _limiter; + private final BandwidthEstimator _partBWE; + /** how many KBps do we want to allow? */ private int _inboundKBytesPerSecond; /** how many KBps do we want to allow? */ @@ -88,6 +91,7 @@ public class FIFOBandwidthRefiller implements Runnable { _context = context; _log = context.logManager().getLog(FIFOBandwidthRefiller.class); reinitialize(); + _partBWE = new SimpleBandwidthEstimator(context); _isRunning = true; } @@ -100,6 +104,7 @@ public class FIFOBandwidthRefiller implements Runnable { // bootstrap 'em with nothing _lastRefillTime = _limiter.now(); List buffer = new ArrayList(2); + int i = 0; while (_isRunning) { long now = _limiter.now(); if (now >= _lastCheckConfigTime + _configCheckPeriodMs) { @@ -107,8 +112,9 @@ public class FIFOBandwidthRefiller implements Runnable { now = _limiter.now(); _lastCheckConfigTime = now; } + if ((++i & 0x3f) == 0) + updateParticipating(now); - updateParticipating(now); boolean updated = updateQueues(buffer, now); if (updated) { _lastRefillTime = now; @@ -326,7 +332,8 @@ public class FIFOBandwidthRefiller implements Runnable { * @since 0.8.12 */ void incrementParticipatingMessageBytes(int size) { - _currentParticipating.addAndGet(size); + //_currentParticipating.addAndGet(size); + _partBWE.addSample(size); } /** @@ -336,14 +343,18 @@ public class FIFOBandwidthRefiller implements Runnable { * @since 0.8.12 */ int getCurrentParticipatingBandwidth() { + return (int) (_partBWE.getBandwidthEstimate() * 1000f); +/* _updateLock.readLock().lock(); try { return locked_getCurrentParticipatingBandwidth(); } finally { _updateLock.readLock().unlock(); } +*/ } +/* private int locked_getCurrentParticipatingBandwidth() { int current = _currentParticipating.get(); long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime; @@ -355,6 +366,7 @@ public class FIFOBandwidthRefiller implements Runnable { return 0; return (int) bw; } +*/ /** * Run once every replenish period @@ -362,14 +374,18 @@ public class FIFOBandwidthRefiller implements Runnable { * @since 0.8.12 */ private void updateParticipating(long now) { + _context.statManager().addRateData("tunnel.participatingBandwidthOut", getCurrentParticipatingBandwidth()); +/* _updateLock.writeLock().lock(); try { locked_updateParticipating(now); } finally { _updateLock.writeLock().unlock(); } +*/ } +/* private void locked_updateParticipating(long now) { long elapsed = now - _lastPartUpdateTime; if (elapsed <= 0) { @@ -400,4 +416,5 @@ public class FIFOBandwidthRefiller implements Runnable { DataHelper.formatSize(bw) + " Bps"); } } +*/ } diff --git a/router/java/src/net/i2p/router/transport/SimpleBandwidthEstimator.java b/router/java/src/net/i2p/router/transport/SimpleBandwidthEstimator.java new file mode 100644 index 000000000..402a24be6 --- /dev/null +++ b/router/java/src/net/i2p/router/transport/SimpleBandwidthEstimator.java @@ -0,0 +1,168 @@ +package net.i2p.router.transport; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.util.BandwidthEstimator; +import net.i2p.util.Log; + +/** + * A Westwood+ bandwidth estimator with + * a first stage anti-aliasing low pass filter based on RTT, + * and the time-varying Westwood filter based on inter-arrival time. + * + * Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks + * Casetti et al + * (Westwood) + * + * Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks + * Grieco and Mascolo + * (Westwood+) + * + * Adapted from: Linux kernel tcp_westwood.c (GPLv2) + * + * @since 0.9.49 adapted from streaming + */ +class SimpleBandwidthEstimator implements BandwidthEstimator { + + private final I2PAppContext _context; + private final Log _log; + + private long _tAck; + // bw_est, bw_ns_est + private float _bKFiltered, _bK_ns_est; + // bk + private int _acked; + + // As in kernel tcp_westwood.c + // Should probably match ConnectionOptions.TCP_ALPHA + private static final int DECAY_FACTOR = 8; + private static final int WESTWOOD_RTT_MIN = 500; + + SimpleBandwidthEstimator(I2PAppContext ctx) { + _log = ctx.logManager().getLog(SimpleBandwidthEstimator.class); + _context = ctx; + // assume we're about to send something + _tAck = ctx.clock().now(); + _acked = -1; + } + + /** + * Records an arriving ack. + * @param acked how many bytes were acked with this ack + */ + public void addSample(int acked) { + long now = _context.clock().now(); + addSample(acked, now); + } + + private synchronized void addSample(int acked, long now) { + if (_acked < 0) { + // first sample + // use time since constructed as the RTT + // getRTT() would return zero here. + long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN); + float bkdt = ((float) acked) / deltaT; + _bKFiltered = bkdt; + _bK_ns_est = bkdt; + _acked = 0; + _tAck = now; + if (_log.shouldDebug()) + _log.debug("first sample bytes: " + acked + " deltaT: " + deltaT + ' ' + this); + } else { + _acked += acked; + // anti-aliasing filter + // As in kernel tcp_westwood.c + // and the Westwood+ paper + long deltaT = now - _tAck; + if (deltaT >= WESTWOOD_RTT_MIN) + computeBWE(now, (int) deltaT); + } + } + + /** + * @return the current bandwidth estimate in bytes/ms. + */ + public float getBandwidthEstimate() { + long now = _context.clock().now(); + // anti-aliasing filter + // As in kernel tcp_westwood.c + // and the Westwood+ paper + synchronized(this) { + long deltaT = now - _tAck; + if (deltaT >= WESTWOOD_RTT_MIN) + return computeBWE(now, (int) deltaT); + return _bKFiltered; + } + } + + private synchronized float computeBWE(final long now, final int rtt) { + if (_acked < 0) + return 0.0f; // nothing ever sampled + updateBK(now, _acked, rtt); + _acked = 0; + return _bKFiltered; + } + + /** + * Optimized version of updateBK with packets == 0 + */ + private void decay() { + _bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR; + _bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est); + } + + /** + * Here we insert virtual null samples if necessary as in Westwood, + * And use a very simple EWMA (exponential weighted moving average) + * time-varying filter, as in kernel tcp_westwood.c + * + * @param time the time of the measurement + * @param packets number of bytes acked + * @param rtt current rtt + */ + private void updateBK(long time, int packets, int rtt) { + long deltaT = time - _tAck; + if (rtt < WESTWOOD_RTT_MIN) + rtt = WESTWOOD_RTT_MIN; + if (deltaT > 2 * rtt) { + // Decay with virtual null samples as in the Westwood paper + int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR); + for (int i = 0; i < numrtts; i++) { + decay(); + } + deltaT -= numrtts * rtt; + if (_log.shouldDebug()) + _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this); + } + float bkdt; + if (packets > 0) { + // As in kernel tcp_westwood.c + bkdt = ((float) packets) / deltaT; + _bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt); + _bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est); + } else { + bkdt = 0; + decay(); + } + _tAck = time; + if (_log.shouldDebug()) + _log.debug("computeBWE bytes: " + packets + " deltaT: " + deltaT + + " bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this); + } + + /** + * As in kernel tcp_westwood.c + */ + private static float westwood_do_filter(float a, float b) { + return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR; + } + + @Override + public synchronized String toString() { + return "SBE[" + + " _bKFiltered " + _bKFiltered + + " _tAck " + _tAck + "; " + + DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000), false) + + "Bps]"; + } +} diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 63c31a121..9e3cce913 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -812,7 +812,7 @@ public class TunnelDispatcher implements Service { // start dropping at 120% of the limit, // as we rely on Throttle for long-term bandwidth control by rejecting tunnels - float maxBps = maxKBps * share * (1024f * 1.20f); + float maxBps = maxKBps * share * (1024f * 0.95f); float pctDrop = (used - maxBps) / used; if (pctDrop <= 0) return false;