diff --git a/history.txt b/history.txt index 05c1777a0..6aed39dce 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,19 @@ +2011-12-04 zzz + * Console: + - Less icons on configclients.jsp + - Fix some browsers breaking line on negative numbers + - Tab CSS tweaks + * i2psnark: Fix directories not always deleted when torrent is deleted + * IRC Client: Lower log level for connect error to warn (thx echelon/sponge) + * Tunnel RED: + - Complete rework of participating traffic RED. + Implement an accurate bandwidth tracker in FIFOBandwidthRefiller. + - Fix drop priority of VTBM at OBEP + - Lower drop priority of VTBRM at IBGW + - Raise threshold from 95% to 120% + - Remove unused things in HopConfig + * UDP: Fix i2np.udp.allowLocal operation (thx Quizzers) + 2011-12-02 zzz * Console: - Summary bar tweaks diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 31c35f8c3..ea17ad55f 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 14; + public final static long BUILD = 15; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index f5aed1edc..c44598599 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -51,18 +51,20 @@ public class FIFOBandwidthLimiter { private int _maxInbound; /** how large _availableOutbound can get - aka our outbound rate during a burst */ private int _maxOutbound; - /** shortcut of whether our outbound rate is unlimited */ + /** shortcut of whether our outbound rate is unlimited - UNUSED always false for now */ private boolean _outboundUnlimited; - /** shortcut of whether our inbound rate is unlimited */ + /** shortcut of whether our inbound rate is unlimited - UNUSED always false for now */ private boolean _inboundUnlimited; /** lifetime counter of bytes received */ private final AtomicLong _totalAllocatedInboundBytes = new AtomicLong(); /** lifetime counter of bytes sent */ private final AtomicLong _totalAllocatedOutboundBytes = new AtomicLong(); + /** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */ - private final AtomicLong _totalWastedInboundBytes = new AtomicLong(); + //private final AtomicLong _totalWastedInboundBytes = new AtomicLong(); /** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */ - private final AtomicLong _totalWastedOutboundBytes = new AtomicLong(); + //private final AtomicLong _totalWastedOutboundBytes = new AtomicLong(); + private final FIFOBandwidthRefiller _refiller; private final Thread _refillerThread; @@ -101,25 +103,41 @@ public class FIFOBandwidthLimiter { //public long getAvailableOutboundBytes() { return _availableOutboundBytes; } public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes.get(); } public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes.get(); } - public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes.get(); } - public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes.get(); } + //public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes.get(); } + //public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes.get(); } //public long getMaxInboundBytes() { return _maxInboundBytes; } //public void setMaxInboundBytes(int numBytes) { _maxInboundBytes = numBytes; } //public long getMaxOutboundBytes() { return _maxOutboundBytes; } //public void setMaxOutboundBytes(int numBytes) { _maxOutboundBytes = numBytes; } - public boolean getInboundUnlimited() { return _inboundUnlimited; } - public void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; } - public boolean getOutboundUnlimited() { return _outboundUnlimited; } - public void setOutboundUnlimited(boolean isUnlimited) { _outboundUnlimited = isUnlimited; } + + /** @deprecated unused for now, we are always limited */ + void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; } + + /** @deprecated unused for now, we are always limited */ + void setOutboundUnlimited(boolean isUnlimited) { _outboundUnlimited = isUnlimited; } + + /** @return smoothed one second rate */ public float getSendBps() { return _sendBps; } + + /** @return smoothed one second rate */ public float getReceiveBps() { return _recvBps; } + + /** @return smoothed 15 second rate */ public float getSendBps15s() { return _sendBps15s; } + + /** @return smoothed 15 second rate */ public float getReceiveBps15s() { return _recvBps15s; } - /** These are the configured maximums, not the current rate */ + /** The configured maximum, not the current rate */ public int getOutboundKBytesPerSecond() { return _refiller.getOutboundKBytesPerSecond(); } + + /** The configured maximum, not the current rate */ public int getInboundKBytesPerSecond() { return _refiller.getInboundKBytesPerSecond(); } + + /** The configured maximum, not the current rate */ public int getOutboundBurstKBytesPerSecond() { return _refiller.getOutboundBurstKBytesPerSecond(); } + + /** The configured maximum, not the current rate */ public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); } public void reinitialize() { @@ -146,10 +164,31 @@ public class FIFOBandwidthLimiter { _maxOutboundBurst = 0; _unavailableInboundBurst.set(0); _unavailableOutboundBurst.set(0); - _inboundUnlimited = false; - _outboundUnlimited = false; + // always limited for now + //_inboundUnlimited = false; + //_outboundUnlimited = false; } + /** + * We sent a message. + * + * @param size bytes + * @since 0.8.12 + */ + public void sentParticipatingMessage(int size) { + _refiller.incrementParticipatingMessageBytes(size); + } + + /** + * Out bandwidth. Actual bandwidth, not smoothed, not bucketed. + * + * @return Bps in recent period (a few seconds) + * @since 0.8.12 + */ + public int getCurrentParticipatingBandwidth() { + return _refiller.getCurrentParticipatingBandwidth(); + } + public Request createRequest() { return new SimpleRequest(); } /** @@ -241,6 +280,7 @@ public class FIFOBandwidthLimiter { * More bytes are available - add them to the queue and satisfy any requests * we can * + * @param buf contains satisfied outbound requests, really just to avoid object thrash, not really used * @param maxBurstIn allow up to this many bytes in from the burst section for this time period (may be negative) * @param maxBurstOut allow up to this many bytes in from the burst section for this time period (may be negative) */ @@ -261,7 +301,7 @@ public class FIFOBandwidthLimiter { int uib = _unavailableInboundBurst.addAndGet(avi - _maxInbound); _availableInbound.set(_maxInbound); if (uib > _maxInboundBurst) { - _totalWastedInboundBytes.addAndGet(uib - _maxInboundBurst); + //_totalWastedInboundBytes.addAndGet(uib - _maxInboundBurst); _unavailableInboundBurst.set(_maxInboundBurst); } } else { @@ -292,7 +332,7 @@ public class FIFOBandwidthLimiter { _availableOutbound.set(_maxOutbound); if (uob > _maxOutboundBurst) { - _totalWastedOutboundBytes.getAndAdd(uob - _maxOutboundBurst); + //_totalWastedOutboundBytes.getAndAdd(uob - _maxOutboundBurst); _unavailableOutboundBurst.set(_maxOutboundBurst); } } else { @@ -376,7 +416,7 @@ public class FIFOBandwidthLimiter { * Go through the queue, satisfying as many requests as possible (notifying * each one satisfied that the request has been granted). * - * @param buffer returned with the satisfied outbound requests only + * @param buffer Out parameter, returned with the satisfied outbound requests only */ private final void satisfyRequests(List buffer) { buffer.clear(); @@ -385,6 +425,9 @@ public class FIFOBandwidthLimiter { satisfyOutboundRequests(buffer); } + /** + * @param satisfied Out parameter, returned with the satisfied requests added + */ private final void satisfyInboundRequests(List satisfied) { synchronized (_pendingInboundRequests) { if (_inboundUnlimited) { @@ -529,6 +572,9 @@ public class FIFOBandwidthLimiter { } } + /** + * @param satisfied Out parameter, returned with the satisfied requests added + */ private final void satisfyOutboundRequests(List satisfied) { synchronized (_pendingOutboundRequests) { if (_outboundUnlimited) { @@ -895,6 +941,7 @@ public class FIFOBandwidthLimiter { /** thar be dragons */ public void init(int in, int out, String target); public void setCompleteListener(CompleteListener lsnr); + /** Only supported if the request is not satisfied */ public void attach(Object obj); public Object attachment(); public CompleteListener getCompleteListener(); @@ -905,9 +952,8 @@ public class FIFOBandwidthLimiter { } private static final NoopRequest _noop = new NoopRequest(); + private static class NoopRequest implements Request { - private CompleteListener _lsnr; - private Object _attachment; public void abort() {} public boolean getAborted() { return false; } public int getPendingInboundRequested() { return 0; } @@ -918,12 +964,13 @@ public class FIFOBandwidthLimiter { public int getTotalOutboundRequested() { return 0; } public void waitForNextAllocation() {} public void init(int in, int out, String target) {} - public CompleteListener getCompleteListener() { return _lsnr; } + public CompleteListener getCompleteListener() { return null; } public void setCompleteListener(CompleteListener lsnr) { - _lsnr = lsnr; lsnr.complete(NoopRequest.this); } - public void attach(Object obj) { _attachment = obj; } - public Object attachment() { return _attachment; } + public void attach(Object obj) { + throw new UnsupportedOperationException("Don't attach to a satisfied request"); + } + public Object attachment() { return null; } } } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java index d01374a54..6e572921e 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java @@ -2,10 +2,22 @@ package net.i2p.router.transport; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; import net.i2p.util.Log; +/** + * Thread that runs every 100 ms to "give" bandwidth to + * FIFOBandwidthLimiter. + * Instantiated by FIFOBandwidthLimiter. + * + * As of 0.8.12, this also contains a counter for outbound participating bandwidth. + * This was a good place for it since we needed a 100ms thread for it. + * + * Public only for the properties and defaults. + */ public class FIFOBandwidthRefiller implements Runnable { private final Log _log; private final I2PAppContext _context; @@ -63,7 +75,7 @@ public class FIFOBandwidthRefiller implements Runnable { */ private static final long REPLENISH_FREQUENCY = 100; - public FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) { + FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) { _limiter = limiter; _context = context; _log = context.logManager().getLog(FIFOBandwidthRefiller.class); @@ -72,7 +84,7 @@ public class FIFOBandwidthRefiller implements Runnable { } /** @since 0.8.8 */ - public void shutdown() { + void shutdown() { _isRunning = false; } @@ -88,6 +100,7 @@ public class FIFOBandwidthRefiller implements Runnable { _lastCheckConfigTime = now; } + updateParticipating(now); boolean updated = updateQueues(buffer, now); if (updated) { _lastRefillTime = now; @@ -97,7 +110,7 @@ public class FIFOBandwidthRefiller implements Runnable { } } - public void reinitialize() { + void reinitialize() { _lastRefillTime = _limiter.now(); checkConfig(); _lastCheckConfigTime = _lastRefillTime; @@ -105,8 +118,8 @@ public class FIFOBandwidthRefiller implements Runnable { private boolean updateQueues(List buffer, long now) { long numMs = (now - _lastRefillTime); - if (_log.shouldLog(Log.INFO)) - _log.info("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString() + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString() + " rate in=" + _inboundKBytesPerSecond + ", out=" + _outboundKBytesPerSecond +")"); @@ -120,6 +133,7 @@ public class FIFOBandwidthRefiller implements Runnable { if (inboundToAdd < 0) inboundToAdd = 0; if (outboundToAdd < 0) outboundToAdd = 0; + /**** Always limited for now if (_inboundKBytesPerSecond <= 0) { _limiter.setInboundUnlimited(true); inboundToAdd = 0; @@ -132,15 +146,16 @@ public class FIFOBandwidthRefiller implements Runnable { } else { _limiter.setOutboundUnlimited(false); } + ****/ long maxBurstIn = ((_inboundBurstKBytesPerSecond-_inboundKBytesPerSecond)*1024*numMs)/1000; long maxBurstOut = ((_outboundBurstKBytesPerSecond-_outboundKBytesPerSecond)*1024*numMs)/1000; _limiter.refillBandwidthQueues(buffer, inboundToAdd, outboundToAdd, maxBurstIn, maxBurstOut); - if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable"); - _log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable"); - } + //if (_log.shouldLog(Log.DEBUG)) { + // _log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable"); + // _log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable"); + //} return true; } else { if (_log.shouldLog(Log.DEBUG)) @@ -157,17 +172,9 @@ public class FIFOBandwidthRefiller implements Runnable { updateInboundPeak(); updateOutboundPeak(); - if (_inboundKBytesPerSecond <= 0) { - _limiter.setInboundUnlimited(true); - } else { - _limiter.setInboundUnlimited(false); - } - if (_outboundKBytesPerSecond <= 0) { - _limiter.setOutboundUnlimited(true); - } else { - _limiter.setOutboundUnlimited(false); - } - + // We are always limited for now + //_limiter.setInboundUnlimited(_inboundKBytesPerSecond <= 0); + //_limiter.setOutboundUnlimited(_outboundKBytesPerSecond <= 0); } private void updateInboundRate() { @@ -185,6 +192,7 @@ public class FIFOBandwidthRefiller implements Runnable { if (_inboundKBytesPerSecond <= 0) _inboundKBytesPerSecond = DEFAULT_INBOUND_BANDWIDTH; } + private void updateOutboundRate() { int out = _context.getProperty(PROP_OUTBOUND_BANDWIDTH, DEFAULT_OUTBOUND_BANDWIDTH); if (out != _outboundKBytesPerSecond) { @@ -276,4 +284,87 @@ public class FIFOBandwidthRefiller implements Runnable { int getInboundKBytesPerSecond() { return _inboundKBytesPerSecond; } int getOutboundBurstKBytesPerSecond() { return _outboundBurstKBytesPerSecond; } int getInboundBurstKBytesPerSecond() { return _inboundBurstKBytesPerSecond; } + + /** + * Participating counter stuff below here + * TOTAL_TIME needs to be high enough to get a burst without dropping + * @since 0.8.12 + */ + private static final int TOTAL_TIME = 4000; + private static final int PERIODS = TOTAL_TIME / (int) REPLENISH_FREQUENCY; + /** count in current 100 ms period */ + private final AtomicInteger _currentParticipating = new AtomicInteger(); + private long _lastPartUpdateTime; + private int _lastTotal; + /** the actual length of last total period as coalesced (nominally TOTAL_TIME) */ + private long _lastTotalTime; + private int _lastIndex; + /** buffer of count per 100 ms period, last is at _lastIndex, older at higher indexes (wraps) */ + private final int[] _counts = new int[PERIODS]; + /** the actual length of the period (nominally REPLENISH_FREQUENCY) */ + private final long[] _times = new long[PERIODS]; + + /** + * We sent a message. + * + * @param size bytes + * @since 0.8.12 + */ + void incrementParticipatingMessageBytes(int size) { + _currentParticipating.addAndGet(size); + } + + /** + * Out bandwidth. Actual bandwidth, not smoothed, not bucketed. + * + * @return Bps in recent period (a few seconds) + * @since 0.8.12 + */ + synchronized int getCurrentParticipatingBandwidth() { + int current = _currentParticipating.get(); + long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime; + if (totalTime <= 0) + return 0; + // 1000 for ms->seconds in denominator + long bw = 1000l * (current + _lastTotal) / totalTime; + if (bw > Integer.MAX_VALUE) + return 0; + return (int) bw; + } + + /** + * Run once every 100 ms + * + * @since 0.8.12 + */ + private synchronized void updateParticipating(long now) { + long elapsed = now - _lastPartUpdateTime; + if (elapsed <= 0) { + // glitch in the matrix + _lastPartUpdateTime = now; + return; + } + _lastPartUpdateTime = now; + if (--_lastIndex < 0) + _lastIndex = PERIODS - 1; + _counts[_lastIndex] = _currentParticipating.getAndSet(0); + _times[_lastIndex] = elapsed; + _lastTotal = 0; + _lastTotalTime = 0; + // add up total counts and times + for (int i = 0; i < PERIODS; i++) { + int idx = (_lastIndex + i) % PERIODS; + _lastTotal += _counts[idx]; + _lastTotalTime += _times[idx]; + if (_lastTotalTime >= TOTAL_TIME) + break; + } + if (_lastIndex == 0 && _lastTotalTime > 0) { + long bw = 1000l * _lastTotal / _lastTotalTime; + _context.statManager().addRateData("tunnel.participatingBandwidthOut", bw); + if (_lastTotal > 0 && _log.shouldLog(Log.INFO)) + _log.info(DataHelper.formatSize(_lastTotal) + " bytes out part. tunnels in last " + _lastTotalTime + " ms: " + + DataHelper.formatSize(bw) + " Bps"); + } + } } diff --git a/router/java/src/net/i2p/router/tunnel/HopConfig.java b/router/java/src/net/i2p/router/tunnel/HopConfig.java index 042829fdf..b2ad431dc 100644 --- a/router/java/src/net/i2p/router/tunnel/HopConfig.java +++ b/router/java/src/net/i2p/router/tunnel/HopConfig.java @@ -25,13 +25,14 @@ public class HopConfig { private ByteArray _replyIV; private long _creation; private long _expiration; - private Map _options; + //private Map _options; + // these 4 were longs, let's save some space // 2 billion * 1KB / 10 minutes = 3 GBps in a single tunnel private int _messagesProcessed; private int _oldMessagesProcessed; - private int _messagesSent; - private int _oldMessagesSent; + //private int _messagesSent; + //private int _oldMessagesSent; /** IV length for {@link #getReplyIV} */ public static final int REPLY_IV_LENGTH = 16; @@ -48,6 +49,7 @@ public class HopConfig { _receiveTunnel = getTunnel(_receiveTunnelId); return _receiveTunnel; } + public void setReceiveTunnelId(byte id[]) { _receiveTunnelId = id; } public void setReceiveTunnelId(TunnelId id) { _receiveTunnelId = DataHelper.toLong(4, id.getTunnelId()); } @@ -106,11 +108,13 @@ public class HopConfig { * would be a Boolean, etc). * */ - public Map getOptions() { return _options; } - public void setOptions(Map options) { _options = options; } + //public Map getOptions() { return _options; } + //public void setOptions(Map options) { _options = options; } - /** take note of a message being pumped through this tunnel */ - /** "processed" is for incoming and "sent" is for outgoing (could be dropped in between) */ + /** + * Take note of a message being pumped through this tunnel. + * "processed" is for incoming and "sent" is for outgoing (could be dropped in between) + */ public void incrementProcessedMessages() { _messagesProcessed++; } public int getProcessedMessagesCount() { return _messagesProcessed; } @@ -121,6 +125,11 @@ public class HopConfig { return rv; } + /** + * Take note of a message being pumped through this tunnel. + * "processed" is for incoming and "sent" is for outgoing (could be dropped in between) + */ + /**** public void incrementSentMessages() { _messagesSent++; } public int getSentMessagesCount() { return _messagesSent; } @@ -130,7 +139,9 @@ public class HopConfig { _oldMessagesSent = _messagesSent; return rv; } + ****/ + /** */ @Override public String toString() { StringBuilder buf = new StringBuilder(64); diff --git a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java index 204fc1087..1131808c3 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java @@ -48,7 +48,8 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver { // We do this before the preprocessor now (i.e. before fragmentation) //if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length)) // return -1; - _config.incrementSentMessages(); + //_config.incrementSentMessages(); + _context.bandwidthLimiter().sentParticipatingMessage(1024); TunnelDataMessage msg = new TunnelDataMessage(_context); msg.setData(encrypted); msg.setTunnelId(_config.getSendTunnel()); diff --git a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java index 9a139f268..7f8142445 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java @@ -50,11 +50,18 @@ class OutboundTunnelEndpoint { + " to be forwarded on to " + (toRouter != null ? toRouter.toBase64().substring(0,4) : "") + (toTunnel != null ? ":" + toTunnel.getTunnelId() : "")); + int size = msg.getMessageSize(); // don't drop it if we are the target - if ((!_context.routerHash().equals(toRouter)) && - _context.tunnelDispatcher().shouldDropParticipatingMessage("OBEP " + msg.getType(), msg.getMessageSize())) + boolean toUs = _context.routerHash().equals(toRouter); + if ((!toUs) && + _context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.OBEP, msg.getType(), size)) return; - _config.incrementSentMessages(); + // this overstates the stat somewhat, but ok for now + //int kb = (size + 1023) / 1024; + //for (int i = 0; i < kb; i++) + // _config.incrementSentMessages(); + if (!toUs) + _context.bandwidthLimiter().sentParticipatingMessage(size); _outDistributor.distribute(msg, toRouter, toTunnel); } } diff --git a/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java index 72b9e7239..efb65aa47 100644 --- a/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java @@ -1,8 +1,5 @@ package net.i2p.router.tunnel; -import java.util.ArrayList; -import java.util.List; - import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; @@ -38,13 +35,13 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway { // for the purpose of estimating outgoing size. // We assume that it's the outbound bandwidth that is the issue... int size = Math.max(msg.getMessageSize(), 1024/2); - if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW " + msg.getType(), size)) { + if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size)) { // this overstates the stat somewhat, but ok for now int kb = (size + 1023) / 1024; for (int i = 0; i < kb; i++) _config.incrementProcessedMessages(); return; } - super.add(msg, toRouter,toTunnel); + super.add(msg, toRouter, toTunnel); } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 4025124f9..c37d931c9 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -12,8 +12,12 @@ import net.i2p.data.DataHelper; import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; +import net.i2p.data.i2np.TunnelBuildMessage; +import net.i2p.data.i2np.TunnelBuildReplyMessage; import net.i2p.data.i2np.TunnelDataMessage; import net.i2p.data.i2np.TunnelGatewayMessage; +import net.i2p.data.i2np.VariableTunnelBuildMessage; +import net.i2p.data.i2np.VariableTunnelBuildReplyMessage; import net.i2p.router.JobImpl; import net.i2p.router.Router; import net.i2p.router.RouterContext; @@ -41,9 +45,12 @@ public class TunnelDispatcher implements Service { private BloomFilterIVValidator _validator; private final LeaveTunnel _leaveJob; /** what is the date/time we last deliberately dropped a tunnel? **/ - private long _lastDropTime; + //private long _lastDropTime; private final TunnelGatewayPumper _pumper; + /** for shouldDropParticipatingMessage() */ + enum Location {OBEP, PARTICIPANT, IBGW} + private static final long[] RATES = { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 }; /** Creates a new instance of TunnelDispatcher */ @@ -198,13 +205,13 @@ public class TunnelDispatcher implements Service { TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper); TunnelId outId = cfg.getConfig(0).getSendTunnel(); _outboundGateways.put(outId, gw); - _context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0); + _context.statManager().addRateData("tunnel.joinOutboundGateway", 1); _context.messageHistory().tunnelJoined("outbound", cfg); } else { TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelId outId = cfg.getConfig(0).getSendTunnel(); _outboundGateways.put(outId, gw); - _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0); + _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1); _context.messageHistory().tunnelJoined("outboundZeroHop", cfg); } } @@ -220,13 +227,13 @@ public class TunnelDispatcher implements Service { TunnelParticipant participant = new TunnelParticipant(_context, new InboundEndpointProcessor(_context, cfg, _validator)); TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel(); _participants.put(recvId, participant); - _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0); + _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1); _context.messageHistory().tunnelJoined("inboundEndpoint", cfg); } else { TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg); TunnelId recvId = cfg.getConfig(0).getReceiveTunnel(); _inboundGateways.put(recvId, gw); - _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0); + _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1); _context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg); } } @@ -243,7 +250,7 @@ public class TunnelDispatcher implements Service { _participants.put(recvId, participant); _participatingConfig.put(recvId, cfg); _context.messageHistory().tunnelJoined("participant", cfg); - _context.statManager().addRateData("tunnel.joinParticipant", 1, 0); + _context.statManager().addRateData("tunnel.joinParticipant", 1); if (cfg.getExpiration() > _lastParticipatingExpiration) _lastParticipatingExpiration = cfg.getExpiration(); _leaveJob.add(cfg); @@ -261,7 +268,7 @@ public class TunnelDispatcher implements Service { _outboundEndpoints.put(recvId, endpoint); _participatingConfig.put(recvId, cfg); _context.messageHistory().tunnelJoined("outboundEndpoint", cfg); - _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0); + _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1); if (cfg.getExpiration() > _lastParticipatingExpiration) _lastParticipatingExpiration = cfg.getExpiration(); @@ -284,7 +291,7 @@ public class TunnelDispatcher implements Service { _inboundGateways.put(recvId, gw); _participatingConfig.put(recvId, cfg); _context.messageHistory().tunnelJoined("inboundGateway", cfg); - _context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0); + _context.statManager().addRateData("tunnel.joinInboundGateway", 1); if (cfg.getExpiration() > _lastParticipatingExpiration) _lastParticipatingExpiration = cfg.getExpiration(); @@ -388,7 +395,7 @@ public class TunnelDispatcher implements Service { + recvFrom.toBase64().substring(0,4)); _context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId(), "participant"); participant.dispatch(msg, recvFrom); - _context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0); + _context.statManager().addRateData("tunnel.dispatchParticipant", 1); } else { OutboundTunnelEndpoint endpoint = _outboundEndpoints.get(msg.getTunnelIdObj()); if (endpoint != null) { @@ -399,7 +406,7 @@ public class TunnelDispatcher implements Service { _context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId(), "outbound endpoint"); endpoint.dispatch(msg, recvFrom); - _context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0); + _context.statManager().addRateData("tunnel.dispatchEndpoint", 1); } else { _context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId()); int level = (_context.router().getUptime() > 10*60*1000 ? Log.WARN : Log.DEBUG); @@ -446,7 +453,7 @@ public class TunnelDispatcher implements Service { // + msg.getTunnelId().getTunnelId() + " as inbound gateway"); _context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getMessage().getUniqueId(), msg.getTunnelId().getTunnelId(), "inbound gateway"); gw.add(msg); - _context.statManager().addRateData("tunnel.dispatchInbound", 1, 0); + _context.statManager().addRateData("tunnel.dispatchInbound", 1); } else { _context.messageHistory().droppedTunnelGatewayMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId()); int level = (_context.router().getUptime() > 10*60*1000 ? Log.WARN : Log.INFO); @@ -481,6 +488,7 @@ public class TunnelDispatcher implements Service { public void dispatchOutbound(I2NPMessage msg, TunnelId outboundTunnel, Hash targetPeer) { dispatchOutbound(msg, outboundTunnel, null, targetPeer); } + /** * We are the outbound tunnel gateway (we created it), so wrap up this message * with instructions to be forwarded to the targetTunnel on the targetPeer when @@ -523,9 +531,9 @@ public class TunnelDispatcher implements Service { _context.messageHistory().tunnelDispatched(msg.getUniqueId(), tid1, tid2, targetPeer, "outbound gateway"); gw.add(msg, targetPeer, targetTunnel); if (targetTunnel == null) - _context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1, 0); + _context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1); else - _context.statManager().addRateData("tunnel.dispatchOutboundTunnel", 1, 0); + _context.statManager().addRateData("tunnel.dispatchOutboundTunnel", 1); } else { _context.messageHistory().droppedTunnelGatewayMessageUnknown(msg.getUniqueId(), outboundTunnel.getTunnelId()); @@ -561,22 +569,16 @@ public class TunnelDispatcher implements Service { * and computing the average from that. */ public void updateParticipatingStats(int ms) { - List participating = listParticipatingTunnels(); - int size = participating.size(); long count = 0; long bw = 0; - long bwOut = 0; + //long bwOut = 0; long tcount = 0; long tooYoung = _context.clock().now() - 60*1000; long tooOld = tooYoung - 9*60*1000; - for (int i = 0; i < size; i++) { - HopConfig cfg = participating.get(i); - // rare NPE seen here, guess CHS.values() isn't atomic? - if (cfg == null) - continue; + for (HopConfig cfg : _participatingConfig.values()) { long c = cfg.getRecentMessagesCount(); bw += c; - bwOut += cfg.getRecentSentMessagesCount(); + //bwOut += cfg.getRecentSentMessagesCount(); long created = cfg.getCreation(); if (created > tooYoung || created < tooOld) continue; @@ -587,8 +589,9 @@ public class TunnelDispatcher implements Service { count = count * 30 / tcount; _context.statManager().addRateData("tunnel.participatingMessageCount", count, ms); _context.statManager().addRateData("tunnel.participatingBandwidth", bw*1024/(ms/1000), ms); - _context.statManager().addRateData("tunnel.participatingBandwidthOut", bwOut*1024/(ms/1000), ms); - _context.statManager().addRateData("tunnel.participatingTunnels", size, 0); + // moved to FIFOBandwidthRefiller + //_context.statManager().addRateData("tunnel.participatingBandwidthOut", bwOut*1024/(ms/1000), ms); + _context.statManager().addRateData("tunnel.participatingTunnels", tcount); } /** @@ -609,12 +612,22 @@ public class TunnelDispatcher implements Service { * Also, the OBEP is the earliest identifiable hop in the message's path * (a plain participant could be earlier or later, but on average is later) * - * @param type message hop location and type + * @param loc message hop location + * @param type I2NP message type * @param length the length of the message */ - public boolean shouldDropParticipatingMessage(String type, int length) { + public boolean shouldDropParticipatingMessage(Location loc, int type, int length) { if (length <= 0) return false; + /**** + Don't use the tunnel.participatingBandwidth stat any more. It could be up to 3 minutes old. + Also, it counts inbound bandwidth, i.e. before dropping, which resulted in too many drops + during a burst. + We now use the bandwidth limiter to track outbound participating bandwidth + over the last few seconds. + ****/ + + /**** RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth"); if (rs == null) return false; @@ -630,34 +643,45 @@ public class TunnelDispatcher implements Service { bw = (int) r.getLifetimeAverageValue(); int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn()); - usedIn = Math.min(usedIn, bw); + if (bw < usedIn) + usedIn = bw; if (usedIn <= 0) return false; int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true)); - usedOut = Math.min(usedOut, bw); + if (bw < usedOut) + usedOut = bw; if (usedOut <= 0) return false; int used = Math.min(usedIn, usedOut); + ****/ + int used = _context.bandwidthLimiter().getCurrentParticipatingBandwidth(); + int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(), _context.bandwidthLimiter().getOutboundKBytesPerSecond()); float share = (float) _context.router().getSharePercentage(); - // start dropping at 95% of the limit - float maxBps = maxKBps * share * 1024f * 0.95f; + // start dropping at 120% of the limit, + // as we rely on Throttle for long-term bandwidth control by rejecting tunnels + float maxBps = maxKBps * share * (1024f * 1.20f); float pctDrop = (used - maxBps) / used; if (pctDrop <= 0) return false; // increase the drop probability for OBEP, - // (except lower it for tunnel build messages (type 21)), + // (except lower it for tunnel build messages type 21/22/23/24), // and lower it for IBGW, for network efficiency double len = length; - if (type.startsWith("OBEP")) { - if (type.equals("OBEP 21")) + if (loc == Location.OBEP) { + // we don't need to check for VTBRM/TBRM as that happens at tunnel creation + if (type == VariableTunnelBuildMessage.MESSAGE_TYPE || type == TunnelBuildMessage.MESSAGE_TYPE) len /= 1.5; else len *= 1.5; - } else if (type.startsWith("IBGW")) { - len /= 1.5; + } else if (loc == Location.IBGW) { + // we don't need to check for VTBM/TBM as that happens at tunnel creation + if (type == VariableTunnelBuildReplyMessage.MESSAGE_TYPE || type == TunnelBuildReplyMessage.MESSAGE_TYPE) + len /= 1.5 * 1.5 * 1.5; + else + len /= 1.5; } // drop in proportion to size w.r.t. a standard 1024-byte message // this is a little expensive but we want to adjust the curve between 0 and 1 @@ -671,9 +695,9 @@ public class TunnelDispatcher implements Service { int availBps = (int) (((maxKBps*1024)*share) - used); _log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/" + used + " %Drop = " + pctDrop - + ' ' + type + ' ' + length); + + ' ' + loc + ' ' + type + ' ' + length); } - _context.statManager().addRateData("tunnel.participatingMessageDropped", 1, 0); + _context.statManager().addRateData("tunnel.participatingMessageDropped", 1); } return reject; } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index b22e0a173..dfb3ac1e1 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -181,9 +181,11 @@ class TunnelParticipant { } private void send(HopConfig config, TunnelDataMessage msg, RouterInfo ri) { - if (_context.tunnelDispatcher().shouldDropParticipatingMessage("TDM", 1024)) + if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.PARTICIPANT, + TunnelDataMessage.MESSAGE_TYPE, 1024)) return; - _config.incrementSentMessages(); + //_config.incrementSentMessages(); + _context.bandwidthLimiter().sentParticipatingMessage(1024); long oldId = msg.getUniqueId(); long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE); _context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId);