NTCP2: Establishment and data phase

- Address generation and validation fixes to match proposal changes
- Fixes for persistence of static s and iv
- Add methods for keygen and getters for static s and iv
- Add OutboundNTCP2State for outbound establishment
- Add support to InboundEstablishState
- Add data phase support to NTCPConnection
- Refactor NTCPConnection for multiple protocols
- Support concurrent pending outbound messages in NTCPConnection
NTCP1: Cleanups and performance improvements
- EventPumper tweaks and logging
- Eliminate extra data copy in NTCPConnection message sending
- Remove _meta field in NTCPConnection
- Locking changes in NTCPConnection and EstablishState classes
- Zero out DH keys when done
- Fix read when buffer position nonzero in NTCPConnection
- NTCPConnection make more methods package private
- Do AES decryption in data phase all at once when possible
- Drop expired outbound messages in NTCPConnection before sending
- Pass extra data from EstablishState directly to NTCPConnection
  to avoid race, remove getExtraBytes() method
- Remove getException, getError, getFailedBySkew methods and calls from Reader
This commit is contained in:
zzz
2018-06-26 16:47:53 +00:00
parent 49221add97
commit ae8779e004
10 changed files with 2790 additions and 670 deletions

View File

@@ -37,6 +37,7 @@ import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import static net.i2p.router.transport.Transport.AddressSource.*;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
import net.i2p.router.transport.crypto.X25519KeyFactory;
import net.i2p.router.transport.ntcp.NTCPTransport;
import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.util.Addresses;
@@ -68,6 +69,7 @@ public class TransportManager implements TransportEventListener {
private final RouterContext _context;
private final UPnPManager _upnpManager;
private final DHSessionKeyBuilder.PrecalcRunner _dhThread;
private final X25519KeyFactory _xdhThread;
/** default true */
public final static String PROP_ENABLE_UDP = "i2np.udp.enable";
@@ -76,6 +78,9 @@ public class TransportManager implements TransportEventListener {
/** default true */
public final static String PROP_ENABLE_UPNP = "i2np.upnp.enable";
private static final String PROP_NTCP2_ENABLE = "i2np.ntcp2.enable";
private static final boolean DEFAULT_NTCP2_ENABLE = false;
private static final String PROP_ADVANCED = "routerconsole.advanced";
/** not forever, since they may update */
@@ -98,6 +103,9 @@ public class TransportManager implements TransportEventListener {
else
_upnpManager = null;
_dhThread = new DHSessionKeyBuilder.PrecalcRunner(context);
boolean enableNTCP2 = isNTCPEnabled(context) &&
context.getProperty(PROP_NTCP2_ENABLE, DEFAULT_NTCP2_ENABLE);
_xdhThread = enableNTCP2 ? new X25519KeyFactory(context) : null;
}
/**
@@ -172,7 +180,7 @@ public class TransportManager implements TransportEventListener {
initializeAddress(udp);
}
if (isNTCPEnabled(_context)) {
Transport ntcp = new NTCPTransport(_context, _dhThread);
Transport ntcp = new NTCPTransport(_context, _dhThread, _xdhThread);
addTransport(ntcp);
initializeAddress(ntcp);
if (udp != null) {
@@ -309,6 +317,8 @@ public class TransportManager implements TransportEventListener {
synchronized void startListening() {
if (_dhThread.getState() == Thread.State.NEW)
_dhThread.start();
if (_xdhThread != null && _xdhThread.getState() == Thread.State.NEW)
_xdhThread.start();
// For now, only start UPnP if we have no publicly-routable addresses
// so we don't open the listener ports to the world.
// Maybe we need a config option to force on? Probably not.
@@ -719,6 +729,7 @@ public class TransportManager implements TransportEventListener {
_context.banlist().banlistRouterForever(peer, _x("Unsupported signature type"));
} else if (unreachableTransports >= _transports.size() && countActivePeers() > 0) {
// Don't banlist if we aren't talking to anybody, as we may have a network connection issue
// TODO if we are IPv6 only, ban for longer
boolean incompat = false;
RouterInfo us = _context.router().getRouterInfo();
if (us != null) {

View File

@@ -9,6 +9,11 @@ import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
/**
* Inbound NTCP 1 or 2. Outbound NTCP 1 only.
* OutboundNTCP2State does not extend this.
*
* NTCP 1 establishement overview:
*
* Handle the 4-phase establishment, which is as follows:
*
* <pre>
@@ -33,7 +38,7 @@ import net.i2p.util.SimpleByteCache;
* X, Y: 256 byte DH keys
* H(): 32 byte SHA256 Hash
* E(data, session key, IV): AES256 Encrypt
* S(): 40 byte DSA Signature
* S(): 40 byte DSA Signature, or length as implied by sig type
* tsA, tsB: timestamps (4 bytes, seconds since epoch)
* sk: 32 byte Session key
* sz: 2 byte size of Alice identity to follow
@@ -85,17 +90,11 @@ abstract class EstablishBase implements EstablishState {
/** bytes received so far */
protected int _received;
private byte _extra[];
protected final DHSessionKeyBuilder _dh;
protected final NTCPTransport _transport;
protected final NTCPConnection _con;
/** error causing the corruption */
private String _err;
/** exception causing the error */
private Exception _e;
private boolean _failedBySkew;
protected static final int MIN_RI_SIZE = 387;
protected static final int MAX_RI_SIZE = 3072;
@@ -133,6 +132,42 @@ abstract class EstablishBase implements EstablishState {
/** got 1, sent 2, got 3 */
IB_GOT_RI,
/**
* Next state IB_NTCP2_GOT_X
* @since 0.9.36
*/
IB_NTCP2_INIT,
/**
* Got Noise part of msg 1
* Next state IB_NTCP2_GOT_PADDING or IB_NTCP2_READ_RANDOM on fail
* @since 0.9.36
*/
IB_NTCP2_GOT_X,
/**
* Got msg 1 incl. padding
* Next state IB_NTCP2_SENT_Y
* @since 0.9.36
*/
IB_NTCP2_GOT_PADDING,
/**
* Sent msg 2 and padding
* Next state IB_NTCP2_GOT_RI
* @since 0.9.36
*/
IB_NTCP2_SENT_Y,
/**
* Got msg 3
* Next state VERIFIED
* @since 0.9.36
*/
IB_NTCP2_GOT_RI,
/**
* Got msg 1 and failed AEAD
* Next state CORRUPT
* @since 0.9.36
*/
IB_NTCP2_READ_RANDOM,
/** OB: got and verified 4; IB: got and verified 3 and sent 4 */
VERIFIED,
CORRUPT
@@ -178,13 +213,13 @@ abstract class EstablishBase implements EstablishState {
}
/**
* parse the contents of the buffer as part of the handshake. if the
* handshake is completed and there is more data remaining, the data are
* copieed out so that the next read will be the (still encrypted) remaining
* data (available from getExtraBytes)
* Parse the contents of the buffer as part of the handshake.
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*
* If there are additional data in the buffer after the handshake is complete,
* the EstablishState is responsible for passing it to NTCPConnection.
*/
public synchronized void receive(ByteBuffer src) {
synchronized(_stateLock) {
@@ -202,11 +237,6 @@ abstract class EstablishBase implements EstablishState {
*/
public void prepareOutbound() {}
/**
* Was this connection failed because of clock skew?
*/
public synchronized boolean getFailedBySkew() { return _failedBySkew; }
/** did the handshake fail for some reason? */
public boolean isCorrupt() {
synchronized(_stateLock) {
@@ -234,31 +264,6 @@ abstract class EstablishBase implements EstablishState {
*/
public abstract int getVersion();
/** Anything left over in the byte buffer after verification is extra
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*
* State must be VERIFIED.
* Caller must synch.
*/
protected void prepareExtra(ByteBuffer buf) {
int remaining = buf.remaining();
if (remaining > 0) {
_extra = new byte[remaining];
buf.get(_extra);
_received += remaining;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "prepare extra " + remaining + " (total received: " + _received + ")");
}
/**
* if complete, this will contain any bytes received as part of the
* handshake that were after the actual handshake. This may return null.
*/
public synchronized byte[] getExtraBytes() { return _extra; }
/**
* Release resources on timeout.
* @param e may be null
@@ -281,12 +286,12 @@ abstract class EstablishBase implements EstablishState {
return;
changeState(State.CORRUPT);
}
_failedBySkew = bySkew;
_err = reason;
_e = e;
if (_log.shouldLog(Log.WARN))
_log.warn(prefix()+"Failed to establish: " + _err, e);
_log.warn(prefix() + "Failed to establish: " + reason, e);
if (!bySkew)
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1);
releaseBufs(false);
// con.close()?
}
/**
@@ -303,10 +308,6 @@ abstract class EstablishBase implements EstablishState {
_transport.returnUnused(_dh);
}
public synchronized String getError() { return _err; }
public synchronized Exception getException() { return _e; }
/**
* XOR a into b. Modifies b. a is unmodified.
* @param a 32 bytes
@@ -328,7 +329,7 @@ abstract class EstablishBase implements EstablishState {
buf.append("IBES ");
else
buf.append("OBES ");
buf.append(System.identityHashCode(this));
buf.append(_con.toString());
buf.append(' ').append(_state);
if (_con.isEstablished()) buf.append(" established");
buf.append(": ");
@@ -347,10 +348,20 @@ abstract class EstablishBase implements EstablishState {
public int getVersion() { return 1; }
/*
* @throws IllegalStateException always
*/
@Override
public void receive(ByteBuffer src) {
throw new IllegalStateException("receive() " + src.remaining() + " on verified state, doing nothing!");
}
/*
* @throws IllegalStateException always
*/
@Override
public void prepareOutbound() {
Log log = RouterContext.getCurrentContext().logManager().getLog(VerifiedEstablishState.class);
log.warn("prepareOutbound() on verified state, doing nothing!");
throw new IllegalStateException("prepareOutbound() on verified state, doing nothing!");
}
@Override
@@ -369,10 +380,20 @@ abstract class EstablishBase implements EstablishState {
public int getVersion() { return 1; }
/*
* @throws IllegalStateException always
*/
@Override
public void receive(ByteBuffer src) {
throw new IllegalStateException("receive() " + src.remaining() + " on failed state, doing nothing!");
}
/*
* @throws IllegalStateException always
*/
@Override
public void prepareOutbound() {
Log log = RouterContext.getCurrentContext().logManager().getLog(VerifiedEstablishState.class);
log.warn("prepareOutbound() on verified state, doing nothing!");
throw new IllegalStateException("prepareOutbound() on failed state, doing nothing!");
}
@Override

View File

@@ -9,13 +9,15 @@ import java.nio.ByteBuffer;
interface EstablishState {
/**
* parse the contents of the buffer as part of the handshake. if the
* handshake is completed and there is more data remaining, the data are
* copieed out so that the next read will be the (still encrypted) remaining
* data (available from getExtraBytes)
* Parse the contents of the buffer as part of the handshake.
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*
* If there are additional data in the buffer after the handshake is complete,
* the EstablishState is responsible for passing it to NTCPConnection.
*
* @throws IllegalStateException
*/
public void receive(ByteBuffer src);
@@ -23,14 +25,11 @@ interface EstablishState {
* Does nothing. Outbound (Alice) must override.
* We are establishing an outbound connection, so prepare ourselves by
* queueing up the write of the first part of the handshake
*
* @throws IllegalStateException
*/
public void prepareOutbound();
/**
* Was this connection failed because of clock skew?
*/
public boolean getFailedBySkew();
/** did the handshake fail for some reason? */
public boolean isCorrupt();
@@ -43,12 +42,6 @@ interface EstablishState {
*/
public boolean isComplete();
/**
* if complete, this will contain any bytes received as part of the
* handshake that were after the actual handshake. This may return null.
*/
public byte[] getExtraBytes();
/**
* Get the NTCP version
* @return 1, 2, or 0 if unknown
@@ -63,7 +56,4 @@ interface EstablishState {
*/
public void close(String reason, Exception e);
public String getError();
public Exception getException();
}

View File

@@ -283,6 +283,8 @@ class EventPumper implements Runnable {
con.getTimeSinceReceive() > expire) {
// we haven't sent or received anything in a really long time, so lets just close 'er up
con.close();
if (_log.shouldInfo())
_log.info("Failsafe or expire close for " + con);
failsafeCloses++;
}
} catch (CancelledKeyException cke) {
@@ -300,6 +302,7 @@ class EventPumper implements Runnable {
}
} else {
// another 100% CPU workaround
// TODO remove or only if we appear to be looping with no interest ops
if ((loopCount % 512) == 511) {
if (_log.shouldLog(Log.INFO))
_log.info("EventPumper throttle " + loopCount + " loops in " +
@@ -549,7 +552,9 @@ class EventPumper implements Runnable {
chan.socket().setKeepAlive(true);
SelectionKey ckey = chan.register(_selector, SelectionKey.OP_READ);
new NTCPConnection(_context, _transport, chan, ckey);
NTCPConnection con = new NTCPConnection(_context, _transport, chan, ckey);
ckey.attach(con);
_transport.establishing(con);
} catch (IOException ioe) {
_log.error("Error accepting", ioe);
}
@@ -565,6 +570,7 @@ class EventPumper implements Runnable {
if (connected) {
if (shouldSetKeepAlive(chan))
chan.socket().setKeepAlive(true);
// key was already set when the channel was created, why do it again here?
con.setKey(key);
con.outboundConnected();
_context.statManager().addRateData("ntcp.connectSuccessful", 1);
@@ -619,7 +625,7 @@ class EventPumper implements Runnable {
ByteArray ba = new ByteArray(ip);
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
_log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
} else {
count = 1;
if (_log.shouldLog(Log.WARN))
@@ -682,11 +688,11 @@ class EventPumper implements Runnable {
ByteArray ba = new ByteArray(ip);
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
_log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con, ioe);
} else {
count = 1;
if (_log.shouldLog(Log.WARN))
_log.warn("IOE on inbound before receiving any: " + con);
_log.warn("IOE on inbound before receiving any: " + con, ioe);
}
_context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
} else {

View File

@@ -5,17 +5,35 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import com.southernstorm.noise.protocol.CipherState;
import com.southernstorm.noise.protocol.CipherStatePair;
import com.southernstorm.noise.protocol.HandshakeState;
import net.i2p.crypto.SigType;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.router.RouterAddress;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.router.RouterInfo;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
import static net.i2p.router.transport.ntcp.OutboundNTCP2State.*;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
@@ -25,7 +43,7 @@ import net.i2p.util.SimpleByteCache;
*
* @since 0.9.35 pulled out of EstablishState
*/
class InboundEstablishState extends EstablishBase {
class InboundEstablishState extends EstablishBase implements NTCP2Payload.PayloadCallback {
/** current encrypted block we are reading (IB only) or an IV buf used at the end for OB */
private byte _curEncrypted[];
@@ -39,7 +57,36 @@ class InboundEstablishState extends EstablishBase {
/** how long we expect _sz_aliceIdent_tsA_padding_aliceSig to be when its full */
private int _sz_aliceIdent_tsA_padding_aliceSigSize;
//// NTCP2 things
private HandshakeState _handshakeState;
private int _padlen1;
private int _msg3p2len;
private int _msg3p2FailReason = -1;
private ByteArray _msg3tmp;
private NTCP2Options _hisPadding;
// same as I2PTunnelRunner
private static final int BUFFER_SIZE = 4*1024;
private static final int MAX_DATA_READ_BUFS = 32;
private static final ByteCache _dataReadBufs = ByteCache.getInstance(MAX_DATA_READ_BUFS, BUFFER_SIZE);
private static final int NTCP1_MSG1_SIZE = XY_SIZE + HXY_SIZE;
// 287 - 64 = 223
private static final int PADDING1_MAX = TOTAL1_MAX - MSG1_SIZE;
private static final int PADDING1_FAIL_MAX = 128;
private static final int PADDING2_MAX = 64;
// DSA RI, no options, no addresses
private static final int RI_MIN = 387 + 8 + 1 + 1 + 2 + 40;
private static final int MSG3P2_MIN = 1 + 2 + 1 + RI_MIN + MAC_SIZE;
// absolute max, let's enforce less
//private static final int MSG3P2_MAX = BUFFER_SIZE - MSG3P1_SIZE;
private static final int MSG3P2_MAX = 6000;
private static final Set<State> STATES_NTCP2 =
EnumSet.of(State.IB_NTCP2_INIT, State.IB_NTCP2_GOT_X, State.IB_NTCP2_GOT_PADDING,
State.IB_NTCP2_SENT_Y, State.IB_NTCP2_GOT_RI, State.IB_NTCP2_READ_RANDOM);
public InboundEstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
super(ctx, transport, con);
@@ -50,13 +97,13 @@ class InboundEstablishState extends EstablishBase {
}
/**
* parse the contents of the buffer as part of the handshake. if the
* handshake is completed and there is more data remaining, the data are
* copieed out so that the next read will be the (still encrypted) remaining
* data (available from getExtraBytes)
* Parse the contents of the buffer as part of the handshake.
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*
* If there are additional data in the buffer after the handshake is complete,
* the EstablishState is responsible for passing it to NTCPConnection.
*/
@Override
public synchronized void receive(ByteBuffer src) {
@@ -77,7 +124,8 @@ class InboundEstablishState extends EstablishBase {
synchronized (_stateLock) {
if (_state == State.IB_INIT)
return 0;
// TODO NTCP2 states
if (STATES_NTCP2.contains(_state))
return 2;
return 1;
}
}
@@ -91,15 +139,24 @@ class InboundEstablishState extends EstablishBase {
*
* Caller must synch.
*
* FIXME none of the _state comparisons use _stateLock, but whole thing
* is synchronized, should be OK. See isComplete()
*/
private void receiveInbound(ByteBuffer src) {
if (STATES_NTCP2.contains(_state)) {
receiveInboundNTCP2(src);
return;
}
// TODO if less than 64, buffer and decide later?
if (_state == State.IB_INIT && src.hasRemaining()) {
int remaining = src.remaining();
//if (remaining < NTCP1_MSG1_SIZE && _transport.isNTCP2Enabled()) {
// // NTCP2
//}
if (remaining < NTCP1_MSG1_SIZE && _transport.isNTCP2Enabled()) {
// NTCP2
// TODO can't change our mind later if we get more than 287
_con.setVersion(2);
changeState(State.IB_NTCP2_INIT);
receiveInboundNTCP2(src);
// releaseBufs() will return the unused DH
return;
}
int toGet = Math.min(remaining, XY_SIZE - _received);
src.get(_X, _received, toGet);
_received += toGet;
@@ -188,6 +245,10 @@ class InboundEstablishState extends EstablishBase {
_context.statManager().addRateData("ntcp.invalidDH", 1);
fail("Invalid X", e);
return;
} catch (IllegalStateException ise) {
// setPeerPublicValue()
fail("reused keys?", ise);
return;
}
}
@@ -281,9 +342,7 @@ class InboundEstablishState extends EstablishBase {
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "got the sig");
verifyInbound();
if (_state == State.VERIFIED && src.hasRemaining())
prepareExtra(src);
verifyInbound(src);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"verifying size (sz=" + _sz_aliceIdent_tsA_padding_aliceSig.size()
+ " expected=" + _sz_aliceIdent_tsA_padding_aliceSigSize
@@ -291,10 +350,15 @@ class InboundEstablishState extends EstablishBase {
+ ')');
return;
}
} else {
}
}
// check for remaining data
if ((_state == State.VERIFIED || _state == State.CORRUPT) && src.hasRemaining()) {
if (_log.shouldWarn())
_log.warn("Received unexpected " + src.remaining() + " on " + this, new Exception());
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"done with the data, not yet complete or corrupt");
}
@@ -343,6 +407,7 @@ class InboundEstablishState extends EstablishBase {
/**
* We are Bob. Verify message #3 from Alice, then send message #4 to Alice.
* NTCP 1 only.
*
* _aliceIdentSize and _aliceIdent must be set.
* _sz_aliceIdent_tsA_padding_aliceSig must contain at least
@@ -358,9 +423,12 @@ class InboundEstablishState extends EstablishBase {
* transport
*
* State must be IB_GOT_RI.
* This will always change the state to VERIFIED or CORRUPT.
* Caller must synch.
*
* @param buf possibly containing "extra" data for data phase
*/
private void verifyInbound() {
private void verifyInbound(ByteBuffer buf) {
byte b[] = _sz_aliceIdent_tsA_padding_aliceSig.toByteArray();
try {
int sz = _aliceIdentSize;
@@ -393,64 +461,35 @@ class InboundEstablishState extends EstablishBase {
System.arraycopy(b, b.length-s.length, s, 0, s.length);
Signature sig = new Signature(type, s);
boolean ok = _context.dsa().verifySignature(sig, toVerify, _aliceIdent.getSigningPublicKey());
Hash aliceHash = _aliceIdent.calculateHash();
if (ok) {
ok = verifyInbound(aliceHash);
}
if (ok) {
// get inet-addr
InetAddress addr = this._con.getChannel().socket().getInetAddress();
byte[] ip = (addr == null) ? null : addr.getAddress();
if (_context.banlist().isBanlistedForever(_aliceIdent.calculateHash())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound connection from permanently banlisted peer: " + _aliceIdent.calculateHash());
// So next time we will not accept the con from this IP,
// rather than doing the whole handshake
if(ip != null)
_context.blocklist().add(ip);
fail("Peer is banlisted forever: " + _aliceIdent.calculateHash());
return;
}
if(ip != null)
_transport.setIP(_aliceIdent.calculateHash(), ip);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "verification successful for " + _con);
long diff = 1000*Math.abs(_peerSkew);
if (!_context.clock().getUpdatedSuccessfully()) {
// Adjust the clock one time in desperation
// This isn't very likely, outbound will do it first
// We are Bob, she is Alice, adjust to match Alice
_context.clock().setOffset(1000 * (0 - _peerSkew), true);
_peerSkew = 0;
if (diff != 0)
_log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff);
_transport.markReachable(_aliceIdent.calculateHash(), true);
// Only banlist if we know what time it is
_context.banlist().banlistRouter(DataHelper.formatDuration(diff),
_aliceIdent.calculateHash(),
_x("Excessive clock skew: {0}"));
_transport.setLastBadSkew(_peerSkew);
fail("Clocks too skewed (" + diff + " ms)", null, true);
return;
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug(prefix()+"Clock skew: " + diff + " ms");
}
_con.setRemotePeer(_aliceIdent);
sendInboundConfirm(_aliceIdent, tsA);
sendInboundConfirm(aliceHash, tsA);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"e_bobSig is " + _e_bobSig.length + " bytes long");
byte iv[] = _curEncrypted; // reuse buf
System.arraycopy(_e_bobSig, _e_bobSig.length-AES_SIZE, iv, 0, AES_SIZE);
// this does not copy the IV, do not release to cache
// We are Bob, she is Alice, clock skew is Alice-Bob
_con.finishInboundEstablishment(_dh.getSessionKey(), _peerSkew, iv, _prevEncrypted); // skew in seconds
// skew in seconds
_con.finishInboundEstablishment(_dh.getSessionKey(), _peerSkew, iv, _prevEncrypted);
changeState(State.VERIFIED);
if (buf.hasRemaining()) {
// process "extra" data
// This is unlikely for inbound, as we must reply with message 4
if (_log.shouldWarn())
_log.warn("extra data " + buf.remaining() + " on " + this);
_con.recvEncryptedI2NP(buf);
}
releaseBufs(true);
if (_log.shouldLog(Log.INFO))
_log.info(prefix()+"Verified remote peer as " + _aliceIdent.calculateHash());
changeState(State.VERIFIED);
_log.info(prefix()+"Verified remote peer as " + aliceHash);
} else {
_context.statManager().addRateData("ntcp.invalidInboundSignature", 1);
fail("Peer verification failed - spoof of " + _aliceIdent.calculateHash() + "?");
// verifyInbound(aliceHash) called fail()
}
} catch (IOException ioe) {
_context.statManager().addRateData("ntcp.invalidInboundIOE", 1);
@@ -458,19 +497,76 @@ class InboundEstablishState extends EstablishBase {
}
}
/**
* Common validation things for both NTCP 1 and 2.
* Call after receiving Alice's RouterIdentity (in message 3).
* _peerSkew must be set.
*
* Side effect: sets _msg3p2FailReason when returning false
*
* @return success or calls fail() and returns false
* @since 0.9.36 pulled out of verifyInbound()
*/
private boolean verifyInbound(Hash aliceHash) {
// get inet-addr
InetAddress addr = this._con.getChannel().socket().getInetAddress();
byte[] ip = (addr == null) ? null : addr.getAddress();
if (_context.banlist().isBanlistedForever(aliceHash)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound connection from permanently banlisted peer: " + aliceHash);
// So next time we will not accept the con from this IP,
// rather than doing the whole handshake
if(ip != null)
_context.blocklist().add(ip);
fail("Peer is banlisted forever: " + aliceHash);
_msg3p2FailReason = NTCPConnection.REASON_BANNED;
return false;
}
if(ip != null)
_transport.setIP(aliceHash, ip);
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "verification successful for " + _con);
long diff = 1000*Math.abs(_peerSkew);
if (!_context.clock().getUpdatedSuccessfully()) {
// Adjust the clock one time in desperation
// This isn't very likely, outbound will do it first
// We are Bob, she is Alice, adjust to match Alice
_context.clock().setOffset(1000 * (0 - _peerSkew), true);
_peerSkew = 0;
if (diff != 0)
_log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidInboundSkew", diff);
_transport.markReachable(aliceHash, true);
// Only banlist if we know what time it is
_context.banlist().banlistRouter(DataHelper.formatDuration(diff),
aliceHash,
_x("Excessive clock skew: {0}"));
_transport.setLastBadSkew(_peerSkew);
fail("Clocks too skewed (" + diff + " ms)", null, true);
_msg3p2FailReason = NTCPConnection.REASON_SKEW;
return false;
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug(prefix()+"Clock skew: " + diff + " ms");
}
return true;
}
/**
* We are Bob. Send message #4 to Alice.
*
* State must be VERIFIED.
* Caller must synch.
*
* @param h Alice's Hash
*/
private void sendInboundConfirm(RouterIdentity alice, long tsA) {
private void sendInboundConfirm(Hash h, long tsA) {
// send Alice E(S(X+Y+Alice.identHash+tsA+tsB), sk, prev)
byte toSign[] = new byte[XY_SIZE + XY_SIZE + 32+4+4];
int off = 0;
System.arraycopy(_X, 0, toSign, off, XY_SIZE); off += XY_SIZE;
System.arraycopy(_Y, 0, toSign, off, XY_SIZE); off += XY_SIZE;
Hash h = alice.calculateHash();
System.arraycopy(h.getData(), 0, toSign, off, 32); off += 32;
DataHelper.toLong(toSign, off, 4, tsA); off += 4;
DataHelper.toLong(toSign, off, 4, _tsB); off += 4;
@@ -496,6 +592,438 @@ class InboundEstablishState extends EstablishBase {
_transport.getPumper().wantsWrite(_con, _e_bobSig);
}
//// NTCP2 below here
/**
* NTCP2 only. State must be one of IB_NTCP2_*
*
* we are Bob, so receive these bytes as part of an inbound connection
* This method receives messages 1 and 3, and sends message 2.
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*
* @since 0.9.36
*/
private synchronized void receiveInboundNTCP2(ByteBuffer src) {
if (_state == State.IB_NTCP2_INIT && src.hasRemaining()) {
// use _X for the buffer
int toGet = Math.min(src.remaining(), MSG1_SIZE - _received);
src.get(_X, _received, toGet);
_received += toGet;
if (_received < MSG1_SIZE) {
// TODO if we got less than 64 should we even be here?
if (_log.shouldWarn())
_log.warn("Short buffer got " + toGet + " total now " + _received);
return;
}
changeState(State.IB_NTCP2_GOT_X);
_received = 0;
// replay check using encrypted key
if (!_transport.isHXHIValid(_X)) {
_context.statManager().addRateData("ntcp.replayHXxorBIH", 1);
fail("Replay msg 1, eX = " + Base64.encode(_X, 0, KEY_SIZE));
return;
}
try {
_handshakeState = new HandshakeState(HandshakeState.RESPONDER, _transport.getXDHFactory());
} catch (GeneralSecurityException gse) {
throw new IllegalStateException("bad proto", gse);
}
_handshakeState.getLocalKeyPair().setPublicKey(_transport.getNTCP2StaticPubkey(), 0);
_handshakeState.getLocalKeyPair().setPrivateKey(_transport.getNTCP2StaticPrivkey(), 0);
Hash h = _context.routerHash();
SessionKey bobHash = new SessionKey(h.getData());
// save encrypted data for CBC for msg 2
System.arraycopy(_X, KEY_SIZE - IV_SIZE, _prevEncrypted, 0, IV_SIZE);
_context.aes().decrypt(_X, 0, _X, 0, bobHash, _transport.getNTCP2StaticIV(), KEY_SIZE);
if (DataHelper.eqCT(_X, 0, ZEROKEY, 0, KEY_SIZE)) {
fail("Bad msg 1, X = 0");
return;
}
byte options[] = new byte[OPTIONS1_SIZE];
try {
_handshakeState.start();
if (_log.shouldWarn())
_log.warn("After start: " + _handshakeState.toString());
_handshakeState.readMessage(_X, 0, MSG1_SIZE, options, 0);
} catch (GeneralSecurityException gse) {
// Read a random number of bytes, store wanted in _padlen1
_padlen1 = _context.random().nextInt(PADDING1_FAIL_MAX) - src.remaining();
if (_padlen1 > 0) {
// delayed fail for probing resistance
// need more bytes before failure
if (_log.shouldWarn())
_log.warn("Bad msg 1, X = " + Base64.encode(_X, 0, KEY_SIZE) + " with " + src.remaining() +
" more bytes, waiting for " + _padlen1 + " more bytes", gse);
changeState(State.IB_NTCP2_READ_RANDOM);
} else {
// got all we need, fail now
fail("Bad msg 1, X = " + Base64.encode(_X, 0, KEY_SIZE) + " remaining = " + src.remaining(), gse);
}
return;
} catch (RuntimeException re) {
fail("Bad msg 1, X = " + Base64.encode(_X, 0, KEY_SIZE), re);
return;
}
if (_log.shouldWarn())
_log.warn("After msg 1: " + _handshakeState.toString());
int v = options[1] & 0xff;
if (v != NTCPTransport.NTCP2_INT_VERSION) {
fail("Bad version: " + v);
return;
}
_padlen1 = (int) DataHelper.fromLong(options, 2, 2);
_msg3p2len = (int) DataHelper.fromLong(options, 4, 2);
long tsA = DataHelper.fromLong(options, 8, 4);
long now = _context.clock().now();
// In NTCP1, timestamp comes in msg 3 so we know the RTT.
// In NTCP2, it comes in msg 1, so just guess.
// We could defer this to msg 3 to calculate the RTT?
long rtt = 250;
_peerSkew = (now - (tsA * 1000) - (rtt / 2) + 500) / 1000;
if ((_peerSkew > MAX_SKEW || _peerSkew < 0 - MAX_SKEW) &&
!_context.clock().getUpdatedSuccessfully()) {
// If not updated successfully, allow it.
// This isn't very likely, outbound will do it first
// See verifyInbound() above.
fail("Clock Skew: " + _peerSkew, null, true);
return;
}
if (_padlen1 > PADDING1_MAX) {
fail("bad msg 1 padlen: " + _padlen1);
return;
}
if (_msg3p2len < MSG3P2_MIN || _msg3p2len > MSG3P2_MAX) {
fail("bad msg3p2 len: " + _msg3p2len);
return;
}
if (_padlen1 <= 0) {
// No padding specified, go straight to sending msg 2
changeState(State.IB_NTCP2_GOT_PADDING);
if (src.hasRemaining()) {
// Inbound conn can never have extra data after msg 1
fail("Extra data after msg 1: " + src.remaining());
} else {
// write msg 2
prepareOutbound2();
}
return;
}
}
// delayed fail for probing resistance
if (_state == State.IB_NTCP2_READ_RANDOM && src.hasRemaining()) {
// read more bytes before failing
_received += src.remaining();
if (_received < _padlen1) {
if (_log.shouldWarn())
_log.warn("Bad msg 1, got " + src.remaining() +
" more bytes, waiting for " + (_padlen1 - _received) + " more bytes");
} else {
fail("Bad msg 1, failing after getting " + src.remaining() + " more bytes");
}
return;
}
if (_state == State.IB_NTCP2_GOT_X && src.hasRemaining()) {
// skip this if _padlen1 == 0;
// use _X for the buffer
int toGet = Math.min(src.remaining(), _padlen1 - _received);
src.get(_X, _received, toGet);
_received += toGet;
if (_received < _padlen1)
return;
changeState(State.IB_NTCP2_GOT_PADDING);
_handshakeState.mixHash(_X, 0, _padlen1);
if (_log.shouldWarn())
_log.warn("After mixhash padding " + _padlen1 + " msg 1: " + _handshakeState.toString());
_received = 0;
if (src.hasRemaining()) {
// Inbound conn can never have extra data after msg 1
fail("Extra data after msg 1: " + src.remaining());
} else {
// write msg 2
prepareOutbound2();
}
return;
}
if (_state == State.IB_NTCP2_SENT_Y && src.hasRemaining()) {
int msg3tot = MSG3P1_SIZE + _msg3p2len;
if (_msg3tmp == null)
_msg3tmp = _dataReadBufs.acquire();
// use _X for the buffer FIXME too small
byte[] tmp = _msg3tmp.getData();
int toGet = Math.min(src.remaining(), msg3tot - _received);
src.get(tmp, _received, toGet);
_received += toGet;
if (_received < msg3tot)
return;
changeState(State.IB_NTCP2_GOT_RI);
_received = 0;
ByteArray ptmp = _dataReadBufs.acquire();
byte[] payload = ptmp.getData();
try {
_handshakeState.readMessage(tmp, 0, msg3tot, payload, 0);
} catch (GeneralSecurityException gse) {
// TODO delayed failure per spec, as in NTCPConnection.delayedClose()
_dataReadBufs.release(ptmp, false);
fail("Bad msg 3, part 1 is:\n" + net.i2p.util.HexDump.dump(tmp, 0, MSG3P1_SIZE), gse);
return;
} catch (RuntimeException re) {
_dataReadBufs.release(ptmp, false);
fail("Bad msg 3", re);
return;
}
if (_log.shouldWarn())
_log.warn("After msg 3: " + _handshakeState.toString());
try {
// calls callbacks below
NTCP2Payload.processPayload(_context, this, payload, 0, _msg3p2len - MAC_SIZE, true);
} catch (IOException ioe) {
fail("Bad msg 3 payload", ioe);
// probably payload frame/block problems
// setDataPhase() will send termination
if (_msg3p2FailReason < 0)
_msg3p2FailReason = NTCPConnection.REASON_FRAMING;
} catch (DataFormatException dfe) {
fail("Bad msg 3 payload", dfe);
// probably RI problems
// setDataPhase() will send termination
if (_msg3p2FailReason < 0)
_msg3p2FailReason = NTCPConnection.REASON_SIGFAIL;
_context.statManager().addRateData("ntcp.invalidInboundSignature", 1);
} catch (I2NPMessageException ime) {
// shouldn't happen, no I2NP msgs in msg3p2
fail("Bad msg 3 payload", ime);
// setDataPhase() will send termination
if (_msg3p2FailReason < 0)
_msg3p2FailReason = 0;
} finally {
_dataReadBufs.release(ptmp, false);
}
// pass buffer for processing of "extra" data
setDataPhase(src);
}
// TODO check for remaining data and log/throw
}
/**
* Write the 2nd NTCP2 message.
* IV (CBC from msg 1) must be in _prevEncrypted
*
* @since 0.9.36
*/
private synchronized void prepareOutbound2() {
// create msg 2 payload
byte[] options2 = new byte[OPTIONS2_SIZE];
int padlen2 = _context.random().nextInt(PADDING2_MAX);
DataHelper.toLong(options2, 2, 2, padlen2);
long now = _context.clock().now() / 1000;
DataHelper.toLong(options2, 8, 4, now);
byte[] tmp = new byte[MSG2_SIZE + padlen2];
try {
_handshakeState.writeMessage(tmp, 0, options2, 0, OPTIONS2_SIZE);
} catch (GeneralSecurityException gse) {
// buffer length error
if (!_log.shouldWarn())
_log.error("Bad msg 2 out", gse);
fail("Bad msg 2 out", gse);
return;
} catch (RuntimeException re) {
if (!_log.shouldWarn())
_log.error("Bad msg 2 out", re);
fail("Bad msg 2 out", re);
return;
}
if (_log.shouldWarn())
_log.warn("After msg 2: " + _handshakeState.toString());
Hash h = _context.routerHash();
SessionKey bobHash = new SessionKey(h.getData());
_context.aes().encrypt(tmp, 0, tmp, 0, bobHash, _prevEncrypted, KEY_SIZE);
if (padlen2 > 0) {
_context.random().nextBytes(tmp, MSG2_SIZE, padlen2);
_handshakeState.mixHash(tmp, MSG2_SIZE, padlen2);
if (_log.shouldWarn())
_log.warn("After mixhash padding " + padlen2 + " msg 2: " + _handshakeState.toString());
}
changeState(State.IB_NTCP2_SENT_Y);
// send it all at once
_transport.getPumper().wantsWrite(_con, tmp);
}
/**
* KDF for NTCP2 data phase,
* then calls con.finishInboundEstablishment(),
* passing over the final keys and states to the con.
*
* This changes the state to VERIFIED.
*
* @param buf possibly containing "extra" data for data phase
* @since 0.9.36
*/
private synchronized void setDataPhase(ByteBuffer buf) {
// Data phase ChaChaPoly keys
CipherStatePair ckp = _handshakeState.split();
CipherState rcvr = ckp.getReceiver();
CipherState sender = ckp.getSender();
byte[] k_ab = rcvr.getKey();
byte[] k_ba = sender.getKey();
// Data phase SipHash keys
byte[][] sipkeys = generateSipHashKeys(_context, _handshakeState);
byte[] sip_ab = sipkeys[0];
byte[] sip_ba = sipkeys[1];
if (_msg3p2FailReason >= 0) {
if (_log.shouldWarn())
_log.warn("Failed msg3p2, code " + _msg3p2FailReason + " for " + this);
_con.failInboundEstablishment(sender, sip_ba, _msg3p2FailReason);
} else {
if (_log.shouldWarn()) {
_log.warn("Finished establishment for " + this +
"\nGenerated ChaCha key for A->B: " + Base64.encode(k_ab) +
"\nGenerated ChaCha key for B->A: " + Base64.encode(k_ba) +
"\nGenerated SipHash key for A->B: " + Base64.encode(sip_ab) +
"\nGenerated SipHash key for B->A: " + Base64.encode(sip_ba));
}
// skew in seconds
_con.finishInboundEstablishment(sender, rcvr, sip_ba, sip_ab, _peerSkew, _hisPadding);
changeState(State.VERIFIED);
if (buf.hasRemaining()) {
// process "extra" data
// This is very likely for inbound, as data should come right after message 3
if (_log.shouldInfo())
_log.info("extra data " + buf.remaining() + " on " + this);
_con.recvEncryptedI2NP(buf);
}
}
// zero out everything
releaseBufs(true);
_handshakeState.destroy();
Arrays.fill(sip_ab, (byte) 0);
Arrays.fill(sip_ba, (byte) 0);
}
//// PayloadCallbacks
/**
* Get "s" static key out of RI, compare to what we got in the handshake.
* Tell NTCPConnection who it is.
*
* @param isHandshake always true
* @throws DataFormatException on bad sig, unknown SigType, no static key,
* static key mismatch, IP checks in verifyInbound()
* @since 0.9.36
*/
public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException {
// Validate Alice static key
String s = null;
// find address with matching version
List<RouterAddress> addrs = ri.getTargetAddresses(NTCPTransport.STYLE, NTCPTransport.STYLE2);
for (RouterAddress addr : addrs) {
String v = addr.getOption("v");
if (v == null ||
(!v.equals(NTCPTransport.NTCP2_VERSION) && !v.startsWith(NTCPTransport.NTCP2_VERSION_ALT))) {
continue;
}
s = addr.getOption("s");
if (s != null)
break;
}
if (s == null) {
_msg3p2FailReason = NTCPConnection.REASON_S_MISMATCH;
throw new DataFormatException("no s in RI");
}
byte[] sb = Base64.decode(s);
if (sb == null || sb.length != KEY_SIZE) {
_msg3p2FailReason = NTCPConnection.REASON_S_MISMATCH;
throw new DataFormatException("bad s in RI");
}
byte[] nb = new byte[32];
// compare to the _handshakeState
_handshakeState.getRemotePublicKey().getPublicKey(nb, 0);
if (!DataHelper.eqCT(sb, 0, nb, 0, KEY_SIZE)) {
_msg3p2FailReason = NTCPConnection.REASON_S_MISMATCH;
throw new DataFormatException("s mismatch in RI");
}
_aliceIdent = ri.getIdentity();
Hash h = _aliceIdent.calculateHash();
// this sets the reason
boolean ok = verifyInbound(h);
if (!ok)
throw new DataFormatException("NTCP2 verifyInbound() fail");
try {
RouterInfo old = _context.netDb().store(h, ri);
if (flood && !ri.equals(old)) {
FloodfillNetworkDatabaseFacade fndf = (FloodfillNetworkDatabaseFacade) _context.netDb();
if (fndf.floodConditional(ri)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Flooded the RI: " + h);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Flood request but we didn't: " + h);
}
}
} catch (IllegalArgumentException iae) {
// hash collision?
_msg3p2FailReason = NTCPConnection.REASON_UNSPEC;
throw new DataFormatException("RI store fail", iae);
}
_con.setRemotePeer(_aliceIdent);
}
/** @since 0.9.36 */
public void gotOptions(byte[] options, boolean isHandshake) {
if (options.length < 12) {
if (_log.shouldWarn())
_log.warn("Got options length " + options.length + " on: " + this);
return;
}
float tmin = (options[0] & 0xff) / 16.0f;
float tmax = (options[1] & 0xff) / 16.0f;
float rmin = (options[2] & 0xff) / 16.0f;
float rmax = (options[3] & 0xff) / 16.0f;
int tdummy = (int) DataHelper.fromLong(options, 4, 2);
int rdummy = (int) DataHelper.fromLong(options, 6, 2);
int tdelay = (int) DataHelper.fromLong(options, 8, 2);
int rdelay = (int) DataHelper.fromLong(options, 10, 2);
_hisPadding = new NTCP2Options(tmin, tmax, rmin, rmax,
tdummy, rdummy, tdelay, rdelay);
}
/** @since 0.9.36 */
public void gotPadding(int paddingLength, int frameLength) {}
// Following 4 are illegal in handshake, we will never get them
/** @since 0.9.36 */
public void gotTermination(int reason, long lastReceived) {}
/** @since 0.9.36 */
public void gotUnknown(int type, int len) {}
/** @since 0.9.36 */
public void gotDateTime(long time) {}
/** @since 0.9.36 */
public void gotI2NP(I2NPMessage msg) {}
/**
* @since 0.9.16
*/
@Override
protected synchronized void fail(String reason, Exception e, boolean bySkew) {
super.fail(reason, e, bySkew);
if (_handshakeState != null) {
if (_log.shouldWarn())
_log.warn("State at failure: " + _handshakeState.toString());
_handshakeState.destroy();
}
}
/**
* Only call once. Caller must synch.
* @since 0.9.16
@@ -507,6 +1035,11 @@ class InboundEstablishState extends EstablishBase {
// NTCPConnection to use as the IV
if (!isVerified)
SimpleByteCache.release(_curEncrypted);
Arrays.fill(_X, (byte) 0);
SimpleByteCache.release(_X);
if (_msg3tmp != null) {
_dataReadBufs.release(_msg3tmp, false);
_msg3tmp = null;
}
}
}

View File

@@ -8,6 +8,7 @@ import java.net.Inet6Address;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.KeyPair;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
@@ -45,6 +46,9 @@ import net.i2p.router.transport.TransportImpl;
import net.i2p.router.transport.TransportUtil;
import static net.i2p.router.transport.TransportUtil.IPv6Config.*;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
import net.i2p.router.transport.crypto.X25519KeyFactory;
import net.i2p.router.transport.crypto.X25519PublicKey;
import net.i2p.router.transport.crypto.X25519PrivateKey;
import net.i2p.router.util.DecayingHashSet;
import net.i2p.router.util.DecayingBloomFilter;
import net.i2p.util.Addresses;
@@ -97,13 +101,15 @@ public class NTCPTransport extends TransportImpl {
public final static String PROP_I2NP_NTCP_AUTO_PORT = "i2np.ntcp.autoport";
public final static String PROP_I2NP_NTCP_AUTO_IP = "i2np.ntcp.autoip";
private static final String PROP_ADVANCED = "routerconsole.advanced";
public static final int DEFAULT_COST = 10;
private static final int DEFAULT_COST = 10;
private static final int NTCP2_OUTBOUND_COST = 14;
/** this is rarely if ever used, default is to bind to wildcard address */
public static final String PROP_BIND_INTERFACE = "i2np.ntcp.bindInterface";
private final NTCPSendFinisher _finisher;
private final DHSessionKeyBuilder.Factory _dhFactory;
private final X25519KeyFactory _xdhFactory;
private long _lastBadSkew;
private static final long[] RATES = { 10*60*1000 };
@@ -114,28 +120,32 @@ public class NTCPTransport extends TransportImpl {
// NTCP2 stuff
public static final String STYLE = "NTCP";
private static final String STYLE2 = "NTCP2";
private static final String PROP_NTCP2_ENABLE = "i2np.ntcp2.enable";
private static final boolean DEFAULT_NTCP2_ENABLE = false;
private boolean _enableNTCP2;
private static final String NTCP2_PROTO_SHORT = "NXK2CS";
private static final String OPT_NTCP2_SK = 'N' + NTCP2_PROTO_SHORT + "2s";
public static final String STYLE2 = "NTCP2";
static final int NTCP2_INT_VERSION = 2;
private static final String NTCP2_VERSION = Integer.toString(NTCP2_INT_VERSION);
/** "2" */
static final String NTCP2_VERSION = Integer.toString(NTCP2_INT_VERSION);
/** "2," */
static final String NTCP2_VERSION_ALT = NTCP2_VERSION + ',';
/** b64 static private key */
private static final String PROP_NTCP2_SP = "i2np.ntcp2.sp";
public static final String PROP_NTCP2_SP = "i2np.ntcp2.sp";
/** b64 static IV */
private static final String PROP_NTCP2_IV = "i2np.ntcp2.iv";
private static final int NTCP2_IV_LEN = 16;
private static final int NTCP2_KEY_LEN = 32;
public static final String PROP_NTCP2_IV = "i2np.ntcp2.iv";
private static final int NTCP2_IV_LEN = OutboundNTCP2State.IV_SIZE;
private static final int NTCP2_KEY_LEN = OutboundNTCP2State.KEY_SIZE;
private final boolean _enableNTCP2;
private final byte[] _ntcp2StaticPubkey;
private final byte[] _ntcp2StaticPrivkey;
private final byte[] _ntcp2StaticIV;
private final String _b64Ntcp2StaticPubkey;
private final String _b64Ntcp2StaticIV;
public NTCPTransport(RouterContext ctx, DHSessionKeyBuilder.Factory dh) {
/**
* @param xdh null to disable NTCP2
*/
public NTCPTransport(RouterContext ctx, DHSessionKeyBuilder.Factory dh, X25519KeyFactory xdh) {
super(ctx);
_dhFactory = dh;
_xdhFactory = xdh;
_log = ctx.logManager().getLog(getClass());
_context.statManager().createRateStat("ntcp.sendTime", "Total message lifetime when sent completely", "ntcp", RATES);
@@ -222,27 +232,31 @@ public class NTCPTransport extends TransportImpl {
_nearCapacityCostBid = new SharedBid(105);
_transientFail = new SharedBid(TransportBid.TRANSIENT_FAIL);
//_enableNTCP2 = ctx.getProperty(PROP_NTCP2_ENABLE, DEFAULT_NTCP2_ENABLE);
_enableNTCP2 = false;
_enableNTCP2 = xdh != null;
if (_enableNTCP2) {
boolean shouldSave = false;
byte[] priv = null;
byte[] iv = null;
String b64Pub = null;
String b64IV = null;
String s = ctx.getProperty(PROP_NTCP2_SP);
if (s != null) {
priv = Base64.decode(s);
}
if (priv == null || priv.length != NTCP2_KEY_LEN) {
priv = new byte[NTCP2_KEY_LEN];
ctx.random().nextBytes(priv);
KeyPair keys = xdh.getKeys();
_ntcp2StaticPrivkey = keys.getPrivate().getEncoded();
_ntcp2StaticPubkey = keys.getPublic().getEncoded();
shouldSave = true;
} else {
_ntcp2StaticPrivkey = priv;
_ntcp2StaticPubkey = (new X25519PrivateKey(priv)).toPublic().getEncoded();
}
s = ctx.getProperty(PROP_NTCP2_IV);
if (s != null) {
iv = Base64.decode(s);
b64IV = s;
if (!shouldSave) {
s = ctx.getProperty(PROP_NTCP2_IV);
if (s != null) {
iv = Base64.decode(s);
b64IV = s;
}
}
if (iv == null || iv.length != NTCP2_IV_LEN) {
iv = new byte[NTCP2_IV_LEN];
@@ -251,17 +265,17 @@ public class NTCPTransport extends TransportImpl {
}
if (shouldSave) {
Map<String, String> changes = new HashMap<String, String>(2);
String b64Priv = Base64.encode(priv);
String b64Priv = Base64.encode(_ntcp2StaticPrivkey);
b64IV = Base64.encode(iv);
changes.put(PROP_NTCP2_SP, b64Priv);
changes.put(PROP_NTCP2_IV, b64IV);
ctx.router().saveConfig(changes, null);
}
_ntcp2StaticPrivkey = priv;
_ntcp2StaticIV = iv;
_b64Ntcp2StaticPubkey = "TODO"; // priv->pub
_b64Ntcp2StaticPubkey = Base64.encode(_ntcp2StaticPubkey);
_b64Ntcp2StaticIV = b64IV;
} else {
_ntcp2StaticPubkey = null;
_ntcp2StaticPrivkey = null;
_ntcp2StaticIV = null;
_b64Ntcp2StaticPubkey = null;
@@ -299,17 +313,17 @@ public class NTCPTransport extends TransportImpl {
RouterIdentity ident = target.getIdentity();
Hash ih = ident.calculateHash();
NTCPConnection con = null;
boolean isNew = false;
int newVersion = 0;
boolean fail = false;
synchronized (_conLock) {
con = _conByIdent.get(ih);
if (con == null) {
isNew = true;
RouterAddress addr = getTargetAddress(target);
if (addr != null) {
int ver = getNTCPVersion(addr);
if (ver != 0) {
con = new NTCPConnection(_context, this, ident, addr, ver);
newVersion = getNTCPVersion(addr);
if (newVersion != 0) {
con = new NTCPConnection(_context, this, ident, addr, newVersion);
establishing(con);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Send on a new con: " + con + " at " + addr + " for " + ih);
// Note that outbound conns go in the map BEFORE establishment
@@ -331,9 +345,7 @@ public class NTCPTransport extends TransportImpl {
afterSend(msg, false);
return;
}
if (isNew) {
// doesn't do anything yet, just enqueues it
con.send(msg);
if (newVersion != 0) {
// As of 0.9.12, don't send our info if the first message is
// doing the same (common when connecting to a floodfill).
// Also, put the info message after whatever we are trying to send
@@ -341,16 +353,27 @@ public class NTCPTransport extends TransportImpl {
// Prior to 0.9.12, Bob would not send his RI unless he had ours,
// but that's fixed in 0.9.12.
boolean shouldSkipInfo = false;
boolean shouldFlood = false;
I2NPMessage m = msg.getMessage();
if (m.getType() == DatabaseStoreMessage.MESSAGE_TYPE) {
DatabaseStoreMessage dsm = (DatabaseStoreMessage) m;
if (dsm.getKey().equals(_context.routerHash())) {
shouldSkipInfo = true;
shouldFlood = dsm.getReplyToken() != 0;
// TODO tell the NTCP2 con to flood in the handshake and mark success when sent
}
}
if (!shouldSkipInfo) {
// Queue the message, and our RI
// doesn't do anything yet, just enqueues it
con.send(msg);
con.enqueueInfoMessage();
} else if (shouldFlood || newVersion == 1) {
// Queue the message, which is a DSM of our RI
con.send(msg);
} else if (_log.shouldLog(Log.INFO)) {
// Send nothing, the handshake has the RI
// version == 2 && shouldSkipInfo && !shouldFlood
_log.info("SKIPPING INFO message: " + con);
}
@@ -365,6 +388,10 @@ public class NTCPTransport extends TransportImpl {
_log.error("Error opening a channel", ioe);
_context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1);
con.close();
afterSend(msg, false);
} catch (IllegalStateException ise) {
_log.error("Failed opening a channel", ise);
afterSend(msg, false);
}
} else {
con.send(msg);
@@ -677,6 +704,7 @@ public class NTCPTransport extends TransportImpl {
long tooOld = _context.clock().now() - 10*60*1000;
for (NTCPConnection con : _conByIdent.values()) {
// TODO skip isEstablished() check?
if (con.isEstablished() && con.getCreated() > tooOld)
skews.addElement(Long.valueOf(con.getClockSkew()));
}
@@ -696,7 +724,7 @@ public class NTCPTransport extends TransportImpl {
* As there is no timestamp in the first message, we can't detect
* something long-delayed. To be fixed in next version of NTCP.
*
* @param hxhi 32 bytes
* @param hxhi using first 8 bytes only
* @return valid
* @since 0.9.12
*/
@@ -740,19 +768,37 @@ public class NTCPTransport extends TransportImpl {
replaceAddress(addr);
} else if (port > 0) {
// all detected interfaces
for (InetAddress ia : getSavedLocalAddresses()) {
OrderedProperties props = new OrderedProperties();
props.setProperty(RouterAddress.PROP_HOST, ia.getHostAddress());
props.setProperty(RouterAddress.PROP_PORT, Integer.toString(port));
addNTCP2Options(props);
int cost = getDefaultCost(ia instanceof Inet6Address);
myAddress = new RouterAddress(STYLE, props, cost);
replaceAddress(myAddress);
Collection<InetAddress> addrs = getSavedLocalAddresses();
if (!addrs.isEmpty()) {
for (InetAddress ia : addrs) {
OrderedProperties props = new OrderedProperties();
props.setProperty(RouterAddress.PROP_HOST, ia.getHostAddress());
props.setProperty(RouterAddress.PROP_PORT, Integer.toString(port));
addNTCP2Options(props);
int cost = getDefaultCost(ia instanceof Inet6Address);
myAddress = new RouterAddress(STYLE, props, cost);
replaceAddress(myAddress);
}
} else if (_enableNTCP2) {
setOutboundNTCP2Address();
}
} else if (_enableNTCP2) {
setOutboundNTCP2Address();
}
// TransportManager.startListening() calls router.rebuildRouterInfo()
}
/**
* Outbound only, NTCP2 with "s" and "v" only
* @since 0.9.36
*/
private void setOutboundNTCP2Address() {
OrderedProperties props = new OrderedProperties();
addNTCP2Options(props);
RouterAddress myAddress = new RouterAddress(STYLE2, props, NTCP2_OUTBOUND_COST);
replaceAddress(myAddress);
}
/**
* Only called by externalAddressReceived().
* Calls replaceAddress() or removeAddress().
@@ -966,6 +1012,14 @@ public class NTCPTransport extends TransportImpl {
return _dhFactory.getBuilder();
}
/**
* @return null if not configured for NTCP2
* @since 0.9.36
*/
X25519KeyFactory getXDHFactory() {
return _xdhFactory;
}
/**
* Return an unused DH key builder
* to be put back onto the queue for reuse.
@@ -1071,15 +1125,17 @@ public class NTCPTransport extends TransportImpl {
}
/**
* Add the required options to the properties for a NTCP2 address
* Add the required options to the properties for a NTCP2 address.
* Host/port must already be set in props if they are going to be.
*
* @since 0.9.35
*/
private void addNTCP2Options(Properties props) {
if (!_enableNTCP2)
return;
props.setProperty("i", _b64Ntcp2StaticIV);
props.setProperty("n", NTCP2_PROTO_SHORT);
// only set i if we are not firewalled
if (props.containsKey("host"))
props.setProperty("i", _b64Ntcp2StaticIV);
props.setProperty("s", _b64Ntcp2StaticPubkey);
props.setProperty("v", NTCP2_VERSION);
}
@@ -1091,6 +1147,15 @@ public class NTCPTransport extends TransportImpl {
*/
boolean isNTCP2Enabled() { return _enableNTCP2; }
/**
* The static priv key
*
* @since 0.9.36
*/
byte[] getNTCP2StaticPubkey() {
return _ntcp2StaticPubkey;
}
/**
* The static priv key
*
@@ -1101,7 +1166,17 @@ public class NTCPTransport extends TransportImpl {
}
/**
* Get the valid NTCP version of this NTCP address.
* The static IV
*
* @since 0.9.36
*/
byte[] getNTCP2StaticIV() {
return _ntcp2StaticIV;
}
/**
* Get the valid NTCP version of Bob's NTCP address
* for our outbound connections as Alice.
*
* @return the valid version 1 or 2, or 0 if unusable
* @since 0.9.35
@@ -1116,18 +1191,21 @@ public class NTCPTransport extends TransportImpl {
} else if (style.equals(STYLE2)) {
if (!_enableNTCP2)
return 0;
rv = 2;
rv = NTCP2_INT_VERSION;
} else {
return 0;
}
if (addr.getOption("s") == null ||
// check version == "2" || version starts with "2,"
// and static key, and iv
String v = addr.getOption("v");
if (v == null ||
addr.getOption("i") == null ||
!NTCP2_VERSION.equals(addr.getOption("v")) ||
!NTCP2_PROTO_SHORT.equals(addr.getOption("n"))) {
addr.getOption("s") == null ||
(!v.equals(NTCP2_VERSION) && !v.startsWith(NTCP2_VERSION_ALT))) {
return (rv == 1) ? 1 : 0;
}
// todo validate s/i b64, or just catch it later?
return rv;
return NTCP2_INT_VERSION;
}
/**
@@ -1314,7 +1392,6 @@ public class NTCPTransport extends TransportImpl {
int cost;
if (oldAddr == null) {
cost = getDefaultCost(isIPv6);
addNTCP2Options(newProps);
} else {
cost = oldAddr.getCost();
newProps.putAll(oldAddr.getOptionsMap());
@@ -1436,6 +1513,7 @@ public class NTCPTransport extends TransportImpl {
return;
}
}
addNTCP2Options(newProps);
// stopListening stops the pumper, readers, and writers, so required even if
// oldAddr == null since startListening starts them all again

View File

@@ -3,6 +3,7 @@ package net.i2p.router.transport.ntcp;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import net.i2p.crypto.SigType;
import net.i2p.data.DataFormatException;
@@ -33,13 +34,13 @@ class OutboundEstablishState extends EstablishBase {
}
/**
* parse the contents of the buffer as part of the handshake. if the
* handshake is completed and there is more data remaining, the data are
* copieed out so that the next read will be the (still encrypted) remaining
* data (available from getExtraBytes)
* Parse the contents of the buffer as part of the handshake.
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*
* If there are additional data in the buffer after the handshake is complete,
* the EstablishState is responsible for passing it to NTCPConnection.
*/
@Override
public synchronized void receive(ByteBuffer src) {
@@ -65,124 +66,129 @@ class OutboundEstablishState extends EstablishBase {
*
* Caller must synch.
*
* FIXME none of the _state comparisons use _stateLock, but whole thing
* is synchronized, should be OK. See isComplete()
*/
private void receiveOutbound(ByteBuffer src) {
// recv Y+E(H(X+Y)+tsB, sk, Y[239:255])
// Read in Y, which is the first part of message #2
while (_state == State.OB_SENT_X && src.hasRemaining()) {
byte c = src.get();
_Y[_received++] = c;
if (_received >= XY_SIZE) {
try {
_dh.setPeerPublicValue(_Y);
_dh.getSessionKey(); // force the calc
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
changeState(State.OB_GOT_Y);
} catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
_context.statManager().addRateData("ntcp.invalidDH", 1);
fail("Invalid X", e);
return;
}
if (_state == State.OB_SENT_X && src.hasRemaining()) {
int toGet = Math.min(src.remaining(), XY_SIZE - _received);
src.get(_Y, _received, toGet);
_received += toGet;
if (_received < XY_SIZE)
return;
try {
_dh.setPeerPublicValue(_Y);
_dh.getSessionKey(); // force the calc
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
changeState(State.OB_GOT_Y);
_received = 0;
} catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
_context.statManager().addRateData("ntcp.invalidDH", 1);
fail("Invalid X", e);
return;
} catch (IllegalStateException ise) {
// setPeerPublicValue()
fail("reused keys?", ise);
return;
}
}
// Read in Y, which is the first part of message #2
// Read in the rest of message #2
while (_state == State.OB_GOT_Y && src.hasRemaining()) {
int i = _received-XY_SIZE;
_received++;
byte c = src.get();
_e_hXY_tsB[i] = c;
if (i+1 >= HXY_TSB_PAD_SIZE) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "received _e_hXY_tsB fully");
byte hXY_tsB[] = new byte[HXY_TSB_PAD_SIZE];
_context.aes().decrypt(_e_hXY_tsB, 0, hXY_tsB, 0, _dh.getSessionKey(), _Y, XY_SIZE-AES_SIZE, HXY_TSB_PAD_SIZE);
byte XY[] = new byte[XY_SIZE + XY_SIZE];
System.arraycopy(_X, 0, XY, 0, XY_SIZE);
System.arraycopy(_Y, 0, XY, XY_SIZE, XY_SIZE);
byte[] h = SimpleByteCache.acquire(HXY_SIZE);
_context.sha().calculateHash(XY, 0, XY_SIZE + XY_SIZE, h, 0);
if (!DataHelper.eq(h, 0, hXY_tsB, 0, HXY_SIZE)) {
SimpleByteCache.release(h);
_context.statManager().addRateData("ntcp.invalidHXY", 1);
fail("Invalid H(X+Y) - mitm attack attempted?");
return;
}
if (_state == State.OB_GOT_Y && src.hasRemaining()) {
int toGet = Math.min(src.remaining(), HXY_TSB_PAD_SIZE - _received);
src.get(_e_hXY_tsB, _received, toGet);
_received += toGet;
if (_received < HXY_TSB_PAD_SIZE)
return;
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "received _e_hXY_tsB fully");
byte hXY_tsB[] = new byte[HXY_TSB_PAD_SIZE];
_context.aes().decrypt(_e_hXY_tsB, 0, hXY_tsB, 0, _dh.getSessionKey(), _Y, XY_SIZE-AES_SIZE, HXY_TSB_PAD_SIZE);
byte XY[] = new byte[XY_SIZE + XY_SIZE];
System.arraycopy(_X, 0, XY, 0, XY_SIZE);
System.arraycopy(_Y, 0, XY, XY_SIZE, XY_SIZE);
byte[] h = SimpleByteCache.acquire(HXY_SIZE);
_context.sha().calculateHash(XY, 0, XY_SIZE + XY_SIZE, h, 0);
if (!DataHelper.eq(h, 0, hXY_tsB, 0, HXY_SIZE)) {
SimpleByteCache.release(h);
changeState(State.OB_GOT_HXY);
// their (Bob's) timestamp in seconds
_tsB = DataHelper.fromLong(hXY_tsB, HXY_SIZE, 4);
long now = _context.clock().now();
// rtt from sending #1 to receiving #2
long rtt = now - _con.getCreated();
// our (Alice's) timestamp in seconds
_tsA = (now + 500) / 1000;
_peerSkew = (now - (_tsB * 1000) - (rtt / 2) + 500) / 1000;
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"h(X+Y) is correct, skew = " + _peerSkew);
// the skew is not authenticated yet, but it is certainly fatal to
// the establishment, so fail hard if appropriate
long diff = 1000*Math.abs(_peerSkew);
if (!_context.clock().getUpdatedSuccessfully()) {
// Adjust the clock one time in desperation
// We are Alice, he is Bob, adjust to match Bob
_context.clock().setOffset(1000 * (0 - _peerSkew), true);
_peerSkew = 0;
if (diff != 0)
_log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidOutboundSkew", diff);
_transport.markReachable(_con.getRemotePeer().calculateHash(), false);
// Only banlist if we know what time it is
_context.banlist().banlistRouter(DataHelper.formatDuration(diff),
_con.getRemotePeer().calculateHash(),
_x("Excessive clock skew: {0}"));
_transport.setLastBadSkew(_peerSkew);
fail("Clocks too skewed (" + diff + " ms)", null, true);
return;
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug(prefix()+"Clock skew: " + diff + " ms");
}
// now prepare and send our response
// send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31])
int sigSize = XY_SIZE + XY_SIZE + HXY_SIZE + 4+4;//+12;
byte preSign[] = new byte[sigSize];
System.arraycopy(_X, 0, preSign, 0, XY_SIZE);
System.arraycopy(_Y, 0, preSign, XY_SIZE, XY_SIZE);
System.arraycopy(_con.getRemotePeer().calculateHash().getData(), 0, preSign, XY_SIZE + XY_SIZE, HXY_SIZE);
DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE, 4, _tsA);
DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE + 4, 4, _tsB);
// hXY_tsB has 12 bytes of padding (size=48, tsB=4 + hXY=32)
Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey());
byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray();
// handle variable signature size
int min = 2 + ident.length + 4 + sig.length();
int rem = min % AES_SIZE;
int padding = 0;
if (rem > 0)
padding = AES_SIZE - rem;
byte preEncrypt[] = new byte[min+padding];
DataHelper.toLong(preEncrypt, 0, 2, ident.length);
System.arraycopy(ident, 0, preEncrypt, 2, ident.length);
DataHelper.toLong(preEncrypt, 2+ident.length, 4, _tsA);
if (padding > 0)
_context.random().nextBytes(preEncrypt, 2 + ident.length + 4, padding);
System.arraycopy(sig.getData(), 0, preEncrypt, 2+ident.length+4+padding, sig.length());
_prevEncrypted = new byte[preEncrypt.length];
_context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(),
_hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-AES_SIZE, preEncrypt.length);
changeState(State.OB_SENT_RI);
_transport.getPumper().wantsWrite(_con, _prevEncrypted);
_context.statManager().addRateData("ntcp.invalidHXY", 1);
fail("Invalid H(X+Y) - mitm attack attempted?");
return;
}
SimpleByteCache.release(h);
changeState(State.OB_GOT_HXY);
_received = 0;
// their (Bob's) timestamp in seconds
_tsB = DataHelper.fromLong(hXY_tsB, HXY_SIZE, 4);
long now = _context.clock().now();
// rtt from sending #1 to receiving #2
long rtt = now - _con.getCreated();
// our (Alice's) timestamp in seconds
_tsA = (now + 500) / 1000;
_peerSkew = (now - (_tsB * 1000) - (rtt / 2) + 500) / 1000;
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix()+"h(X+Y) is correct, skew = " + _peerSkew);
// the skew is not authenticated yet, but it is certainly fatal to
// the establishment, so fail hard if appropriate
long diff = 1000*Math.abs(_peerSkew);
if (!_context.clock().getUpdatedSuccessfully()) {
// Adjust the clock one time in desperation
// We are Alice, he is Bob, adjust to match Bob
_context.clock().setOffset(1000 * (0 - _peerSkew), true);
_peerSkew = 0;
if (diff != 0)
_log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
} else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
_context.statManager().addRateData("ntcp.invalidOutboundSkew", diff);
_transport.markReachable(_con.getRemotePeer().calculateHash(), false);
// Only banlist if we know what time it is
_context.banlist().banlistRouter(DataHelper.formatDuration(diff),
_con.getRemotePeer().calculateHash(),
_x("Excessive clock skew: {0}"));
_transport.setLastBadSkew(_peerSkew);
fail("Clocks too skewed (" + diff + " ms)", null, true);
return;
} else if (_log.shouldLog(Log.DEBUG)) {
_log.debug(prefix()+"Clock skew: " + diff + " ms");
}
// now prepare and send our response
// send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31])
int sigSize = XY_SIZE + XY_SIZE + HXY_SIZE + 4+4;//+12;
byte preSign[] = new byte[sigSize];
System.arraycopy(_X, 0, preSign, 0, XY_SIZE);
System.arraycopy(_Y, 0, preSign, XY_SIZE, XY_SIZE);
System.arraycopy(_con.getRemotePeer().calculateHash().getData(), 0, preSign, XY_SIZE + XY_SIZE, HXY_SIZE);
DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE, 4, _tsA);
DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE + 4, 4, _tsB);
// hXY_tsB has 12 bytes of padding (size=48, tsB=4 + hXY=32)
Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey());
byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray();
// handle variable signature size
int min = 2 + ident.length + 4 + sig.length();
int rem = min % AES_SIZE;
int padding = 0;
if (rem > 0)
padding = AES_SIZE - rem;
byte preEncrypt[] = new byte[min+padding];
DataHelper.toLong(preEncrypt, 0, 2, ident.length);
System.arraycopy(ident, 0, preEncrypt, 2, ident.length);
DataHelper.toLong(preEncrypt, 2+ident.length, 4, _tsA);
if (padding > 0)
_context.random().nextBytes(preEncrypt, 2 + ident.length + 4, padding);
System.arraycopy(sig.getData(), 0, preEncrypt, 2+ident.length+4+padding, sig.length());
_prevEncrypted = new byte[preEncrypt.length];
_context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(),
_hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-AES_SIZE, preEncrypt.length);
changeState(State.OB_SENT_RI);
_transport.getPumper().wantsWrite(_con, _prevEncrypted);
}
// Read in message #4
@@ -205,7 +211,7 @@ class OutboundEstablishState extends EstablishBase {
_log.debug(prefix() + "receiving E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev) (remaining? " +
src.hasRemaining() + ")");
} else {
off = _received - XY_SIZE - HXY_TSB_PAD_SIZE;
off = _received;
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "continuing to receive E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev) (remaining? " +
src.hasRemaining() + " off=" + off + " recv=" + _received + ")");
@@ -223,6 +229,7 @@ class OutboundEstablishState extends EstablishBase {
// handle variable signature size
SigType type = _con.getRemotePeer().getSigningPublicKey().getType();
int siglen = type.getSigLen();
// we don't need to do this if no padding!
byte bobSigData[] = new byte[siglen];
System.arraycopy(bobSig, 0, bobSigData, 0, siglen);
Signature sig = new Signature(type, bobSigData);
@@ -242,23 +249,36 @@ class OutboundEstablishState extends EstablishBase {
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "signature verified from Bob. done!");
prepareExtra(src);
byte nextWriteIV[] = SimpleByteCache.acquire(AES_SIZE);
System.arraycopy(_prevEncrypted, _prevEncrypted.length-AES_SIZE, nextWriteIV, 0, AES_SIZE);
// this does not copy the nextWriteIV, do not release to cache
// We are Alice, he is Bob, clock skew is Bob - Alice
_con.finishOutboundEstablishment(_dh.getSessionKey(), _peerSkew, nextWriteIV, _e_bobSig); // skew in seconds
// skew in seconds
_con.finishOutboundEstablishment(_dh.getSessionKey(), _peerSkew, nextWriteIV, _e_bobSig);
changeState(State.VERIFIED);
if (src.hasRemaining()) {
// process "extra" data
// This is fairly common for outbound, where Bob may send his updated RI
if (_log.shouldInfo())
_log.info("extra data " + src.remaining() + " on " + this);
_con.recvEncryptedI2NP(src);
}
releaseBufs(true);
// if socket gets closed this will be null - prevent NPE
InetAddress ia = _con.getChannel().socket().getInetAddress();
if (ia != null)
_transport.setIP(_con.getRemotePeer().calculateHash(), ia.getAddress());
changeState(State.VERIFIED);
}
return;
}
}
}
// check for remaining data
if ((_state == State.VERIFIED || _state == State.CORRUPT) && src.hasRemaining()) {
if (_log.shouldWarn())
_log.warn("Received unexpected " + src.remaining() + " on " + this, new Exception());
}
}
/**
@@ -266,13 +286,12 @@ class OutboundEstablishState extends EstablishBase {
* We are establishing an outbound connection, so prepare ourselves by
* queueing up the write of the first part of the handshake
* This method sends message #1 to Bob.
*
* @throws IllegalStateException
*/
@Override
public synchronized void prepareOutbound() {
boolean shouldSend;
synchronized(_stateLock) {
shouldSend = _state == State.OB_INIT;
}
if (shouldSend) {
if (_state == State.OB_INIT) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(prefix() + "send X");
byte toWrite[] = new byte[XY_SIZE + _hX_xor_bobIdentHash.length];
@@ -281,8 +300,7 @@ class OutboundEstablishState extends EstablishBase {
changeState(State.OB_SENT_X);
_transport.getPumper().wantsWrite(_con, toWrite);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn(prefix() + "unexpected prepareOutbound()");
throw new IllegalStateException(prefix() + "unexpected prepareOutbound()");
}
}
@@ -293,6 +311,7 @@ class OutboundEstablishState extends EstablishBase {
@Override
protected void releaseBufs(boolean isVerified) {
super.releaseBufs(isVerified);
Arrays.fill(_Y, (byte) 0);
SimpleByteCache.release(_Y);
}
}

View File

@@ -0,0 +1,520 @@
package net.i2p.router.transport.ntcp;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import com.southernstorm.noise.protocol.CipherState;
import com.southernstorm.noise.protocol.CipherStatePair;
import com.southernstorm.noise.protocol.HandshakeState;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.router.RouterInfo;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.ntcp.NTCP2Payload.Block;
import net.i2p.util.Log;
/**
*
* NTCP 2 only. We are Alice.
*
* Also contains static constants and methods used by InboundEstablishState for NTCP2.
* Does not extend EstablishBase.
*
* @since 0.9.35
*/
class OutboundNTCP2State implements EstablishState {
private final RouterContext _context;
private final Log _log;
private final NTCPTransport _transport;
private final NTCPConnection _con;
private final byte[] _tmp;
/** bytes received so far */
private int _received;
private long _peerSkew;
public static final int KEY_SIZE = 32;
public static final int MAC_SIZE = 16;
public static final int IV_SIZE = 16;
public static final int OPTIONS1_SIZE = 16;
/** 64 */
public static final int MSG1_SIZE = KEY_SIZE + OPTIONS1_SIZE + MAC_SIZE;
/** one less than 288 byte NTCP1 msg 1 */
public static final int TOTAL1_MAX = 287;
private static final int PADDING1_MAX = 64;
private static final int PADDING3_MAX = 64;
public static final int OPTIONS2_SIZE = 16;
public static final int MSG2_SIZE = KEY_SIZE + OPTIONS2_SIZE + MAC_SIZE;
/** 48 */
public static final int MSG3P1_SIZE = KEY_SIZE + MAC_SIZE;
private static final int OPTIONS3_SIZE = 12;
/** in SECONDS */
public static final long MAX_SKEW = 60;
// SipHash KDF things
private static final byte[] ZEROLEN = new byte[0];
private static final byte[] ONE = new byte[] { 1 };
public static final byte[] ZEROKEY = new byte[KEY_SIZE];
/** for SipHash keygen */
private static final byte[] ASK = new byte[] { (byte) 'a', (byte) 's', (byte) 'k', 1 };
/** for SipHash keygen */
private static final byte[] SIPHASH = DataHelper.getASCII("siphash");
private final Object _stateLock = new Object();
private State _state;
private final HandshakeState _handshakeState;
private final RouterInfo _aliceRI;
private final int _aliceRISize;
private int _padlen1;
private int _padlen2;
private final int _padlen3;
private final SessionKey _bobHash;
private final byte[] _bobIV;
private enum State {
OB_INIT,
/** sent 1 */
OB_SENT_X,
/** sent 1, got 2 but not padding */
OB_GOT_HXY,
/** sent 1, got 2 incl. padding */
OB_GOT_PADDING,
/** sent 1, got 2 incl. padding, sent 3 */
VERIFIED,
CORRUPT
}
public OutboundNTCP2State(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_transport = transport;
_con = con;
_state = State.OB_INIT;
_tmp = new byte[TOTAL1_MAX];
try {
_handshakeState = new HandshakeState(HandshakeState.INITIATOR, _transport.getXDHFactory());
} catch (GeneralSecurityException gse) {
throw new IllegalStateException("bad proto", gse);
}
// save because we must know length
_aliceRI = ctx.router().getRouterInfo();
if (_aliceRI == null)
throw new IllegalStateException("no RI yet");
_aliceRISize = _aliceRI.toByteArray().length;
_padlen3 = _context.random().nextInt(PADDING3_MAX);
Hash h = _con.getRemotePeer().calculateHash();
_bobHash = new SessionKey(h.getData());
String s = _con.getRemoteAddress().getOption("i");
if (s == null)
throw new IllegalArgumentException("no NTCP2 IV");
_bobIV = Base64.decode(s);
if (_bobIV == null || _bobIV.length != IV_SIZE ||
DataHelper.eq(_bobIV, 0, ZEROKEY, 0, IV_SIZE))
throw new IllegalArgumentException("bad NTCP2 IV");
}
private void changeState(State state) {
synchronized (_stateLock) {
_state = state;
}
}
/**
* Parse the contents of the buffer as part of the handshake.
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*/
@Override
public synchronized void receive(ByteBuffer src) {
if (_state == State.VERIFIED || _state == State.CORRUPT)
throw new IllegalStateException(this + "received unexpected data on " + _con);
if (_log.shouldLog(Log.DEBUG))
_log.debug(this + "Receiving: " + src.remaining() + " Received: " + _received);
if (!src.hasRemaining())
return; // nothing to receive
receiveOutbound(src);
}
/** did the handshake fail for some reason? */
public boolean isCorrupt() {
synchronized (_stateLock) {
return _state == State.CORRUPT;
}
}
/**
* Don't synchronize this, deadlocks all over.
*
* @return is the handshake complete and valid?
*/
public boolean isComplete() {
synchronized (_stateLock) {
return _state == State.VERIFIED;
}
}
/**
* Get the NTCP version
* @return 2
*/
public int getVersion() { return 2; }
/**
* We are Alice.
* We are establishing an outbound connection, so prepare ourselves by
* writing the first message in the handshake.
* Encrypt X and write X, the options block, and the padding.
* Save last half of encrypted X as IV for message 2 AES.
*
* @throws IllegalStateException
*/
public synchronized void prepareOutbound() {
if (!(_state == State.OB_INIT)) {
throw new IllegalStateException(this + "unexpected prepareOutbound()");
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(this + "send X");
byte options[] = new byte[OPTIONS1_SIZE];
options[1] = NTCPTransport.NTCP2_INT_VERSION;
int padlen1 = _context.random().nextInt(PADDING1_MAX);
DataHelper.toLong(options, 2, 2, padlen1);
int msg3p2len = NTCP2Payload.BLOCK_HEADER_SIZE + 1 + _aliceRISize +
NTCP2Payload.BLOCK_HEADER_SIZE + OPTIONS3_SIZE +
NTCP2Payload.BLOCK_HEADER_SIZE + _padlen3 +
MAC_SIZE;
DataHelper.toLong(options, 4, 2, msg3p2len);
long now = (_context.clock().now() + 500) / 1000;
DataHelper.toLong(options, 8, 4, now);
// set keys
String s = _con.getRemoteAddress().getOption("s");
if (s == null) {
fail("no NTCP2 S");
return;
}
byte[] bk = Base64.decode(s);
if (bk == null || bk.length != KEY_SIZE ||
DataHelper.eq(bk, 0, ZEROKEY, 0, KEY_SIZE)) {
fail("bad NTCP2 S: " + s);
return;
}
_handshakeState.getRemotePublicKey().setPublicKey(bk, 0);
_handshakeState.getLocalKeyPair().setPublicKey(_transport.getNTCP2StaticPubkey(), 0);
_handshakeState.getLocalKeyPair().setPrivateKey(_transport.getNTCP2StaticPrivkey(), 0);
// output to _tmp
try {
_handshakeState.start();
if (_log.shouldWarn())
_log.warn("After start: " + _handshakeState.toString());
_handshakeState.writeMessage(_tmp, 0, options, 0, OPTIONS1_SIZE);
} catch (GeneralSecurityException gse) {
// buffer length error
if (!_log.shouldWarn())
_log.error("Bad msg 1 out", gse);
fail("Bad msg 1 out", gse);
return;
} catch (RuntimeException re) {
if (!_log.shouldWarn())
_log.error("Bad msg 1 out", re);
fail("Bad msg 1 out", re);
return;
}
if (_log.shouldWarn())
_log.warn("After msg 1: " + _handshakeState.toString());
// encrypt key before writing
_context.aes().encrypt(_tmp, 0, _tmp, 0, _bobHash, _bobIV, KEY_SIZE);
// overwrite _bobIV with last 16 encrypted bytes, CBC for message 2
System.arraycopy(_tmp, KEY_SIZE - IV_SIZE, _bobIV, 0, IV_SIZE);
// add padding
if (padlen1 > 0) {
_context.random().nextBytes(_tmp, MSG1_SIZE, padlen1);
_handshakeState.mixHash(_tmp, MSG1_SIZE, padlen1);
if (_log.shouldWarn())
_log.warn("After mixhash padding " + padlen1 + " msg 1: " + _handshakeState.toString());
}
changeState(State.OB_SENT_X);
// send it all at once
_transport.getPumper().wantsWrite(_con, _tmp, 0, MSG1_SIZE + padlen1);
}
/**
* We are Alice, so receive these bytes as part of an outbound connection.
* This method receives message 2, and sends message 3.
*
* IV (CBC from msg 1) must be in _bobIV
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*
* Caller must synch
*/
private void receiveOutbound(ByteBuffer src) {
// Read in message #2 except for the padding
if (_state == State.OB_SENT_X && src.hasRemaining()) {
int toGet = Math.min(src.remaining(), MSG2_SIZE - _received);
src.get(_tmp, _received, toGet);
_received += toGet;
if (_received < MSG2_SIZE)
return;
_context.aes().decrypt(_tmp, 0, _tmp, 0, _bobHash, _bobIV, KEY_SIZE);
if (DataHelper.eqCT(_tmp, 0, ZEROKEY, 0, KEY_SIZE)) {
fail("Bad msg 2, Y = 0");
return;
}
byte[] options2 = new byte[OPTIONS2_SIZE];
try {
_handshakeState.readMessage(_tmp, 0, MSG2_SIZE, options2, 0);
} catch (GeneralSecurityException gse) {
fail("Bad msg 2, Y = " + Base64.encode(_tmp, 0, KEY_SIZE), gse);
return;
} catch (RuntimeException re) {
fail("Bad msg 2, Y = " + Base64.encode(_tmp, 0, KEY_SIZE), re);
return;
}
if (_log.shouldWarn())
_log.warn("After msg 2: " + _handshakeState.toString());
_padlen2 = (int) DataHelper.fromLong(options2, 2, 2);
long tsB = DataHelper.fromLong(options2, 8, 4);
long now = _context.clock().now();
// rtt from sending #1 to receiving #2
long rtt = now - _con.getCreated();
_peerSkew = (now - (tsB * 1000) - (rtt / 2) + 500) / 1000;
if (_peerSkew > MAX_SKEW || _peerSkew < 0 - MAX_SKEW) {
fail("Clock Skew: " + _peerSkew, null, true);
return;
}
changeState(State.OB_GOT_HXY);
_received = 0;
}
// Read in the padding for message #2
if (_state == State.OB_GOT_HXY && src.hasRemaining()) {
int toGet = Math.min(src.remaining(), _padlen2 - _received);
src.get(_tmp, _received, toGet);
_received += toGet;
if (_received < _padlen2)
return;
if (_padlen2 > 0) {
_handshakeState.mixHash(_tmp, 0, _padlen2);
if (_log.shouldWarn())
_log.warn("After mixhash padding " + _padlen2 + " msg 2: " + _handshakeState.toString());
}
changeState(State.OB_GOT_PADDING);
if (src.hasRemaining()) {
// Outbound conn can never have extra data after msg 2
fail("Extra data after msg 2: " + src.remaining());
return;
}
prepareOutbound3();
return;
}
// check for remaining data
if ((_state == State.VERIFIED || _state == State.CORRUPT) && src.hasRemaining()) {
if (_log.shouldWarn())
_log.warn("Received unexpected " + src.remaining() + " on " + this, new Exception());
}
}
/**
* We are Alice.
* Write the 3rd message.
*
* Caller must synch
*/
private void prepareOutbound3() {
// create msg 3 part 2 payload
// payload without MAC
int msg3p2len = NTCP2Payload.BLOCK_HEADER_SIZE + 1 + _aliceRISize +
NTCP2Payload.BLOCK_HEADER_SIZE + OPTIONS3_SIZE +
NTCP2Payload.BLOCK_HEADER_SIZE + _padlen3;
// total for parts 1 and 2 with mac
byte[] tmp = new byte[MSG3P1_SIZE + msg3p2len + MAC_SIZE];
List<Block> blocks = new ArrayList<Block>(3);
Block block = new NTCP2Payload.RIBlock(_aliceRI, false);
blocks.add(block);
byte[] opts = new byte[OPTIONS3_SIZE];
opts[0] = NTCPConnection.PADDING_MIN_DEFAULT_INT;
opts[1] = NTCPConnection.PADDING_MAX_DEFAULT_INT;
opts[2] = NTCPConnection.PADDING_MIN_DEFAULT_INT;
opts[3] = NTCPConnection.PADDING_MAX_DEFAULT_INT;
DataHelper.toLong(opts, 4, 2, NTCPConnection.DUMMY_DEFAULT);
DataHelper.toLong(opts, 6, 2, NTCPConnection.DUMMY_DEFAULT);
DataHelper.toLong(opts, 8, 2, NTCPConnection.DELAY_DEFAULT);
DataHelper.toLong(opts, 10, 2, NTCPConnection.DELAY_DEFAULT);
block = new NTCP2Payload.OptionsBlock(opts);
blocks.add(block);
// all zeros is fine here
//block = new NTCP2Payload.PaddingBlock(_context, _padlen3);
block = new NTCP2Payload.PaddingBlock(_padlen3);
blocks.add(block);
// we put it at the offset so it doesn't get overwritten by HandshakeState
// when it copies the static key in there
int newoff = NTCP2Payload.writePayload(tmp, MSG3P1_SIZE, blocks);
int expect = MSG3P1_SIZE + msg3p2len;
if (newoff != expect)
throw new IllegalStateException("msg3 size mismatch expected " + expect + " got " + newoff);
try {
_handshakeState.writeMessage(tmp, 0, tmp, MSG3P1_SIZE, msg3p2len);
} catch (GeneralSecurityException gse) {
// buffer length error
if (!_log.shouldWarn())
_log.error("Bad msg 3 out", gse);
fail("Bad msg 3 out", gse);
return;
} catch (RuntimeException re) {
if (!_log.shouldWarn())
_log.error("Bad msg 3 out", re);
fail("Bad msg 3 out", re);
return;
}
// send it all at once
if (_log.shouldWarn())
_log.warn("Sending msg3, part 1 is:\n" + net.i2p.util.HexDump.dump(tmp, 0, MSG3P1_SIZE));
_transport.getPumper().wantsWrite(_con, tmp);
if (_log.shouldWarn())
_log.warn("After msg 3: " + _handshakeState.toString());
setDataPhase();
}
/**
* KDF for data phase,
* then calls con.finishOutboundEstablishment(),
* passing over the final keys and states to the con.
*
* Caller must synch
*/
private void setDataPhase() {
// Data phase ChaChaPoly keys
CipherStatePair ckp = _handshakeState.split();
CipherState rcvr = ckp.getReceiver();
CipherState sender = ckp.getSender();
byte[] k_ab = sender.getKey();
byte[] k_ba = rcvr.getKey();
// Data phase SipHash keys
byte[][] sipkeys = generateSipHashKeys(_context, _handshakeState);
byte[] sip_ab = sipkeys[0];
byte[] sip_ba = sipkeys[1];
if (_log.shouldWarn()) {
_log.warn("Finished establishment for " + this +
"\nGenerated ChaCha key for A->B: " + Base64.encode(k_ab) +
"\nGenerated ChaCha key for B->A: " + Base64.encode(k_ba) +
"\nGenerated SipHash key for A->B: " + Base64.encode(sip_ab) +
"\nGenerated SipHash key for B->A: " + Base64.encode(sip_ba));
}
// skew in seconds
_con.finishOutboundEstablishment(sender, rcvr, sip_ab, sip_ba, _peerSkew);
changeState(State.VERIFIED);
// no extra data possible
releaseBufs(true);
_handshakeState.destroy();
Arrays.fill(sip_ab, (byte) 0);
Arrays.fill(sip_ba, (byte) 0);
}
/**
* KDF for SipHash
*
* @return rv[0] is sip_ab, rv[1] is sip_ba
*/
static byte[][] generateSipHashKeys(RouterContext ctx, HandshakeState state) {
// TODO use noise HMAC or HKDF method instead?
// ask_master = HKDF(ck, zerolen, info="ask")
SessionKey tk = new SessionKey(state.getChainingKey());
byte[] temp_key = doHMAC(ctx, tk, ZEROLEN);
tk = new SessionKey(temp_key);
byte[] ask_master = doHMAC(ctx, tk, ASK);
byte[] tmp = new byte[32 + SIPHASH.length];
byte[] hash = state.getHash();
System.arraycopy(hash, 0, tmp, 0, 32);
System.arraycopy(SIPHASH, 0, tmp, 32, SIPHASH.length);
tk = new SessionKey(ask_master);
temp_key = doHMAC(ctx, tk, tmp);
tk = new SessionKey(temp_key);
byte[] sip_master = doHMAC(ctx, tk, ONE);
tk = new SessionKey(sip_master);
temp_key = doHMAC(ctx, tk, ZEROLEN);
tk = new SessionKey(temp_key);
// Output 1
byte[] sip_ab = doHMAC(ctx, tk, ONE);
// Output 2
tmp = new byte[KEY_SIZE + 1];
System.arraycopy(sip_ab, 0, tmp, 0, 32);
tmp[32] = 2;
byte[] sip_ba = doHMAC(ctx, tk, tmp);
Arrays.fill(temp_key, (byte) 0);
Arrays.fill(tmp, (byte) 0);
return new byte[][] { sip_ab, sip_ba };
}
private static byte[] doHMAC(RouterContext ctx, SessionKey key, byte data[]) {
byte[] rv = new byte[32];
ctx.hmac256().calculate(key, data, 0, data.length, rv, 0);
return rv;
}
/**
* Release resources on timeout.
* @param e may be null
* @since 0.9.16
*/
public synchronized void close(String reason, Exception e) {
fail(reason, e);
}
protected void fail(String reason) { fail(reason, null); }
protected void fail(String reason, Exception e) { fail(reason, e, false); }
protected synchronized void fail(String reason, Exception e, boolean bySkew) {
if (_state == State.CORRUPT || _state == State.VERIFIED)
return;
changeState(State.CORRUPT);
if (_log.shouldWarn()) {
_log.warn(this + "Failed to establish: " + reason, e);
_log.warn("State at failure: " + _handshakeState.toString());
}
_handshakeState.destroy();
if (!bySkew)
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1);
releaseBufs(false);
}
/**
* Only call once.
*
* Caller must synch
*/
private void releaseBufs(boolean isVerified) {
Arrays.fill(_tmp, (byte) 0);
// TODO
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("OBES2 ");
buf.append(System.identityHashCode(this));
buf.append(' ').append(_state);
if (_con.isEstablished()) buf.append(" established");
buf.append(": ");
return buf.toString();
}
}

View File

@@ -149,8 +149,6 @@ class Reader {
if ((buf = con.getNextReadBuf()) == null)
return;
EstablishState est = con.getEstablishState();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]");
if (est.isComplete()) {
// why is it complete yet !con.isEstablished?
@@ -163,20 +161,13 @@ class Reader {
est.receive(buf);
EventPumper.releaseBuf(buf);
if (est.isCorrupt()) {
if (_log.shouldLog(Log.WARN))
_log.warn("closing connection on establishment because: " +est.getError(), est.getException());
if (!est.getFailedBySkew())
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1);
con.close();
return;
}
if (est.isComplete() && est.getExtraBytes() != null)
con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes()));
// EstablishState is responsible for passing "extra" data to the con
}
while (!con.isClosed() && (buf = con.getNextReadBuf()) != null) {
// decrypt the data and push it into an i2np message
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)");
con.recvEncryptedI2NP(buf);
EventPumper.releaseBuf(buf);
}