From 4908f760d9b6bb2b0e702c7382aae842a26a91e8 Mon Sep 17 00:00:00 2001 From: zzz <zzz@i2pmail.org> Date: Sat, 26 Feb 2022 11:31:04 -0500 Subject: [PATCH] SSU2: PeerState2 I2NP handling Handle complete messages and fragments in PeerState2 Send complete messages to transport Add SSU2 constructor and reader in IMS Change fragment callback to avoid an extra copy Fix checks of fragment blocks MessageReader cleanups for SSU1 Other cleanups WIP, untested, not hooked in --- .../transport/udp/InboundEstablishState2.java | 7 +- .../udp/InboundMessageFragments.java | 2 + .../transport/udp/InboundMessageState.java | 97 ++++++++++++++++++- .../router/transport/udp/MessageReceiver.java | 28 +++--- .../udp/OutboundEstablishState2.java | 12 +-- .../i2p/router/transport/udp/PeerState.java | 6 +- .../i2p/router/transport/udp/PeerState2.java | 86 +++++++++++++++- .../i2p/router/transport/udp/SSU2Payload.java | 25 +++-- 8 files changed, 211 insertions(+), 52 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java index 06f015fcd1..c966ff2374 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java @@ -191,7 +191,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa } public void gotAddress(byte[] ip, int port) { - System.out.println("Got ADDRESS block: " + Addresses.toString(ip, port)); throw new IllegalStateException("Address in Handshake"); } @@ -204,7 +203,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa } public void gotRelayTag(long tag) { - System.out.println("Got relay tag " + tag); throw new IllegalStateException("Relay tag in Handshake"); } @@ -218,19 +216,17 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa throw new IllegalStateException("I2NP in Sess Req"); } - public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException { + public void gotFragment(byte[] data, int off, int len, long messageID, int frag, boolean isLast) throws DataFormatException { System.out.println("Got FRAGMENT block: " + messageID); if (getState() != InboundState.IB_STATE_CREATED_SENT) throw new IllegalStateException("I2NP in Sess Req"); } public void gotACK(long ackThru, int acks, byte[] ranges) { - System.out.println("Got ACK block: " + ackThru); throw new IllegalStateException("ACK in Handshake"); } public void gotTermination(int reason, long count) { - System.out.println("Got TERMINATION block, reason: " + reason + " count: " + count); throw new IllegalStateException("Termination in Handshake"); } @@ -239,7 +235,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa } public void gotPadding(int paddingLength, int frameLength) { - System.out.println("Got PADDING block, len: " + paddingLength + " in frame len: " + frameLength); } ///////////////////////////////////////////////////////// diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 1410d67ca9..90a945a7d1 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -68,6 +68,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ /** * Pull the fragments and ACKs out of the authenticated data packet + * + * SSU 1 only. */ public void receiveData(PeerState from, UDPPacketReader.DataReader data) { try { diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java index e13a81c813..8405f7a752 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java @@ -45,6 +45,9 @@ class InboundMessageState implements CDQEntry { private static final int MAX_FRAGMENT_SIZE = UDPPacket.MAX_PACKET_SIZE; private static final ByteCache _fragmentCache = ByteCache.getInstance(64, MAX_FRAGMENT_SIZE); + /** + * Only for Poison right now. + */ public InboundMessageState(RouterContext ctx, long messageId, Hash from) { _context = ctx; _log = ctx.logManager().getLog(InboundMessageState.class); @@ -62,6 +65,9 @@ class InboundMessageState implements CDQEntry { * This is more efficient if the fragment is the last (and probably only) fragment. * The main savings is not allocating ByteArray[64]. * + * SSU 1 only. + * + * @param dataFragment the fragment index in the DataReader, NOT the fragment number * @throws DataFormatException if the fragment was corrupt * @since 0.9.9 */ @@ -86,11 +92,47 @@ class InboundMessageState implements CDQEntry { if (!receiveFragment(data, dataFragment)) throw new DataFormatException("corrupt"); } - + + /** + * Create a new IMS and read in the data from the fragment. + * Do NOT call receiveFragment for the same fragment afterwards. + * This is more efficient if the fragment is the last (and probably only) fragment. + * The main savings is not allocating ByteArray[64]. + * + * SSU 2 only. + * + * @param fragmentNum the fragment number + * @throws DataFormatException if the fragment was corrupt + * @since 0.9.54 + */ + public InboundMessageState(RouterContext ctx, long messageId, Hash from, + byte[] data, int off, int len, int fragmentNum, boolean isLast) + throws DataFormatException { + _context = ctx; + _log = ctx.logManager().getLog(InboundMessageState.class); + _messageId = messageId; + _from = from; + if (isLast) { + if (fragmentNum > MAX_FRAGMENTS) + throw new DataFormatException("corrupt - too many fragments: " + fragmentNum); + _fragments = new ByteArray[fragmentNum]; + } else { + _fragments = new ByteArray[MAX_FRAGMENTS]; + } + _lastFragment = -1; + _completeSize = -1; + _receiveBegin = ctx.clock().now(); + if (!receiveFragment(data, off, len, fragmentNum, isLast)) + throw new DataFormatException("corrupt"); + } + /** * Read in the data from the fragment. * Caller should synchronize. * + * SSU 1 only. + * + * @param dataFragment the fragment index in the DataReader, NOT the fragment number * @return true if the data was ok, false if it was corrupt */ public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) throws DataFormatException { @@ -153,7 +195,58 @@ class InboundMessageState implements CDQEntry { } return true; } - + + /** + * Read in the data from the fragment. + * Caller should synchronize. + * + * SSU 2 only. + * + * @param fragmentNum the fragment number + * @return true if the data was ok, false if it was corrupt + * @since 0.9.54 + */ + public boolean receiveFragment(byte[] data, int off, int len, int fragmentNum, boolean isLast) throws DataFormatException { + if (fragmentNum >= _fragments.length) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid fragment " + fragmentNum + '/' + _fragments.length); + return false; + } + if (_fragments[fragmentNum] == null) { + // new fragment, read it + ByteArray message = _fragmentCache.acquire(); + System.arraycopy(data, off, message.getData(), 0, len); + message.setValid(len); + _fragments[fragmentNum] = message; + if (isLast) { + // don't allow _lastFragment to be set twice + if (_lastFragment >= 0) { + if (_log.shouldWarn()) + _log.warn("Multiple last fragments for message " + _messageId + " from " + _from); + return false; + } + // TODO - check for non-last fragments after this one? + _lastFragment = fragmentNum; + } else if (_lastFragment >= 0 && fragmentNum >= _lastFragment) { + // don't allow non-last after last + if (_log.shouldWarn()) + _log.warn("Non-last fragment " + fragmentNum + " when last is " + _lastFragment + " for message " + _messageId + " from " + _from); + return false; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New fragment " + fragmentNum + " for message " + _messageId + + ", size=" + len + + ", isLast=" + isLast + /* + ", data=" + Base64.encode(message.getData(), 0, size) */ ); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received fragment " + fragmentNum + " for message " + _messageId + + " again, old size=" + _fragments[fragmentNum].getValid() + + " and new size=" + len); + } + return true; + } + /** * May not be valid after released. * Probably doesn't need to be synced by caller, given the order of diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index 96e3148326..27b942ade4 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -187,33 +187,29 @@ class MessageReceiver { * @return null on error */ private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) { + int sz = state.getCompleteSize(); try { - //byte buf[] = new byte[state.getCompleteSize()]; I2NPMessage m; int numFragments = state.getFragmentCount(); if (numFragments > 1) { ByteArray fragments[] = state.getFragments(); int off = 0; + byte[] data = buf.getData(); for (int i = 0; i < numFragments; i++) { - System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid()); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": " - // + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid()) - // + " (valid: " + fragments[i].getValid() - // + " raw: " + Base64.encode(fragments[i].getData()) + ")"); - off += fragments[i].getValid(); + ByteArray ba = fragments[i]; + int len = ba.getValid(); + System.arraycopy(ba.getData(), 0, data, off, len); + off += len; } - if (off != state.getCompleteSize()) { + if (off != sz) { if (_log.shouldLog(Log.WARN)) - _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize()); + _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + sz); return null; } - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Raw byte array for " + state.getMessageId() + ": " + HexDump.dump(buf.getData(), 0, state.getCompleteSize())); - m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler); + m = I2NPMessageImpl.fromRawByteArray(_context, data, 0, sz, handler); } else { // zero copy for single fragment - m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, state.getCompleteSize(), handler); + m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, sz, handler); } m.setUniqueId(state.getMessageId()); return m; @@ -227,8 +223,8 @@ class MessageReceiver { byte[] data = ba.getData(); _log.warn("Message invalid: " + state + " PeerState: " + _transport.getPeerState(state.getFrom()) + - "\nDUMP:\n" + HexDump.dump(data, 0, state.getCompleteSize()) + - "\nRAW:\n" + Base64.encode(data, 0, state.getCompleteSize()), + "\nDUMP:\n" + HexDump.dump(data, 0, sz) + + "\nRAW:\n" + Base64.encode(data, 0, sz), ime); } if (state.getFragments()[0].getData()[0] == DatabaseStoreMessage.MESSAGE_TYPE) { diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java index 3d498b6ebb..3e945b8d74 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java @@ -149,12 +149,10 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl } public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException { - System.out.println("Got RI block: " + ri); throw new DataFormatException("RI in Sess Created"); } public void gotRIFragment(byte[] data, boolean isHandshake, boolean flood, boolean isGzipped, int frag, int totalFrags) { - System.out.println("Got RI fragment " + frag + " of " + totalFrags); throw new IllegalStateException("RI in Sess Created"); } @@ -167,7 +165,6 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl } public void gotRelayTagRequest() { - System.out.println("Got relay tag request"); throw new IllegalStateException("Relay tag req in Sess Created"); } @@ -180,22 +177,18 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl } public void gotI2NP(I2NPMessage msg) { - System.out.println("Got I2NP block: " + msg); throw new IllegalStateException("I2NP in Sess Created"); } - public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException { - System.out.println("Got FRAGMENT block: " + messageID); - throw new IllegalStateException("I2NP in Sess Created"); + public void gotFragment(byte[] data, int off, int len, long messageId,int frag, boolean isLast) throws DataFormatException { + throw new DataFormatException("I2NP in Sess Created"); } public void gotACK(long ackThru, int acks, byte[] ranges) { - System.out.println("Got ACK block: " + ackThru); throw new IllegalStateException("ACK in Sess Created"); } public void gotTermination(int reason, long count) { - System.out.println("Got TERMINATION block, reason: " + reason + " count: " + count); throw new IllegalStateException("Termination in Sess Created"); } @@ -204,7 +197,6 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl } public void gotPadding(int paddingLength, int frameLength) { - System.out.println("Got PADDING block, len: " + paddingLength + " in frame len: " + frameLength); } ///////////////////////////////////////////////////////// diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 591a034491..fd716c55f8 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -46,7 +46,7 @@ public class PeerState { * receiving the connection this will be set only after the connection * is established. */ - private final Hash _remotePeer; + protected final Hash _remotePeer; /** * The AES key used to verify packets, set only after the connection is * established. @@ -192,7 +192,7 @@ public class PeerState { private boolean _mayDisconnect; /** list of InboundMessageState for active message */ - private final Map<Long, InboundMessageState> _inboundMessages; + protected final Map<Long, InboundMessageState> _inboundMessages; /** * Mostly messages that have been transmitted and are awaiting acknowledgement, @@ -212,7 +212,7 @@ public class PeerState { /** when the retransmit timer is about to trigger */ private long _retransmitTimer; - private final UDPTransport _transport; + protected final UDPTransport _transport; /** have we migrated away from this peer to another newer one? */ private volatile boolean _dead; diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState2.java b/router/java/src/net/i2p/router/transport/udp/PeerState2.java index 2fff406f1b..8519a24509 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState2.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState2.java @@ -7,12 +7,15 @@ import java.util.concurrent.atomic.AtomicInteger; import com.southernstorm.noise.protocol.CipherState; +import net.i2p.data.ByteArray; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Hash; import net.i2p.data.router.RouterInfo; import net.i2p.data.SessionKey; import net.i2p.data.i2np.I2NPMessage; +import net.i2p.data.i2np.I2NPMessageException; +import net.i2p.data.i2np.I2NPMessageImpl; import net.i2p.router.RouterContext; import static net.i2p.router.transport.udp.SSU2Util.*; import net.i2p.util.HexDump; @@ -116,7 +119,9 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback return; } } - processPayload(data, off + SHORT_HEADER_SIZE, len - (SHORT_HEADER_SIZE + MAC_LEN)); + int payloadLen = len - (SHORT_HEADER_SIZE + MAC_LEN); + processPayload(data, off + SHORT_HEADER_SIZE, payloadLen); + packetReceived(payloadLen); } catch (GeneralSecurityException gse) { if (_log.shouldWarn()) _log.warn("Bad encrypted packet:\n" + HexDump.dump(data, off, len), gse); @@ -169,9 +174,50 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback } public void gotI2NP(I2NPMessage msg) { + // 9 byte header + int size = msg.getMessageSize() - 7; + // complete message, skip IMF and MessageReceiver + _transport.messageReceived(msg, null, _remotePeer, 0, size); } - public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException { + public void gotFragment(byte[] data, int off, int len, long messageId,int frag, boolean isLast) throws DataFormatException { + InboundMessageState state; + boolean messageComplete = false; + boolean messageExpired = false; + + synchronized (_inboundMessages) { + state = _inboundMessages.get(messageId); + if (state == null) { + state = new InboundMessageState(_context, messageId, _remotePeer, data, off, len, frag, isLast); + _inboundMessages.put(messageId, state); + } else { + boolean fragmentOK = state.receiveFragment(data, off, len, frag, isLast); + if (!fragmentOK) + return; + if (state.isComplete()) { + messageComplete = true; + _inboundMessages.remove(messageId); + } else if (state.isExpired()) { + messageExpired = true; + _inboundMessages.remove(messageId); + } + } + } + + if (messageComplete) { + messageFullyReceived(messageId, state.getCompleteSize()); + if (_log.shouldDebug()) + _log.debug("Message received completely! " + state); + _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); + _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime()); + receiveMessage(state); + } else if (messageExpired) { + if (_log.shouldWarn()) + _log.warn("Message expired while only being partially read: " + state); + _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString()); + // all state access must be before this + state.releaseResources(); + } } public void gotACK(long ackThru, int acks, byte[] ranges) { @@ -190,4 +236,40 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback // end payload callbacks ///////////////////////////////////////////////////////// + /** + * Do what MessageReceiver does, but inline and for SSU2. + * Will always be more than one fragment. + */ + private void receiveMessage(InboundMessageState state) { + int sz = state.getCompleteSize(); + try { + byte buf[] = new byte[sz]; + I2NPMessage m; + int numFragments = state.getFragmentCount(); + ByteArray fragments[] = state.getFragments(); + int off = 0; + for (int i = 0; i < numFragments; i++) { + ByteArray ba = fragments[i]; + int len = ba.getValid(); + System.arraycopy(ba.getData(), 0, buf, off, len); + off += len; + } + if (off != sz) { + if (_log.shouldWarn()) + _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + sz); + return; + } + I2NPMessage msg = I2NPMessageImpl.fromRawByteArrayNTCP2(_context, buf, 0, sz, null); + _transport.messageReceived(msg, null, _remotePeer, state.getLifetime(), sz); + } catch (I2NPMessageException ime) { + if (_log.shouldWarn()) + _log.warn("Message invalid: " + state + " PeerState: " + this, ime); + } catch (RuntimeException e) { + // e.g. AIOOBE + if (_log.shouldWarn()) + _log.warn("Error handling a message: " + state, e); + } finally { + state.releaseResources(); + } + } } diff --git a/router/java/src/net/i2p/router/transport/udp/SSU2Payload.java b/router/java/src/net/i2p/router/transport/udp/SSU2Payload.java index 510eff3229..f2d3b23368 100644 --- a/router/java/src/net/i2p/router/transport/udp/SSU2Payload.java +++ b/router/java/src/net/i2p/router/transport/udp/SSU2Payload.java @@ -56,10 +56,13 @@ class SSU2Payload { public void gotI2NP(I2NPMessage msg) throws I2NPMessageException; /** - * @param expires 0 for frag greater than 1 - * @param type 0 for frag greater than 1 + * Data must be copied out in this method. + * Data starts at the 9 byte header for fragment 0. + * + * @param off offset in data + * @param len length of data to copy */ - public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException; + public void gotFragment(byte[] data, int off, int len, long messageID, int frag, boolean isLast) throws DataFormatException; /** * @param ranges null if none @@ -193,28 +196,24 @@ class SSU2Payload { case BLOCK_FIRSTFRAG: { if (isHandshake) throw new IOException("Illegal block in handshake: " + type); - if (len < 9) + if (len <= 9) throw new IOException("Bad length for FIRSTFRAG: " + len); - int mtype = payload[i] & 0xff; long id = DataHelper.fromLong(payload, i + 1, 4); - long exp = DataHelper.fromLong(payload, i + 5, 4) * 1000; - byte[] data = new byte[len - 9]; - System.arraycopy(payload, i + 9, data, 0, len - 9); - cb.gotFragment(data, id, mtype, exp, 0, false); + cb.gotFragment(payload, i, len, id, 0, false); break; } case BLOCK_FOLLOWONFRAG: { if (isHandshake) throw new IOException("Illegal block in handshake: " + type); - if (len < 5) + if (len <= 5) throw new IOException("Bad length for FOLLOWON: " + len); int frag = (payload[i] & 0xff) >> 1; + if (frag == 0) + throw new IOException("0 frag for FOLLOWON"); boolean isLast = (payload[i] & 0x01) != 0; long id = DataHelper.fromLong(payload, i + 1, 4); - byte[] data = new byte[len - 5]; - System.arraycopy(payload, i + 5, data, 0, len - 5); - cb.gotFragment(data, id, 0, 0, frag, isLast); + cb.gotFragment(payload, i + 5, len - 5, id, frag, isLast); break; } -- GitLab