From 72a588bfbf36b4b548abe2bcfdd4c0a5419567b7 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 26 Dec 2009 20:20:11 +0000 Subject: [PATCH] * Tunnels - More code to detect improper reuse of cached objects after release - Don't pass a msg with a failed IV on to the FragmentHandler at the OBEP - More cleanups and comments --- .../router/tunnel/BatchedPreprocessor.java | 7 +++- .../i2p/router/tunnel/FragmentHandler.java | 33 +++++++++++++++---- .../i2p/router/tunnel/FragmentedMessage.java | 24 ++++++++++++++ .../router/tunnel/OutboundTunnelEndpoint.java | 11 ++++++- .../router/tunnel/RouterFragmentHandler.java | 10 ++---- .../router/tunnel/TrivialPreprocessor.java | 18 ++++++++-- .../net/i2p/router/tunnel/TunnelGateway.java | 8 +++++ .../i2p/router/tunnel/TunnelParticipant.java | 3 ++ 8 files changed, 97 insertions(+), 17 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java index cfaabcfcbd..879eb78e70 100644 --- a/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java @@ -363,6 +363,11 @@ public class BatchedPreprocessor extends TrivialPreprocessor { protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending); + + // Might as well take a buf from the cache; + // However it will never be returned to the cache. + // (TunnelDataMessage will not wrap the buffer in a new ByteArray and release() it) + // See also TDM for more discussion. byte preprocessed[] = _dataCache.acquire().getData(); int offset = 0; @@ -389,7 +394,7 @@ public class BatchedPreprocessor extends TrivialPreprocessor { _log.error("Error preprocessing the messages (offset=" + offset + " start=" + startAt + " through=" + sendThrough + " pending=" + pending.size() + " preproc=" + preprocessed.length); return; } - + long msgId = sender.sendPreprocessed(preprocessed, rec); for (int i = 0; i < pending.size(); i++) { TunnelGateway.Pending cur = pending.get(i); diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index 6e4c00e1c8..18a06f0362 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -4,7 +4,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import net.i2p.I2PAppContext; import net.i2p.data.Base64; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; @@ -13,6 +12,7 @@ import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessageException; import net.i2p.data.i2np.I2NPMessageHandler; +import net.i2p.router.RouterContext; import net.i2p.util.ByteCache; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -30,6 +30,8 @@ fragments it across the necessary number of 1KB tunnel messages, and decides how each I2NP message should be handled by the tunnel endpoint, encoding that data into the raw tunnel payload:</p> <ul> +<li>The 4 byte Tunnel ID</li> +<li>The 16 byte IV</li> <li>the first 4 bytes of the SHA256 of (the remaining preprocessed data concatenated with the IV), using the IV as will be seen on the tunnel endpoint (for outbound tunnels), or the IV as was seen on the tunnel gateway (for inbound @@ -81,13 +83,15 @@ set, this is a follow on fragment.</p> </ul> <p>The I2NP message is encoded in its standard form, and the -preprocessed payload must be padded to a multiple of 16 bytes.</p> +preprocessed payload must be padded to a multiple of 16 bytes. +The total size, including the tunnel ID and IV, is 1028 bytes. +</p> * */ public class FragmentHandler { - private I2PAppContext _context; - private Log _log; + protected RouterContext _context; + protected Log _log; private final Map<Long, FragmentedMessage> _fragmentedMessages; private DefragmentedReceiver _receiver; private int _completed; @@ -98,7 +102,7 @@ public class FragmentHandler { static long MAX_DEFRAGMENT_TIME = 60*1000; private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE); - public FragmentHandler(I2PAppContext context, DefragmentedReceiver receiver) { + public FragmentHandler(RouterContext context, DefragmentedReceiver receiver) { _context = context; _log = context.logManager().getLog(FragmentHandler.class); _fragmentedMessages = new HashMap(8); @@ -185,6 +189,9 @@ public class FragmentHandler { // each of the FragmentedMessages populated make a copy out of the // payload, which they release separately, so we can release // immediately + // + // This is certainly interesting, to wrap the 1024-byte array in a new ByteArray + // in order to put it in the pool, but it shouldn't cause any harm. _cache.release(new ByteArray(preprocessed)); } } @@ -204,6 +211,13 @@ public class FragmentHandler { * this. */ private boolean verifyPreprocessed(byte preprocessed[], int offset, int length) { + // ByteCache/ByteArray corruption detection + //byte[] orig = new byte[length]; + //System.arraycopy(preprocessed, 0, orig, 0, length); + //try { + // Thread.sleep(75); + //} catch (InterruptedException ie) {} + // now we need to verify that the message was received correctly int paddingEnd = HopProcessor.IV_LENGTH + 4; while (preprocessed[offset+paddingEnd] != (byte)0x00) { @@ -249,6 +263,13 @@ public class FragmentHandler { _context.statManager().addRateData("tunnel.fullFragments", 1, 0); } + // ByteCache/ByteArray corruption detection + //if (!DataHelper.eq(preprocessed, 0, orig, 0, length)) { + // _log.log(Log.CRIT, "Not equal! orig =\n" + Base64.encode(orig, 0, length) + + // "\nprep =\n" + Base64.encode(preprocessed, 0, length), + // new Exception("hosed")); + //} + return eq; } @@ -514,7 +535,7 @@ public class FragmentHandler { _failed++; noteFailure(_msg.getMessageId(), _msg.toString()); if (_log.shouldLog(Log.WARN)) - _log.warn("Dropped failed fragmented message: " + _msg); + _log.warn("Dropped incomplete fragmented message: " + _msg); _context.statManager().addRateData("tunnel.fragmentedDropped", _msg.getFragmentCount(), _msg.getLifetime()); _msg.failed(); } else { diff --git a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java index 10841701d0..0515f813b5 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java @@ -188,6 +188,11 @@ public class FragmentedMessage { public int getCompleteSize() { if (!_lastReceived) throw new IllegalStateException("wtf, don't get the completed size when we're not complete"); + if (_releasedAfter > 0) { + RuntimeException e = new RuntimeException("use after free in FragmentedMessage"); + _log.error("FM completeSize()", e); + throw e; + } int size = 0; for (int i = 0; i <= _highFragmentNum; i++) { ByteArray ba = _fragments[i]; @@ -205,6 +210,11 @@ public class FragmentedMessage { public void writeComplete(OutputStream out) throws IOException { + if (_releasedAfter > 0) { + RuntimeException e = new RuntimeException("use after free in FragmentedMessage"); + _log.error("FM writeComplete()", e); + throw e; + } for (int i = 0; i <= _highFragmentNum; i++) { ByteArray ba = _fragments[i]; out.write(ba.getData(), ba.getOffset(), ba.getValid()); @@ -212,6 +222,11 @@ public class FragmentedMessage { _completed = true; } public void writeComplete(byte target[], int offset) { + if (_releasedAfter > 0) { + RuntimeException e = new RuntimeException("use after free in FragmentedMessage"); + _log.error("FM writeComplete() 2", e); + throw e; + } for (int i = 0; i <= _highFragmentNum; i++) { ByteArray ba = _fragments[i]; System.arraycopy(ba.getData(), ba.getOffset(), target, offset, ba.getValid()); @@ -241,6 +256,11 @@ public class FragmentedMessage { * */ private void releaseFragments() { + if (_releasedAfter > 0) { + RuntimeException e = new RuntimeException("double free in FragmentedMessage"); + _log.error("FM releaseFragments()", e); + throw e; + } _releasedAfter = getLifetime(); for (int i = 0; i <= _highFragmentNum; i++) { ByteArray ba = _fragments[i]; @@ -251,6 +271,7 @@ public class FragmentedMessage { } } +/**** public InputStream getInputStream() { return new FragmentInputStream(); } private class FragmentInputStream extends InputStream { private int _fragment; @@ -274,6 +295,7 @@ public class FragmentedMessage { } } } +****/ @Override public String toString() { @@ -301,6 +323,7 @@ public class FragmentedMessage { return buf.toString(); } +/***** public static void main(String args[]) { try { I2PAppContext ctx = I2PAppContext.getGlobalContext(); @@ -327,4 +350,5 @@ public class FragmentedMessage { e.printStackTrace(); } } +******/ } diff --git a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java index 32e4c2c2ef..7ba086fdcf 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java @@ -30,7 +30,16 @@ public class OutboundTunnelEndpoint { } public void dispatch(TunnelDataMessage msg, Hash recvFrom) { _config.incrementProcessedMessages(); - _processor.process(msg.getData(), 0, msg.getData().length, recvFrom); + boolean ok = _processor.process(msg.getData(), 0, msg.getData().length, recvFrom); + if (!ok) { + // invalid IV + // If we pass it on to the handler, it will fail + // If we don't, the data buf won't get released from the cache... that's ok + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid IV, dropping at OBEP " + _config); + _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); + return; + } _handler.receiveTunnelMessage(msg.getData(), 0, msg.getData().length); } diff --git a/router/java/src/net/i2p/router/tunnel/RouterFragmentHandler.java b/router/java/src/net/i2p/router/tunnel/RouterFragmentHandler.java index acb4231d27..50b9396bec 100644 --- a/router/java/src/net/i2p/router/tunnel/RouterFragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/RouterFragmentHandler.java @@ -7,31 +7,27 @@ import net.i2p.util.Log; * Minor extension to allow message history integration */ public class RouterFragmentHandler extends FragmentHandler { - private RouterContext _routerContext; - private Log _log; public RouterFragmentHandler(RouterContext context, DefragmentedReceiver receiver) { super(context, receiver); - _routerContext = context; - _log = context.logManager().getLog(RouterFragmentHandler.class); } @Override protected void noteReception(long messageId, int fragmentId, Object status) { if (_log.shouldLog(Log.INFO)) _log.info("Received fragment " + fragmentId + " for message " + messageId + ": " + status); - _routerContext.messageHistory().receiveTunnelFragment(messageId, fragmentId, status); + _context.messageHistory().receiveTunnelFragment(messageId, fragmentId, status); } @Override protected void noteCompletion(long messageId) { if (_log.shouldLog(Log.INFO)) _log.info("Received complete message " + messageId); - _routerContext.messageHistory().receiveTunnelFragmentComplete(messageId); + _context.messageHistory().receiveTunnelFragmentComplete(messageId); } @Override protected void noteFailure(long messageId, String status) { if (_log.shouldLog(Log.INFO)) _log.info("Dropped message " + messageId + ": " + status); - _routerContext.messageHistory().droppedFragmentedMessage(messageId, status); + _context.messageHistory().droppedFragmentedMessage(messageId, status); } } diff --git a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java index 6cda3071f3..0e32f0f937 100644 --- a/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java +++ b/router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java @@ -25,9 +25,17 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { public static final int PREPROCESSED_SIZE = 1024; protected static final int IV_SIZE = HopProcessor.IV_LENGTH; + + /** + * Here in tunnels, we take from the cache but never add to it. + * In other words, we take advantage of other places in the router also using 1024-byte ByteCaches + * (since ByteCache only maintains once instance for each size) + * Used in BatchedPreprocessor; see add'l comments there + */ protected static final ByteCache _dataCache = ByteCache.getInstance(32, PREPROCESSED_SIZE); - protected static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE); - protected static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH); + + private static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE); + private static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH); public TrivialPreprocessor(RouterContext ctx) { _context = ctx; @@ -41,8 +49,10 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { * Return true if there were messages remaining, and we should queue up * a delayed flush to clear them * + * NOTE: Unused here, see BatchedPreprocessor override, super is not called. */ public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) { + if (true) throw new IllegalArgumentException("unused, right?"); long begin = System.currentTimeMillis(); StringBuilder buf = null; if (_log.shouldLog(Log.DEBUG)) { @@ -87,6 +97,9 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { protected void notePreprocessing(long messageId, int numFragments, int totalLength, List<Long> messageIds, String msg) {} + /* + * @deprecated unused except by above + */ private byte[][] preprocess(TunnelGateway.Pending msg) { List fragments = new ArrayList(1); @@ -110,6 +123,7 @@ public class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor { * bytes after the IV, followed by the first 4 bytes of that SHA256, lining up * exactly to meet the beginning of the instructions. (i hope) * + * @deprecated unused except by above */ private byte[] preprocessFragment(TunnelGateway.Pending msg) { byte target[] = _dataCache.acquire().getData(); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index 1eb9897e94..bf093f7a26 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -224,6 +224,10 @@ public class TunnelGateway { public int getFragmentNumber() { return _fragmentNumber; } /** ok, fragment sent, increment what the next will be */ public void incrementFragmentNumber() { _fragmentNumber++; } + /** + * Add an ID to the list of the TunnelDataMssages this message was fragmented into. + * Unused except in notePreprocessing() calls for debugging + */ public void addMessageId(long id) { synchronized (Pending.this) { if (_messageIds == null) @@ -231,6 +235,10 @@ public class TunnelGateway { _messageIds.add(new Long(id)); } } + /** + * The IDs of the TunnelDataMssages this message was fragmented into. + * Unused except in notePreprocessing() calls for debugging + */ public List<Long> getMessageIds() { synchronized (Pending.this) { if (_messageIds != null) diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index 6263f8ec70..54f22ce1c9 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -71,6 +71,9 @@ public class TunnelParticipant { if (_log.shouldLog(Log.WARN)) _log.warn("Failed to dispatch " + msg + ": processor=" + _processor + " inboundEndpoint=" + _inboundEndpointProcessor); + if (_config != null) + _config.incrementProcessedMessages(); + _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); return; } -- GitLab