diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index 736a19a284d162ce705e1b1c110adce2b6431fd8..bb054be38e89a115fad3781cf8b5917657f1605f 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -106,7 +106,7 @@ class FragmentHandler { public FragmentHandler(RouterContext context, DefragmentedReceiver receiver) { _context = context; _log = context.logManager().getLog(FragmentHandler.class); - _fragmentedMessages = new HashMap(8); + _fragmentedMessages = new HashMap(16); _receiver = receiver; // all createRateStat in TunnelDispatcher } @@ -351,8 +351,8 @@ class FragmentHandler { int size = (int)DataHelper.fromLong(preprocessed, offset, 2); offset += 2; - FragmentedMessage msg = null; if (fragmented) { + FragmentedMessage msg; synchronized (_fragmentedMessages) { msg = _fragmentedMessages.get(Long.valueOf(messageId)); if (msg == null) { @@ -360,11 +360,7 @@ class FragmentHandler { _fragmentedMessages.put(Long.valueOf(messageId), msg); } } - } else { - msg = new FragmentedMessage(_context); - } - - if (fragmented) { + // synchronized is required, fragments may be arriving in different threads synchronized(msg) { boolean ok = msg.receive(messageId, preprocessed, offset, size, false, router, tunnelId); @@ -387,18 +383,17 @@ class FragmentHandler { } } } - } else { - // synchronized not required if !fragmented - boolean ok = msg.receive(messageId, preprocessed, offset, size, true, router, tunnelId); - if (!ok) return -1; + } else { + // Unfragmented + // synchronized not required // always complete, never an expire event - receiveComplete(msg); + receiveComplete(preprocessed, offset, size, router, tunnelId); } offset += size; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handling finished message " + msg.getMessageId() + " at offset " + offset); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Handling finished message " + msg.getMessageId() + " at offset " + offset); return offset; } @@ -462,6 +457,7 @@ class FragmentHandler { return offset; } + private void receiveComplete(FragmentedMessage msg) { if (msg == null) return; @@ -500,6 +496,37 @@ class FragmentHandler { } } + /** + * Zero-copy reception of an unfragmented message + * @since 0.9 + */ + private void receiveComplete(byte[] data, int offset, int len, Hash router, TunnelId tunnelId) { + _completed++; + try { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("RECV unfrag(" + len + ')'); + + // TODO read in as unknown message for outbound tunnels, + // since this will just be packaged in a TunnelGatewayMessage. + // Not a big savings since most everything is a GarlicMessage + // and so the readMessage() call is fast. + // The unencrypted messages at the OBEP are (V)TBMs + // and perhaps an occasional DatabaseLookupMessage + I2NPMessageHandler h = new I2NPMessageHandler(_context); + h.readMessage(data, offset, len); + I2NPMessage m = h.lastRead(); + noteReception(m.getUniqueId(), 0, "complete: ");// + msg.toString()); + noteCompletion(m.getUniqueId()); + _receiver.receiveComplete(m, router, tunnelId); + } catch (I2NPMessageException ime) { + if (_log.shouldLog(Log.WARN)) { + _log.warn("Error receiving unfragmented message (corrupt?)", ime); + _log.warn("DUMP:\n" + HexDump.dump(data, offset, len)); + _log.warn("RAW:\n" + Base64.encode(data, offset, len)); + } + } + } + protected void noteReception(long messageId, int fragmentId, Object status) {} protected void noteCompletion(long messageId) {} protected void noteFailure(long messageId, String status) {} @@ -523,10 +550,12 @@ class FragmentHandler { } private class RemoveFailed implements SimpleTimer.TimedEvent { - private FragmentedMessage _msg; + private final FragmentedMessage _msg; + public RemoveFailed(FragmentedMessage msg) { _msg = msg; } + public void timeReached() { boolean removed = false; synchronized (_fragmentedMessages) { diff --git a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java index 7a84522f473a32c098618d748c84997786aabe0f..592cc0e9a2af72cfedd007862df30496380c66d8 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java @@ -1,18 +1,10 @@ package net.i2p.router.tunnel; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - import net.i2p.I2PAppContext; -import net.i2p.data.Base64; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; import net.i2p.data.Hash; import net.i2p.data.TunnelId; -import net.i2p.data.i2np.DataMessage; -import net.i2p.data.i2np.I2NPMessage; -import net.i2p.data.i2np.I2NPMessageHandler; import net.i2p.util.ByteCache; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -30,7 +22,7 @@ class FragmentedMessage { private long _messageId; private Hash _toRouter; private TunnelId _toTunnel; - private ByteArray _fragments[]; + private final ByteArray _fragments[]; private boolean _lastReceived; private int _highFragmentNum; private final long _createdOn; @@ -93,9 +85,9 @@ class FragmentedMessage { ba.setValid(length); ba.setOffset(0); //System.arraycopy(payload, offset, ba.getData(), 0, length); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("fragment[" + fragmentNum + "/" + offset + "/" + length + "]: " - + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid())); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("fragment[" + fragmentNum + "/" + offset + "/" + length + "]: " + // + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid())); _fragments[fragmentNum] = ba; _lastReceived = _lastReceived || isLast; @@ -145,9 +137,9 @@ class FragmentedMessage { ba.setValid(length); ba.setOffset(0); //System.arraycopy(payload, offset, ba.getData(), 0, length); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("fragment[0/" + offset + "/" + length + "]: " - + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid())); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("fragment[0/" + offset + "/" + length + "]: " + // + Base64.encode(ba.getData(), ba.getOffset(), ba.getValid())); _fragments[0] = ba; _lastReceived = _lastReceived || isLast; _toRouter = toRouter; @@ -160,6 +152,7 @@ class FragmentedMessage { public long getMessageId() { return _messageId; } public Hash getTargetRouter() { return _toRouter; } public TunnelId getTargetTunnel() { return _toTunnel; } + public int getFragmentCount() { int found = 0; for (int i = 0; i < _fragments.length; i++) @@ -204,6 +197,7 @@ class FragmentedMessage { public boolean getReleased() { return _completed; } +/**** public void writeComplete(OutputStream out) throws IOException { if (_releasedAfter > 0) { RuntimeException e = new RuntimeException("use after free in FragmentedMessage"); @@ -216,7 +210,10 @@ class FragmentedMessage { } _completed = true; } - public void writeComplete(byte target[], int offset) { +****/ + + /** */ + private void writeComplete(byte target[], int offset) { if (_releasedAfter > 0) { RuntimeException e = new RuntimeException("use after free in FragmentedMessage"); _log.error("FM writeComplete() 2", e); @@ -229,6 +226,7 @@ class FragmentedMessage { } _completed = true; } + public byte[] toByteArray() { synchronized (this) { if (_releasedAfter > 0) return null;