diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java index 06f015fcd155a05690837f349bf35c89d219f0e4..c966ff2374f0ae03e669fdfe689877ac1f98ee21 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java @@ -191,7 +191,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa } public void gotAddress(byte[] ip, int port) { - System.out.println("Got ADDRESS block: " + Addresses.toString(ip, port)); throw new IllegalStateException("Address in Handshake"); } @@ -204,7 +203,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa } public void gotRelayTag(long tag) { - System.out.println("Got relay tag " + tag); throw new IllegalStateException("Relay tag in Handshake"); } @@ -218,19 +216,17 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa throw new IllegalStateException("I2NP in Sess Req"); } - public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException { + public void gotFragment(byte[] data, int off, int len, long messageID, int frag, boolean isLast) throws DataFormatException { System.out.println("Got FRAGMENT block: " + messageID); if (getState() != InboundState.IB_STATE_CREATED_SENT) throw new IllegalStateException("I2NP in Sess Req"); } public void gotACK(long ackThru, int acks, byte[] ranges) { - System.out.println("Got ACK block: " + ackThru); throw new IllegalStateException("ACK in Handshake"); } public void gotTermination(int reason, long count) { - System.out.println("Got TERMINATION block, reason: " + reason + " count: " + count); throw new IllegalStateException("Termination in Handshake"); } @@ -239,7 +235,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa } public void gotPadding(int paddingLength, int frameLength) { - System.out.println("Got PADDING block, len: " + paddingLength + " in frame len: " + frameLength); } ///////////////////////////////////////////////////////// diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 1410d67ca90ff05a39bce72fc5fb54960d6553a0..90a945a7d167f6466d4646fe3ec97dcdb313649a 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -68,6 +68,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ /** * Pull the fragments and ACKs out of the authenticated data packet + * + * SSU 1 only. */ public void receiveData(PeerState from, UDPPacketReader.DataReader data) { try { diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java index e13a81c813557f27aad568a0c9c8c1141b418821..8405f7a752ce77a66889cc69a366735ad453edea 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java @@ -45,6 +45,9 @@ class InboundMessageState implements CDQEntry { private static final int MAX_FRAGMENT_SIZE = UDPPacket.MAX_PACKET_SIZE; private static final ByteCache _fragmentCache = ByteCache.getInstance(64, MAX_FRAGMENT_SIZE); + /** + * Only for Poison right now. + */ public InboundMessageState(RouterContext ctx, long messageId, Hash from) { _context = ctx; _log = ctx.logManager().getLog(InboundMessageState.class); @@ -62,6 +65,9 @@ class InboundMessageState implements CDQEntry { * This is more efficient if the fragment is the last (and probably only) fragment. * The main savings is not allocating ByteArray[64]. * + * SSU 1 only. + * + * @param dataFragment the fragment index in the DataReader, NOT the fragment number * @throws DataFormatException if the fragment was corrupt * @since 0.9.9 */ @@ -86,11 +92,47 @@ class InboundMessageState implements CDQEntry { if (!receiveFragment(data, dataFragment)) throw new DataFormatException("corrupt"); } - + + /** + * Create a new IMS and read in the data from the fragment. + * Do NOT call receiveFragment for the same fragment afterwards. + * This is more efficient if the fragment is the last (and probably only) fragment. + * The main savings is not allocating ByteArray[64]. + * + * SSU 2 only. + * + * @param fragmentNum the fragment number + * @throws DataFormatException if the fragment was corrupt + * @since 0.9.54 + */ + public InboundMessageState(RouterContext ctx, long messageId, Hash from, + byte[] data, int off, int len, int fragmentNum, boolean isLast) + throws DataFormatException { + _context = ctx; + _log = ctx.logManager().getLog(InboundMessageState.class); + _messageId = messageId; + _from = from; + if (isLast) { + if (fragmentNum > MAX_FRAGMENTS) + throw new DataFormatException("corrupt - too many fragments: " + fragmentNum); + _fragments = new ByteArray[fragmentNum]; + } else { + _fragments = new ByteArray[MAX_FRAGMENTS]; + } + _lastFragment = -1; + _completeSize = -1; + _receiveBegin = ctx.clock().now(); + if (!receiveFragment(data, off, len, fragmentNum, isLast)) + throw new DataFormatException("corrupt"); + } + /** * Read in the data from the fragment. * Caller should synchronize. * + * SSU 1 only. + * + * @param dataFragment the fragment index in the DataReader, NOT the fragment number * @return true if the data was ok, false if it was corrupt */ public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) throws DataFormatException { @@ -153,7 +195,58 @@ class InboundMessageState implements CDQEntry { } return true; } - + + /** + * Read in the data from the fragment. + * Caller should synchronize. + * + * SSU 2 only. + * + * @param fragmentNum the fragment number + * @return true if the data was ok, false if it was corrupt + * @since 0.9.54 + */ + public boolean receiveFragment(byte[] data, int off, int len, int fragmentNum, boolean isLast) throws DataFormatException { + if (fragmentNum >= _fragments.length) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid fragment " + fragmentNum + '/' + _fragments.length); + return false; + } + if (_fragments[fragmentNum] == null) { + // new fragment, read it + ByteArray message = _fragmentCache.acquire(); + System.arraycopy(data, off, message.getData(), 0, len); + message.setValid(len); + _fragments[fragmentNum] = message; + if (isLast) { + // don't allow _lastFragment to be set twice + if (_lastFragment >= 0) { + if (_log.shouldWarn()) + _log.warn("Multiple last fragments for message " + _messageId + " from " + _from); + return false; + } + // TODO - check for non-last fragments after this one? + _lastFragment = fragmentNum; + } else if (_lastFragment >= 0 && fragmentNum >= _lastFragment) { + // don't allow non-last after last + if (_log.shouldWarn()) + _log.warn("Non-last fragment " + fragmentNum + " when last is " + _lastFragment + " for message " + _messageId + " from " + _from); + return false; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New fragment " + fragmentNum + " for message " + _messageId + + ", size=" + len + + ", isLast=" + isLast + /* + ", data=" + Base64.encode(message.getData(), 0, size) */ ); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received fragment " + fragmentNum + " for message " + _messageId + + " again, old size=" + _fragments[fragmentNum].getValid() + + " and new size=" + len); + } + return true; + } + /** * May not be valid after released. * Probably doesn't need to be synced by caller, given the order of diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index 96e3148326371f2c6e37d5ffafdbe58b574ddbb1..27b942ade47b562300daddc1d3341e2eef417bc0 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -187,33 +187,29 @@ class MessageReceiver { * @return null on error */ private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) { + int sz = state.getCompleteSize(); try { - //byte buf[] = new byte[state.getCompleteSize()]; I2NPMessage m; int numFragments = state.getFragmentCount(); if (numFragments > 1) { ByteArray fragments[] = state.getFragments(); int off = 0; + byte[] data = buf.getData(); for (int i = 0; i < numFragments; i++) { - System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid()); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": " - // + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid()) - // + " (valid: " + fragments[i].getValid() - // + " raw: " + Base64.encode(fragments[i].getData()) + ")"); - off += fragments[i].getValid(); + ByteArray ba = fragments[i]; + int len = ba.getValid(); + System.arraycopy(ba.getData(), 0, data, off, len); + off += len; } - if (off != state.getCompleteSize()) { + if (off != sz) { if (_log.shouldLog(Log.WARN)) - _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize()); + _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + sz); return null; } - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Raw byte array for " + state.getMessageId() + ": " + HexDump.dump(buf.getData(), 0, state.getCompleteSize())); - m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler); + m = I2NPMessageImpl.fromRawByteArray(_context, data, 0, sz, handler); } else { // zero copy for single fragment - m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, state.getCompleteSize(), handler); + m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, sz, handler); } m.setUniqueId(state.getMessageId()); return m; @@ -227,8 +223,8 @@ class MessageReceiver { byte[] data = ba.getData(); _log.warn("Message invalid: " + state + " PeerState: " + _transport.getPeerState(state.getFrom()) + - "\nDUMP:\n" + HexDump.dump(data, 0, state.getCompleteSize()) + - "\nRAW:\n" + Base64.encode(data, 0, state.getCompleteSize()), + "\nDUMP:\n" + HexDump.dump(data, 0, sz) + + "\nRAW:\n" + Base64.encode(data, 0, sz), ime); } if (state.getFragments()[0].getData()[0] == DatabaseStoreMessage.MESSAGE_TYPE) { diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java index 3d498b6ebbf3d9a5db8882177c349a8ca8cdbc89..3e945b8d7479017756b7730ca1a0e479b3bf22b8 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java @@ -149,12 +149,10 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl } public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException { - System.out.println("Got RI block: " + ri); throw new DataFormatException("RI in Sess Created"); } public void gotRIFragment(byte[] data, boolean isHandshake, boolean flood, boolean isGzipped, int frag, int totalFrags) { - System.out.println("Got RI fragment " + frag + " of " + totalFrags); throw new IllegalStateException("RI in Sess Created"); } @@ -167,7 +165,6 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl } public void gotRelayTagRequest() { - System.out.println("Got relay tag request"); throw new IllegalStateException("Relay tag req in Sess Created"); } @@ -180,22 +177,18 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl } public void gotI2NP(I2NPMessage msg) { - System.out.println("Got I2NP block: " + msg); throw new IllegalStateException("I2NP in Sess Created"); } - public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException { - System.out.println("Got FRAGMENT block: " + messageID); - throw new IllegalStateException("I2NP in Sess Created"); + public void gotFragment(byte[] data, int off, int len, long messageId,int frag, boolean isLast) throws DataFormatException { + throw new DataFormatException("I2NP in Sess Created"); } public void gotACK(long ackThru, int acks, byte[] ranges) { - System.out.println("Got ACK block: " + ackThru); throw new IllegalStateException("ACK in Sess Created"); } public void gotTermination(int reason, long count) { - System.out.println("Got TERMINATION block, reason: " + reason + " count: " + count); throw new IllegalStateException("Termination in Sess Created"); } @@ -204,7 +197,6 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl } public void gotPadding(int paddingLength, int frameLength) { - System.out.println("Got PADDING block, len: " + paddingLength + " in frame len: " + frameLength); } ///////////////////////////////////////////////////////// diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 591a034491d843f692806c865f8617f2f825da6f..fd716c55f8836f6ec7dc127e49d6af76b43c9e43 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -46,7 +46,7 @@ public class PeerState { * receiving the connection this will be set only after the connection * is established. */ - private final Hash _remotePeer; + protected final Hash _remotePeer; /** * The AES key used to verify packets, set only after the connection is * established. @@ -192,7 +192,7 @@ public class PeerState { private boolean _mayDisconnect; /** list of InboundMessageState for active message */ - private final Map<Long, InboundMessageState> _inboundMessages; + protected final Map<Long, InboundMessageState> _inboundMessages; /** * Mostly messages that have been transmitted and are awaiting acknowledgement, @@ -212,7 +212,7 @@ public class PeerState { /** when the retransmit timer is about to trigger */ private long _retransmitTimer; - private final UDPTransport _transport; + protected final UDPTransport _transport; /** have we migrated away from this peer to another newer one? */ private volatile boolean _dead; diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState2.java b/router/java/src/net/i2p/router/transport/udp/PeerState2.java index 2fff406f1bbda7a2a015bb10c40a766fc713bb2e..8519a245093bb3ec5f9e5d34dbc056b2bd0d7c00 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState2.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState2.java @@ -7,12 +7,15 @@ import java.util.concurrent.atomic.AtomicInteger; import com.southernstorm.noise.protocol.CipherState; +import net.i2p.data.ByteArray; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Hash; import net.i2p.data.router.RouterInfo; import net.i2p.data.SessionKey; import net.i2p.data.i2np.I2NPMessage; +import net.i2p.data.i2np.I2NPMessageException; +import net.i2p.data.i2np.I2NPMessageImpl; import net.i2p.router.RouterContext; import static net.i2p.router.transport.udp.SSU2Util.*; import net.i2p.util.HexDump; @@ -116,7 +119,9 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback return; } } - processPayload(data, off + SHORT_HEADER_SIZE, len - (SHORT_HEADER_SIZE + MAC_LEN)); + int payloadLen = len - (SHORT_HEADER_SIZE + MAC_LEN); + processPayload(data, off + SHORT_HEADER_SIZE, payloadLen); + packetReceived(payloadLen); } catch (GeneralSecurityException gse) { if (_log.shouldWarn()) _log.warn("Bad encrypted packet:\n" + HexDump.dump(data, off, len), gse); @@ -169,9 +174,50 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback } public void gotI2NP(I2NPMessage msg) { + // 9 byte header + int size = msg.getMessageSize() - 7; + // complete message, skip IMF and MessageReceiver + _transport.messageReceived(msg, null, _remotePeer, 0, size); } - public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException { + public void gotFragment(byte[] data, int off, int len, long messageId,int frag, boolean isLast) throws DataFormatException { + InboundMessageState state; + boolean messageComplete = false; + boolean messageExpired = false; + + synchronized (_inboundMessages) { + state = _inboundMessages.get(messageId); + if (state == null) { + state = new InboundMessageState(_context, messageId, _remotePeer, data, off, len, frag, isLast); + _inboundMessages.put(messageId, state); + } else { + boolean fragmentOK = state.receiveFragment(data, off, len, frag, isLast); + if (!fragmentOK) + return; + if (state.isComplete()) { + messageComplete = true; + _inboundMessages.remove(messageId); + } else if (state.isExpired()) { + messageExpired = true; + _inboundMessages.remove(messageId); + } + } + } + + if (messageComplete) { + messageFullyReceived(messageId, state.getCompleteSize()); + if (_log.shouldDebug()) + _log.debug("Message received completely! " + state); + _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); + _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime()); + receiveMessage(state); + } else if (messageExpired) { + if (_log.shouldWarn()) + _log.warn("Message expired while only being partially read: " + state); + _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString()); + // all state access must be before this + state.releaseResources(); + } } public void gotACK(long ackThru, int acks, byte[] ranges) { @@ -190,4 +236,40 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback // end payload callbacks ///////////////////////////////////////////////////////// + /** + * Do what MessageReceiver does, but inline and for SSU2. + * Will always be more than one fragment. + */ + private void receiveMessage(InboundMessageState state) { + int sz = state.getCompleteSize(); + try { + byte buf[] = new byte[sz]; + I2NPMessage m; + int numFragments = state.getFragmentCount(); + ByteArray fragments[] = state.getFragments(); + int off = 0; + for (int i = 0; i < numFragments; i++) { + ByteArray ba = fragments[i]; + int len = ba.getValid(); + System.arraycopy(ba.getData(), 0, buf, off, len); + off += len; + } + if (off != sz) { + if (_log.shouldWarn()) + _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + sz); + return; + } + I2NPMessage msg = I2NPMessageImpl.fromRawByteArrayNTCP2(_context, buf, 0, sz, null); + _transport.messageReceived(msg, null, _remotePeer, state.getLifetime(), sz); + } catch (I2NPMessageException ime) { + if (_log.shouldWarn()) + _log.warn("Message invalid: " + state + " PeerState: " + this, ime); + } catch (RuntimeException e) { + // e.g. AIOOBE + if (_log.shouldWarn()) + _log.warn("Error handling a message: " + state, e); + } finally { + state.releaseResources(); + } + } } diff --git a/router/java/src/net/i2p/router/transport/udp/SSU2Payload.java b/router/java/src/net/i2p/router/transport/udp/SSU2Payload.java index 510eff32296eb8663c79cb4da37fcf549060e093..f2d3b23368ce1391ea21f9f405e7a11687b23f5f 100644 --- a/router/java/src/net/i2p/router/transport/udp/SSU2Payload.java +++ b/router/java/src/net/i2p/router/transport/udp/SSU2Payload.java @@ -56,10 +56,13 @@ class SSU2Payload { public void gotI2NP(I2NPMessage msg) throws I2NPMessageException; /** - * @param expires 0 for frag greater than 1 - * @param type 0 for frag greater than 1 + * Data must be copied out in this method. + * Data starts at the 9 byte header for fragment 0. + * + * @param off offset in data + * @param len length of data to copy */ - public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException; + public void gotFragment(byte[] data, int off, int len, long messageID, int frag, boolean isLast) throws DataFormatException; /** * @param ranges null if none @@ -193,28 +196,24 @@ class SSU2Payload { case BLOCK_FIRSTFRAG: { if (isHandshake) throw new IOException("Illegal block in handshake: " + type); - if (len < 9) + if (len <= 9) throw new IOException("Bad length for FIRSTFRAG: " + len); - int mtype = payload[i] & 0xff; long id = DataHelper.fromLong(payload, i + 1, 4); - long exp = DataHelper.fromLong(payload, i + 5, 4) * 1000; - byte[] data = new byte[len - 9]; - System.arraycopy(payload, i + 9, data, 0, len - 9); - cb.gotFragment(data, id, mtype, exp, 0, false); + cb.gotFragment(payload, i, len, id, 0, false); break; } case BLOCK_FOLLOWONFRAG: { if (isHandshake) throw new IOException("Illegal block in handshake: " + type); - if (len < 5) + if (len <= 5) throw new IOException("Bad length for FOLLOWON: " + len); int frag = (payload[i] & 0xff) >> 1; + if (frag == 0) + throw new IOException("0 frag for FOLLOWON"); boolean isLast = (payload[i] & 0x01) != 0; long id = DataHelper.fromLong(payload, i + 1, 4); - byte[] data = new byte[len - 5]; - System.arraycopy(payload, i + 5, data, 0, len - 5); - cb.gotFragment(data, id, 0, 0, frag, isLast); + cb.gotFragment(payload, i + 5, len - 5, id, frag, isLast); break; }