diff --git a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java index 26784ed0d2a413a4e754e629c7ef7b52165bb125..ff43d294769f54cb2f68850be8ce4129c6635971 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java @@ -123,7 +123,7 @@ class EstablishState { private boolean _failedBySkew; private static final int MIN_RI_SIZE = 387; - private static final int MAX_RI_SIZE = 2048; + private static final int MAX_RI_SIZE = 3072; private static final int AES_SIZE = 16; private static final int XY_SIZE = 256; @@ -256,14 +256,6 @@ class EstablishState { while (_state == State.IB_INIT && src.hasRemaining()) { byte c = src.get(); _X[_received++] = c; - //if (_log.shouldLog(Log.DEBUG)) _log.debug("recv x" + (int)c + " received=" + _received); - //if (_received >= _X.length) { - // if (isCheckInfo(_context, _context.routerHash(), _X)) { - // _context.statManager().addRateData("ntcp.inboundCheckConnection", 1); - // fail("Incoming connection was a check connection"); - // return; - // } - //} if (_received >= XY_SIZE) changeState(State.IB_GOT_X); } @@ -272,7 +264,6 @@ class EstablishState { _received++; byte c = src.get(); _hX_xor_bobIdentHash[i] = c; - //if (_log.shouldLog(Log.DEBUG)) _log.debug("recv bih" + (int)c + " received=" + _received); if (i >= HXY_SIZE - 1) changeState(State.IB_GOT_HX); } @@ -287,12 +278,6 @@ class EstablishState { byte[] realXor = SimpleByteCache.acquire(HXY_SIZE); _context.sha().calculateHash(_X, 0, XY_SIZE, realXor, 0); xor32(_context.routerHash().getData(), realXor); - //if (_log.shouldLog(Log.DEBUG)) { - //_log.debug(prefix()+"_X = " + Base64.encode(_X)); - // _log.debug(prefix()+"hx = " + Base64.encode(hX.getData())); - // _log.debug(prefix()+"bih=" + Base64.encode(_context.routerHash().getData())); - // _log.debug(prefix()+"xor=" + Base64.encode(realXor)); - //} if (!DataHelper.eq(realXor, _hX_xor_bobIdentHash)) { SimpleByteCache.release(realXor); _context.statManager().addRateData("ntcp.invalidHXxorBIH", 1); @@ -327,11 +312,8 @@ class EstablishState { System.arraycopy(hxy, 0, toEncrypt, 0, HXY_SIZE); byte tsB[] = DataHelper.toLong(4, _tsB); System.arraycopy(tsB, 0, toEncrypt, HXY_SIZE, tsB.length); - //DataHelper.toLong(toEncrypt, hxy.getData().length, 4, _tsB); _context.random().nextBytes(toEncrypt, HXY_SIZE + 4, 12); if (_log.shouldLog(Log.DEBUG)) { - //_log.debug(prefix()+"Y="+Base64.encode(_Y)); - //_log.debug(prefix()+"x+y="+Base64.encode(xy)); _log.debug(prefix()+"h(x+y)="+Base64.encode(hxy)); _log.debug(prefix() + "tsb = " + _tsB); _log.debug(prefix()+"unencrypted H(X+Y)+tsB+padding: " + Base64.encode(toEncrypt)); @@ -364,8 +346,6 @@ class EstablishState { _state == State.IB_GOT_RI_SIZE || _state == State.IB_GOT_RI) && src.hasRemaining()) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(prefix()+"Encrypted bytes available (" + src.hasRemaining() + ")"); // Collect a 16-byte block while (_curEncryptedOffset < AES_SIZE && src.hasRemaining()) { _curEncrypted[_curEncryptedOffset++] = src.get(); @@ -375,8 +355,6 @@ class EstablishState { if (_curEncryptedOffset >= AES_SIZE) { _context.aes().decrypt(_curEncrypted, 0, _curDecrypted, 0, _dh.getSessionKey(), _prevEncrypted, 0, AES_SIZE); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(prefix() + "full block read and decrypted: "); byte swap[] = _prevEncrypted; _prevEncrypted = _curEncrypted; @@ -406,8 +384,6 @@ class EstablishState { } catch (IOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error(prefix()+"Error writing to the baos?", ioe); } - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(prefix()+"subsequent block decrypted (" + _sz_aliceIdent_tsA_padding_aliceSig.size() + ")"); if (_state == State.IB_GOT_RI_SIZE && _sz_aliceIdent_tsA_padding_aliceSig.size() >= 2 + _aliceIdentSize) { @@ -487,7 +463,6 @@ class EstablishState { while (_state == State.OB_SENT_X && src.hasRemaining()) { byte c = src.get(); _Y[_received++] = c; - //if (_log.shouldLog(Log.DEBUG)) _log.debug("recv x" + (int)c + " received=" + _received); if (_received >= XY_SIZE) { try { _dh.setPeerPublicValue(_Y); @@ -510,8 +485,6 @@ class EstablishState { _received++; byte c = src.get(); _e_hXY_tsB[i] = c; - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(prefix() + "recv _e_hXY_tsB " + (int)c + " received=" + _received); if (i+1 >= HXY_TSB_PAD_SIZE) { if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "received _e_hXY_tsB fully"); byte hXY_tsB[] = new byte[HXY_TSB_PAD_SIZE]; @@ -521,8 +494,6 @@ class EstablishState { System.arraycopy(_Y, 0, XY, XY_SIZE, XY_SIZE); byte[] h = SimpleByteCache.acquire(HXY_SIZE); _context.sha().calculateHash(XY, 0, XY_SIZE + XY_SIZE, h, 0); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(prefix() + "h(XY)=" + h.toBase64()); if (!DataHelper.eq(h, 0, hXY_tsB, 0, HXY_SIZE)) { SimpleByteCache.release(h); _context.statManager().addRateData("ntcp.invalidHXY", 1); @@ -576,16 +547,8 @@ class EstablishState { DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE, 4, _tsA); DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE + 4, 4, _tsB); // hXY_tsB has 12 bytes of padding (size=48, tsB=4 + hXY=32) - //System.arraycopy(hXY_tsB, hXY_tsB.length-12, preSign, _X.length+_Y.length+Hash.HASH_LENGTH+4+4, 12); - //byte sigPad[] = new byte[padSig]; - //_context.random().nextBytes(sigPad); - //System.arraycopy(sigPad, 0, preSign, _X.length+_Y.length+Hash.HASH_LENGTH+4+4, padSig); Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey()); - //if (_log.shouldLog(Log.DEBUG)) { - // _log.debug(prefix()+"signing " + Base64.encode(preSign)); - //} - byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray(); // handle variable signature size int min = 2 + ident.length + 4 + sig.length(); @@ -605,11 +568,6 @@ class EstablishState { _context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(), _hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-AES_SIZE, preEncrypt.length); - //if (_log.shouldLog(Log.DEBUG)) { - //_log.debug(prefix() + "unencrypted response to Bob: " + Base64.encode(preEncrypt)); - //_log.debug(prefix() + "encrypted response to Bob: " + Base64.encode(_prevEncrypted)); - //} - // send 'er off (when the bw limiter says, etc) changeState(State.OB_SENT_RI); _transport.getPumper().wantsWrite(_con, _prevEncrypted); } @@ -641,14 +599,11 @@ class EstablishState { src.hasRemaining() + " off=" + off + " recv=" + _received + ")"); } while (_state == State.OB_SENT_RI && src.hasRemaining()) { - //if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"recv bobSig received=" + _received); _e_bobSig[off++] = src.get(); _received++; if (off >= _e_bobSig.length) { changeState(State.OB_GOT_SIG); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(prefix() + "received E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev): " + Base64.encode(_e_bobSig)); byte bobSig[] = new byte[_e_bobSig.length]; _context.aes().decrypt(_e_bobSig, 0, bobSig, 0, _dh.getSessionKey(), _e_hXY_tsB, HXY_TSB_PAD_SIZE - AES_SIZE, _e_bobSig.length); @@ -759,8 +714,6 @@ class EstablishState { */ private void readAliceRouterIdentity() { byte b[] = _sz_aliceIdent_tsA_padding_aliceSig.toByteArray(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(prefix()+"decrypted sz(etc) data: " + Base64.encode(b)); try { int sz = _aliceIdentSize; @@ -825,10 +778,6 @@ class EstablishState { //baos.write(b, 2+sz+4, b.length-2-sz-4-Signature.SIGNATURE_BYTES); byte toVerify[] = baos.toByteArray(); - //if (_log.shouldLog(Log.DEBUG)) { - // _log.debug(prefix()+"checking " + Base64.encode(toVerify, 0, AES_SIZE)); - // //_log.debug(prefix()+"check pad " + Base64.encode(b, 2+sz+4, 12)); - //} // handle variable signature size SigType type = _aliceIdent.getSigningPublicKey().getType(); @@ -1052,63 +1001,6 @@ class EstablishState { return buf.toString(); } - /** - * a check info connection will receive 256 bytes containing: - * - 32 bytes of uninterpreted, ignored data - * - 1 byte size - * - that many bytes making up the local router's IP address (as reached by the remote side) - * - 2 byte port number that the local router was reached on - * - 4 byte i2p network time as known by the remote side (seconds since the epoch) - * - uninterpreted padding data, up to byte 223 - * - xor of the local router's identity hash and the SHA256 of bytes 32 through bytes 223 - * - * @return should always be false since nobody ever sends a check info message - * - */ -/***** - private static boolean isCheckInfo(I2PAppContext ctx, Hash us, byte first256[]) { - Log log = ctx.logManager().getLog(EstablishState.class); - int off = 32; // ignore the first 32 bytes - - byte[] xor = SimpleByteCache.acquire(Hash.HASH_LENGTH); - ctx.sha().calculateHash(first256, off, first256.length-32-off, xor, 0); - xor32(us.getData(), xor); - //if (log.shouldLog(Log.DEBUG)) - // log.debug("check hash: " + h.toBase64() + " xor: " + Base64.encode(xor)); - if (DataHelper.eq(xor, 0, first256, first256.length-32, 32)) { - SimpleByteCache.release(xor); - // ok, data is as expected - // parse our IP/port/etc out of the first256 - int ipSize = (int)DataHelper.fromLong(first256, off, 1); - off++; - byte ip[] = new byte[ipSize]; - System.arraycopy(first256, off, ip, 0, ipSize); - try { - InetAddress ourIP = InetAddress.getByAddress(ip); - off += ipSize; - int port = (int)DataHelper.fromLong(first256, off, 2); - off += 2; - long now = DataHelper.fromLong(first256, off, 4); - off += 4; - long skewSeconds = (ctx.clock().now()/1000)-now; - if (log.shouldLog(Log.INFO)) - log.info("Check info received: our IP: " + ourIP + " our port: " + port - + " skew: " + skewSeconds + " s"); - } catch (UnknownHostException uhe) { - // ipSize is invalid - if (log.shouldLog(Log.WARN)) - log.warn("Invalid IP received on check connection (size: " + ipSize + ")"); - } - return true; - } else { - SimpleByteCache.release(xor); - if (log.shouldLog(Log.DEBUG)) - log.debug("Not a checkInfo connection"); - return false; - } - } -*****/ - /** * @since 0.9.8 */ @@ -1145,47 +1037,6 @@ class EstablishState { @Override public String toString() { return "FailedEstablishState: ";} } - /** @deprecated unused */ -/********* - public static void checkHost(String args[]) { - if (args.length != 3) { - System.err.println("Usage: EstablishState ipOrHostname portNum peerHashBase64"); - return; - } - try { - I2PAppContext ctx = I2PAppContext.getGlobalContext(); - String host = args[0]; - int port = Integer.parseInt(args[1]); - byte peer[] = Base64.decode(args[2]); - Socket s = new Socket(host, port); - OutputStream out = s.getOutputStream(); - byte toSend[] = new byte[256]; - ctx.random().nextBytes(toSend); - int off = 32; - byte ip[] = s.getInetAddress().getAddress(); - DataHelper.toLong(toSend, off, 1, ip.length); - off++; - System.arraycopy(ip, 0, toSend, off, ip.length); - off += ip.length; - DataHelper.toLong(toSend, off, 2, port); - off += 2; - long now = ctx.clock().now()/1000; - DataHelper.toLong(toSend, off, 4, now); - off += 4; - Hash h = ctx.sha().calculateHash(toSend, 32, toSend.length-32-32); - DataHelper.xor(peer, 0, h.getData(), 0, toSend, toSend.length-32, peer.length); - System.out.println("check hash: " + h.toBase64()); - - out.write(toSend); - out.flush(); - try { Thread.sleep(1000); } catch (InterruptedException ie) {} - s.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } -*******/ - /******* public static void main(String args[]) { if (args.length == 3) { diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 8524cee6747ac86deca5ce2e9fe6ae4bb8c2d265..eab07cbec5068d71b4904b8439bc334d9d352ea9 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -181,12 +181,8 @@ class EventPumper implements Runnable { runDelayedEvents(); try { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("before select..."); int count = _selector.select(SELECTOR_LOOP_DELAY); if (count > 0) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("select returned " + count); Set<SelectionKey> selected = _selector.selectedKeys(); //_context.statManager().addRateData("ntcp.pumperKeysPerLoop", selected.size()); processKeys(selected); @@ -397,11 +393,9 @@ class EventPumper implements Runnable { processConnect(key); } if (read) { - //_context.statManager().addRateData("ntcp.read", 1, 0); processRead(key); } if (write) { - //_context.statManager().addRateData("ntcp.write", 1, 0); processWrite(key); } //if (!(accept || connect || read || write)) { @@ -429,9 +423,6 @@ class EventPumper implements Runnable { _context.statManager().addRateData("ntcp.wantsQueuedWrite", 1); con.queuedWrite(buf, req); } else { - // fully allocated - //if (_log.shouldLog(Log.INFO)) - // _log.info("fully allocated write on " + con + " for " + data.length); con.write(buf); } } @@ -475,12 +466,6 @@ class EventPumper implements Runnable { else rv = ByteBuffer.allocate(BUF_SIZE); _numBufs++; - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); - //_context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0); - } else { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("acquiring existing read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); } return rv; } @@ -491,10 +476,6 @@ class EventPumper implements Runnable { * High-frequency path in thread. */ public static void releaseBuf(ByteBuffer buf) { - //if (false) return; - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf); - // double check if (buf.capacity() < BUF_SIZE) { I2PAppContext.getGlobalContext().logManager().getLog(EventPumper.class).error("Bad size " + buf.capacity(), new Exception()); @@ -516,13 +497,9 @@ class EventPumper implements Runnable { } } } - //if (cached && _log.shouldLog(Log.DEBUG)) - // _log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live"); } private void processAccept(SelectionKey key) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("processing accept"); ServerSocketChannel servChan = (ServerSocketChannel)key.attachment(); try { SocketChannel chan = servChan.accept(); @@ -542,8 +519,6 @@ class EventPumper implements Runnable { if (_context.blocklist().isBlocklisted(ip)) { if (_log.shouldLog(Log.WARN)) _log.warn("Receive session request from blocklisted IP: " + chan.socket().getInetAddress()); - // need to add this stat first - // _context.statManager().addRateData("ntcp.connectBlocklisted", 1, 0); try { chan.close(); } catch (IOException ioe) { } return; } @@ -559,14 +534,11 @@ class EventPumper implements Runnable { return; } - // BUGFIX for firewalls. --Sponge if (shouldSetKeepAlive(chan)) chan.socket().setKeepAlive(true); SelectionKey ckey = chan.register(_selector, SelectionKey.OP_READ); new NTCPConnection(_context, _transport, chan, ckey); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("new NTCP connection established: " +con); } catch (IOException ioe) { _log.error("Error accepting", ioe); } @@ -580,7 +552,6 @@ class EventPumper implements Runnable { if (_log.shouldLog(Log.DEBUG)) _log.debug("processing connect for " + con + ": connected? " + connected); if (connected) { - // BUGFIX for firewalls. --Sponge if (shouldSetKeepAlive(chan)) chan.socket().setKeepAlive(true); con.setKey(key); @@ -595,7 +566,6 @@ class EventPumper implements Runnable { if (_log.shouldLog(Log.INFO)) _log.info("Failed outbound " + con, ioe); con.closeOnTimeout("connect failed", ioe); - //_context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "Error connecting", NTCPTransport.STYLE); _transport.markUnreachable(con.getRemotePeer().calculateHash()); _context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1); } catch (NoConnectionPendingException ncpe) { @@ -630,7 +600,6 @@ class EventPumper implements Runnable { try { int read = con.getChannel().read(buf); if (read < 0) { - //_context.statManager().addRateData("ntcp.readEOF", 1); if (con.isInbound() && con.getMessagesReceived() <= 0) { InetAddress addr = con.getChannel().socket().getInetAddress(); int count; @@ -653,8 +622,6 @@ class EventPumper implements Runnable { con.close(); releaseBuf(buf); } else if (read == 0) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("nothing to read for " + con + ", but stay interested"); // stay interested //key.interestOps(key.interestOps() | SelectionKey.OP_READ); releaseBuf(buf); @@ -679,14 +646,9 @@ class EventPumper implements Runnable { if (req.getPendingRequested() > 0) { // rare since we generally don't throttle inbound key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore"); _context.statManager().addRateData("ntcp.queuedRecv", read); con.queuedRecv(buf, req); } else { - // fully allocated - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("not bw throttled reading for " + con); // stay interested //key.interestOps(key.interestOps() | SelectionKey.OP_READ); con.recv(buf); @@ -747,51 +709,31 @@ class EventPumper implements Runnable { * High-frequency path in thread. */ private void processWrite(SelectionKey key) { - //int totalWritten = 0; - //int buffers = 0; - //long before = System.currentTimeMillis(); NTCPConnection con = (NTCPConnection)key.attachment(); try { while (true) { ByteBuffer buf = con.getNextWriteBuf(); if (buf != null) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("writing " + buf.remaining()+"..."); if (buf.remaining() <= 0) { - //long beforeRem = System.currentTimeMillis(); con.removeWriteBuf(buf); - //long afterRem = System.currentTimeMillis(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("buffer was already fully written and removed after " + (afterRem-beforeRem) + "..."); - //buffers++; continue; } int written = con.getChannel().write(buf); //totalWritten += written; if (written == 0) { if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) { - //if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains..."); // stay interested //key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } else { - //if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains..."); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); } break; } else if (buf.remaining() > 0) { - //if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining..."); // stay interested //key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); break; } else { - //long beforeRem = System.currentTimeMillis(); con.removeWriteBuf(buf); - //long afterRem = System.currentTimeMillis(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("buffer "+ buffers+"/"+written+"/"+totalWritten+" fully written after " + - // (beforeRem-before) + ", then removed after " + (afterRem-beforeRem) + "..."); - //releaseBuf(buf); - //buffers++; //if (buffer time is too much, add OP_WRITe to the interest ops and break?) // LOOP } @@ -811,10 +753,6 @@ class EventPumper implements Runnable { _context.statManager().addRateData("ntcp.writeError", 1); con.close(); } - //long after = System.currentTimeMillis(); - //if (_log.shouldLog(Log.INFO)) - // _log.info("Wrote " + totalWritten + " in " + buffers + " buffers on " + con - // + " after " + (after-before)); } /** @@ -900,7 +838,6 @@ class EventPumper implements Runnable { boolean connected = con.getChannel().connect(saddr); if (connected) { // Never happens, we use nonblocking - //_context.statManager().addRateData("ntcp.connectImmediate", 1); key.interestOps(SelectionKey.OP_READ); processConnect(key); } @@ -909,24 +846,12 @@ class EventPumper implements Runnable { _log.warn("error connecting to " + Addresses.toString(naddr.getIP(), naddr.getPort()), ioe); _context.statManager().addRateData("ntcp.connectFailedIOE", 1); _transport.markUnreachable(con.getRemotePeer().calculateHash()); - //if (ntcpOnly(con)) { - // _context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage()); - // con.close(false); - //} else { - // _context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE); - con.close(true); - //} + con.close(true); } catch (UnresolvedAddressException uae) { if (_log.shouldLog(Log.WARN)) _log.warn("unresolved address connecting", uae); _context.statManager().addRateData("ntcp.connectFailedUnresolved", 1); _transport.markUnreachable(con.getRemotePeer().calculateHash()); - //if (ntcpOnly(con)) { - // _context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage()); - // con.close(false); - //} else { - // _context.banlist().banlistRouter(con.getRemotePeer().calculateHash(), "unable to connect/resolve: " + uae.getMessage(), NTCPTransport.STYLE); - con.close(true); - //} + con.close(true); } catch (CancelledKeyException cke) { con.close(false); } @@ -941,22 +866,7 @@ class EventPumper implements Runnable { _lastExpired = now; } } - - /** - * If the other peer only supports ntcp, we should banlist them when we can't reach 'em, - * but if they support other transports (eg ssu) we should allow those transports to be - * tried as well. - */ -/**** - private boolean ntcpOnly(NTCPConnection con) { - RouterIdentity ident = con.getRemotePeer(); - if (ident == null) return true; - RouterInfo info = _context.netDb().lookupRouterInfoLocally(ident.calculateHash()); - if (info == null) return true; - return info.getAddresses().size() == 1; - } -****/ - + private long _lastExpired; private void expireTimedOut() { diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 99e70c8c5dfb815afe5a4ad188d1ecf89cc069ec..5c0f3992bf595df0eb04c7e661cca05b05659351 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -305,8 +305,6 @@ public class NTCPConnection implements Closeable { _clockSkew = clockSkew; _prevWriteEnd = prevWriteEnd; System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd)); _establishedOn = _context.clock().now(); NTCPConnection rv = _transport.inboundEstablished(this); _nextMetaTime = _establishedOn + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY); @@ -468,64 +466,20 @@ public class NTCPConnection implements Closeable { * toss the message onto the connection's send queue */ public void send(OutNetMessage msg) { - /**** - always enqueue, let the queue do the dropping - - if (tooBacklogged()) { - boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu - boolean successful = false; - _consecutiveBacklog++; - _transport.afterSend(msg, successful, allowRequeue, msg.getLifetime()); - if (_consecutiveBacklog > 10) { // waaay too backlogged - boolean wantsWrite = false; - try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (RuntimeException e) {} - if (_log.shouldLog(Log.WARN)) { - int blocks = _writeBufs.size(); - _log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash()); - } - _context.statManager().addRateData("ntcp.closeOnBacklog", getUptime()); - close(); - } - _context.statManager().addRateData("ntcp.dontSendOnBacklog", _consecutiveBacklog); - return; - } - _consecutiveBacklog = 0; - ****/ - //if (FAST_LARGE) _outbound.offer(msg); - //int enqueued = _outbound.size(); - // although stat description says ahead of this one, not including this one... - //_context.statManager().addRateData("ntcp.sendQueueSize", enqueued); boolean noOutbound = (getCurrentOutbound() == null); - //if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); if (isEstablished() && noOutbound) _transport.getWriter().wantsWrite(this, "enqueued"); } -/**** - private long queueTime() { - OutNetMessage msg = _currentOutbound; - if (msg == null) { - msg = _outbound.peek(); - if (msg == null) - return 0; - } - return msg.getSendTime(); // does not include any of the pre-send(...) preparation - } -****/ - public boolean isBacklogged() { return _outbound.isBacklogged(); } public boolean tooBacklogged() { - //long queueTime = queueTime(); - //if (queueTime <= 0) return false; - // perhaps we could take into account the size of the queued messages too, our // current transmission rate, and how much time is left before the new message's expiration? // ok, maybe later... if (getUptime() < 10*1000) // allow some slack just after establishment return false; - //if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime... if (_outbound.isBacklogged()) { // bloody arbitrary. well, its half the average message lifetime... int size = _outbound.size(); if (_log.shouldLog(Log.WARN)) { @@ -538,12 +492,7 @@ public class NTCPConnection implements Closeable { + ", writeBufs: " + writeBufs + " on " + toString()); } catch (RuntimeException e) {} // java.nio.channels.CancelledKeyException } - //_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime); return true; - //} else if (size > 32) { // another arbitrary limit. - // if (_log.shouldLog(Log.ERROR)) - // _log.error("Too backlogged: queue size is " + size + " and the lifetime of the head is " + queueTime); - // return true; } else { return false; } @@ -554,22 +503,6 @@ public class NTCPConnection implements Closeable { */ public void enqueueInfoMessage() { int priority = INFO_PRIORITY; - //if (!_isInbound) { - // Workaround for bug at Bob's end. - // This probably isn't helpful because Bob puts the store on the job queue. - // Prior to 0.9.12, Bob would only send his RI if he had our RI after - // the first received message, so make sure it is first in our queue. - // As of 0.9.12 this is fixed and Bob will always send his RI. - // RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash()); - // if (target != null) { - // String v = target.getOption("router.version"); - // if (v == null || VersionComparator.comp(v, FIXED_RI_VERSION) < 0) { - // priority = OutNetMessage.PRIORITY_HIGHEST; - // } - // } else { - // priority = OutNetMessage.PRIORITY_HIGHEST; - // } - //} if (_log.shouldLog(Log.INFO)) _log.info("SENDING INFO message pri. " + priority + ": " + toString()); DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context); @@ -577,52 +510,9 @@ public class NTCPConnection implements Closeable { // We are injecting directly, so we can use a null target. OutNetMessage infoMsg = new OutNetMessage(_context, dsm, _context.clock().now()+10*1000, priority, null); infoMsg.beginSend(); - //_context.statManager().addRateData("ntcp.infoMessageEnqueued", 1); send(infoMsg); } - //private static final int PEERS_TO_FLOOD = 3; - - /** - * to prevent people from losing track of the floodfill peers completely, lets periodically - * send those we are connected to references to the floodfill peers that we know - * - * Do we really need this anymore??? Peers shouldn't lose track anymore, and if they do, - * FloodOnlyLookupJob should recover. - * The bandwidth isn't so much, but it is a lot of extra data at connection startup, which - * hurts latency of new connections. - */ -/********** - private void enqueueFloodfillMessage(RouterInfo target) { - FloodfillNetworkDatabaseFacade fac = (FloodfillNetworkDatabaseFacade)_context.netDb(); - List peers = fac.getFloodfillPeers(); - Collections.shuffle(peers); - for (int i = 0; i < peers.size() && i < PEERS_TO_FLOOD; i++) { - Hash peer = (Hash)peers.get(i); - - // we already sent our own info, and no need to tell them about themselves - if (peer.equals(_context.routerHash()) || peer.equals(target.calculateHash())) - continue; - - RouterInfo info = fac.lookupRouterInfoLocally(peer); - if (info == null) - continue; - - OutNetMessage infoMsg = new OutNetMessage(_context); - infoMsg.setExpiration(_context.clock().now()+10*1000); - DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context); - dsm.setKey(peer); - dsm.setRouterInfo(info); - infoMsg.setMessage(dsm); - infoMsg.setPriority(100); - infoMsg.setTarget(target); - infoMsg.beginSend(); - _context.statManager().addRateData("ntcp.floodInfoMessageEnqueued", 1, 0); - send(infoMsg); - } - } -***********/ - /** * We are Alice. * @@ -644,7 +534,6 @@ public class NTCPConnection implements Closeable { _establishedOn = _context.clock().now(); _establishState = EstablishState.VERIFIED; _transport.markReachable(getRemotePeer().calculateHash(), false); - //_context.banlist().unbanlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE); boolean msgs = !_outbound.isEmpty(); _nextMetaTime = _establishedOn + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY); _nextInfoTime = _establishedOn + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); @@ -652,23 +541,6 @@ public class NTCPConnection implements Closeable { _transport.getWriter().wantsWrite(this, "outbound established"); } - /** - // Time vs space tradeoff: - // on slow GCing jvms, the mallocs in the following preparation can cause the - // write to get congested, taking up a substantial portion of the Writer's - // time (and hence, slowing down the transmission to the peer). we could - // however do the preparation (up to but not including the aes.encrypt) - // as part of the .send(OutNetMessage) above, which runs on less congested - // threads (whatever calls OutNetMessagePool.add, which can be the jobqueue, - // tunnel builders, simpletimers, etc). that would increase the Writer's - // efficiency (speeding up the transmission to the peer) but would require - // more memory to hold the serialized preparations of all queued messages, not - // just the currently transmitting one. - // - // hmm. - */ - private static final boolean FAST_LARGE = true; // otherwise, SLOW_SMALL - /** * prepare the next i2np message for transmission. this should be run from * the Writer thread pool. @@ -677,111 +549,9 @@ public class NTCPConnection implements Closeable { * */ synchronized void prepareNextWrite(PrepBuffer prep) { - //if (FAST_LARGE) prepareNextWriteFast(prep); - //else - // prepareNextWriteSmall(); } -/********** nobody's tried this one in years - private void prepareNextWriteSmall() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established); - if (!_isInbound && !_established) { - if (_establishState == null) { - _establishState = new EstablishState(_context, _transport, this); - _establishState.prepareOutbound(); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("prepare next write, but we have already prepared the first outbound and we are not yet established..." + toString()); - } - return; - } - - if (_nextMetaTime <= System.currentTimeMillis()) { - sendMeta(); - _nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY); - } - - OutNetMessage msg = null; - synchronized (_outbound) { - if (_currentOutbound != null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued"); - return; - } - //throw new RuntimeException("We should not be preparing a write while we still have one pending"); - if (!_outbound.isEmpty()) { - msg = (OutNetMessage)_outbound.remove(0); - _currentOutbound = msg; - } else { - return; - } - } - - msg.beginTransmission(); - msg.beginPrepare(); - long begin = System.currentTimeMillis(); - // prepare the message as a binary array, then encrypt it w/ a checksum - // and add it to the _writeBufs - // E(sizeof(data)+data+pad+crc, sessionKey, prevEncrypted) - I2NPMessage m = msg.getMessage(); - int sz = m.getMessageSize(); - int min = 2 + sz + 4; - int rem = min % 16; - int padding = 0; - if (rem > 0) - padding = 16 - rem; - - byte unencrypted[] = new byte[min+padding]; - byte base[] = m.toByteArray(); - DataHelper.toLong(unencrypted, 0, 2, sz); - System.arraycopy(base, 0, unencrypted, 2, base.length); - if (padding > 0) { - byte pad[] = new byte[padding]; - _context.random().nextBytes(pad); - System.arraycopy(pad, 0, unencrypted, 2+sz, padding); - } - - long serialized = System.currentTimeMillis(); - Adler32 crc = new Adler32(); - crc.reset(); - crc.update(unencrypted, 0, unencrypted.length-4); - long val = crc.getValue(); - DataHelper.toLong(unencrypted, unencrypted.length-4, 4, val); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Outbound message " + _messagesWritten + " has crc " + val); - - long crced = System.currentTimeMillis(); - byte encrypted[] = new byte[unencrypted.length]; - _context.aes().encrypt(unencrypted, 0, encrypted, 0, _sessionKey, _prevWriteEnd, 0, unencrypted.length); - System.arraycopy(encrypted, encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length); - long encryptedTime = System.currentTimeMillis(); - msg.prepared(); - if (_log.shouldLog(Log.DEBUG)) { - _log.debug("prepared outbound " + System.identityHashCode(msg) - + " serialize=" + (serialized-begin) - + " crc=" + (crced-serialized) - + " encrypted=" + (encryptedTime-crced) - + " prepared=" + (encryptedTime-begin)); - } - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Encrypting " + msg + " [" + System.identityHashCode(msg) + "] crc=" + crc.getValue() + "\nas: " - // + Base64.encode(encrypted, 0, 16) + "...\ndecrypted: " - // + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16)); - _transport.getPumper().wantsWrite(this, encrypted); - - // for every 6-12 hours that we are connected to a peer, send them - // our updated netDb info (they may not accept it and instead query - // the floodfill netDb servers, but they may...) - if (_nextInfoTime <= System.currentTimeMillis()) { - enqueueInfoMessage(); - _nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); - } - } -**********/ - /** * prepare the next i2np message for transmission. this should be run from * the Writer thread pool. @@ -816,60 +586,16 @@ public class NTCPConnection implements Closeable { _log.info("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued"); return; } -/**** - //throw new RuntimeException("We should not be preparing a write while we still have one pending"); - if (queueTime() > 3*1000) { // don't stall low-priority messages -****/ msg = _outbound.poll(); if (msg == null) return; -/**** - } else { - // FIXME - // This is a linear search to implement a priority queue, O(n**2) - // Also race with unsynchronized removal in close() above - // Either implement a real (concurrent?) priority queue or just comment out all of this, - // as it isn't clear how effective the priorities on a per-connection basis are. - int slot = 0; // only for logging - Iterator<OutNetMessage> it = _outbound.iterator(); - for (int i = 0; it.hasNext() && i < 75; i++) { //arbitrary bound - OutNetMessage mmsg = it.next(); - if (msg == null || mmsg.getPriority() > msg.getPriority()) { - msg = mmsg; - slot = i; - } - } - if (msg == null) - return; - // if (_outbound.indexOf(msg) > 0) - // _log.debug("Priority message sent, pri = " + msg.getPriority() + " pos = " + _outbound.indexOf(msg) + "/" +_outbound.size()); - if (_log.shouldLog(Log.INFO)) - _log.info("Type " + msg.getMessage().getType() + " pri " + msg.getPriority() + " slot " + slot); - boolean removed = _outbound.remove(msg); - if ((!removed) && _log.shouldLog(Log.WARN)) - _log.warn("Already removed??? " + msg.getMessage().getType()); - } -****/ _currentOutbound = msg; } - //long begin = System.currentTimeMillis(); bufferedPrepare(msg,buf); _context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength); System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length); - //long encryptedTime = System.currentTimeMillis(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Encrypting " + msg + " [" + System.identityHashCode(msg) + "] crc=" + crc.getValue() + "\nas: " - // + Base64.encode(encrypted, 0, 16) + "...\ndecrypted: " - // + Base64.encode(unencrypted, 0, 16) + "..." + "\nIV=" + Base64.encode(_prevWriteEnd, 0, 16)); _transport.getPumper().wantsWrite(this, buf.encrypted); - //long wantsTime = System.currentTimeMillis(); - //long releaseTime = System.currentTimeMillis(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("prepared outbound " + System.identityHashCode(msg) - // + " encrypted=" + (encryptedTime-begin) - // + " wantsWrite=" + (wantsTime-encryptedTime) - // + " releaseBuf=" + (releaseTime-wantsTime)); // for every 6-12 hours that we are connected to a peer, send them // our updated netDb info (they may not accept it and instead query @@ -889,15 +615,9 @@ public class NTCPConnection implements Closeable { * @param buf PrepBuffer to use as scratch space */ private void bufferedPrepare(OutNetMessage msg, PrepBuffer buf) { - //if (!_isInbound && !_established) - // return; - //long begin = System.currentTimeMillis(); - //long alloc = System.currentTimeMillis(); - I2NPMessage m = msg.getMessage(); buf.baseLength = m.toByteArray(buf.base); int sz = buf.baseLength; - //int sz = m.getMessageSize(); int min = 2 + sz + 4; int rem = min % 16; int padding = 0; @@ -911,7 +631,6 @@ public class NTCPConnection implements Closeable { _context.random().nextBytes(buf.unencrypted, 2+sz, padding); } - //long serialized = System.currentTimeMillis(); buf.crc.update(buf.unencrypted, 0, buf.unencryptedLength-4); long val = buf.crc.getValue(); @@ -926,11 +645,6 @@ public class NTCPConnection implements Closeable { // 3) change EventPumper.wantsWrite() to take a ByteBuffer arg // 4) in EventPumper.processWrite(), release the byte buffer buf.encrypted = new byte[buf.unencryptedLength]; - - //long crced = System.currentTimeMillis(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Buffered prepare took " + (crced-begin) + ", alloc=" + (alloc-begin) - // + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized)); } public static class PrepBuffer { @@ -984,7 +698,6 @@ public class NTCPConnection implements Closeable { // longer interested in reading from the network), but we aren't // throttled anymore, so we should resume being interested in reading _transport.getPumper().wantsRead(NTCPConnection.this); - //_transport.getReader().wantsRead(this); } } @@ -1046,7 +759,6 @@ public class NTCPConnection implements Closeable { */ public void recv(ByteBuffer buf) { _bytesReceived += buf.remaining(); - //buf.flip(); _readBufs.offer(buf); _transport.getReader().wantsRead(this); updateStats(); @@ -1057,9 +769,7 @@ public class NTCPConnection implements Closeable { * been fully allocated for the bandwidth limiter. */ public void write(ByteBuffer buf) { - //if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)"); _writeBufs.offer(buf); - //if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)"); _transport.getPumper().wantsWrite(this); } @@ -1122,12 +832,6 @@ public class NTCPConnection implements Closeable { if (getOutboundQueueSize() > 0) // push through the bw limiter to reach _writeBufs _transport.getWriter().wantsWrite(this, "write completed"); - // this is not necessary, EventPumper.processWrite() handles this - // and it just causes unnecessary selector.wakeup() and looping - //boolean bufsRemain = !_writeBufs.isEmpty(); - //if (bufsRemain) // send asap - // _transport.getPumper().wantsWrite(this); - updateStats(); } @@ -1139,8 +843,6 @@ public class NTCPConnection implements Closeable { private long _lastBytesSent; private float _sendBps; private float _recvBps; - //private float _sendBps15s; - //private float _recvBps15s; public float getSendRate() { return _sendBps; } public float getRecvRate() { return _recvBps; } @@ -1165,18 +867,6 @@ public class NTCPConnection implements Closeable { _sendBps = (0.9f)*_sendBps + (0.1f)*(sent*1000f)/time; _recvBps = (0.9f)*_recvBps + (0.1f)*((float)recv*1000)/time; - - // Maintain an approximate average with a 15-second halflife - // Weights (0.955 and 0.045) are tuned so that transition between two values (e.g. 0..10) - // would reach their midpoint (e.g. 5) in 15s - //_sendBps15s = (0.955f)*_sendBps15s + (0.045f)*((float)sent*1000f)/(float)time; - //_recvBps15s = (0.955f)*_recvBps15s + (0.045f)*((float)recv*1000)/(float)time; - - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Rates updated to " - // + _sendBps + '/' + _recvBps + "Bps in/out " - // //+ _sendBps15s + "/" + _recvBps15s + "Bps in/out 15s after " - // + sent + '/' + recv + " in " + DataHelper.formatDuration(time)); } } @@ -1194,8 +884,6 @@ public class NTCPConnection implements Closeable { * as reader will call EventPumper.releaseBuf(). */ synchronized void recvEncryptedI2NP(ByteBuffer buf) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("receive encrypted i2np: " + buf.remaining()); // hasArray() is false for direct buffers, at least on my system... if (_curReadBlockIndex == 0 && buf.hasArray()) { // fast way @@ -1212,16 +900,12 @@ public class NTCPConnection implements Closeable { buf.get(_curReadBlock, _curReadBlockIndex, want); _curReadBlockIndex += want; } - //_curReadBlock[_curReadBlockIndex++] = buf.get(); if (_curReadBlockIndex >= BLOCK_SIZE) { // cbc _context.aes().decryptBlock(_curReadBlock, 0, _sessionKey, _decryptBlockBuf, 0); - //DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE); for (int i = 0; i < BLOCK_SIZE; i++) { _decryptBlockBuf[i] ^= _prevReadBlock[i]; } - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("parse decrypted i2np block (remaining: " + buf.remaining() + ")"); boolean ok = recvUnencryptedI2NP(); if (!ok) { if (_log.shouldLog(Log.INFO)) @@ -1258,14 +942,11 @@ public class NTCPConnection implements Closeable { for ( ; pos < end && !_closed.get(); pos += BLOCK_SIZE) { _context.aes().decryptBlock(array, pos, _sessionKey, _decryptBlockBuf, 0); if (first) { - // XOR with _prevReadBlock the first time... - //DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE); for (int i = 0; i < BLOCK_SIZE; i++) { _decryptBlockBuf[i] ^= _prevReadBlock[i]; } first = false; } else { - //DataHelper.xor(_decryptBlockBuf, 0, array, pos - BLOCK_SIZE, _decryptBlockBuf, 0, BLOCK_SIZE); int start = pos - BLOCK_SIZE; for (int i = 0; i < BLOCK_SIZE; i++) { _decryptBlockBuf[i] ^= array[start + i]; @@ -1371,7 +1052,6 @@ public class NTCPConnection implements Closeable { _log.debug("Sending NTCP metadata"); _sendingMeta = true; _transport.getPumper().wantsWrite(this, encrypted); - // enqueueInfoMessage(); // this often? } private static final int MAX_HANDLERS = 4; @@ -1392,9 +1072,6 @@ public class NTCPConnection implements Closeable { _i2npHandlers.offer(handler); } - - //public long getReadTime() { return _curReadState.getReadTime(); } - private static ByteArray acquireReadBuf() { return _dataReadBufs.acquire(); } @@ -1469,17 +1146,6 @@ public class NTCPConnection implements Closeable { } } - /**** - public long getReadTime() { - long now = System.currentTimeMillis(); - long readTime = now - _stateBegin; - if (readTime >= now) - return -1; - else - return readTime; - } - ****/ - /** @param buf 16 bytes */ private void receiveInitial(byte buf[]) { _size = (int)DataHelper.fromLong(buf, 0, 2); @@ -1518,22 +1184,15 @@ public class NTCPConnection implements Closeable { receiveLastBlock(buf); } else { _crc.update(buf); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("update read state with another block (remaining: " + remaining + ")"); } } /** @param buf 16 bytes */ private void receiveLastBlock(byte buf[]) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("block remaining in the last block: " + (buf.length-blockUsed)); - // on the last block _expectedCrc = DataHelper.fromLong(buf, buf.length-4, 4); _crc.update(buf, 0, buf.length-4); long val = _crc.getValue(); - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size); if (val == _expectedCrc) { try { I2NPMessageHandler h = acquireHandler(_context); @@ -1569,21 +1228,11 @@ public class NTCPConnection implements Closeable { ime); } _context.statManager().addRateData("ntcp.corruptI2NPIME", 1); - // Don't close the con, possible attack vector, not necessarily the peer's fault, - // and should be recoverable - // handler and databuf are lost if we do this - //close(); - //return; } } else { if (_log.shouldLog(Log.WARN)) _log.warn("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks); _context.statManager().addRateData("ntcp.corruptI2NPCRC", 1); - // This probably can't be spoofed from somebody else, but do we really need to close it? - // This is rare. - //close(); - // databuf is lost if we do this - //return; } // get it ready for the next I2NP message init();