diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java index 8028aa13c3ddb6ce95baa94dbfcf6f51ca76bb57..30fbbdd27bbbdc371479b6e827a0d713490ec0a2 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -97,6 +97,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { return rv; } + /* See TunnelGateway.QueuePreprocessor for Javadoc */ @Override public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { StringBuilder timingBuf = null; @@ -115,9 +116,12 @@ public class BatchedPreprocessor extends TrivialPreprocessor { int batchCount = 0; int beforeLooping = pending.size(); + // loop until the queue is empty while (pending.size() > 0) { int allocated = 0; long beforePendingLoop = System.currentTimeMillis(); + + // loop until we fill up a single message for (int i = 0; i < pending.size(); i++) { long pendingStart = System.currentTimeMillis(); TunnelGateway.Pending msg = pending.get(i); @@ -143,6 +147,8 @@ public class BatchedPreprocessor extends TrivialPreprocessor { long waited = _context.clock().now() - _pendingSince; _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), waited); } + + // Send the message long beforeSend = System.currentTimeMillis(); _pendingSince = 0; send(pending, 0, i, sender, rec); @@ -153,6 +159,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { + " (last complete? " + (msg.getOffset() >= msg.getData().length) + ", off=" + msg.getOffset() + ", count=" + pending.size() + ")"); + // Remove what we sent from the pending queue for (int j = 0; j < i; j++) { TunnelGateway.Pending cur = pending.remove(0); if (cur.getOffset() < cur.getData().length) @@ -185,18 +192,18 @@ public class BatchedPreprocessor extends TrivialPreprocessor { + "/" + (beforeSend-start) + " pending current " + (pendingEnd-pendingStart)).append("."); break; - } + } // if >= full size if (timingBuf != null) timingBuf.append(" After pending loop " + (System.currentTimeMillis()-beforePendingLoop)).append("."); - } + } // for long afterCleared = System.currentTimeMillis(); if (_log.shouldLog(Log.INFO)) display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size())); long afterDisplayed = System.currentTimeMillis(); if (allocated > 0) { - // after going through the entire pending list, we still don't - // have enough data to send a full message + // After going through the entire pending list, we have only a partial message. + // We might flush it or might not, but we are returning either way. if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) { // not even a full message, but we want to flush it anyway @@ -208,6 +215,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { send(pending, 0, pending.size()-1, sender, rec); _context.statManager().addRateData("tunnel.batchSmallFragments", FULL_SIZE - allocated, 0); + // Remove everything in the message from the pending queue int beforeSize = pending.size(); for (int i = 0; i < pending.size(); i++) { TunnelGateway.Pending cur = pending.get(i); @@ -245,7 +253,9 @@ public class BatchedPreprocessor extends TrivialPreprocessor { } return false; } + // won't get here, we returned } else { + // We didn't flush. Note that the messages remain on the pending list. _context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0); if (_pendingSince <= 0) _pendingSince = _context.clock().now(); @@ -261,14 +271,15 @@ public class BatchedPreprocessor extends TrivialPreprocessor { } return true; } + // won't get here, we returned } else { // ok, we sent some, but haven't gone back for another // pass yet. keep looping if (timingBuf != null) timingBuf.append(" Keep looping"); - } - } + } // if allocated + } // while if (_log.shouldLog(Log.DEBUG)) _log.debug("Sent everything on the list (pending=" + pending.size() + ")"); diff --git a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java index fa8e32f1e3f61b1c90a62f914d1d8bad73aa3e6c..6af65e88e2e23991812d1678ac42bb9657016797 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java @@ -79,7 +79,7 @@ public class BatchedRouterPreprocessor extends BatchedPreprocessor { } @Override - protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds, String msg) { + protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) { if (_config != null) _routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, _config, msg); else diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java index 538397f27db763db68a4371ffce52322272c396b..bd7bc73628c135167d29739138db86beacb81828 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java @@ -84,7 +84,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { return false; } - protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds, String msg) {} + protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) {} private byte[][] preprocess(TunnelGateway.Pending msg) { List fragments = new ArrayList(1); diff --git a/router/java/src/net/i2p/router/tunnel/TrivialRouterPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialRouterPreprocessor.java index fe853e49d774bc7a16bd78856cedc8249fcee991..6ddf636e4f77a5e766dafeafdb22e3cf8bd07b0d 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialRouterPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialRouterPreprocessor.java @@ -17,7 +17,7 @@ public class TrivialRouterPreprocessor extends TrivialPreprocessor { _routerContext = ctx; } - protected void notePreprocessing(long messageId, int numFragments, int totalLength, List messageIds) { + protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds) { _routerContext.messageHistory().fragmentMessage(messageId, numFragments, totalLength, messageIds, null); } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index 5416abf5949260264e21eefa0155a37db232e0ed..2b394e6dae5a1d7bb8844b68a681de0ed2568107 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -159,6 +159,10 @@ public class TunnelGateway { * @param pending list of Pending objects for messages either unsent * or partly sent. This list should be update with any * values removed (the preprocessor owns the lock) + * Messages are not removed from the list until actually sent. + * The status of unsent and partially-sent messages is stored in + * the Pending structure. + * * @return true if we should delay before preprocessing again */ public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver); @@ -175,6 +179,9 @@ public class TunnelGateway { public long receiveEncrypted(byte encrypted[]); } + /** + * Stores all the state for an unsent or partially-sent message + */ public static class Pending { protected Hash _toRouter; protected TunnelId _toTunnel; @@ -184,7 +191,7 @@ public class TunnelGateway { protected int _offset; protected int _fragmentNumber; protected long _created; - private List _messageIds; + private List<Long> _messageIds; public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { this(message, toRouter, toTunnel, System.currentTimeMillis()); @@ -224,7 +231,7 @@ public class TunnelGateway { _messageIds.add(new Long(id)); } } - public List getMessageIds() { + public List<Long> getMessageIds() { synchronized (Pending.this) { if (_messageIds != null) return new ArrayList(_messageIds); @@ -233,6 +240,8 @@ public class TunnelGateway { } } } + + /** Extend for debugging */ class PendingImpl extends Pending { public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { super(message, toRouter, toTunnel, _context.clock().now());