SSU2: Prep for fragmented RI (WIP)

Fix IES2 not being removed from pending after complete
Don't send DSM after handshake
Validate conn ID before pkt type for data pkts
More log tweaks

SSU: Increase min pending establish states limit (unless slow)
Reduce IB establish timeout to 15s
Log pending inbound establish states when limit exceeded
Log tweaks to add establish state lifetime
Bump -11
This commit is contained in:
zzz
2022-03-27 12:36:12 -04:00
parent 8c59c514b2
commit b1852c127b
11 changed files with 134 additions and 56 deletions

View File

@@ -1,3 +1,24 @@
2022-03-27 zzz
* Crypto: Fix CertUtil loading EdDSA certs, check sigs
* Router: Validate family sig at startup
* SSU: Increase min pending establish states limit
* SSU2: Misc. fixes, prep for fragmented RI
2022-03-26 zzz
* Crypto: Add official EdDSA OIDs to provider
* SSU: Reduce ack delay to minimize addition to measured RTT
* SSU2: Hook in IMF Bloom filter to detect dups
2022-03-25 zzz
* Console: Prevent creating a family that another router is using
2022-03-23 zzz
* I2CP: Synch fixes
* SSU2: Refactor tokens
2022-03-22 zzz
* Router family fixes
2022-03-20 zzz
* NetDB:
- Refactor family validation
@@ -66,7 +87,7 @@
* SSU: SSU2 classes and keys (WIP)
2022-02-23 zzz
* i2psnark: Load sytem mime types if available
* i2psnark: Load system mime types if available
* SSU: More SSU2 prep and support (WIP)
2022-02-22 zzz

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Git";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 10;
public final static long BUILD = 11;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -33,6 +33,7 @@ import net.i2p.util.Addresses;
import net.i2p.util.I2PThread;
import net.i2p.util.LHMCache;
import net.i2p.util.Log;
import net.i2p.util.SystemVersion;
import net.i2p.util.VersionComparator;
/**
@@ -103,7 +104,7 @@ class EstablishmentManager {
/** max outbound in progress - max inbound is half of this */
private final int DEFAULT_MAX_CONCURRENT_ESTABLISH;
private static final int DEFAULT_LOW_MAX_CONCURRENT_ESTABLISH = 20;
private static final int DEFAULT_LOW_MAX_CONCURRENT_ESTABLISH = SystemVersion.isSlow() ? 20 : 40;
private static final int DEFAULT_HIGH_MAX_CONCURRENT_ESTABLISH = 150;
private static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish";
@@ -130,7 +131,7 @@ class EstablishmentManager {
* Kill any inbound that takes more than this
* One round trip (Created-Confirmed)
*/
private static final int MAX_IB_ESTABLISH_TIME = 20*1000;
private static final int MAX_IB_ESTABLISH_TIME = 15*1000;
/** max before receiving a response to a single message during outbound establishment */
public static final int OB_MESSAGE_TIMEOUT = 15*1000;
@@ -513,8 +514,17 @@ class EstablishmentManager {
// TODO this is insufficient to prevent DoSing, especially if
// IP spoofing is used. For further study.
if (!shouldAllowInboundEstablishment()) {
if (_log.shouldLog(Log.WARN))
if (_log.shouldLog(Log.WARN)) {
_log.warn("Dropping inbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH);
if (_log.shouldDebug()) {
StringBuilder buf = new StringBuilder(4096);
buf.append("Active: ").append(_inboundStates.size()).append('\n');
for (InboundEstablishState ies : _inboundStates.values()) {
buf.append(ies.toString()).append('\n');
}
_log.debug(buf.toString());
}
}
_context.statManager().addRateData("udp.establishDropped", 1);
return; // drop the packet
}
@@ -697,8 +707,14 @@ class EstablishmentManager {
state.fail();
return;
}
// we are done, go right to ps2
handleCompletelyEstablished(state);
InboundEstablishState.InboundState istate = state.getState();
if (istate == IB_STATE_CONFIRMED_COMPLETELY ||
istate == IB_STATE_COMPLETE) {
// we are done, go right to ps2
handleCompletelyEstablished(state);
} else {
// More RI blocks to come, TODO
}
notifyActivity();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session confirmed from: " + state);
@@ -997,13 +1013,19 @@ class EstablishmentManager {
// SimpleTimer.getInstance().addEvent(new PublishToNewInbound(peer), 10*1000);
if (_log.shouldDebug())
_log.debug("IB confirm: " + peer);
DeliveryStatusMessage dsm = new DeliveryStatusMessage(_context);
dsm.setArrival(_networkID); // overloaded, sure, but future versions can check this
DeliveryStatusMessage dsm;
if (peer.getVersion() == 1) {
dsm = new DeliveryStatusMessage(_context);
dsm.setArrival(_networkID); // overloaded, sure, but future versions can check this
// This causes huge values in the inNetPool.droppedDeliveryStatusDelay stat
// so it needs to be caught in InNetMessagePool.
dsm.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT);
dsm.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
// sent below
dsm.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT);
dsm.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
// sent below
} else {
// SSU 2 uses an ACK of packet 0
dsm = null;
}
// just do this inline
//_context.simpleTimer2().addEvent(new PublishToNewInbound(peer), 0);
@@ -1016,10 +1038,11 @@ class EstablishmentManager {
// bundle the two messages together for efficiency
DatabaseStoreMessage dbsm = getOurInfo();
List<I2NPMessage> msgs = new ArrayList<I2NPMessage>(2);
msgs.add(dsm);
if (dsm != null)
msgs.add(dsm);
msgs.add(dbsm);
_transport.send(msgs, peer);
} else {
} else if (dsm != null) {
_transport.send(dsm, peer);
// nuh uh.
if (_log.shouldLog(Log.WARN))
@@ -1465,6 +1488,7 @@ class EstablishmentManager {
private void sendDestroy(InboundEstablishState state) {
if (state.getVersion() > 1)
return;
// TODO ban the IP for a while, like we do in NTCP?
UDPPacket packet = _builder.buildSessionDestroyPacket(state);
if (packet != null) {
if (_log.shouldLog(Log.DEBUG))
@@ -1486,7 +1510,8 @@ class EstablishmentManager {
for (Iterator<InboundEstablishState> iter = _inboundStates.values().iterator(); iter.hasNext(); ) {
InboundEstablishState cur = iter.next();
if (cur.getState() == IB_STATE_CONFIRMED_COMPLETELY) {
InboundEstablishState.InboundState istate = cur.getState();
if (istate == IB_STATE_CONFIRMED_COMPLETELY) {
// completely received (though the signature may be invalid)
iter.remove();
inboundState = cur;
@@ -1498,13 +1523,12 @@ class EstablishmentManager {
iter.remove();
inboundState = cur;
//_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Removing expired inbound state");
if (_log.shouldDebug())
_log.debug("Expired: " + cur);
expired = true;
break;
} else if (cur.getState() == IB_STATE_FAILED) {
} else if (istate == IB_STATE_FAILED || istate == IB_STATE_COMPLETE) {
iter.remove();
//_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
} else {
// this will always be > 0
long next = cur.getNextSendTime();

View File

@@ -550,6 +550,7 @@ class InboundEstablishState {
//if (_sentY != null)
// buf.append(" SentY: ").append(Base64.encode(_sentY, 0, 4));
//buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort));
buf.append(" lifetime: ").append(DataHelper.formatDuration(getLifetime()));
buf.append(" RelayTag: ").append(_sentRelayTag);
//buf.append(" SignedOn: ").append(_sentSignedOnTime);
buf.append(' ').append(_currentState);

View File

@@ -163,8 +163,10 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
if (_timeReceived == 0)
throw new GeneralSecurityException("No DateTime block in Session/Token Request");
_skew = _establishBegin - _timeReceived;
if (_skew > MAX_SKEW || _skew < 0 - MAX_SKEW)
if (_skew > MAX_SKEW || _skew < 0 - MAX_SKEW) {
// TODO send retry with termination
throw new GeneralSecurityException("Skew exceeded in Session/Token Request: " + _skew);
}
packetReceived();
}
@@ -593,6 +595,7 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
sender.initializeKey(d_ba, 0);
ChaChaPolyCipherState rcvr = new ChaChaPolyCipherState();
rcvr.initializeKey(d_ab, 0);
/****
if (_log.shouldDebug())
_log.debug("split()\nGenerated Chain key: " + Base64.encode(ckd) +
"\nGenerated split key for A->B: " + Base64.encode(k_ab) +
@@ -603,6 +606,7 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
"\nIntro key for Bob: " + Base64.encode(_rcvHeaderEncryptKey1) +
"\nGenerated header key 2 for A->B: " + Base64.encode(h_ab) +
"\nGenerated header key 2 for B->A: " + Base64.encode(h_ba));
****/
_handshakeState.destroy();
if (_createdSentCount == 1)
_rtt = (int) ( _context.clock().now() - _lastSend );
@@ -706,6 +710,7 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
StringBuilder buf = new StringBuilder(128);
buf.append("IES2 ");
buf.append(Addresses.toString(_aliceIP, _alicePort));
buf.append(" lifetime: ").append(DataHelper.formatDuration(getLifetime()));
buf.append(" Rcv ID: ").append(_rcvConnID);
buf.append(" Send ID: ").append(_sendConnID);
buf.append(" RelayTag: ").append(_sentRelayTag);

View File

@@ -768,6 +768,8 @@ class OutboundEstablishState {
/** @since 0.8.9 */
@Override
public String toString() {
return "OES " + _remoteHostId + ' ' + _currentState;
return "OES " + _remoteHostId +
" lifetime: " + DataHelper.formatDuration(getLifetime()) +
' ' + _currentState;
}
}

View File

@@ -6,6 +6,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.List;
import com.southernstorm.noise.protocol.ChaChaPolyCipherState;
import com.southernstorm.noise.protocol.CipherState;
@@ -461,13 +462,15 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl
* note that we just sent the SessionConfirmed packets
* and save them for retransmission
*
* @param riFrags if non-null, the RI was fragmented, and these are the
* remaining fragments to be sent in the PeerState.
* @return the new PeerState2, may also be retrieved from getPeerState()
*/
public synchronized PeerState2 confirmedPacketsSent(UDPPacket[] packets) {
public synchronized PeerState2 confirmedPacketSent(UDPPacket packet, List<SSU2Payload.RIBlock> riFrags) {
if (_sessConfForReTX == null) {
// store pkt for retx
// only one supported right now
DatagramPacket pkt = packets[0].getPacket();
DatagramPacket pkt = packet.getPacket();
byte data[] = pkt.getData();
int off = pkt.getOffset();
int len = pkt.getLength();
@@ -515,7 +518,7 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl
_sendConnID, _rcvConnID,
_sendHeaderEncryptKey1, h_ab, h_ba);
_currentState = OutboundState.OB_STATE_CONFIRMED_COMPLETELY;
_pstate.confirmedPacketsSent(_sessConfForReTX);
_pstate.confirmedPacketSent(_sessConfForReTX, riFrags);
// PS2.super adds CLOCK_SKEW_FUDGE that doesn't apply here
_pstate.adjustClockSkew(_skew - (_rtt / 2) - PeerState.CLOCK_SKEW_FUDGE);
_pstate.setHisMTU(_mtu);
@@ -554,6 +557,7 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl
@Override
public String toString() {
return "OES2 " + _remoteHostId +
" lifetime: " + DataHelper.formatDuration(getLifetime()) +
" Rcv ID: " + _rcvConnID +
" Send ID: " + _sendConnID +
' ' + _currentState;

View File

@@ -443,15 +443,12 @@ class PacketBuilder2 {
* Build a new series of SessionConfirmed packets for the given peer,
* encrypting it as necessary.
*
* Note that while a SessionConfirmed could in theory be fragmented,
* in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max,
* so it will never be fragmented.
* If the RI is large enough that it is fragmented, this will still only return
* a single Session Confirmed message. The remaining RI blocks will be passed to
* the establish state via confirmedPacketsSent(), and the state will
* transmit them via the new PeerState2.
*
* @return ready to send packets, or null if there was a problem
*
* TODO: doesn't really return null, and caller doesn't handle null return
* (null SigningPrivateKey should cause this?)
* Should probably return null if buildSessionConfirmedPacket() returns null for any fragment
*/
public UDPPacket[] buildSessionConfirmedPackets(OutboundEstablishState2 state, RouterInfo ourInfo) {
boolean gzip = false;
@@ -493,17 +490,24 @@ class PacketBuilder2 {
len = info.length;
}
UDPPacket packets[] = new UDPPacket[numFragments];
UDPPacket packets[] = new UDPPacket[1];
packets[0] = buildSessionConfirmedPacket(state, numFragments, info, len, gzip);
List<SSU2Payload.RIBlock> riFrags;
if (numFragments > 1) {
// get PeerState from OES
riFrags = new ArrayList<SSU2Payload.RIBlock>(numFragments - 1);
int off = len;
for (int i = 1; i < numFragments; i++) {
//packets[i] = buildSessionConfirmedPacket(state, i, numFragments, info, gzip);
if (i == numFragments - 1)
len = info.length - off;
SSU2Payload.RIBlock block = new SSU2Payload.RIBlock(info, off, len,
false, gzip, i, numFragments);
riFrags.add(block);
off += len;
}
// TODO numFragments > 1 requires shift to data phase
throw new IllegalArgumentException("TODO");
} else {
riFrags = null;
}
state.confirmedPacketsSent(packets);
state.confirmedPacketSent(packets[0], riFrags);
return packets;
}

View File

@@ -881,7 +881,7 @@ class PacketHandler {
if (header.getPacketNumber() != 0 ||
header.getType() != SSU2Util.SESSION_CONFIRMED_FLAG_BYTE) {
if (_log.shouldWarn())
_log.warn("Queue possible data packet on: " + state);
_log.warn("Queue possible data packet with header " + header + " on: " + state);
// TODO either attempt to decrypt as a retransmitted
// Session Request or Token Request,
// or just tell establisher so it can retransmit Session Created or Retry

View File

@@ -1570,8 +1570,8 @@ public class PeerState {
// no need to nudge(), this is called from OMF loop before allocateSend()
}
if (rv <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " nothing pending, cancelling timer");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(_remotePeer + " nothing pending, cancelling timer");
synchronized(this) {
_retransmitTimer = 0;
exitFastRetransmit();

View File

@@ -5,6 +5,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,6 +61,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
// Session Confirmed retransmit
private byte[] _sessConfForReTX;
private List<SSU2Payload.RIBlock> _riFragsForReTX;
private long _sessConfSentTime;
private int _sessConfSentCount;
@@ -292,20 +294,20 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
//if (_log.shouldDebug())
// _log.debug("Packet before header decryption:\n" + HexDump.dump(data, off, len));
try {
if (len < MIN_DATA_LEN) {
SSU2Header.Header header = SSU2Header.trialDecryptShortHeader(packet, _rcvHeaderEncryptKey1, _rcvHeaderEncryptKey2);
if (header == null) {
if (_log.shouldWarn())
_log.warn("Inbound packet too short " + len + " on " + this);
return;
}
SSU2Header.Header header = SSU2Header.trialDecryptShortHeader(packet, _rcvHeaderEncryptKey1, _rcvHeaderEncryptKey2);
if (header == null) {
if (header.getDestConnID() != _rcvConnID) {
if (_log.shouldWarn())
_log.warn("bad data header on " + this);
_log.warn("bad Dest Conn id " + header.getDestConnID() + " size " + len + " on " + this);
return;
}
if (header.getType() != DATA_FLAG_BYTE) {
if (_log.shouldWarn())
_log.warn("bad data pkt type " + (header.getType() & 0xff) + " on " + this);
_log.warn("bad data pkt type " + (header.getType() & 0xff) + " size " + len + " on " + this);
// TODO if it's early:
// If inbound, could be a retransmitted Session Confirmed,
// ack it again.
@@ -316,11 +318,6 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
// we didn't know the session has disconnected yet.
return;
}
if (header.getDestConnID() != _rcvConnID) {
if (_log.shouldWarn())
_log.warn("bad Dest Conn id " + header.getDestConnID() + " on " + this);
return;
}
long n = header.getPacketNumber();
SSU2Header.acceptTrialDecrypt(packet, header);
//if (_log.shouldDebug())
@@ -341,8 +338,8 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
return;
}
int payloadLen = len - (SHORT_HEADER_SIZE + MAC_LEN);
if (_log.shouldInfo())
_log.info("New " + len + " byte pkt " + n + " rcvd on " + this);
if (_log.shouldDebug())
_log.debug("New " + len + " byte pkt " + n + " rcvd on " + this);
processPayload(data, off + SHORT_HEADER_SIZE, payloadLen);
packetReceived(payloadLen);
} catch (GeneralSecurityException gse) {
@@ -642,12 +639,23 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
}
/**
* note that we just sent the SessionConfirmed packets
* and save them for retransmission
* note that we just sent the SessionConfirmed packet
* and save it for retransmission.
*
* @param riFrags if non-null, the RI was fragmented, and these are the
* remaining fragments to be sent and saved for retransmission.
*/
public synchronized void confirmedPacketsSent(byte[] data) {
public synchronized void confirmedPacketSent(byte[] data, List<SSU2Payload.RIBlock> riFrags) {
if (_sessConfForReTX == null)
_sessConfForReTX = data;
if (riFrags != null) {
if (_riFragsForReTX == null)
_riFragsForReTX = riFrags;
for (SSU2Payload.RIBlock block : riFrags) {
UDPPacket pkt = _transport.getBuilder2().buildPacket(Collections.emptyList(), Collections.singletonList(block), this);
_transport.send(pkt);
}
}
_sessConfSentTime = _context.clock().now();
_sessConfSentCount++;
}
@@ -659,7 +667,10 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
if (_sessConfForReTX == null)
return null;
UDPPacket packet = UDPPacket.acquire(_context, false);
UDPPacket[] rv = new UDPPacket[1];
int count = 1;
if (_riFragsForReTX != null)
count += _riFragsForReTX.size();
UDPPacket[] rv = new UDPPacket[count];
rv[0] = packet;
DatagramPacket pkt = packet.getPacket();
byte data[] = pkt.getData();
@@ -670,6 +681,12 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
pkt.setPort(_remotePort);
packet.setMessageType(PacketBuilder2.TYPE_CONF);
packet.setPriority(PacketBuilder2.PRIORITY_HIGH);
if (_riFragsForReTX != null) {
int i = 1;
for (SSU2Payload.RIBlock block : _riFragsForReTX) {
rv[i++] = _transport.getBuilder2().buildPacket(Collections.emptyList(), Collections.singletonList(block), this);
}
}
return rv;
}