diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index cf3c33ad313b8d2449db3bc16ff51ff92b87aceb..962e205fc0b3d56343c2bf41cef7f9fb96d53384 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -99,8 +99,8 @@ class FragmentHandler { private final AtomicInteger _completed = new AtomicInteger(); private final AtomicInteger _failed = new AtomicInteger(); - /** don't wait more than 60s to defragment the partial message */ - static long MAX_DEFRAGMENT_TIME = 60*1000; + /** don't wait more than this long to completely receive a fragmented message */ + static long MAX_DEFRAGMENT_TIME = 45*1000; private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE); public FragmentHandler(RouterContext context, DefragmentedReceiver receiver) { @@ -125,7 +125,7 @@ class FragmentHandler { _log.warn("Unable to verify preprocessed data (pre.length=" + preprocessed.length + " off=" +offset + " len=" + length); _cache.release(new ByteArray(preprocessed)); - _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); + _context.statManager().addRateData("tunnel.corruptMessage", 1); return; } offset += HopProcessor.IV_LENGTH; // skip the IV @@ -136,7 +136,9 @@ class FragmentHandler { // AIOOBE http://forum.i2p/viewtopic.php?t=3187 if (offset >= TrivialPreprocessor.PREPROCESSED_SIZE) { _cache.release(new ByteArray(preprocessed)); - _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); + _context.statManager().addRateData("tunnel.corruptMessage", 1); + if (_log.shouldWarn()) + _log.warn("Corrupt fragment received: off = " + offset); return; } padding++; @@ -150,21 +152,25 @@ class FragmentHandler { while (offset < length) { int off = receiveFragment(preprocessed, offset, length); if (off < 0) { - _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); + _context.statManager().addRateData("tunnel.corruptMessage", 1); + if (_log.shouldWarn()) + _log.warn("Corrupt fragment received: off = " + off); return; } offset = off; } } catch (ArrayIndexOutOfBoundsException aioobe) { - _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); + _context.statManager().addRateData("tunnel.corruptMessage", 1); + if (_log.shouldWarn()) + _log.warn("Corrupt fragment received: offset = " + offset, aioobe); } catch (NullPointerException npe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Corrupt fragment received: offset = " + offset, npe); - _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); + if (_log.shouldWarn()) + _log.warn("Corrupt fragment received: offset = " + offset, npe); + _context.statManager().addRateData("tunnel.corruptMessage", 1); } catch (RuntimeException e) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Corrupt fragment received: offset = " + offset, e); - _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); + if (_log.shouldWarn()) + _log.warn("Corrupt fragment received: offset = " + offset, e); + _context.statManager().addRateData("tunnel.corruptMessage", 1); // java.lang.IllegalStateException: don't get the completed size when we're not complete - null fragment i=0 of 1 // at net.i2p.router.tunnel.FragmentedMessage.getCompleteSize(FragmentedMessage.java:194) // at net.i2p.router.tunnel.FragmentedMessage.toByteArray(FragmentedMessage.java:223) @@ -284,7 +290,8 @@ class FragmentHandler { static final short TYPE_UNDEF = 3; /** - * @return the offset for the next byte after the received fragment + * @return the offset for the next byte after the received fragment or -1 on error + * @throws RuntimeException */ private int receiveFragment(byte preprocessed[], int offset, int length) { if (_log.shouldLog(Log.DEBUG)) @@ -299,7 +306,8 @@ class FragmentHandler { /** * Handle the initial fragment in a message (or a full message, if it fits) * - * @return offset after reading the full fragment + * @return offset after reading the full fragment or -1 on error + * @throws RuntimeException */ private int receiveInitialFragment(byte preprocessed[], int offset, int length) { if (_log.shouldLog(Log.DEBUG)) @@ -317,7 +325,11 @@ class FragmentHandler { if (offset + 4 >= preprocessed.length) return -1; long id = DataHelper.fromLong(preprocessed, offset, 4); - tunnelId = new TunnelId(id); + // i2pd 2.19 bug? 0 will throw IAE. + // message checked and discarded below. + // don't throw so we can process the other fragments if any, if they're from a different message + if (id != 0) + tunnelId = new TunnelId(id); offset += 4; } if ( (type == TYPE_ROUTER) || (type == TYPE_TUNNEL) ) { @@ -359,6 +371,18 @@ class FragmentHandler { if (_log.shouldLog(Log.WARN)) _log.warn("Dropping msg at tunnel endpoint with unsupported delivery instruction type " + type + " rcvr: " + _receiver); + _context.statManager().addRateData("tunnel.corruptMessage", 1); + } else if (type == TYPE_TUNNEL && tunnelId == null) { + // do this after the above since we have to return offset + // i2pd 2.19 bug? see above + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping msg at tunnel endpoint with delivery instruction to tunnel 0" + + " gw: " + router + + " fragmented? " + fragmented + + " id: " + messageId + + " size: " + size + + " type: " + (preprocessed[offset] & 0xff)); + _context.statManager().addRateData("tunnel.corruptMessage", 1); } else if (fragmented) { FragmentedMessage msg; synchronized (_fragmentedMessages) { @@ -408,7 +432,8 @@ class FragmentHandler { /** * Handle a fragment beyond the initial fragment in a message * - * @return offset after reading the full fragment + * @return offset after reading the full fragment or -1 on error + * @throws RuntimeException */ private int receiveSubsequentFragment(byte preprocessed[], int offset, int length) { if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java index af5b908f441107ab2c339ab02ae7beebacad653d..b394f514df4e64372f21f9540a2bb7af1e4c1dbb 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java @@ -39,7 +39,6 @@ class OutboundTunnelEndpoint { // If we don't, the data buf won't get released from the cache... that's ok if (_log.shouldLog(Log.WARN)) _log.warn("Invalid IV, dropping at OBEP " + _config); - _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); return; } _handler.receiveTunnelMessage(msg.getData(), 0, msg.getData().length); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index a1ca5bc9d23ee2fda544325cf8f0d926de364aa9..c27ceecb39f56479fe7cde56cc10923ccf6aa2bc 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -85,10 +85,11 @@ class TunnelParticipant { public void dispatch(TunnelDataMessage msg, Hash recvFrom) { boolean ok = false; + byte[] data = msg.getData(); if (_processor != null) - ok = _processor.process(msg.getData(), 0, msg.getData().length, recvFrom); + ok = _processor.process(data, 0, data.length, recvFrom); else if (_inboundEndpointProcessor != null) - ok = _inboundEndpointProcessor.retrievePreprocessedData(msg.getData(), 0, msg.getData().length, recvFrom); + ok = _inboundEndpointProcessor.retrievePreprocessedData(data, 0, data.length, recvFrom); if (!ok) { if (_log.shouldLog(Log.WARN)) @@ -96,7 +97,6 @@ class TunnelParticipant { + " inboundEndpoint=" + _inboundEndpointProcessor); if (_config != null) _config.incrementProcessedMessages(); - _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); return; } @@ -125,7 +125,7 @@ class TunnelParticipant { _inboundEndpointProcessor.getConfig().incrementProcessedMessages(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive fragment: on " + _config + ": " + msg); - _handler.receiveTunnelMessage(msg.getData(), 0, msg.getData().length); + _handler.receiveTunnelMessage(data, 0, data.length); } }