From 2b841ad667776fee37baab46e38546b4246ef885 Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Wed, 23 Nov 2005 16:04:52 +0000 Subject: [PATCH] 2005-11-23 jrandom * Removed spurious streaming lib RTO increase (it wasn't helpful) * Streamlined the tunnel batching to schedule batch transmissions more appropriately. * Default tunnel pool variance to 2 +0-1 hops --- .../net/i2p/client/streaming/Connection.java | 4 +- .../streaming/ConnectionPacketHandler.java | 4 +- history.txt | 8 +- .../src/net/i2p/router/RouterVersion.java | 4 +- .../net/i2p/router/TunnelPoolSettings.java | 2 +- .../router/tunnel/BatchedPreprocessor.java | 117 ++++++++++++++---- .../tunnel/BatchedRouterPreprocessor.java | 30 ++++- .../src/net/i2p/router/tunnel/FlushTimer.java | 12 ++ .../router/tunnel/TrivialPreprocessor.java | 5 +- .../net/i2p/router/tunnel/TunnelGateway.java | 50 ++++++-- 10 files changed, 193 insertions(+), 43 deletions(-) create mode 100644 router/java/src/net/i2p/router/tunnel/FlushTimer.java diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 1e7fa629b8..98e6178d8e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -72,7 +72,7 @@ public class Connection { private long _lifetimeDupMessageSent; private long _lifetimeDupMessageReceived; - public static final long MAX_RESEND_DELAY = 15*1000; + public static final long MAX_RESEND_DELAY = 10*1000; public static final long MIN_RESEND_DELAY = 2*1000; /** wait up to 5 minutes after disconnection so we can ack/close packets */ @@ -992,7 +992,7 @@ public class Connection { newWindowSize = 1; // setRTT has its own ceiling - getOptions().setRTT(getOptions().getRTT() + 10*1000); + //getOptions().setRTT(getOptions().getRTT() + 10*1000); getOptions().setWindowSize(newWindowSize); if (_log.shouldLog(Log.WARN)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index be27fb71d6..c870bbeacd 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -80,7 +80,7 @@ public class ConnectionPacketHandler { if (packet.getOptionalDelay() > 60000) { // requested choke choke = true; - con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000); + //con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000); } } @@ -272,7 +272,7 @@ public class ConnectionPacketHandler { oldSize = 1; // setRTT has its own ceiling - con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000); + //con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000); con.getOptions().setWindowSize(oldSize); if (_log.shouldLog(Log.DEBUG)) diff --git a/history.txt b/history.txt index f7432da80f..60e56ca75e 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.325 2005/11/19 23:42:17 jrandom Exp $ +$Id: history.txt,v 1.326 2005/11/21 09:37:10 jrandom Exp $ + +2005-11-23 jrandom + * Removed spurious streaming lib RTO increase (it wasn't helpful) + * Streamlined the tunnel batching to schedule batch transmissions more + appropriately. + * Default tunnel pool variance to 2 +0-1 hops 2005-11-21 jrandom * IE doesn't strip SPAN from <button> form fields, so add in a workaround diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index f6206b3aa5..cb80ef6b76 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.293 $ $Date: 2005/11/19 23:42:17 $"; + public final static String ID = "$Revision: 1.294 $ $Date: 2005/11/21 09:37:09 $"; public final static String VERSION = "0.6.1.5"; - public final static long BUILD = 4; + public final static long BUILD = 5; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/TunnelPoolSettings.java b/router/java/src/net/i2p/router/TunnelPoolSettings.java index 14a6e66796..8df755a28c 100644 --- a/router/java/src/net/i2p/router/TunnelPoolSettings.java +++ b/router/java/src/net/i2p/router/TunnelPoolSettings.java @@ -44,7 +44,7 @@ public class TunnelPoolSettings { public static final int DEFAULT_REBUILD_PERIOD = 60*1000; public static final int DEFAULT_DURATION = 10*60*1000; public static final int DEFAULT_LENGTH = 2; - public static final int DEFAULT_LENGTH_VARIANCE = -1; + public static final int DEFAULT_LENGTH_VARIANCE = 1; public static final boolean DEFAULT_ALLOW_ZERO_HOP = true; public TunnelPoolSettings() { diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java index 74b2794aee..775f7eec5a 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -19,14 +19,20 @@ import net.i2p.util.Log; public class BatchedPreprocessor extends TrivialPreprocessor { private Log _log; private long _pendingSince; + private String _name; - public BatchedPreprocessor(I2PAppContext ctx) { + public BatchedPreprocessor(I2PAppContext ctx, String name) { super(ctx); _log = ctx.logManager().getLog(BatchedPreprocessor.class); + _name = name; _pendingSince = 0; ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); } private static final int FULL_SIZE = PREPROCESSED_SIZE @@ -37,14 +43,32 @@ public class BatchedPreprocessor extends TrivialPreprocessor { private static final boolean DISABLE_BATCHING = false; /* not final or private so the test code can adjust */ - static long DEFAULT_DELAY = 500; + static long DEFAULT_DELAY = 100; /** wait up to 2 seconds before sending a small message */ protected long getSendDelay() { return DEFAULT_DELAY; } + /** if we have 50 messages queued that are too small, flush them anyway */ + private static final int FORCE_BATCH_FLUSH = 50; + + /** how long do we want to wait before flushing */ + public long getDelayAmount() { return getDelayAmount(true); } + private long getDelayAmount(boolean shouldStat) { + long rv = -1; + long defaultAmount = getSendDelay(); + if (_pendingSince > 0) + rv = _pendingSince + defaultAmount - _context.clock().now(); + if (rv > defaultAmount) + rv = defaultAmount; + if (shouldStat) + _context.statManager().addRateData("tunnel.batchDelayAmount", rv, 0); + return rv; + } + public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Preprocess queue with " + pending.size() + " to send"); - + + if (false) { if (DISABLE_BATCHING || getSendDelay() <= 0) { if (_log.shouldLog(Log.INFO)) _log.info("No batching, send all messages immediately"); @@ -59,7 +83,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor { } return false; } + } + int batchCount = 0; + int beforeLooping = pending.size(); + while (pending.size() > 0) { int allocated = 0; for (int i = 0; i < pending.size(); i++) { @@ -78,39 +106,47 @@ public class BatchedPreprocessor extends TrivialPreprocessor { if (_log.shouldLog(Log.DEBUG)) _log.debug("Pushback of " + curWanted + " (message " + (i+1) + ")"); } - if (_pendingSince > 0) - _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0); + if (_pendingSince > 0) { + long waited = _context.clock().now() - _pendingSince; + _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), waited); + } _pendingSince = 0; send(pending, 0, i, sender, rec); if (_log.shouldLog(Log.INFO)) _log.info("Allocated=" + allocated + " so we sent " + (i+1) - + " (last complete? " + (msg.getOffset() >= msg.getData().length) + ")"); + + " (last complete? " + (msg.getOffset() >= msg.getData().length) + + ", off=" + msg.getOffset() + ", count=" + pending.size() + ")"); for (int j = 0; j < i; j++) { TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0); + if (cur.getOffset() < cur.getData().length) + throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset() + + " len=" + cur.getData().length + " alloc=" + allocated); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber()); + _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); } if (msg.getOffset() >= msg.getData().length) { // ok, this last message fit perfectly, remove it too TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber()); + _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); } if (i > 0) _context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0); allocated = 0; - break; + // don't break - we may have enough source messages for multiple full tunnel messages + //break; + batchCount++; } } + display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size())); + if (allocated > 0) { // after going through the entire pending list, we still don't // have enough data to send a full message - if ( (_pendingSince > 0) && (_pendingSince + getSendDelay() <= _context.clock().now()) ) { - if (_log.shouldLog(Log.INFO)) - _log.info("Passed through pending list, with " + allocated + "/" + pending.size() - + " left to clean up, but we've waited, so flush"); - + if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) { // not even a full message, but we want to flush it anyway if (pending.size() > 1) @@ -119,21 +155,37 @@ public class BatchedPreprocessor extends TrivialPreprocessor { send(pending, 0, pending.size()-1, sender, rec); - while (pending.size() > 0) { - TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0); - notePreprocessing(cur.getMessageId(), cur.getFragmentNumber()); + int beforeSize = pending.size(); + for (int i = 0; i < pending.size(); i++) { + TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.get(i); + if (cur.getOffset() >= cur.getData().length) { + pending.remove(i); + notePreprocessing(cur.getMessageId(), cur.getFragmentNumber()); + _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length); + i--; + } + } + if (pending.size() > 0) { + _pendingSince = _context.clock().now(); + _context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize); + display(allocated, pending, "flushed, some remain"); + return true; + } else { + long delayAmount = _context.clock().now() - _pendingSince; + _pendingSince = 0; + if (batchCount > 1) + _context.statManager().addRateData("tunnel.batchCount", batchCount, 0); + display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount); + return false; } - _pendingSince = 0; - return false; } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Passed through pending list, with " + allocated + "/"+ pending.size() - + " left to clean up, but we've haven't waited, so don't flush (wait=" - + (_context.clock().now() - _pendingSince) + " / " + _pendingSince + ")"); _context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0); if (_pendingSince <= 0) _pendingSince = _context.clock().now(); + if (batchCount > 1) + _context.statManager().addRateData("tunnel.batchCount", batchCount, 0); // not yet time to send the delayed flush + display(allocated, pending, "dont flush"); return true; } } else { @@ -149,6 +201,29 @@ public class BatchedPreprocessor extends TrivialPreprocessor { return false; } + private void display(long allocated, List pending, String title) { + if (_log.shouldLog(Log.INFO)) { + long highestDelay = 0; + StringBuffer buf = new StringBuffer(); + buf.append(_name).append(": "); + buf.append(title); + buf.append(" allocated: ").append(allocated); + buf.append(" pending: ").append(pending.size()); + if (_pendingSince > 0) + buf.append(" delay: ").append(getDelayAmount(false)); + for (int i = 0; i < pending.size(); i++) { + TunnelGateway.Pending curPending = (TunnelGateway.Pending)pending.get(i); + buf.append(" pending[").append(i).append("]: "); + buf.append(curPending.getOffset()).append("/").append(curPending.getData().length).append('/'); + buf.append(curPending.getLifetime()); + if (curPending.getLifetime() > highestDelay) + highestDelay = curPending.getLifetime(); + } + _log.info(buf.toString()); + } + } + + /** * Preprocess the messages from the pending list, grouping items startAt * through sendThrough (though only part of the last one may be fully diff --git a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java index ca980d8422..64c48f3f8c 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java @@ -18,21 +18,41 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor { */ public static final String PROP_BATCH_FREQUENCY = "batchFrequency"; public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency"; - public static final int DEFAULT_BATCH_FREQUENCY = 500; + public static final int DEFAULT_BATCH_FREQUENCY = 100; public BatchedRouterPreprocessor(RouterContext ctx) { this(ctx, (HopConfig)null); } public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) { - super(ctx); + super(ctx, getName(cfg)); _routerContext = ctx; _config = cfg; } public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) { - super(ctx); + super(ctx, getName(cfg)); _routerContext = ctx; _hopConfig = cfg; } + + private static String getName(HopConfig cfg) { + if (cfg == null) return "[unknown]"; + if (cfg.getReceiveTunnel() != null) + return cfg.getReceiveTunnel().getTunnelId() + ""; + else if (cfg.getSendTunnel() != null) + return cfg.getSendTunnel().getTunnelId() + ""; + else + return "[n/a]"; + } + + private static String getName(TunnelCreatorConfig cfg) { + if (cfg == null) return "[unknown]"; + if (cfg.getReceiveTunnelId(0) != null) + return cfg.getReceiveTunnelId(0).getTunnelId() + ""; + else if (cfg.getSendTunnelId(0) != null) + return cfg.getSendTunnelId(0).getTunnelId() + ""; + else + return "[n/a]"; + } /** how long should we wait before flushing */ protected long getSendDelay() { @@ -41,9 +61,9 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor { Properties opts = _config.getOptions(); if (opts != null) freq = opts.getProperty(PROP_BATCH_FREQUENCY); - } else { - freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY); } + if (freq == null) + freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY); if (freq != null) { try { diff --git a/router/java/src/net/i2p/router/tunnel/FlushTimer.java b/router/java/src/net/i2p/router/tunnel/FlushTimer.java new file mode 100644 index 0000000000..b18799ac6d --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/FlushTimer.java @@ -0,0 +1,12 @@ +package net.i2p.router.tunnel; + +import net.i2p.util.SimpleTimer; + +/** + * + */ +class FlushTimer extends SimpleTimer { + private static final FlushTimer _instance = new FlushTimer(); + public static final SimpleTimer getInstance() { return _instance; } + protected FlushTimer() { super("TunnelFlushTimer"); } +} diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java index d73e412709..d1e3579e8a 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java @@ -32,7 +32,10 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { _context = ctx; _log = ctx.logManager().getLog(TrivialPreprocessor.class); } - + + /** how long do we want to wait before flushing */ + public long getDelayAmount() { return 0; } + /** * Return true if there were messages remaining, and we should queue up * a delayed flush to clear them diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index bb3d5b6968..6e343e5e83 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -64,6 +64,8 @@ public class TunnelGateway { _flushFrequency = 500; _delayedFlush = new DelayedFlush(); _lastFlush = _context.clock().now(); + _context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 }); } /** @@ -87,11 +89,18 @@ public class TunnelGateway { public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { _messagesSent++; boolean delayedFlush = false; + long delayAmount = -1; + int remaining = 0; + long beforeLock = _context.clock().now(); + long afterAdded = -1; Pending cur = new PendingImpl(msg, toRouter, toTunnel); synchronized (_queue) { _queue.add(cur); + afterAdded = _context.clock().now(); delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver); + if (delayedFlush) + delayAmount = _preprocessor.getDelayAmount(); _lastFlush = _context.clock().now(); // expire any as necessary, even if its framented @@ -104,11 +113,13 @@ public class TunnelGateway { i--; } } + remaining = _queue.size(); } if (delayedFlush) { - SimpleTimer.getInstance().addEvent(_delayedFlush, _flushFrequency); + FlushTimer.getInstance().addEvent(_delayedFlush, delayAmount); } + _context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining); } public int getMessagesSent() { return _messagesSent; } @@ -131,6 +142,9 @@ public class TunnelGateway { * @return true if we should delay before preprocessing again */ public boolean preprocessQueue(List pending, Sender sender, Receiver receiver); + + /** how long do we want to wait before flushing */ + public long getDelayAmount(); } public interface Receiver { @@ -148,8 +162,12 @@ public class TunnelGateway { protected byte _remaining[]; protected int _offset; protected int _fragmentNumber; + protected long _created; - public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { + public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { + this(message, toRouter, toTunnel, System.currentTimeMillis()); + } + public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel, long now) { _toRouter = toRouter; _toTunnel = toTunnel; _messageId = message.getUniqueId(); @@ -157,6 +175,7 @@ public class TunnelGateway { _remaining = message.toByteArray(); _offset = 0; _fragmentNumber = 0; + _created = now; } /** may be null */ public Hash getToRouter() { return _toRouter; } @@ -170,17 +189,15 @@ public class TunnelGateway { public int getOffset() { return _offset; } /** move the offset */ public void setOffset(int offset) { _offset = offset; } + public long getLifetime() { return System.currentTimeMillis()-_created; } /** which fragment are we working on (0 for the first fragment) */ public int getFragmentNumber() { return _fragmentNumber; } /** ok, fragment sent, increment what the next will be */ public void incrementFragmentNumber() { _fragmentNumber++; } } private class PendingImpl extends Pending { - private long _created; - public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { - super(message, toRouter, toTunnel); - _created = _context.clock().now(); + super(message, toRouter, toTunnel, _context.clock().now()); } public String toString() { @@ -203,20 +220,37 @@ public class TunnelGateway { buf.append(" frag ").append(_fragmentNumber); return buf.toString(); } + + public long getLifetime() { return _context.clock().now()-_created; } } private class DelayedFlush implements SimpleTimer.TimedEvent { public void timeReached() { boolean wantRequeue = false; + int remaining = 0; + long beforeLock = _context.clock().now(); + long afterChecked = -1; + long delayAmount = -1; + if (_queue.size() > 10000) // stay out of the synchronized block + System.out.println("foo!"); synchronized (_queue) { - if (_queue.size() > 0) + if (_queue.size() > 10000) // stay in the synchronized block + System.out.println("foo!"); + afterChecked = _context.clock().now(); + if (_queue.size() > 0) { wantRequeue = _preprocessor.preprocessQueue(_queue, _sender, _receiver); + if (wantRequeue) + delayAmount = _preprocessor.getDelayAmount(); + } + remaining = _queue.size(); } if (wantRequeue) - SimpleTimer.getInstance().addEvent(_delayedFlush, _flushFrequency); + FlushTimer.getInstance().addEvent(_delayedFlush, delayAmount); else _lastFlush = _context.clock().now(); + + _context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining); } } } -- GitLab