forked from I2P_Developers/i2p.i2p
SSU2: Implement path challenge and connection migration
mark session dead after receiving destroy log tweaks
This commit is contained in:
@@ -1,3 +1,10 @@
|
||||
2022-08-25 zzz
|
||||
* Router: Fix deadlock via rebuildRouterAddress() and UDPTransport
|
||||
* SSU2: Implement path challenge and connection migration
|
||||
|
||||
2022-08-23 zzz
|
||||
* Router: Add deadlocks to event log
|
||||
|
||||
2022-08-22 1.9.0 released
|
||||
|
||||
2022-08-10 zzz
|
||||
|
||||
@@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Git";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 0;
|
||||
public final static long BUILD = 1;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
||||
@@ -208,7 +208,7 @@ public class PeerState {
|
||||
protected final UDPTransport _transport;
|
||||
|
||||
/** have we migrated away from this peer to another newer one? */
|
||||
private volatile boolean _dead;
|
||||
protected volatile boolean _dead;
|
||||
|
||||
/** The minimum number of outstanding messages (NOT fragments/packets) */
|
||||
private static final int MIN_CONCURRENT_MSGS = 8;
|
||||
|
||||
@@ -5,6 +5,7 @@ import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@@ -24,6 +25,7 @@ import net.i2p.data.i2np.I2NPMessageException;
|
||||
import net.i2p.data.i2np.I2NPMessageImpl;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
|
||||
import net.i2p.router.transport.TransportUtil;
|
||||
import net.i2p.router.transport.udp.InboundMessageFragments.ModifiableLong;
|
||||
import static net.i2p.router.transport.udp.SSU2Util.*;
|
||||
import net.i2p.util.HexDump;
|
||||
@@ -68,6 +70,28 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
private long _sessConfSentTime;
|
||||
private int _sessConfSentCount;
|
||||
|
||||
// Connection Migration, synch on _migrationLock
|
||||
private enum MigrationState {
|
||||
MIGRATION_STATE_NONE,
|
||||
MIGRATION_STATE_PENDING,
|
||||
// unused below here
|
||||
MIGRATION_STATE_CANCELLED,
|
||||
MIGRATION_STATE_FAILED,
|
||||
MIGRATION_STATE_SUCCESS
|
||||
}
|
||||
private final Object _migrationLock = new Object();
|
||||
private MigrationState _migrationState = MigrationState.MIGRATION_STATE_NONE;
|
||||
private long _migrationStarted;
|
||||
private long _migrationNextSendTime;
|
||||
private byte[] _pathChallengeData;
|
||||
private long _pathChallengeSendCount;
|
||||
private RemoteHostId _pendingRemoteHostId;
|
||||
private RemoteHostId _previousRemoteHostId;
|
||||
private static final int MAX_PATH_CHALLENGE_SENDS = 4;
|
||||
private static final long MAX_PATH_CHALLENGE_TIME = 30*1000;
|
||||
private static final long PATH_CHALLENGE_DELAY = 5*1000;
|
||||
|
||||
|
||||
// As SSU
|
||||
public static final int MIN_SSU_IPV4_MTU = 1292;
|
||||
public static final int MAX_SSU_IPV4_MTU = 1484;
|
||||
@@ -318,15 +342,9 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
*
|
||||
* @param from source address
|
||||
* @param packet fully encrypted, header and body decryption will be done here
|
||||
* @since 0.9.56
|
||||
* @since 0.9.55
|
||||
*/
|
||||
void receivePacket(RemoteHostId from, UDPPacket packet) {
|
||||
if (!from.equals(_remoteHostId)) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Got packet from " + from + " expected " + _remoteHostId + " on " + this);
|
||||
// Connection Migration TODO
|
||||
}
|
||||
|
||||
DatagramPacket dpacket = packet.getPacket();
|
||||
byte[] data = dpacket.getData();
|
||||
int off = dpacket.getOffset();
|
||||
@@ -386,17 +404,115 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
_log.warn("dup pkt rcvd: " + n + " on " + this);
|
||||
return;
|
||||
}
|
||||
|
||||
int payloadLen = len - (SHORT_HEADER_SIZE + MAC_LEN);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("New " + len + " byte pkt " + n + " rcvd on " + this);
|
||||
SSU2Payload.processPayload(_context, this, data, off + SHORT_HEADER_SIZE, payloadLen, false, from);
|
||||
packetReceived(payloadLen);
|
||||
|
||||
if (!_dead) {
|
||||
// Connection Migration
|
||||
// We process packets regardless of source.
|
||||
// This is after all the header checks, decryption, and payload processing.
|
||||
// Any failures will have thrown or returned before here.
|
||||
// Path Response callback is before this and will reset the state if successful.
|
||||
|
||||
boolean limitSending = false;
|
||||
synchronized(_migrationLock) {
|
||||
switch(_migrationState) {
|
||||
case MIGRATION_STATE_NONE:
|
||||
if (!from.equals(_remoteHostId)) {
|
||||
// QUIC: Must be highest set to protect against reordered packets
|
||||
if (SSU2Util.ENABLE_PATH_CHALLENGE &&
|
||||
from.getIP().length == _remoteHostId.getIP().length &&
|
||||
n == _receivedMessages.getHighestSet() &&
|
||||
TransportUtil.isValidPort(from.getPort()) &&
|
||||
_transport.isValid(from.getIP())) {
|
||||
// send challenge
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Start migration to " + from + " on " + this);
|
||||
_migrationState = MigrationState.MIGRATION_STATE_PENDING;
|
||||
_migrationStarted = _context.clock().now();
|
||||
_migrationNextSendTime = _migrationStarted + PATH_CHALLENGE_DELAY;
|
||||
_pathChallengeData = new byte[8];
|
||||
_context.random().nextBytes(_pathChallengeData);
|
||||
_pathChallengeSendCount = 1;
|
||||
_pendingRemoteHostId = from;
|
||||
sendPathChallenge(dpacket.getAddress(), from.getPort());
|
||||
} else {
|
||||
// don't attempt to switch
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Not migrating to " + from + " on " + this);
|
||||
}
|
||||
limitSending = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case MIGRATION_STATE_PENDING:
|
||||
if (from.equals(_remoteHostId)) {
|
||||
// cancel
|
||||
_migrationState = MigrationState.MIGRATION_STATE_NONE;
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Cancel migration on " + this);
|
||||
} else {
|
||||
// still waiting
|
||||
long now = _context.clock().now();
|
||||
if (now > _migrationStarted + MAX_PATH_CHALLENGE_TIME ||
|
||||
_pathChallengeSendCount > MAX_PATH_CHALLENGE_SENDS) {
|
||||
// time exceeded
|
||||
_migrationState = MigrationState.MIGRATION_STATE_NONE;
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Migration failed on " + this);
|
||||
} else if (from.equals(_pendingRemoteHostId)) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Migration pending, got another packet from " + from + " on " + this);
|
||||
if (now > _migrationNextSendTime) {
|
||||
// retransmit challenge
|
||||
_migrationNextSendTime = now + (PATH_CHALLENGE_DELAY << _pathChallengeSendCount);
|
||||
_pathChallengeSendCount++;
|
||||
sendPathChallenge(dpacket.getAddress(), from.getPort());
|
||||
}
|
||||
limitSending = true;
|
||||
} else {
|
||||
// a third ip/port ???
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Migration pending, got packet from 3rd address " + from + " on " + this);
|
||||
limitSending = true;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (limitSending)
|
||||
ECNReceived();
|
||||
} //// !_dead
|
||||
|
||||
} catch (Exception e) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Bad encrypted packet on: " + this + '\n' + HexDump.dump(data, off, len), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Caller must synch on _migrationState
|
||||
* @since 0.9.56
|
||||
*/
|
||||
private void sendPathChallenge(InetAddress toIP, int toPort) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Send path challenge to " + toIP + ' ' + toPort + " on " + this);
|
||||
List<SSU2Payload.Block> blocks = new ArrayList<SSU2Payload.Block>(3);
|
||||
blocks.add(new SSU2Payload.DateTimeBlock(_context));
|
||||
blocks.add(new SSU2Payload.AddressBlock(toIP.getAddress(), toPort));
|
||||
blocks.add(new SSU2Payload.PathChallengeBlock(_pathChallengeData));
|
||||
UDPPacket packet = _transport.getBuilder2().buildPacket(Collections.emptyList(), blocks, this);
|
||||
// fix up IP/port
|
||||
DatagramPacket pkt = packet.getPacket();
|
||||
pkt.setAddress(toIP);
|
||||
pkt.setPort(toPort);
|
||||
_transport.send(packet);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// begin payload callbacks
|
||||
/////////////////////////////////////////////////////////
|
||||
@@ -410,8 +526,8 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
}
|
||||
|
||||
public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Got RI in data phase " + ri + "\non: " + this);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Got RI in data phase " + ri + "\non: " + this);
|
||||
try {
|
||||
Hash h = ri.getHash();
|
||||
if (h.equals(_context.routerHash()))
|
||||
@@ -643,6 +759,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
_transport.send(pkt);
|
||||
}
|
||||
_transport.getEstablisher().receiveSessionDestroy(_remoteHostId, this);
|
||||
_dead = true;
|
||||
}
|
||||
|
||||
public void gotPathChallenge(RemoteHostId from, byte[] data) {
|
||||
@@ -659,13 +776,51 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
public void gotPathResponse(RemoteHostId from, byte[] data) {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Got PATH RESPONSE block, length: " + data.length + " on " + this);
|
||||
// TODO
|
||||
synchronized(_migrationLock) {
|
||||
switch(_migrationState) {
|
||||
case MIGRATION_STATE_PENDING:
|
||||
if (from.equals(_pendingRemoteHostId) && DataHelper.eq(data, _pathChallengeData)) {
|
||||
// success
|
||||
_migrationState = MigrationState.MIGRATION_STATE_NONE;
|
||||
_pathChallengeData = null;
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Migration successful, changed address from " + _remoteHostId + " to " + from + " for " + this);
|
||||
_transport.changePeerAddress(this, from);
|
||||
_mtu = MIN_MTU;
|
||||
EstablishmentManager.Token token = _transport.getEstablisher().getInboundToken(from);
|
||||
SSU2Payload.Block block = new SSU2Payload.NewTokenBlock(token.token, token.expires);
|
||||
UDPPacket pkt = _transport.getBuilder2().buildPacket(Collections.emptyList(),
|
||||
Collections.singletonList(block),
|
||||
this);
|
||||
_transport.send(pkt);
|
||||
} else {
|
||||
// caller will handle
|
||||
// ACK-eliciting
|
||||
messagePartiallyReceived();
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
messagePartiallyReceived();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// end payload callbacks
|
||||
/////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Caller should sync; UDPTransport must remove and add to peersByRemoteHost map
|
||||
* @since 0.9.56
|
||||
*/
|
||||
void changeAddress(RemoteHostId id) {
|
||||
_previousRemoteHostId = _remoteHostId;
|
||||
_remoteHostId = id;
|
||||
_remotePort = id.getPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Do what MessageReceiver does, but inline and for SSU2.
|
||||
* Will always be more than one fragment.
|
||||
|
||||
@@ -21,7 +21,7 @@ final class SSU2Util {
|
||||
// features
|
||||
public static final boolean ENABLE_RELAY = true;
|
||||
public static final boolean ENABLE_PEER_TEST = true;
|
||||
public static final boolean ENABLE_PATH_CHALLENGE = false;
|
||||
public static final boolean ENABLE_PATH_CHALLENGE = true;
|
||||
|
||||
// lengths
|
||||
/** 32 */
|
||||
|
||||
@@ -1645,7 +1645,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
|
||||
/**
|
||||
* Get the state by SSU2 connection ID
|
||||
* @since 0.9.56
|
||||
* @since 0.9.55
|
||||
*/
|
||||
PeerState2 getPeerState(long rcvConnID) {
|
||||
return _peersByConnID.get(Long.valueOf(rcvConnID));
|
||||
@@ -1671,7 +1671,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
return _peersByIdent.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Remove and add to peersByRemoteHost map
|
||||
* @since 0.9.3
|
||||
*/
|
||||
@@ -1690,6 +1690,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
_log.info("Changed port from " + oldPort + " to " + newPort + " for " + peer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove and add to peersByRemoteHost map
|
||||
* @since 0.9.56
|
||||
*/
|
||||
void changePeerAddress(PeerState2 peer, RemoteHostId newAddress) {
|
||||
RemoteHostId oldAddress;
|
||||
synchronized (_addDropLock) {
|
||||
oldAddress = peer.getRemoteHostId();
|
||||
if (!oldAddress.equals(newAddress)) {
|
||||
_peersByRemoteHost.remove(oldAddress);
|
||||
peer.changeAddress(newAddress);
|
||||
_peersByRemoteHost.put(newAddress, peer);
|
||||
}
|
||||
}
|
||||
if (_log.shouldInfo() && !oldAddress.equals(newAddress))
|
||||
_log.info("Changed address from " + oldAddress + " to " + newAddress + " for " + peer);
|
||||
}
|
||||
|
||||
/**
|
||||
* For IntroductionManager
|
||||
* @return may be null if not started
|
||||
|
||||
Reference in New Issue
Block a user