diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 33b766d6c09650cc792ced9e1f326bc4e9d6ac74..78e41112f27d07e52edc950f4bbd501d06fd8ad1 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -2,6 +2,7 @@ package net.i2p.router.transport.udp; import java.util.Map; +import net.i2p.data.DataFormatException; import net.i2p.data.Hash; import net.i2p.router.RouterContext; import net.i2p.router.util.DecayingBloomFilter; @@ -121,22 +122,28 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ continue; } - InboundMessageState state = null; + InboundMessageState state; boolean messageComplete = false; boolean messageExpired = false; - boolean fragmentOK = false; + boolean fragmentOK; boolean partialACK = false; synchronized (messages) { boolean isNew = false; state = messages.get(messageId); if (state == null) { - state = new InboundMessageState(_context, mid, fromPeer); + try { + state = new InboundMessageState(_context, mid, fromPeer, data, i); + } catch (DataFormatException dfe) { + break; + } isNew = true; + fragmentOK = true; // we will add to messages shortly if it isn't complete + } else { + fragmentOK = state.receiveFragment(data, i); } - fragmentOK = state.receiveFragment(data, i); if (state.isComplete()) { messageComplete = true; diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java index e4f8435ae40db793f6eaa7f6d71ffd1638483e5f..4c11aa0f4700c0e48532eca4d73522901345b4d0 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java @@ -1,6 +1,7 @@ package net.i2p.router.transport.udp; import net.i2p.data.ByteArray; +import net.i2p.data.DataFormatException; import net.i2p.data.Hash; import net.i2p.router.RouterContext; import net.i2p.router.util.CDQEntry; @@ -52,6 +53,33 @@ class InboundMessageState implements CDQEntry { _receiveBegin = ctx.clock().now(); } + /** + * Create a new IMS and read in the data from the fragment. + * Do NOT call receiveFragment for the same fragment afterwards. + * This is more efficient if the fragment is the last (and probably only) fragment. + * The main savings is not allocating ByteArray[64]. + * + * @throws DataFormatException if the fragment was corrupt + * @since 0.9.9 + */ + public InboundMessageState(RouterContext ctx, long messageId, Hash from, + UDPPacketReader.DataReader data, int dataFragment) + throws DataFormatException { + _context = ctx; + _log = ctx.logManager().getLog(InboundMessageState.class); + _messageId = messageId; + _from = from; + if (data.readMessageIsLast(dataFragment)) + _fragments = new ByteArray[1 + data.readMessageFragmentNum(dataFragment)]; + else + _fragments = new ByteArray[MAX_FRAGMENTS]; + _lastFragment = -1; + _completeSize = -1; + _receiveBegin = ctx.clock().now(); + if (!receiveFragment(data, dataFragment)) + throw new DataFormatException("corrupt"); + } + /** * Read in the data from the fragment. * Caller should synchronize. @@ -60,8 +88,9 @@ class InboundMessageState implements CDQEntry { */ public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) { int fragmentNum = data.readMessageFragmentNum(dataFragment); - if ( (fragmentNum < 0) || (fragmentNum >= MAX_FRAGMENTS)) { - _log.warn("Invalid fragment " + fragmentNum + '/' + MAX_FRAGMENTS); + if ( (fragmentNum < 0) || (fragmentNum >= _fragments.length)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid fragment " + fragmentNum + '/' + _fragments.length); return false; } if (_fragments[fragmentNum] == null) { @@ -105,7 +134,8 @@ class InboundMessageState implements CDQEntry { + ", isLast=" + isLast /* + ", data=" + Base64.encode(message.getData(), 0, size) */ ); } catch (ArrayIndexOutOfBoundsException aioobe) { - _log.warn("Corrupt SSU fragment " + fragmentNum, aioobe); + if (_log.shouldLog(Log.WARN)) + _log.warn("Corrupt SSU fragment " + fragmentNum, aioobe); return false; } } else { @@ -247,7 +277,7 @@ class InboundMessageState implements CDQEntry { public void releaseResources() { _released = true; - for (int i = 0; i < MAX_FRAGMENTS; i++) { + for (int i = 0; i < _fragments.length; i++) { if (_fragments[i] != null) { _fragmentCache.release(_fragments[i]); _fragments[i] = null;