From 30b7dbf1f7dd4a69b9879cc6ee2c2077f3e895f7 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 19 Dec 2009 22:20:58 +0000 Subject: [PATCH] Tunnel fragmenter cleanup, prep for enhancements --- .../router/tunnel/BatchedPreprocessor.java | 29 +++++++++---------- .../tunnel/BatchedRouterPreprocessor.java | 4 +-- .../i2p/router/tunnel/FragmentHandler.java | 10 +++---- .../router/tunnel/TrivialPreprocessor.java | 12 ++++---- .../tunnel/TrivialRouterPreprocessor.java | 1 + .../i2p/router/tunnel/TunnelDispatcher.java | 17 ++++++----- .../net/i2p/router/tunnel/TunnelGateway.java | 4 ++- .../pool/PooledTunnelCreatorConfig.java | 5 ++-- 8 files changed, 44 insertions(+), 38 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java index 37c173c75c..8028aa13c3 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -2,7 +2,7 @@ package net.i2p.router.tunnel; import java.util.List; -import net.i2p.I2PAppContext; +import net.i2p.router.RouterContext; import net.i2p.util.Log; /** @@ -45,11 +45,10 @@ import net.i2p.util.Log; * } */ public class BatchedPreprocessor extends TrivialPreprocessor { - private Log _log; private long _pendingSince; private String _name; - public BatchedPreprocessor(I2PAppContext ctx, String name) { + public BatchedPreprocessor(RouterContext ctx, String name) { super(ctx); _log = ctx.logManager().getLog(BatchedPreprocessor.class); _name = name; @@ -99,7 +98,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { } @Override - public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { StringBuilder timingBuf = null; if (_log.shouldLog(Log.DEBUG)) { _log.debug("Preprocess queue with " + pending.size() + " to send"); @@ -121,7 +120,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { long beforePendingLoop = System.currentTimeMillis(); for (int i = 0; i < pending.size(); i++) { long pendingStart = System.currentTimeMillis(); - TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i); + TunnelGateway.Pending msg = pending.get(i); int instructionsSize = getInstructionsSize(msg); instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize); int curWanted = msg.getData().length - msg.getOffset() + instructionsSize; @@ -135,7 +134,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { // the instructions alone exceed the size, so we won't get any // of the message into it. don't include it i--; - msg = (TunnelGateway.Pending)pending.get(i); + msg = pending.get(i); allocated -= curWanted; if (_log.shouldLog(Log.DEBUG)) _log.debug("Pushback of " + curWanted + " (message " + (i+1) + " in " + pending + ")"); @@ -155,7 +154,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { + ", off=" + msg.getOffset() + ", count=" + pending.size() + ")"); for (int j = 0; j < i; j++) { - TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0); + TunnelGateway.Pending cur = pending.remove(0); if (cur.getOffset() < cur.getData().length) throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset() + " len=" + cur.getData().length + " alloc=" + allocated); @@ -167,7 +166,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { } if (msg.getOffset() >= msg.getData().length) { // ok, this last message fit perfectly, remove it too - TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0); + TunnelGateway.Pending cur = pending.remove(0); if (timingBuf != null) timingBuf.append(" sent perfect fit " + cur).append("."); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending); @@ -211,7 +210,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { int beforeSize = pending.size(); for (int i = 0; i < pending.size(); i++) { - TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.get(i); + TunnelGateway.Pending cur = pending.get(i); if (cur.getOffset() >= cur.getData().length) { pending.remove(i); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), cur.getData().length, cur.getMessageIds(), "flushed remaining"); @@ -283,7 +282,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { return false; } - private void display(long allocated, List pending, String title) { + private void display(long allocated, List<TunnelGateway.Pending> pending, String title) { if (_log.shouldLog(Log.INFO)) { long highestDelay = 0; StringBuilder buf = new StringBuilder(); @@ -294,7 +293,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { if (_pendingSince > 0) buf.append(" delay: ").append(getDelayAmount(false)); for (int i = 0; i < pending.size(); i++) { - TunnelGateway.Pending curPending = (TunnelGateway.Pending)pending.get(i); + TunnelGateway.Pending curPending = pending.get(i); buf.append(" pending[").append(i).append("]: "); buf.append(curPending.getOffset()).append("/").append(curPending.getData().length).append('/'); buf.append(curPending.getLifetime()); @@ -314,7 +313,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { * @param startAt first index in pending to send (inclusive) * @param sendThrough last index in pending to send (inclusive) */ - protected void send(List pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending); byte preprocessed[] = _dataCache.acquire().getData(); @@ -346,7 +345,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { long msgId = sender.sendPreprocessed(preprocessed, rec); for (int i = 0; i < pending.size(); i++) { - TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.get(i); + TunnelGateway.Pending cur = pending.get(i); cur.addMessageId(msgId); } if (_log.shouldLog(Log.DEBUG)) @@ -359,9 +358,9 @@ public class BatchedPreprocessor extends TrivialPreprocessor { * * @return new offset into the target for further bytes to be written */ - private int writeFragments(List pending, int startAt, int sendThrough, byte target[], int offset) { + private int writeFragments(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, byte target[], int offset) { for (int i = startAt; i <= sendThrough; i++) { - TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.get(i); + TunnelGateway.Pending msg = pending.get(i); int prevOffset = offset; if (msg.getOffset() == 0) { offset = writeFirstFragment(msg, target, offset); diff --git a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java index 2e5e988855..fa8e32f1e3 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java @@ -11,9 +11,9 @@ import net.i2p.router.RouterContext; * */ public class BatchedRouterPreprocessor extends BatchedPreprocessor { - private RouterContext _routerContext; + protected RouterContext _routerContext; private TunnelCreatorConfig _config; - private HopConfig _hopConfig; + protected HopConfig _hopConfig; /** * How frequently should we flush non-full messages, in milliseconds diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index 57e44203e6..f294d5c785 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -30,9 +30,9 @@ fragments it across the necessary number of 1KB tunnel messages, and decides how each I2NP message should be handled by the tunnel endpoint, encoding that data into the raw tunnel payload:</p> <ul> -<li>the first 4 bytes of the SHA256 of the remaining preprocessed data concatenated - with the IV, using the IV as will be seen on the tunnel endpoint (for - outbound tunnels) or the IV as was seen on the tunnel gateway (for inbound +<li>the first 4 bytes of the SHA256 of (the remaining preprocessed data concatenated + with the IV), using the IV as will be seen on the tunnel endpoint (for + outbound tunnels), or the IV as was seen on the tunnel gateway (for inbound tunnels) (see below for IV processing).</li> <li>0 or more bytes containing random nonzero integers</li> <li>1 byte containing 0x00</li> @@ -208,7 +208,7 @@ public class FragmentHandler { if (_log.shouldLog(Log.WARN)) _log.warn("cannot verify, going past the end [off=" + offset + " len=" + length + " paddingEnd=" - + paddingEnd + " data:\n" + + paddingEnd + " data: " + Base64.encode(preprocessed, offset, length)); return false; } @@ -232,7 +232,7 @@ public class FragmentHandler { _log.warn("Corrupt tunnel message - verification fails: " + Base64.encode(preprocessed, offset+HopProcessor.IV_LENGTH, 4) + " != " + Base64.encode(v.getData(), 0, 4)); _log.warn("No matching endpoint: # pad bytes: " + (paddingEnd-(HopProcessor.IV_LENGTH+4)-1) - + " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd + + " offset=" + offset + " length=" + length + " paddingEnd=" + paddingEnd + ' ' + Base64.encode(preprocessed, offset, length)); } } diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java index 4792b14c26..538397f27d 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java @@ -3,11 +3,11 @@ package net.i2p.router.tunnel; import java.util.ArrayList; import java.util.List; -import net.i2p.I2PAppContext; import net.i2p.data.Base64; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; import net.i2p.data.Hash; +import net.i2p.router.RouterContext; import net.i2p.util.ByteCache; import net.i2p.util.Log; @@ -19,8 +19,8 @@ import net.i2p.util.Log; * */ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { - protected I2PAppContext _context; - private Log _log; + protected RouterContext _context; + protected Log _log; public static final int PREPROCESSED_SIZE = 1024; protected static final int IV_SIZE = HopProcessor.IV_LENGTH; @@ -28,7 +28,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { protected static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE); protected static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH); - public TrivialPreprocessor(I2PAppContext ctx) { + public TrivialPreprocessor(RouterContext ctx) { _context = ctx; _log = ctx.logManager().getLog(TrivialPreprocessor.class); } @@ -41,7 +41,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { * a delayed flush to clear them * */ - public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { long begin = System.currentTimeMillis(); StringBuilder buf = null; if (_log.shouldLog(Log.DEBUG)) { @@ -49,7 +49,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { buf.append("Trivial preprocessing of ").append(pending.size()).append(" "); } while (pending.size() > 0) { - TunnelGateway.Pending msg = (TunnelGateway.Pending)pending.remove(0); + TunnelGateway.Pending msg = pending.remove(0); long beforePreproc = System.currentTimeMillis(); byte preprocessed[][] = preprocess(msg); long afterPreproc = System.currentTimeMillis(); diff --git a/router/java/src/net/i2p/router/tunnel/TrivialRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialRouterPreprocessor.java index ad7322cb61..fe853e49d7 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialRouterPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialRouterPreprocessor.java @@ -7,6 +7,7 @@ import net.i2p.router.RouterContext; /** * Minor extension to track fragmentation * + * @deprecated unused */ public class TrivialRouterPreprocessor extends TrivialPreprocessor { private RouterContext _routerContext; diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 6b5457719b..527426416e 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -131,17 +131,20 @@ public class TunnelDispatcher implements Service { new long[] { 60*1000l, 10*60*1000l, 60*60*1000l }); } + /** for IBGW */ private TunnelGateway.QueuePreprocessor createPreprocessor(HopConfig cfg) { - if (true) - return new BatchedRouterPreprocessor(_context, cfg); - else - return new TrivialRouterPreprocessor(_context); + //if (true) + return new DroppingBatchedRouterPreprocessor(_context, cfg); + //else + // return new TrivialRouterPreprocessor(_context); } + + /** for OBGW */ private TunnelGateway.QueuePreprocessor createPreprocessor(TunnelCreatorConfig cfg) { - if (true) + //if (true) return new BatchedRouterPreprocessor(_context, cfg); - else - return new TrivialRouterPreprocessor(_context); + //else + // return new TrivialRouterPreprocessor(_context); } /** diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index 40c935e795..5416abf594 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -154,12 +154,14 @@ public class TunnelGateway { public interface QueuePreprocessor { /** + * Caller must synchronize on the list! + * * @param pending list of Pending objects for messages either unsent * or partly sent. This list should be update with any * values removed (the preprocessor owns the lock) * @return true if we should delay before preprocessing again */ - public boolean preprocessQueue(List pending, Sender sender, Receiver receiver); + public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver); /** how long do we want to wait before flushing */ public long getDelayAmount(); diff --git a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java index b2cb7c704f..dfbdd3ecac 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java @@ -79,8 +79,9 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { public TunnelPool getTunnelPool() { return _pool; } - /* FIXME Exporting non-public type through public API FIXME */ - public void setTestJob(TestJob job) { _testJob = job; } + /** @deprecated unused, which makes _testJob unused - why is it here */ + void setTestJob(TestJob job) { _testJob = job; } + /** does nothing, to be deprecated */ public void setExpireJob(Job job) { /* _expireJob = job; */ } /** -- GitLab