SSU2: Implement peer test state machine

msgs 5-7 still TODO
WIP, untested
This commit is contained in:
zzz
2022-03-18 10:21:30 -04:00
parent d9c629a6b1
commit c62884ef85
8 changed files with 428 additions and 10 deletions

View File

@@ -364,6 +364,7 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
return;
if (_receivedConfirmedIdentity == null)
throw new IllegalStateException("RI must be first");
_transport.getPeerTestManager().receiveTest(_remoteHostId, _pstate, msg, status, h, data);
}
public void gotToken(long token, long expires) {

View File

@@ -423,6 +423,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
public void gotPeerTest(int msg, int status, Hash h, byte[] data) {
if (!ENABLE_PEER_TEST)
return;
_transport.getPeerTestManager().receiveTest(_remoteHostId, this, msg, status, h, data);
}
public void gotToken(long token, long expires) {

View File

@@ -79,7 +79,7 @@ class PeerTestEvent extends SimpleTimer2.TimedEvent {
private void locked_runTest(boolean isIPv6) {
_lastTestIPv6 = isIPv6;
PeerState bob = _transport.pickTestPeer(BOB, isIPv6, null);
PeerState bob = _transport.pickTestPeer(BOB, 0, isIPv6, null);
if (bob != null) {
if (_log.shouldLog(Log.INFO))
_log.info("Running periodic test with bob = " + bob);

View File

@@ -3,6 +3,7 @@ package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.net.Inet6Address;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@@ -10,12 +11,17 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.data.SigningPrivateKey;
import net.i2p.data.SigningPublicKey;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.router.RouterAddress;
import net.i2p.data.router.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.router.CommSystemFacade.Status;
import net.i2p.router.RouterContext;
import static net.i2p.router.transport.udp.PeerTestState.Role.*;
import net.i2p.router.transport.TransportImpl;
import net.i2p.router.transport.TransportUtil;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@@ -124,6 +130,7 @@ class PeerTestManager {
private final Log _log;
private final UDPTransport _transport;
private final PacketBuilder _packetBuilder;
private final PacketBuilder2 _packetBuilder2;
/** map of Long(nonce) to PeerTestState for tests currently in progress (as Bob/Charlie) */
private final Map<Long, PeerTestState> _activeTests;
/** current test we are running (as Alice), or null */
@@ -151,6 +158,7 @@ class PeerTestManager {
/** initial - ContinueTest adds backoff */
private static final int RESEND_TIMEOUT = 4*1000;
private static final int MAX_TEST_TIME = 30*1000;
private static final long MAX_SKEW = 2*60*1000;
private static final long MAX_NONCE = (1l << 32) - 1l;
/**
@@ -165,6 +173,7 @@ class PeerTestManager {
_activeTests = new ConcurrentHashMap<Long, PeerTestState>();
_recentTests = new LinkedBlockingQueue<Long>();
_packetBuilder = transport.getBuilder();
_packetBuilder2 = transport.getBuilder2();
_throttle = new IPThrottler(MAX_PER_IP, THROTTLE_CLEAN_TIME);
_context.statManager().createRateStat("udp.statusKnownCharlie", "How often the bob we pick passes us to a charlie we already have a session with?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveTestReply", "How often we get a reply to our peer test?", "udp", UDPTransport.RATES);
@@ -680,6 +689,379 @@ class PeerTestManager {
}
}
}
/**
* Entry point for all incoming packets.
*
* SSU 2 only.
*
* Receive a test message of some sort from the given peer, queueing up any packet
* that should be sent in response, or if its a reply to our own current testing,
* adjusting our test state.
*
* We could be Alice, Bob, or Charlie.
*
* @param from non-null
* @param fromPeer non-null if an associated session was found, otherwise null
* @param msg 1-7
* @param status 0 = accept, 1-255 = reject
* @param h Alice or Charlie hash for msg 2 and 4, null for msg 1, 3, 5-7
* @param data excludes flag, includes signature
*/
public void receiveTest(RemoteHostId from, PeerState2 fromPeer, int msg, int status, Hash h, byte[] data) {
PeerTestState.Role role;
if (data[0] == 1) {
role = ALICE;
} else if (data[0] == 3) {
role = CHARLIE;
} else {
if (_log.shouldWarn())
_log.warn("Bad role " + (data[0] & 0xff) + " from " + from + ' ' + fromPeer);
return;
}
if (data[1] != 2) {
if (_log.shouldWarn())
_log.warn("Bad version " + (data[1] & 0xff) + " from " + from + ' ' + fromPeer);
return;
}
long nonce = DataHelper.fromLong(data, 2, 4);
long time = DataHelper.fromLong(data, 6, 4) * 1000;
int iplen = data[10] & 0xff;
if (iplen != 4 && iplen != 16) {
if (_log.shouldLog(Log.WARN))
_log.warn("Bad IP length " + iplen);
return;
}
boolean isIPv6 = iplen == 16;
byte[] testIP = new byte[iplen];
System.arraycopy(data, 11, testIP, 0, iplen);
int testPort = (int) DataHelper.fromLong(data, 11 + iplen, 2);
Long lNonce = Long.valueOf(nonce);
PeerTestState state;
if (msg == 4 || msg == 5 || msg == 7)
state = _currentTest;
else
state = _activeTests.get(lNonce);
if (_log.shouldDebug())
_log.debug("Got peer test from " + from + ' ' + fromPeer +
" msg: " + msg +
" status: " + status +
" hash: " + h +
" role: " + role +
" nonce: " + nonce +
" time: " + DataHelper.formatTime(time) +
" ip/port: " + Addresses.toString(testIP, testPort) +
" state: " + state);
byte[] fromIP = from.getIP();
int fromPort = from.getPort();
// no need to do these checks if we received it in-session
if (fromPeer == null) {
if (!TransportUtil.isValidPort(fromPort) ||
(!_transport.isValid(fromIP)) ||
_transport.isTooClose(fromIP) ||
_context.blocklist().isBlocklisted(fromIP)) {
// spoof check, and don't respond to privileged ports
if (_log.shouldWarn())
_log.warn("Invalid PeerTest address: " + Addresses.toString(fromIP, fromPort));
_context.statManager().addRateData("udp.testBadIP", 1);
return;
}
}
if (_throttle.shouldThrottle(fromIP)) {
if (_log.shouldLog(Log.WARN))
_log.warn("PeerTest throttle from " + Addresses.toString(fromIP, fromPort));
return;
}
// common checks
// use the same counter for both from and to IPs
if (_throttle.shouldThrottle(testIP)) {
if (_log.shouldLog(Log.WARN))
_log.warn("PeerTest throttle to " + Addresses.toString(testIP, testPort));
return;
}
if (msg >= 1 && msg <= 4) {
if (fromPeer == null) {
if (_log.shouldWarn())
_log.warn("Bad msg " + msg + " out-of-session from " + from);
return;
}
} else {
if (fromPeer != null) {
if (_log.shouldWarn())
_log.warn("Bad msg " + msg + " in-session from " + fromPeer);
return;
}
}
if (msg < 3) {
if (state != null) {
if (_log.shouldWarn())
_log.warn("Dup msg " + msg + " from " + fromPeer);
return;
}
if (_activeTests.size() >= MAX_ACTIVE_TESTS) {
if (_log.shouldWarn())
_log.warn("Too many active tests, droppping from " + Addresses.toString(fromIP, fromPort));
UDPPacket packet;
if (msg == 1)
packet = _packetBuilder2.buildPeerTestToAlice(SSU2Util.TEST_REJECT_BOB_LIMIT,
Hash.FAKE_HASH, data, fromPeer);
else
packet = _packetBuilder2.buildPeerTestToBob(SSU2Util.TEST_REJECT_CHARLIE_LIMIT,
data, fromPeer);
_transport.send(packet);
return;
}
} else {
if (state == null) {
if (_log.shouldWarn())
_log.warn("No state found for msg " + msg + " from " + fromPeer);
return;
}
}
long now = _context.clock().now();
long skew = time - now;
if (skew > MAX_SKEW || skew < 0 - MAX_SKEW) {
if (_log.shouldWarn())
_log.warn("Too skewed for msg " + msg + " from " + fromPeer);
return;
}
switch (msg) {
// alice to bob, in-session
case 1: {
PeerState charlie = _transport.pickTestPeer(CHARLIE, fromPeer.getVersion(), isIPv6, from);
if (charlie == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to pick a charlie (no peer), IPv6? " + isIPv6);
// send reject
UDPPacket packet = _packetBuilder2.buildPeerTestToAlice(SSU2Util.TEST_REJECT_BOB_NO_CHARLIE,
Hash.FAKE_HASH, data, fromPeer);
_transport.send(packet);
return;
}
Hash alice = fromPeer.getRemotePeer();
RouterInfo aliceRI = _context.netDb().lookupRouterInfoLocally(alice);
if (aliceRI == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No alice RI");
// send reject
UDPPacket packet = _packetBuilder2.buildPeerTestToAlice(SSU2Util.TEST_REJECT_BOB_UNSPEC,
Hash.FAKE_HASH, data, fromPeer);
_transport.send(packet);
return;
}
InetAddress aliceIP;
try {
aliceIP = InetAddress.getByAddress(testIP);
} catch (UnknownHostException uhe) {
return;
}
state = new PeerTestState(BOB, null, isIPv6, nonce, now);
state.setAlice(fromPeer);
state.setAlice(aliceIP, testPort, alice);
state.setCharlie(charlie.getRemoteIPAddress(), charlie.getRemotePort(), charlie.getRemotePeer());
state.setReceiveAliceTime(now);
_activeTests.put(lNonce, state);
// send alice RI to charlie
DatabaseStoreMessage dbsm = new DatabaseStoreMessage(_context);
dbsm.setEntry(aliceRI);
dbsm.setMessageExpiration(now + 10*1000);
_transport.send(dbsm, charlie);
// forward to charlie, don't bother to validate signed data
UDPPacket packet = _packetBuilder2.buildPeerTestToCharlie(alice, data, (PeerState2) charlie);
_transport.send(packet);
break;
}
// bob to charlie, in-session
case 2: {
InetAddress aliceIP;
try {
aliceIP = InetAddress.getByAddress(testIP);
} catch (UnknownHostException uhe) {
return;
}
RouterInfo aliceRI = null;
SessionKey aliceIntroKey = null;
int rcode;
if (_context.banlist().isBanlisted(h)) {
rcode = SSU2Util.TEST_REJECT_CHARLIE_BANNED;
} else if (!TransportUtil.isValidPort(testPort) ||
!_transport.isValid(testIP) ||
_transport.isTooClose(testIP) ||
_context.blocklist().isBlocklisted(testIP)) {
rcode = SSU2Util.TEST_REJECT_CHARLIE_ADDRESS;
} else {
// bob should have sent it to us. Don't bother to lookup
// remotely if he didn't, or it was out-of-order or lost.
aliceRI = _context.netDb().lookupRouterInfoLocally(h);
if (aliceRI != null) {
// validate signed data
SigningPublicKey spk = aliceRI.getIdentity().getSigningPublicKey();
if (SSU2Util.validateSig(_context, SSU2Util.PEER_TEST_PROLOGUE,
fromPeer.getRemotePeer(), h, data, spk)) {
aliceIntroKey = getIntroKey(aliceRI, isIPv6);
if (aliceIntroKey != null)
rcode = SSU2Util.TEST_ACCEPT;
else
rcode = SSU2Util.TEST_REJECT_CHARLIE_ADDRESS;
} else {
rcode = SSU2Util.TEST_REJECT_CHARLIE_SIGFAIL;
}
} else {
rcode = SSU2Util.TEST_REJECT_CHARLIE_UNKNOWN_ALICE;
}
}
if (rcode == SSU2Util.TEST_ACCEPT) {
state = new PeerTestState(CHARLIE, fromPeer, isIPv6, nonce, now);
state.setAlice(aliceIP, testPort, h);
state.setAliceIntroKey(aliceIntroKey);
state.setReceiveBobTime(now);
_activeTests.put(lNonce, state);
}
// TODO generate our signed data
UDPPacket packet = _packetBuilder2.buildPeerTestToBob(rcode, data, fromPeer);
_transport.send(packet);
// delay, then send msg 5
break;
}
// charlie to bob, in-session
case 3: {
PeerState2 alice = state.getAlice();
Hash charlie = fromPeer.getRemotePeer();
RouterInfo charlieRI = _context.netDb().lookupRouterInfoLocally(charlie);
if (charlieRI != null) {
// send charlie RI to alice
DatabaseStoreMessage dbsm = new DatabaseStoreMessage(_context);
dbsm.setEntry(charlieRI);
dbsm.setMessageExpiration(now + 10*1000);
_transport.send(dbsm, alice);
} else {
// oh well, maybe alice has it
if (_log.shouldLog(Log.WARN))
_log.warn("No charlie RI");
}
// forward to alice, don't bother to validate signed data
UDPPacket packet = _packetBuilder2.buildPeerTestToAlice(status, charlie, data, alice);
_transport.send(packet);
// we are done
_activeTests.remove(lNonce);
break;
}
// bob to alice, in-session
case 4: {
PeerTestState test = _currentTest;
if (test == null || test.getNonce() != nonce) {
if (_log.shouldWarn())
_log.warn("Test nonce mismatch?");
return;
}
InetAddress charlieIP;
try {
charlieIP = InetAddress.getByAddress(testIP);
} catch (UnknownHostException uhe) {
return;
}
boolean fail = false;
RouterInfo charlieRI = null;
SessionKey charlieIntroKey = null;
if (_context.banlist().isBanlisted(h) ||
!TransportUtil.isValidPort(testPort) ||
!_transport.isValid(testIP) ||
_transport.isTooClose(testIP) ||
_context.blocklist().isBlocklisted(testIP)) {
if (_log.shouldWarn())
_log.warn("Test fail ban/ip/port");
} else {
// bob should have sent it to us. Don't bother to lookup
// remotely if he didn't, or it was out-of-order or lost.
charlieRI = _context.netDb().lookupRouterInfoLocally(h);
if (charlieRI != null) {
// validate signed data
SigningPublicKey spk = charlieRI.getIdentity().getSigningPublicKey();
if (SSU2Util.validateSig(_context, SSU2Util.PEER_TEST_PROLOGUE,
fromPeer.getRemotePeer(), h, data, spk)) {
charlieIntroKey = getIntroKey(charlieRI, isIPv6);
}
}
}
if (charlieIntroKey == null) {
// reset all state
// so testComplete() will return UNKNOWN
test.setAlicePortFromCharlie(0);
test.setReceiveCharlieTime(0);
test.setReceiveBobTime(0);
testComplete();
return;
}
state.setCharlie(charlieIP, testPort, h);
// delay, await msg 5
break;
}
// charlie to alice, out-of-session
case 5:
break;
// alice to charlie, out-of-session
case 6:
break;
// charlie to alice, out-of-session
case 7:
break;
default:
return;
}
}
/**
* Get an intro key out of a RI. SSU2 only.
*
* @since 0.9.54
*/
private SessionKey getIntroKey(RouterInfo ri, boolean isIPv6) {
List<RouterAddress> addrs = _transport.getTargetAddresses(ri);
RouterAddress ra = null;
for (RouterAddress addr : addrs) {
// skip SSU 1 address w/o "s"
if (addrs.size() > 1 && addr.getTransportStyle().equals("SSU") && addr.getOption("s") == null)
continue;
String host = addr.getHost();
if (host == null)
host = "";
String caps = addr.getOption(UDPAddress.PROP_CAPACITY);
if (caps == null)
caps = "";
if (isIPv6) {
if (!host.contains(":") && !caps.contains(TransportImpl.CAP_IPV6))
continue;
} else {
if (!host.contains(".") && !caps.contains(TransportImpl.CAP_IPV4))
continue;
}
ra = addr;
break;
}
if (ra == null)
return null;
String siv = ra.getOption("i");
if (siv == null)
return null;
byte[] ik = Base64.decode(siv);
if (ik == null)
return null;
return new SessionKey(ik);
}
// Below here are methods for when we are Bob or Charlie
@@ -786,7 +1168,7 @@ class PeerTestManager {
// _log.warn("PeerTest over IPv6 from Alice as Bob? " + from);
// return;
//}
charlie = _transport.pickTestPeer(CHARLIE, isIPv6, from);
charlie = _transport.pickTestPeer(CHARLIE, alice.getVersion(), isIPv6, from);
} else {
charlie = _transport.getPeerState(new RemoteHostId(state.getCharlieIP().getAddress(), state.getCharliePort()));
}

View File

@@ -15,6 +15,7 @@ class PeerTestState {
private final long _testNonce;
private final Role _ourRole;
private final boolean _isIPv6;
private PeerState2 _alice;
private InetAddress _aliceIP;
private int _alicePort;
private final PeerState _bob;
@@ -74,6 +75,18 @@ class PeerTestState {
*
*/
public InetAddress getAliceIP() { return _aliceIP; }
/**
* SSU2 only
* @since 0.9.54
*/
public PeerState2 getAlice() { return _alice; }
/**
* SSU2 only
* @since 0.9.54
*/
public void setAlice(PeerState2 alice) {
_alice = alice;
}
/**
* @param hash SSU2 only, null for SSU1
* @since 0.9.54

View File

@@ -112,9 +112,9 @@ class SSU2Payload {
public void gotRelayIntro(Hash aliceHash, byte[] data);
/**
* @param msg 1-4
* @param msg 1-7
* @param status 0 = accept, 1-255 = reject
* @param h Alice or Charlie hash for msg 2 and 4, null for msg 1 and 3
* @param h Alice or Charlie hash for msg 2 and 4, null for msg 1, 3, 5-7
* @param data excludes flag, includes signature
*/
public void gotPeerTest(int msg, int status, Hash h, byte[] data);

View File

@@ -119,6 +119,18 @@ final class SSU2Util {
public static final byte[] RELAY_RESPONSE_PROLOGUE = DataHelper.getASCII("RelayAgreementOK");
public static final byte[] PEER_TEST_PROLOGUE = DataHelper.getASCII("PeerTestValidate");
public static final int TEST_ACCEPT = 0;
public static final int TEST_REJECT_BOB_UNSPEC = 1;
public static final int TEST_REJECT_BOB_NO_CHARLIE = 2;
public static final int TEST_REJECT_BOB_LIMIT = 3;
public static final int TEST_REJECT_BOB_SIGFAIL = 4;
public static final int TEST_REJECT_CHARLIE_UNSPEC = 64;
public static final int TEST_REJECT_CHARLIE_ADDRESS = 65;
public static final int TEST_REJECT_CHARLIE_LIMIT = 66;
public static final int TEST_REJECT_CHARLIE_SIGFAIL = 67;
public static final int TEST_REJECT_CHARLIE_CONNECTED = 68;
public static final int TEST_REJECT_CHARLIE_BANNED = 69;
public static final int TEST_REJECT_CHARLIE_UNKNOWN_ALICE = 70;
private SSU2Util() {}

View File

@@ -3778,19 +3778,28 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* Ditto for v6.
*
* @param peerRole The role of the peer we are looking for, BOB or CHARLIE only (NOT our role)
* @param version 1 or 2 for role CHARLIE; ignored for role BOB
* @param isIPv6 true to get a v6-capable peer back
* @param dontInclude may be null
* @return IPv4 peer or null
* @return peer or null
*/
PeerState pickTestPeer(PeerTestState.Role peerRole, boolean isIPv6, RemoteHostId dontInclude) {
PeerState pickTestPeer(PeerTestState.Role peerRole, int version, boolean isIPv6, RemoteHostId dontInclude) {
if (peerRole == ALICE)
throw new IllegalArgumentException();
if (peerRole == CHARLIE && version != 1 && !SSU2Util.ENABLE_PEER_TEST)
return null;
List<PeerState> peers = new ArrayList<PeerState>(_peersByIdent.values());
for (Iterator<PeerState> iter = new RandomIterator<PeerState>(peers); iter.hasNext(); ) {
PeerState peer = iter.next();
// Skip SSU2 until we have support for peer test
if (peer.getVersion() != 1 && !SSU2Util.ENABLE_PEER_TEST)
continue;
if (peerRole == BOB) {
// Skip SSU2 until we have support for peer test
if (peer.getVersion() != 1 && !SSU2Util.ENABLE_PEER_TEST)
continue;
} else {
// charlie must be same version
if (peer.getVersion() != version)
continue;
}
if ( (dontInclude != null) && (dontInclude.equals(peer.getRemoteHostId())) )
continue;
// enforce IPv4/v6 connection if we are ALICE looking for a BOB