From f44eeaf7dd36dd2caeecc274a49dc9a2e045e59a Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 1 Sep 2012 21:39:14 +0000 Subject: [PATCH] TunnelGateway: Refactor TunnelGateway.Pending to its own file PendingGatewayMesasge --- .../router/tunnel/BatchedPreprocessor.java | 22 ++-- .../router/tunnel/PendingGatewayMessage.java | 109 ++++++++++++++++++ .../router/tunnel/PumpedTunnelGateway.java | 8 +- .../router/tunnel/TrivialPreprocessor.java | 10 +- .../net/i2p/router/tunnel/TunnelGateway.java | 103 +---------------- .../router/tunnel/TunnelGatewayPumper.java | 2 +- 6 files changed, 132 insertions(+), 122 deletions(-) create mode 100644 router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java index c3e80b33b5..ff7bdedf8f 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -104,7 +104,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { /* See TunnelGateway.QueuePreprocessor for Javadoc */ @Override - public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { if (_log.shouldLog(Log.INFO)) display(0, pending, "Starting"); StringBuilder timingBuf = null; @@ -131,7 +131,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { // loop until we fill up a single message for (int i = 0; i < pending.size(); i++) { long pendingStart = System.currentTimeMillis(); - TunnelGateway.Pending msg = pending.get(i); + PendingGatewayMessage msg = pending.get(i); int instructionsSize = getInstructionsSize(msg); instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize); int curWanted = msg.getData().length - msg.getOffset() + instructionsSize; @@ -169,7 +169,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { // Remove what we sent from the pending queue for (int j = 0; j < i; j++) { - TunnelGateway.Pending cur = pending.remove(0); + PendingGatewayMessage cur = pending.remove(0); if (cur.getOffset() < cur.getData().length) throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset() + " len=" + cur.getData().length + " alloc=" + allocated); @@ -181,7 +181,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { } if (msg.getOffset() >= msg.getData().length) { // ok, this last message fit perfectly, remove it too - TunnelGateway.Pending cur = pending.remove(0); + PendingGatewayMessage cur = pending.remove(0); if (timingBuf != null) timingBuf.append(" sent perfect fit " + cur).append("."); notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending); @@ -230,7 +230,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { // Remove everything in the outgoing message from the pending queue int beforeSize = pending.size(); for (int i = 0; i < beforeSize; i++) { - TunnelGateway.Pending cur = pending.get(0); + PendingGatewayMessage cur = pending.get(0); if (cur.getOffset() < cur.getData().length) break; pending.remove(0); @@ -316,7 +316,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { * * title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc. */ - private void display(long allocated, List<TunnelGateway.Pending> pending, String title) { + private void display(long allocated, List<PendingGatewayMessage> pending, String title) { if (_log.shouldLog(Log.INFO)) { long highestDelay = 0; StringBuilder buf = new StringBuilder(128); @@ -327,7 +327,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { if (_pendingSince > 0) buf.append(" delay: ").append(getDelayAmount(false)); for (int i = 0; i < pending.size(); i++) { - TunnelGateway.Pending curPending = pending.get(i); + PendingGatewayMessage curPending = pending.get(i); buf.append(" [").append(i).append("]:"); buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/'); buf.append(curPending.getLifetime()); @@ -347,7 +347,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { * @param startAt first index in pending to send (inclusive) * @param sendThrough last index in pending to send (inclusive) */ - protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + protected void send(List<PendingGatewayMessage> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending); @@ -384,7 +384,7 @@ class BatchedPreprocessor extends TrivialPreprocessor { long msgId = sender.sendPreprocessed(preprocessed, rec); for (int i = 0; i < pending.size(); i++) { - TunnelGateway.Pending cur = pending.get(i); + PendingGatewayMessage cur = pending.get(i); cur.addMessageId(msgId); } if (_log.shouldLog(Log.DEBUG)) @@ -397,9 +397,9 @@ class BatchedPreprocessor extends TrivialPreprocessor { * * @return new offset into the target for further bytes to be written */ - private int writeFragments(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, byte target[], int offset) { + private int writeFragments(List<PendingGatewayMessage> pending, int startAt, int sendThrough, byte target[], int offset) { for (int i = startAt; i <= sendThrough; i++) { - TunnelGateway.Pending msg = pending.get(i); + PendingGatewayMessage msg = pending.get(i); int prevOffset = offset; if (msg.getOffset() == 0) { offset = writeFirstFragment(msg, target, offset); diff --git a/router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java b/router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java new file mode 100644 index 0000000000..302c4f94cd --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java @@ -0,0 +1,109 @@ +package net.i2p.router.tunnel; + +import java.util.ArrayList; +import java.util.List; + +import net.i2p.data.Hash; +import net.i2p.data.TunnelId; +import net.i2p.data.i2np.I2NPMessage; +import net.i2p.router.RouterContext; + +/** + * Stores all the state for an unsent or partially-sent message + * + * @since 0.9.3 refactored from TunnelGateway.Pending + */ +class PendingGatewayMessage { + protected final Hash _toRouter; + protected final TunnelId _toTunnel; + protected final long _messageId; + protected final long _expiration; + protected final byte _remaining[]; + protected int _offset; + protected int _fragmentNumber; + protected final long _created; + private List<Long> _messageIds; + + public PendingGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { + _toRouter = toRouter; + _toTunnel = toTunnel; + _messageId = message.getUniqueId(); + _expiration = message.getMessageExpiration(); + _remaining = message.toByteArray(); + _created = System.currentTimeMillis(); + } + + /** may be null */ + public Hash getToRouter() { return _toRouter; } + + /** may be null */ + public TunnelId getToTunnel() { return _toTunnel; } + + public long getMessageId() { return _messageId; } + + public long getExpiration() { return _expiration; } + + /** raw unfragmented message to send */ + public byte[] getData() { return _remaining; } + + /** index into the data to be sent */ + public int getOffset() { return _offset; } + + /** move the offset */ + public void setOffset(int offset) { _offset = offset; } + + public long getLifetime() { return System.currentTimeMillis()-_created; } + + /** which fragment are we working on (0 for the first fragment) */ + public int getFragmentNumber() { return _fragmentNumber; } + + /** ok, fragment sent, increment what the next will be */ + public void incrementFragmentNumber() { _fragmentNumber++; } + + /** + * Add an ID to the list of the TunnelDataMssages this message was fragmented into. + * Unused except in notePreprocessing() calls for debugging + */ + public void addMessageId(long id) { + synchronized (this) { + if (_messageIds == null) + _messageIds = new ArrayList(); + _messageIds.add(Long.valueOf(id)); + } + } + + /** + * The IDs of the TunnelDataMssages this message was fragmented into. + * Unused except in notePreprocessing() calls for debugging + */ + public List<Long> getMessageIds() { + synchronized (this) { + if (_messageIds != null) + return new ArrayList(_messageIds); + else + return new ArrayList(); + } + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(64); + buf.append("Message ").append(_messageId); //.append(" on "); + //buf.append(TunnelGateway.this.toString()); + if (_toRouter != null) { + buf.append(" targetting "); + buf.append(_toRouter.toBase64()).append(" "); + if (_toTunnel != null) + buf.append(_toTunnel.getTunnelId()); + } + buf.append(" actual lifetime "); + buf.append(getLifetime()).append("ms"); + buf.append(" potential lifetime "); + buf.append(_expiration - _created).append("ms"); + buf.append(" size ").append(_remaining.length); + buf.append(" offset ").append(_offset); + buf.append(" frag ").append(_fragmentNumber); + return buf.toString(); + } +} + diff --git a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java index 1d3718a00a..2e9bad9c89 100644 --- a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java @@ -35,7 +35,7 @@ import net.i2p.util.Log; * */ class PumpedTunnelGateway extends TunnelGateway { - private final BlockingQueue<Pending> _prequeue; + private final BlockingQueue<PendingGatewayMessage> _prequeue; private final TunnelGatewayPumper _pumper; private static final int MAX_MSGS_PER_PUMP = 16; @@ -71,7 +71,7 @@ class PumpedTunnelGateway extends TunnelGateway { @Override public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { _messagesSent++; - Pending cur = new PendingImpl(msg, toRouter, toTunnel); + PendingGatewayMessage cur = new PendingGatewayMessage(msg, toRouter, toTunnel); if (_prequeue.offer(cur)) _pumper.wantsPumping(this); else @@ -88,7 +88,7 @@ class PumpedTunnelGateway extends TunnelGateway { * @param queueBuf Empty list for convenience, to use as a temporary buffer. * Must be empty when called; will always be emptied before return. */ - void pump(List<Pending> queueBuf) { + void pump(List<PendingGatewayMessage> queueBuf) { _prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP); if (queueBuf.isEmpty()) return; @@ -114,7 +114,7 @@ class PumpedTunnelGateway extends TunnelGateway { // expire any as necessary, even if its framented for (int i = 0; i < _queue.size(); i++) { - Pending m = _queue.get(i); + PendingGatewayMessage m = _queue.get(i); if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Expire on the queue (size=" + _queue.size() + "): " + m); diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java index 5ff23456c3..c9655e6873 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java @@ -50,7 +50,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { * * NOTE: Unused here, see BatchedPreprocessor override, super is not called. */ - public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { throw new IllegalArgumentException("unused, right?"); } @@ -155,7 +155,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5); private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5); - protected int writeFirstFragment(TunnelGateway.Pending msg, byte target[], int offset) { + protected int writeFirstFragment(PendingGatewayMessage msg, byte target[], int offset) { boolean fragmented = false; int instructionsLength = getInstructionsSize(msg); int payloadLength = msg.getData().length - msg.getOffset(); @@ -221,7 +221,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { return offset; } - protected int writeSubsequentFragment(TunnelGateway.Pending msg, byte target[], int offset) { + protected int writeSubsequentFragment(PendingGatewayMessage msg, byte target[], int offset) { boolean isLast = true; int instructionsLength = getInstructionsSize(msg); @@ -269,7 +269,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { * Does NOT include 4 for the message ID if the message will be fragmented; * call getInstructionAugmentationSize() for that. */ - protected int getInstructionsSize(TunnelGateway.Pending msg) { + protected int getInstructionsSize(PendingGatewayMessage msg) { if (msg.getFragmentNumber() > 0) return 7; // control byte @@ -287,7 +287,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { } /** @return 0 or 4 */ - protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) { + protected int getInstructionAugmentationSize(PendingGatewayMessage msg, int offset, int instructionsSize) { int payloadLength = msg.getData().length - msg.getOffset(); if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) { // requires fragmentation, so include the messageId diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index ae050784c1..bcc3bf2078 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -37,7 +37,7 @@ import net.i2p.util.SimpleTimer2; class TunnelGateway { protected final RouterContext _context; protected final Log _log; - protected final List<Pending> _queue; + protected final List<PendingGatewayMessage> _queue; protected final QueuePreprocessor _preprocessor; protected final Sender _sender; protected final Receiver _receiver; @@ -171,7 +171,7 @@ class TunnelGateway { * * @return true if we should delay before preprocessing again */ - public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver); + public boolean preprocessQueue(List<PendingGatewayMessage> pending, Sender sender, Receiver receiver); /** how long do we want to wait before flushing */ public long getDelayAmount(); @@ -184,106 +184,7 @@ class TunnelGateway { */ public long receiveEncrypted(byte encrypted[]); } - - /** - * Stores all the state for an unsent or partially-sent message - */ - public static class Pending { - protected final Hash _toRouter; - protected final TunnelId _toTunnel; - protected final long _messageId; - protected final long _expiration; - protected final byte _remaining[]; - protected int _offset; - protected int _fragmentNumber; - protected final long _created; - private List<Long> _messageIds; - - public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { - this(message, toRouter, toTunnel, System.currentTimeMillis()); - } - public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel, long now) { - _toRouter = toRouter; - _toTunnel = toTunnel; - _messageId = message.getUniqueId(); - _expiration = message.getMessageExpiration(); - _remaining = message.toByteArray(); - _created = now; - } - /** may be null */ - public Hash getToRouter() { return _toRouter; } - /** may be null */ - public TunnelId getToTunnel() { return _toTunnel; } - public long getMessageId() { return _messageId; } - public long getExpiration() { return _expiration; } - /** raw unfragmented message to send */ - public byte[] getData() { return _remaining; } - /** index into the data to be sent */ - public int getOffset() { return _offset; } - /** move the offset */ - public void setOffset(int offset) { _offset = offset; } - public long getLifetime() { return System.currentTimeMillis()-_created; } - /** which fragment are we working on (0 for the first fragment) */ - public int getFragmentNumber() { return _fragmentNumber; } - /** ok, fragment sent, increment what the next will be */ - public void incrementFragmentNumber() { _fragmentNumber++; } - /** - * Add an ID to the list of the TunnelDataMssages this message was fragmented into. - * Unused except in notePreprocessing() calls for debugging - */ - public void addMessageId(long id) { - synchronized (Pending.this) { - if (_messageIds == null) - _messageIds = new ArrayList(); - _messageIds.add(Long.valueOf(id)); - } - } - /** - * The IDs of the TunnelDataMssages this message was fragmented into. - * Unused except in notePreprocessing() calls for debugging - */ - public List<Long> getMessageIds() { - synchronized (Pending.this) { - if (_messageIds != null) - return new ArrayList(_messageIds); - else - return new ArrayList(); - } - } - } - - /** Extend for debugging */ - class PendingImpl extends Pending { - public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { - super(message, toRouter, toTunnel, _context.clock().now()); - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(64); - buf.append("Message ").append(_messageId).append(" on "); - buf.append(TunnelGateway.this.toString()); - if (_toRouter != null) { - buf.append(" targetting "); - buf.append(_toRouter.toBase64()).append(" "); - if (_toTunnel != null) - buf.append(_toTunnel.getTunnelId()); - } - long now = _context.clock().now(); - buf.append(" actual lifetime "); - buf.append(now - _created).append("ms"); - buf.append(" potential lifetime "); - buf.append(_expiration - _created).append("ms"); - buf.append(" size ").append(_remaining.length); - buf.append(" offset ").append(_offset); - buf.append(" frag ").append(_fragmentNumber); - return buf.toString(); - } - @Override - public long getLifetime() { return _context.clock().now()-_created; } - } - protected class DelayedFlush extends SimpleTimer2.TimedEvent { DelayedFlush() { super(_context.simpleTimer2()); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java index 1483f2e6ad..380023811c 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java @@ -65,7 +65,7 @@ class TunnelGatewayPumper implements Runnable { public void run() { PumpedTunnelGateway gw = null; - List<TunnelGateway.Pending> queueBuf = new ArrayList(32); + List<PendingGatewayMessage> queueBuf = new ArrayList(32); while (!_stop) { try { synchronized (_wantsPumping) { -- GitLab