diff --git a/history.txt b/history.txt index 6b32a6042..8e23ebbea 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,16 @@ -$Id: history.txt,v 1.35 2004/10/05 20:12:04 jrandom Exp $ +$Id: history.txt,v 1.36 2004/10/06 08:23:38 jrandom Exp $ + +2004-10-06 jrandom + * Implement an active queue management scheme on the TCP transports, + dropping messages probabalistically as the queue fills up. The + estimated queue capacity is determined by the rate at which messages + have been sent to the peer (averaged at 1, 5, and 60m periods). As + we exceed 1/2 of the estimated capacity, we drop messages throughout + the queue probabalistically with regards to their size. This is based + on RFC 2309's RED, with the minimum threshold set to 1/2 the + estimated connection capacity. We may want to consider using a send + rate and queue size measured across all connections, to deal with our + own local bandwidth saturation, but we'll try the per-con metrics first. 2004-10-06 jrandom * Enable explicit disabling of the systray entirely for windows machines diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 337969e16..f56c4ce3a 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.43 $ $Date: 2004/10/05 10:39:19 $"; + public final static String ID = "$Revision: 1.44 $ $Date: 2004/10/05 20:12:03 $"; public final static String VERSION = "0.4.1.1"; - public final static long BUILD = 9; + public final static long BUILD = 10; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 1d7a3db0d..2e10c5b88 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -16,6 +16,8 @@ import net.i2p.data.RouterInfo; import net.i2p.data.i2np.I2NPMessageReader; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; +import net.i2p.stat.RateStat; +import net.i2p.stat.Rate; import net.i2p.util.Log; /** @@ -35,6 +37,7 @@ public class TCPConnection { private TCPTransport _transport; private ConnectionRunner _runner; private I2NPMessageReader _reader; + private RateStat _sendRate; private long _started; private boolean _closed; @@ -51,6 +54,9 @@ public class TCPConnection { _started = -1; _closed = false; _runner = new ConnectionRunner(_context, this); + _context.statManager().createRateStat("tcp.probabalisticDropQueueSize", "How many bytes were queued to be sent when a message as dropped probabalistically?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); + _context.statManager().createRateStat("tcp.queueSize", "How many bytes were queued on a connection?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); + _context.statManager().createRateStat("tcp.sendBps", "How fast are we sending data to a peer?", "TCP", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l } ); } /** Who are we talking with (or null if not identified) */ @@ -72,7 +78,11 @@ public class TCPConnection { * */ public void runConnection() { - String name = "TCP Read [" + _ident.calculateHash().toBase64().substring(0,6) + "]"; + String peer = _ident.calculateHash().toBase64().substring(0,6); + String name = "TCP Read [" + peer + "]"; + + _sendRate = new RateStat("tcp.sendRatePeer", "How many bytes are in the messages sent to " + peer, peer, new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + _reader = new I2NPMessageReader(_context, _in, new MessageHandler(_transport, this), name); _reader.startReading(); _runner.startRunning(); @@ -143,10 +153,87 @@ public class TCPConnection { msg.timestamp("TCPConnection.addMessage"); synchronized (_pendingMessages) { _pendingMessages.add(msg); + locked_throttle(); _pendingMessages.notifyAll(); } } + /** + * Implement a probabalistic dropping of messages on the queue to the + * peer along the lines of RFC2309. + * + */ + private void locked_throttle() { + int bytesQueued = 0; + long earliestExpiration = -1; + for (int i = 0; i < _pendingMessages.size(); i++) { + OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i); + bytesQueued += (int)msg.getMessageSize(); + if ( (earliestExpiration < 0) || (msg.getExpiration() < earliestExpiration) ) + earliestExpiration = msg.getExpiration(); + } + + if (bytesQueued > 0) + _context.statManager().addRateData("tcp.queueSize", bytesQueued, _pendingMessages.size()); + + long sendRate = getSendRate(); + long bytesSendableUntilFirstExpire = sendRate * (earliestExpiration - _context.clock().now()) / 1000; + + // try to keep the queue less than half full + long excessQueued = bytesQueued - (bytesSendableUntilFirstExpire/2); + if ( (excessQueued > 0) && (_pendingMessages.size() > 1) && (_transport != null) ) + locked_probabalisticDrop(excessQueued); + } + + /** how many Bps we are sending data to the peer (or 2KBps if we don't know) */ + public long getSendRate() { + if (_sendRate == null) return 2*1024; + _sendRate.coallesceStats(); + Rate r = _sendRate.getRate(60*1000); + if (r == null) { + return 2*1024; + } else if (r.getLastEventCount() <= 2) { + r = _sendRate.getRate(5*60*1000); + if (r.getLastEventCount() <= 2) + r = _sendRate.getRate(60*60*1000); + } + + if (r.getLastEventCount() <= 2) { + return 2*1024; + } else { + long bps = (long)(r.getLastTotalValue() * 1000 / r.getLastTotalEventTime()); + _context.statManager().addRateData("tcp.sendBps", bps, 0); + return bps; + } + } + + /** + * Probabalistically drop messages in relation to their size vs how much + * we've exceeded our target queue usage. + */ + private void locked_probabalisticDrop(long excessBytesQueued) { + for (int i = 0; i < _pendingMessages.size() && excessBytesQueued > 0; i++) { + OutNetMessage msg = (OutNetMessage)_pendingMessages.get(i); + int p = getDropProbability(msg.getMessageSize(), excessBytesQueued); + if (_context.random().nextInt(100) > p) { + _pendingMessages.remove(i); + i--; + msg.timestamp("Probabalistically dropped due to queue size " + excessBytesQueued); + sent(msg, false, -1); + _context.statManager().addRateData("tcp.probabalisticDropQueueSize", excessBytesQueued, msg.getLifetime()); + // since we've already dropped down this amount, lets reduce the + // number of additional messages dropped + excessBytesQueued -= msg.getMessageSize(); + } + } + } + + private int getDropProbability(long msgSize, long excessBytesQueued) { + if (msgSize > excessBytesQueued) + return 100; + return (int)(100.0*(msgSize/excessBytesQueued)); + } + /** * Blocking call to retrieve the next pending message. As a side effect, * this fails messages on the queue that have expired, and in turn never @@ -244,5 +331,7 @@ public class TCPConnection { */ void sent(OutNetMessage msg, boolean ok, long time) { _transport.afterSend(msg, ok, true, time); + if (ok) + _sendRate.addData(msg.getMessageSize(), msg.getLifetime()); } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index 2db4da6aa..da6ecd4a8 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -698,6 +698,12 @@ public class TCPTransport extends TransportImpl { buf.append("
  • "); buf.append(con.getRemoteRouterIdentity().getHash().toBase64().substring(0,6)); buf.append(": up for ").append(DataHelper.formatDuration(con.getLifetime())); + buf.append(" transferring at "); + long bps = con.getSendRate(); + if (bps < 1024) + buf.append(bps).append("Bps"); + else + buf.append((int)(bps/1024)).append("KBps"); buf.append("
  • \n"); } buf.append("\n");