diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 6741eef9c..023b709cb 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -301,10 +301,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { int blocks = _writeBufs.size(); _log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash()); } - _context.statManager().addRateData("ntcp.closeOnBacklog", getUptime(), getUptime()); + _context.statManager().addRateData("ntcp.closeOnBacklog", getUptime()); close(); } - _context.statManager().addRateData("ntcp.dontSendOnBacklog", _consecutiveBacklog, msg.getLifetime()); + _context.statManager().addRateData("ntcp.dontSendOnBacklog", _consecutiveBacklog); return; } _consecutiveBacklog = 0; @@ -353,7 +353,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { + ", writeBufs: " + writeBufs + " on " + toString()); } catch (Exception e) {} // java.nio.channels.CancelledKeyException } - _context.statManager().addRateData("ntcp.sendBacklogTime", queueTime, size); + _context.statManager().addRateData("ntcp.sendBacklogTime", queueTime); return true; //} else if (size > 32) { // another arbitrary limit. // if (_log.shouldLog(Log.ERROR)) @@ -914,7 +914,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } if (msg != null) { _lastSendTime = System.currentTimeMillis(); - _context.statManager().addRateData("ntcp.sendTime", msg.getSendTime(), msg.getSendTime()); + _context.statManager().addRateData("ntcp.sendTime", msg.getSendTime()); if (_log.shouldLog(Log.INFO)) { _log.info("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after " + msg.getSendTime() + "/" @@ -1019,7 +1019,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { boolean ok = recvUnencryptedI2NP(); if (!ok) { _log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data"); - _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1, getUptime()); + _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1); return; } byte swap[] = _prevReadBlock; @@ -1040,7 +1040,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _curReadState.receiveBlock(_decryptBlockBuf); if (_curReadState.getSize() > BUFFER_SIZE) { _log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString()); - _context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize(), getUptime()); + _context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize()); close(); return false; } else { @@ -1068,7 +1068,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (read != expected) { if (_log.shouldLog(Log.WARN)) _log.warn("I2NP metadata message had a bad CRC value"); - _context.statManager().addRateData("ntcp.corruptMetaCRC", 1, getUptime()); + _context.statManager().addRateData("ntcp.corruptMetaCRC", 1); close(); return; } else { @@ -1076,11 +1076,11 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (Math.abs(newSkew*1000) > Router.CLOCK_FUDGE_FACTOR) { if (_log.shouldLog(Log.WARN)) _log.warn("Peer's skew jumped too far (from " + _clockSkew + " s to " + newSkew + " s): " + toString()); - _context.statManager().addRateData("ntcp.corruptSkew", newSkew, getUptime()); + _context.statManager().addRateData("ntcp.corruptSkew", newSkew); close(); return; } - _context.statManager().addRateData("ntcp.receiveMeta", newSkew, getUptime()); + _context.statManager().addRateData("ntcp.receiveMeta", newSkew); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received NTCP metadata, old skew of " + _clockSkew + " s, new skew of " + newSkew + "s."); _clockSkew = newSkew; @@ -1126,7 +1126,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } - public long getReadTime() { return _curReadState.getReadTime(); } + //public long getReadTime() { return _curReadState.getReadTime(); } private static class DataBuf { final byte data[]; @@ -1164,6 +1164,16 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } /** + * Read the unencrypted message (16 bytes at a time). + * verify the checksum, and pass it on to + * an I2NPMessageHandler. The unencrypted message is encoded as follows: + * + *
+ * +-------+-------+--//--+---//----+-------+-------+-------+-------+ + * | sizeof(data) | data | padding | adler checksum of sz+data+pad | + * +-------+-------+--//--+---//----+-------+-------+-------+-------+ + *+ * * sizeof(data)+data+pad+crc. * * perhaps to reduce the per-con memory footprint, we can acquire/release @@ -1176,13 +1186,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private DataBuf _dataBuf; private int _nextWrite; private long _expectedCrc; - private Adler32 _crc; + private final Adler32 _crc; private long _stateBegin; private int _blocks; + public ReadState() { _crc = new Adler32(); init(); } + private void init() { _size = -1; _nextWrite = 0; @@ -1194,7 +1206,13 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { releaseReadBuf(_dataBuf); _dataBuf = null; } + public int getSize() { return _size; } + + /** + * Caller must synchronize + * @param buf 16 bytes + */ public void receiveBlock(byte buf[]) { if (_size == -1) { receiveInitial(buf); @@ -1202,6 +1220,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { receiveSubsequent(buf); } } + + /**** public long getReadTime() { long now = System.currentTimeMillis(); long readTime = now - _stateBegin; @@ -1210,6 +1230,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { else return readTime; } + ****/ + + /** @param buf 16 bytes */ private void receiveInitial(byte buf[]) { _stateBegin = System.currentTimeMillis(); _size = (int)DataHelper.fromLong(buf, 0, 2); @@ -1227,6 +1250,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _log.debug("new I2NP message with size: " + _size + " for message " + _messagesRead); } } + + /** @param buf 16 bytes */ private void receiveSubsequent(byte buf[]) { _blocks++; int remaining = _size - _nextWrite; @@ -1249,6 +1274,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // _log.debug("update read state with another block (remaining: " + remaining + ")"); } } + + /** @param buf 16 bytes */ private void receiveLastBlock(byte buf[]) { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("block remaining in the last block: " + (buf.length-blockUsed)); @@ -1276,8 +1303,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (_log.shouldLog(Log.INFO)) _log.info("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0) + " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes on " + NTCPConnection.this.toString()); - _context.statManager().addRateData("ntcp.receiveTime", timeToRecv, timeToRecv); - _context.statManager().addRateData("ntcp.receiveSize", _size, timeToRecv); + _context.statManager().addRateData("ntcp.receiveTime", timeToRecv); + _context.statManager().addRateData("ntcp.receiveSize", _size); // FIXME move end of try block here. // On the input side, move releaseHandler() and init() to a finally block. @@ -1294,14 +1321,14 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error parsing I2NP message", ioe); - _context.statManager().addRateData("ntcp.corruptI2NPIOE", 1, getUptime()); + _context.statManager().addRateData("ntcp.corruptI2NPIOE", 1); close(); // handler and databuf are lost return; } catch (I2NPMessageException ime) { if (_log.shouldLog(Log.WARN)) _log.warn("Error parsing I2NP message", ime); - _context.statManager().addRateData("ntcp.corruptI2NPIME", 1, getUptime()); + _context.statManager().addRateData("ntcp.corruptI2NPIME", 1); close(); // handler and databuf are lost return; @@ -1309,7 +1336,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } else { if (_log.shouldLog(Log.WARN)) _log.warn("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks); - _context.statManager().addRateData("ntcp.corruptI2NPCRC", 1, getUptime()); + _context.statManager().addRateData("ntcp.corruptI2NPCRC", 1); // FIXME should we try to read in the message and keep going? close(); // databuf is lost