diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java index 5ab204560..f37d661c3 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java @@ -8,6 +8,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; import net.i2p.router.transport.FIFOBandwidthLimiter.Request; +import net.i2p.util.BandwidthEstimator; import net.i2p.util.Log; /** @@ -24,6 +25,8 @@ public class FIFOBandwidthRefiller implements Runnable { private final Log _log; private final I2PAppContext _context; private final FIFOBandwidthLimiter _limiter; + private final BandwidthEstimator _partBWE; + /** how many KBps do we want to allow? */ private int _inboundKBytesPerSecond; /** how many KBps do we want to allow? */ @@ -88,6 +91,7 @@ public class FIFOBandwidthRefiller implements Runnable { _context = context; _log = context.logManager().getLog(FIFOBandwidthRefiller.class); reinitialize(); + _partBWE = new SimpleBandwidthEstimator(context); _isRunning = true; } @@ -100,6 +104,7 @@ public class FIFOBandwidthRefiller implements Runnable { // bootstrap 'em with nothing _lastRefillTime = _limiter.now(); List buffer = new ArrayList(2); + int i = 0; while (_isRunning) { long now = _limiter.now(); if (now >= _lastCheckConfigTime + _configCheckPeriodMs) { @@ -107,8 +112,9 @@ public class FIFOBandwidthRefiller implements Runnable { now = _limiter.now(); _lastCheckConfigTime = now; } + if ((++i & 0x3f) == 0) + updateParticipating(now); - updateParticipating(now); boolean updated = updateQueues(buffer, now); if (updated) { _lastRefillTime = now; @@ -326,7 +332,8 @@ public class FIFOBandwidthRefiller implements Runnable { * @since 0.8.12 */ void incrementParticipatingMessageBytes(int size) { - _currentParticipating.addAndGet(size); + //_currentParticipating.addAndGet(size); + _partBWE.addSample(size); } /** @@ -336,14 +343,18 @@ public class FIFOBandwidthRefiller implements Runnable { * @since 0.8.12 */ int getCurrentParticipatingBandwidth() { + return (int) (_partBWE.getBandwidthEstimate() * 1000f); +/* _updateLock.readLock().lock(); try { return locked_getCurrentParticipatingBandwidth(); } finally { _updateLock.readLock().unlock(); } +*/ } +/* private int locked_getCurrentParticipatingBandwidth() { int current = _currentParticipating.get(); long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime; @@ -355,6 +366,7 @@ public class FIFOBandwidthRefiller implements Runnable { return 0; return (int) bw; } +*/ /** * Run once every replenish period @@ -362,14 +374,18 @@ public class FIFOBandwidthRefiller implements Runnable { * @since 0.8.12 */ private void updateParticipating(long now) { + _context.statManager().addRateData("tunnel.participatingBandwidthOut", getCurrentParticipatingBandwidth()); +/* _updateLock.writeLock().lock(); try { locked_updateParticipating(now); } finally { _updateLock.writeLock().unlock(); } +*/ } +/* private void locked_updateParticipating(long now) { long elapsed = now - _lastPartUpdateTime; if (elapsed <= 0) { @@ -400,4 +416,5 @@ public class FIFOBandwidthRefiller implements Runnable { DataHelper.formatSize(bw) + " Bps"); } } +*/ } diff --git a/router/java/src/net/i2p/router/transport/SimpleBandwidthEstimator.java b/router/java/src/net/i2p/router/transport/SimpleBandwidthEstimator.java new file mode 100644 index 000000000..402a24be6 --- /dev/null +++ b/router/java/src/net/i2p/router/transport/SimpleBandwidthEstimator.java @@ -0,0 +1,168 @@ +package net.i2p.router.transport; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.util.BandwidthEstimator; +import net.i2p.util.Log; + +/** + * A Westwood+ bandwidth estimator with + * a first stage anti-aliasing low pass filter based on RTT, + * and the time-varying Westwood filter based on inter-arrival time. + * + * Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks + * Casetti et al + * (Westwood) + * + * Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks + * Grieco and Mascolo + * (Westwood+) + * + * Adapted from: Linux kernel tcp_westwood.c (GPLv2) + * + * @since 0.9.49 adapted from streaming + */ +class SimpleBandwidthEstimator implements BandwidthEstimator { + + private final I2PAppContext _context; + private final Log _log; + + private long _tAck; + // bw_est, bw_ns_est + private float _bKFiltered, _bK_ns_est; + // bk + private int _acked; + + // As in kernel tcp_westwood.c + // Should probably match ConnectionOptions.TCP_ALPHA + private static final int DECAY_FACTOR = 8; + private static final int WESTWOOD_RTT_MIN = 500; + + SimpleBandwidthEstimator(I2PAppContext ctx) { + _log = ctx.logManager().getLog(SimpleBandwidthEstimator.class); + _context = ctx; + // assume we're about to send something + _tAck = ctx.clock().now(); + _acked = -1; + } + + /** + * Records an arriving ack. + * @param acked how many bytes were acked with this ack + */ + public void addSample(int acked) { + long now = _context.clock().now(); + addSample(acked, now); + } + + private synchronized void addSample(int acked, long now) { + if (_acked < 0) { + // first sample + // use time since constructed as the RTT + // getRTT() would return zero here. + long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN); + float bkdt = ((float) acked) / deltaT; + _bKFiltered = bkdt; + _bK_ns_est = bkdt; + _acked = 0; + _tAck = now; + if (_log.shouldDebug()) + _log.debug("first sample bytes: " + acked + " deltaT: " + deltaT + ' ' + this); + } else { + _acked += acked; + // anti-aliasing filter + // As in kernel tcp_westwood.c + // and the Westwood+ paper + long deltaT = now - _tAck; + if (deltaT >= WESTWOOD_RTT_MIN) + computeBWE(now, (int) deltaT); + } + } + + /** + * @return the current bandwidth estimate in bytes/ms. + */ + public float getBandwidthEstimate() { + long now = _context.clock().now(); + // anti-aliasing filter + // As in kernel tcp_westwood.c + // and the Westwood+ paper + synchronized(this) { + long deltaT = now - _tAck; + if (deltaT >= WESTWOOD_RTT_MIN) + return computeBWE(now, (int) deltaT); + return _bKFiltered; + } + } + + private synchronized float computeBWE(final long now, final int rtt) { + if (_acked < 0) + return 0.0f; // nothing ever sampled + updateBK(now, _acked, rtt); + _acked = 0; + return _bKFiltered; + } + + /** + * Optimized version of updateBK with packets == 0 + */ + private void decay() { + _bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR; + _bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est); + } + + /** + * Here we insert virtual null samples if necessary as in Westwood, + * And use a very simple EWMA (exponential weighted moving average) + * time-varying filter, as in kernel tcp_westwood.c + * + * @param time the time of the measurement + * @param packets number of bytes acked + * @param rtt current rtt + */ + private void updateBK(long time, int packets, int rtt) { + long deltaT = time - _tAck; + if (rtt < WESTWOOD_RTT_MIN) + rtt = WESTWOOD_RTT_MIN; + if (deltaT > 2 * rtt) { + // Decay with virtual null samples as in the Westwood paper + int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR); + for (int i = 0; i < numrtts; i++) { + decay(); + } + deltaT -= numrtts * rtt; + if (_log.shouldDebug()) + _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this); + } + float bkdt; + if (packets > 0) { + // As in kernel tcp_westwood.c + bkdt = ((float) packets) / deltaT; + _bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt); + _bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est); + } else { + bkdt = 0; + decay(); + } + _tAck = time; + if (_log.shouldDebug()) + _log.debug("computeBWE bytes: " + packets + " deltaT: " + deltaT + + " bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this); + } + + /** + * As in kernel tcp_westwood.c + */ + private static float westwood_do_filter(float a, float b) { + return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR; + } + + @Override + public synchronized String toString() { + return "SBE[" + + " _bKFiltered " + _bKFiltered + + " _tAck " + _tAck + "; " + + DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000), false) + + "Bps]"; + } +} diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 63c31a121..9e3cce913 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -812,7 +812,7 @@ public class TunnelDispatcher implements Service { // start dropping at 120% of the limit, // as we rely on Throttle for long-term bandwidth control by rejecting tunnels - float maxBps = maxKBps * share * (1024f * 1.20f); + float maxBps = maxKBps * share * (1024f * 0.95f); float pctDrop = (used - maxBps) / used; if (pctDrop <= 0) return false;