SSU2: PeerState2 I2NP handling

Handle complete messages and fragments in PeerState2
Send complete messages to transport
Add SSU2 constructor and reader in IMS
Change fragment callback to avoid an extra copy
Fix checks of fragment blocks
MessageReader cleanups for SSU1
Other cleanups

WIP, untested, not hooked in
This commit is contained in:
zzz
2022-02-26 11:31:04 -05:00
parent 6bb3657de2
commit 4908f760d9
8 changed files with 211 additions and 52 deletions

View File

@@ -191,7 +191,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
}
public void gotAddress(byte[] ip, int port) {
System.out.println("Got ADDRESS block: " + Addresses.toString(ip, port));
throw new IllegalStateException("Address in Handshake");
}
@@ -204,7 +203,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
}
public void gotRelayTag(long tag) {
System.out.println("Got relay tag " + tag);
throw new IllegalStateException("Relay tag in Handshake");
}
@@ -218,19 +216,17 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
throw new IllegalStateException("I2NP in Sess Req");
}
public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException {
public void gotFragment(byte[] data, int off, int len, long messageID, int frag, boolean isLast) throws DataFormatException {
System.out.println("Got FRAGMENT block: " + messageID);
if (getState() != InboundState.IB_STATE_CREATED_SENT)
throw new IllegalStateException("I2NP in Sess Req");
}
public void gotACK(long ackThru, int acks, byte[] ranges) {
System.out.println("Got ACK block: " + ackThru);
throw new IllegalStateException("ACK in Handshake");
}
public void gotTermination(int reason, long count) {
System.out.println("Got TERMINATION block, reason: " + reason + " count: " + count);
throw new IllegalStateException("Termination in Handshake");
}
@@ -239,7 +235,6 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
}
public void gotPadding(int paddingLength, int frameLength) {
System.out.println("Got PADDING block, len: " + paddingLength + " in frame len: " + frameLength);
}
/////////////////////////////////////////////////////////

View File

@@ -68,6 +68,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
/**
* Pull the fragments and ACKs out of the authenticated data packet
*
* SSU 1 only.
*/
public void receiveData(PeerState from, UDPPacketReader.DataReader data) {
try {

View File

@@ -45,6 +45,9 @@ class InboundMessageState implements CDQEntry {
private static final int MAX_FRAGMENT_SIZE = UDPPacket.MAX_PACKET_SIZE;
private static final ByteCache _fragmentCache = ByteCache.getInstance(64, MAX_FRAGMENT_SIZE);
/**
* Only for Poison right now.
*/
public InboundMessageState(RouterContext ctx, long messageId, Hash from) {
_context = ctx;
_log = ctx.logManager().getLog(InboundMessageState.class);
@@ -62,6 +65,9 @@ class InboundMessageState implements CDQEntry {
* This is more efficient if the fragment is the last (and probably only) fragment.
* The main savings is not allocating ByteArray[64].
*
* SSU 1 only.
*
* @param dataFragment the fragment index in the DataReader, NOT the fragment number
* @throws DataFormatException if the fragment was corrupt
* @since 0.9.9
*/
@@ -86,11 +92,47 @@ class InboundMessageState implements CDQEntry {
if (!receiveFragment(data, dataFragment))
throw new DataFormatException("corrupt");
}
/**
* Create a new IMS and read in the data from the fragment.
* Do NOT call receiveFragment for the same fragment afterwards.
* This is more efficient if the fragment is the last (and probably only) fragment.
* The main savings is not allocating ByteArray[64].
*
* SSU 2 only.
*
* @param fragmentNum the fragment number
* @throws DataFormatException if the fragment was corrupt
* @since 0.9.54
*/
public InboundMessageState(RouterContext ctx, long messageId, Hash from,
byte[] data, int off, int len, int fragmentNum, boolean isLast)
throws DataFormatException {
_context = ctx;
_log = ctx.logManager().getLog(InboundMessageState.class);
_messageId = messageId;
_from = from;
if (isLast) {
if (fragmentNum > MAX_FRAGMENTS)
throw new DataFormatException("corrupt - too many fragments: " + fragmentNum);
_fragments = new ByteArray[fragmentNum];
} else {
_fragments = new ByteArray[MAX_FRAGMENTS];
}
_lastFragment = -1;
_completeSize = -1;
_receiveBegin = ctx.clock().now();
if (!receiveFragment(data, off, len, fragmentNum, isLast))
throw new DataFormatException("corrupt");
}
/**
* Read in the data from the fragment.
* Caller should synchronize.
*
* SSU 1 only.
*
* @param dataFragment the fragment index in the DataReader, NOT the fragment number
* @return true if the data was ok, false if it was corrupt
*/
public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) throws DataFormatException {
@@ -153,7 +195,58 @@ class InboundMessageState implements CDQEntry {
}
return true;
}
/**
* Read in the data from the fragment.
* Caller should synchronize.
*
* SSU 2 only.
*
* @param fragmentNum the fragment number
* @return true if the data was ok, false if it was corrupt
* @since 0.9.54
*/
public boolean receiveFragment(byte[] data, int off, int len, int fragmentNum, boolean isLast) throws DataFormatException {
if (fragmentNum >= _fragments.length) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid fragment " + fragmentNum + '/' + _fragments.length);
return false;
}
if (_fragments[fragmentNum] == null) {
// new fragment, read it
ByteArray message = _fragmentCache.acquire();
System.arraycopy(data, off, message.getData(), 0, len);
message.setValid(len);
_fragments[fragmentNum] = message;
if (isLast) {
// don't allow _lastFragment to be set twice
if (_lastFragment >= 0) {
if (_log.shouldWarn())
_log.warn("Multiple last fragments for message " + _messageId + " from " + _from);
return false;
}
// TODO - check for non-last fragments after this one?
_lastFragment = fragmentNum;
} else if (_lastFragment >= 0 && fragmentNum >= _lastFragment) {
// don't allow non-last after last
if (_log.shouldWarn())
_log.warn("Non-last fragment " + fragmentNum + " when last is " + _lastFragment + " for message " + _messageId + " from " + _from);
return false;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("New fragment " + fragmentNum + " for message " + _messageId
+ ", size=" + len
+ ", isLast=" + isLast
/* + ", data=" + Base64.encode(message.getData(), 0, size) */ );
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received fragment " + fragmentNum + " for message " + _messageId
+ " again, old size=" + _fragments[fragmentNum].getValid()
+ " and new size=" + len);
}
return true;
}
/**
* May not be valid after released.
* Probably doesn't need to be synced by caller, given the order of

View File

@@ -187,33 +187,29 @@ class MessageReceiver {
* @return null on error
*/
private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {
int sz = state.getCompleteSize();
try {
//byte buf[] = new byte[state.getCompleteSize()];
I2NPMessage m;
int numFragments = state.getFragmentCount();
if (numFragments > 1) {
ByteArray fragments[] = state.getFragments();
int off = 0;
byte[] data = buf.getData();
for (int i = 0; i < numFragments; i++) {
System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": "
// + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())
// + " (valid: " + fragments[i].getValid()
// + " raw: " + Base64.encode(fragments[i].getData()) + ")");
off += fragments[i].getValid();
ByteArray ba = fragments[i];
int len = ba.getValid();
System.arraycopy(ba.getData(), 0, data, off, len);
off += len;
}
if (off != state.getCompleteSize()) {
if (off != sz) {
if (_log.shouldLog(Log.WARN))
_log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
_log.warn("Hmm, offset of the fragments = " + off + " while the state says " + sz);
return null;
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Raw byte array for " + state.getMessageId() + ": " + HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler);
m = I2NPMessageImpl.fromRawByteArray(_context, data, 0, sz, handler);
} else {
// zero copy for single fragment
m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, state.getCompleteSize(), handler);
m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, sz, handler);
}
m.setUniqueId(state.getMessageId());
return m;
@@ -227,8 +223,8 @@ class MessageReceiver {
byte[] data = ba.getData();
_log.warn("Message invalid: " + state +
" PeerState: " + _transport.getPeerState(state.getFrom()) +
"\nDUMP:\n" + HexDump.dump(data, 0, state.getCompleteSize()) +
"\nRAW:\n" + Base64.encode(data, 0, state.getCompleteSize()),
"\nDUMP:\n" + HexDump.dump(data, 0, sz) +
"\nRAW:\n" + Base64.encode(data, 0, sz),
ime);
}
if (state.getFragments()[0].getData()[0] == DatabaseStoreMessage.MESSAGE_TYPE) {

View File

@@ -149,12 +149,10 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl
}
public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException {
System.out.println("Got RI block: " + ri);
throw new DataFormatException("RI in Sess Created");
}
public void gotRIFragment(byte[] data, boolean isHandshake, boolean flood, boolean isGzipped, int frag, int totalFrags) {
System.out.println("Got RI fragment " + frag + " of " + totalFrags);
throw new IllegalStateException("RI in Sess Created");
}
@@ -167,7 +165,6 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl
}
public void gotRelayTagRequest() {
System.out.println("Got relay tag request");
throw new IllegalStateException("Relay tag req in Sess Created");
}
@@ -180,22 +177,18 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl
}
public void gotI2NP(I2NPMessage msg) {
System.out.println("Got I2NP block: " + msg);
throw new IllegalStateException("I2NP in Sess Created");
}
public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException {
System.out.println("Got FRAGMENT block: " + messageID);
throw new IllegalStateException("I2NP in Sess Created");
public void gotFragment(byte[] data, int off, int len, long messageId,int frag, boolean isLast) throws DataFormatException {
throw new DataFormatException("I2NP in Sess Created");
}
public void gotACK(long ackThru, int acks, byte[] ranges) {
System.out.println("Got ACK block: " + ackThru);
throw new IllegalStateException("ACK in Sess Created");
}
public void gotTermination(int reason, long count) {
System.out.println("Got TERMINATION block, reason: " + reason + " count: " + count);
throw new IllegalStateException("Termination in Sess Created");
}
@@ -204,7 +197,6 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl
}
public void gotPadding(int paddingLength, int frameLength) {
System.out.println("Got PADDING block, len: " + paddingLength + " in frame len: " + frameLength);
}
/////////////////////////////////////////////////////////

View File

@@ -46,7 +46,7 @@ public class PeerState {
* receiving the connection this will be set only after the connection
* is established.
*/
private final Hash _remotePeer;
protected final Hash _remotePeer;
/**
* The AES key used to verify packets, set only after the connection is
* established.
@@ -192,7 +192,7 @@ public class PeerState {
private boolean _mayDisconnect;
/** list of InboundMessageState for active message */
private final Map<Long, InboundMessageState> _inboundMessages;
protected final Map<Long, InboundMessageState> _inboundMessages;
/**
* Mostly messages that have been transmitted and are awaiting acknowledgement,
@@ -212,7 +212,7 @@ public class PeerState {
/** when the retransmit timer is about to trigger */
private long _retransmitTimer;
private final UDPTransport _transport;
protected final UDPTransport _transport;
/** have we migrated away from this peer to another newer one? */
private volatile boolean _dead;

View File

@@ -7,12 +7,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.southernstorm.noise.protocol.CipherState;
import net.i2p.data.ByteArray;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.router.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.router.RouterContext;
import static net.i2p.router.transport.udp.SSU2Util.*;
import net.i2p.util.HexDump;
@@ -116,7 +119,9 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
return;
}
}
processPayload(data, off + SHORT_HEADER_SIZE, len - (SHORT_HEADER_SIZE + MAC_LEN));
int payloadLen = len - (SHORT_HEADER_SIZE + MAC_LEN);
processPayload(data, off + SHORT_HEADER_SIZE, payloadLen);
packetReceived(payloadLen);
} catch (GeneralSecurityException gse) {
if (_log.shouldWarn())
_log.warn("Bad encrypted packet:\n" + HexDump.dump(data, off, len), gse);
@@ -169,9 +174,50 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
}
public void gotI2NP(I2NPMessage msg) {
// 9 byte header
int size = msg.getMessageSize() - 7;
// complete message, skip IMF and MessageReceiver
_transport.messageReceived(msg, null, _remotePeer, 0, size);
}
public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException {
public void gotFragment(byte[] data, int off, int len, long messageId,int frag, boolean isLast) throws DataFormatException {
InboundMessageState state;
boolean messageComplete = false;
boolean messageExpired = false;
synchronized (_inboundMessages) {
state = _inboundMessages.get(messageId);
if (state == null) {
state = new InboundMessageState(_context, messageId, _remotePeer, data, off, len, frag, isLast);
_inboundMessages.put(messageId, state);
} else {
boolean fragmentOK = state.receiveFragment(data, off, len, frag, isLast);
if (!fragmentOK)
return;
if (state.isComplete()) {
messageComplete = true;
_inboundMessages.remove(messageId);
} else if (state.isExpired()) {
messageExpired = true;
_inboundMessages.remove(messageId);
}
}
}
if (messageComplete) {
messageFullyReceived(messageId, state.getCompleteSize());
if (_log.shouldDebug())
_log.debug("Message received completely! " + state);
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
receiveMessage(state);
} else if (messageExpired) {
if (_log.shouldWarn())
_log.warn("Message expired while only being partially read: " + state);
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString());
// all state access must be before this
state.releaseResources();
}
}
public void gotACK(long ackThru, int acks, byte[] ranges) {
@@ -190,4 +236,40 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
// end payload callbacks
/////////////////////////////////////////////////////////
/**
* Do what MessageReceiver does, but inline and for SSU2.
* Will always be more than one fragment.
*/
private void receiveMessage(InboundMessageState state) {
int sz = state.getCompleteSize();
try {
byte buf[] = new byte[sz];
I2NPMessage m;
int numFragments = state.getFragmentCount();
ByteArray fragments[] = state.getFragments();
int off = 0;
for (int i = 0; i < numFragments; i++) {
ByteArray ba = fragments[i];
int len = ba.getValid();
System.arraycopy(ba.getData(), 0, buf, off, len);
off += len;
}
if (off != sz) {
if (_log.shouldWarn())
_log.warn("Hmm, offset of the fragments = " + off + " while the state says " + sz);
return;
}
I2NPMessage msg = I2NPMessageImpl.fromRawByteArrayNTCP2(_context, buf, 0, sz, null);
_transport.messageReceived(msg, null, _remotePeer, state.getLifetime(), sz);
} catch (I2NPMessageException ime) {
if (_log.shouldWarn())
_log.warn("Message invalid: " + state + " PeerState: " + this, ime);
} catch (RuntimeException e) {
// e.g. AIOOBE
if (_log.shouldWarn())
_log.warn("Error handling a message: " + state, e);
} finally {
state.releaseResources();
}
}
}

View File

@@ -56,10 +56,13 @@ class SSU2Payload {
public void gotI2NP(I2NPMessage msg) throws I2NPMessageException;
/**
* @param expires 0 for frag greater than 1
* @param type 0 for frag greater than 1
* Data must be copied out in this method.
* Data starts at the 9 byte header for fragment 0.
*
* @param off offset in data
* @param len length of data to copy
*/
public void gotFragment(byte[] data, long messageID, int type, long expires, int frag, boolean isLast) throws DataFormatException;
public void gotFragment(byte[] data, int off, int len, long messageID, int frag, boolean isLast) throws DataFormatException;
/**
* @param ranges null if none
@@ -193,28 +196,24 @@ class SSU2Payload {
case BLOCK_FIRSTFRAG: {
if (isHandshake)
throw new IOException("Illegal block in handshake: " + type);
if (len < 9)
if (len <= 9)
throw new IOException("Bad length for FIRSTFRAG: " + len);
int mtype = payload[i] & 0xff;
long id = DataHelper.fromLong(payload, i + 1, 4);
long exp = DataHelper.fromLong(payload, i + 5, 4) * 1000;
byte[] data = new byte[len - 9];
System.arraycopy(payload, i + 9, data, 0, len - 9);
cb.gotFragment(data, id, mtype, exp, 0, false);
cb.gotFragment(payload, i, len, id, 0, false);
break;
}
case BLOCK_FOLLOWONFRAG: {
if (isHandshake)
throw new IOException("Illegal block in handshake: " + type);
if (len < 5)
if (len <= 5)
throw new IOException("Bad length for FOLLOWON: " + len);
int frag = (payload[i] & 0xff) >> 1;
if (frag == 0)
throw new IOException("0 frag for FOLLOWON");
boolean isLast = (payload[i] & 0x01) != 0;
long id = DataHelper.fromLong(payload, i + 1, 4);
byte[] data = new byte[len - 5];
System.arraycopy(payload, i + 5, data, 0, len - 5);
cb.gotFragment(data, id, 0, 0, frag, isLast);
cb.gotFragment(payload, i + 5, len - 5, id, frag, isLast);
break;
}