From 38c422bbc01d77266bfd7fa27aa5e74bfd7e6f38 Mon Sep 17 00:00:00 2001 From: jrandom Date: Wed, 6 Oct 2004 21:03:51 +0000 Subject: [PATCH] 2004-10-06 jrandom * Implement an active queue management scheme on the TCP transports, dropping messages probabalistically as the queue fills up. The estimated queue capacity is determined by the rate at which messages have been sent to the peer (averaged at 1, 5, and 60m periods). As we exceed 1/2 of the estimated capacity, we drop messages throughout the queue probabalistically with regards to their size. This is based on RFC 2309's RED, with the minimum threshold set to 1/2 the estimated connection capacity. We may want to consider using a send rate and queue size measured across all connections, to deal with our own local bandwidth saturation, but we'll try the per-con metrics first. --- history.txt | 14 ++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../router/transport/tcp/TCPConnection.java | 91 ++++++++++++++++++- .../router/transport/tcp/TCPTransport.java | 6 ++ 4 files changed, 111 insertions(+), 4 deletions(-) diff --git a/history.txt b/history.txt index 6b32a6042..8e23ebbea 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,16 @@ -$Id: history.txt,v 1.35 2004/10/05 20:12:04 jrandom Exp $ +$Id: history.txt,v 1.36 2004/10/06 08:23:38 jrandom Exp $ + +2004-10-06 jrandom + * Implement an active queue management scheme on the TCP transports, + dropping messages probabalistically as the queue fills up. The + estimated queue capacity is determined by the rate at which messages + have been sent to the peer (averaged at 1, 5, and 60m periods). As + we exceed 1/2 of the estimated capacity, we drop messages throughout + the queue probabalistically with regards to their size. This is based + on RFC 2309's RED, with the minimum threshold set to 1/2 the + estimated connection capacity. We may want to consider using a send + rate and queue size measured across all connections, to deal with our + own local bandwidth saturation, but we'll try the per-con metrics first. 2004-10-06 jrandom * Enable explicit disabling of the systray entirely for windows machines diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 337969e16..f56c4ce3a 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.43 $ $Date: 2004/10/05 10:39:19 $"; + public final static String ID = "$Revision: 1.44 $ $Date: 2004/10/05 20:12:03 $"; public final static String VERSION = "0.4.1.1"; - public final static long BUILD = 9; + public final static long BUILD = 10; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 1d7a3db0d..2e10c5b88 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -16,6 +16,8 @@ import net.i2p.data.RouterInfo; import net.i2p.data.i2np.I2NPMessageReader; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; +import net.i2p.stat.RateStat; +import net.i2p.stat.Rate; import net.i2p.util.Log; /** @@ -35,6 +37,7 @@ public class TCPConnection { private TCPTransport _transport; private ConnectionRunner _runner; private I2NPMessageReader _reader; + private RateStat _sendRate; private long _started; private boolean _closed; @@ -51,6 +54,9 @@ public class TCPConnection { _started = -1; _closed = false; _runner = new ConnectionRunner(_context, this); + _context.statManager().createRateStat("tcp.probabalisticDropQueueSize", "How many bytes were queued to be sent when a message as dropped probabalistically?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); + _context.statManager().createRateStat("tcp.queueSize", "How many bytes were queued on a connection?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); + _context.statManager().createRateStat("tcp.sendBps", "How fast are we sending data to a peer?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); } /** Who are we talking with (or null if not identified) */ @@ -72,7 +78,11 @@ public class TCPConnection { * */ public void runConnection() { - String name = "TCP Read [" + _ident.calculateHash().toBase64().substring(0,6) + "]"; + String peer = _ident.calculateHash().toBase64().substring(0,6); + String name = "TCP Read [" + peer + "]"; + + _sendRate = new RateStat("tcp.sendRatePeer", "How many bytes are in the messages sent to " + peer, peer, new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + _reader = new I2NPMessageReader(_context, _in, new MessageHandler(_transport, this), name); _reader.startReading(); _runner.startRunning(); @@ -143,10 +153,87 @@ public class TCPConnection { msg.timestamp("TCPConnection.addMessage"); synchronized (_pendingMessages) { _pendingMessages.add(msg); + locked_throttle(); _pendingMessages.notifyAll(); } } + /** + * Implement a probabalistic dropping of messages on the queue to the + * peer along the lines of RFC2309. + * + */ + private void locked_throttle() { + int bytesQueued = 0; + long earliestExpiration = -1; + for (int i = 0; i < _pendingMessages.size(); i++) { + OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i); + bytesQueued += (int)msg.getMessageSize(); + if ( (earliestExpiration < 0) || (msg.getExpiration() < earliestExpiration) ) + earliestExpiration = msg.getExpiration(); + } + + if (bytesQueued > 0) + _context.statManager().addRateData("tcp.queueSize", bytesQueued, _pendingMessages.size()); + + long sendRate = getSendRate(); + long bytesSendableUntilFirstExpire = sendRate * (earliestExpiration - _context.clock().now()) / 1000; + + // try to keep the queue less than half full + long excessQueued = bytesQueued - (bytesSendableUntilFirstExpire/2); + if ( (excessQueued > 0) && (_pendingMessages.size() > 1) && (_transport != null) ) + locked_probabalisticDrop(excessQueued); + } + + /** how many Bps we are sending data to the peer (or 2KBps if we don't know) */ + public long getSendRate() { + if (_sendRate == null) return 2*1024; + _sendRate.coallesceStats(); + Rate r = _sendRate.getRate(60*1000); + if (r == null) { + return 2*1024; + } else if (r.getLastEventCount() <= 2) { + r = _sendRate.getRate(5*60*1000); + if (r.getLastEventCount() <= 2) + r = _sendRate.getRate(60*60*1000); + } + + if (r.getLastEventCount() <= 2) { + return 2*1024; + } else { + long bps = (long)(r.getLastTotalValue() * 1000 / r.getLastTotalEventTime()); + _context.statManager().addRateData("tcp.sendBps", bps, 0); + return bps; + } + } + + /** + * Probabalistically drop messages in relation to their size vs how much + * we've exceeded our target queue usage. + */ + private void locked_probabalisticDrop(long excessBytesQueued) { + for (int i = 0; i < _pendingMessages.size() && excessBytesQueued > 0; i++) { + OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i); + int p = getDropProbability(msg.getMessageSize(), excessBytesQueued); + if (_context.random().nextInt(100) > p) { + _pendingMessages.remove(i); + i--; + msg.timestamp("Probabalistically dropped due to queue size " + excessBytesQueued); + sent(msg, false, -1); + _context.statManager().addRateData("tcp.probabalisticDropQueueSize", excessBytesQueued, msg.getLifetime()); + // since we've already dropped down this amount, lets reduce the + // number of additional messages dropped + excessBytesQueued -= msg.getMessageSize(); + } + } + } + + private int getDropProbability(long msgSize, long excessBytesQueued) { + if (msgSize > excessBytesQueued) + return 100; + return (int)(100.0*(msgSize/excessBytesQueued)); + } + /** * Blocking call to retrieve the next pending message. As a side effect, * this fails messages on the queue that have expired, and in turn never @@ -244,5 +331,7 @@ public class TCPConnection { */ void sent(OutNetMessage msg, boolean ok, long time) { _transport.afterSend(msg, ok, true, time); + if (ok) + _sendRate.addData(msg.getMessageSize(), msg.getLifetime()); } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index 2db4da6aa..da6ecd4a8 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -698,6 +698,12 @@ public class TCPTransport extends TransportImpl { buf.append("
  • "); buf.append(con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)); buf.append(": up for ").append(DataHelper.formatDuration(con.getLifetime())); + buf.append(" transferring at "); + long bps = con.getSendRate(); + if (bps < 1024) + buf.append(bps).append("Bps"); + else + buf.append((int)(bps/1024)).append("KBps"); buf.append("
  • \n"); } buf.append("\n");