diff --git a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java
index 3d6961f08..a93993edb 100644
--- a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java
+++ b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java
@@ -107,7 +107,8 @@ public class SummaryHelper {
}
public boolean allowReseed() {
- return (_context.netDb().getKnownRouters() < 30);
+ return (_context.netDb().getKnownRouters() < 30) ||
+ Boolean.valueOf(_context.getProperty("i2p.alwaysAllowReseed", "false")).booleanValue();
}
public int getAllPeers() { return _context.netDb().getKnownRouters(); }
diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java
index ea8454611..d3b7cbb06 100644
--- a/core/java/src/net/i2p/data/DataHelper.java
+++ b/core/java/src/net/i2p/data/DataHelper.java
@@ -822,7 +822,7 @@ public class DataHelper {
return (ms / (60 * 1000)) + "m";
} else if (ms < 3 * 24 * 60 * 60 * 1000) {
return (ms / (60 * 60 * 1000)) + "h";
- } else if (ms > 365 * 24 * 60 * 60 * 1000) {
+ } else if (ms > 365l * 24l * 60l * 60l * 1000l) {
return "n/a";
} else {
return (ms / (24 * 60 * 60 * 1000)) + "d";
diff --git a/history.txt b/history.txt
index bfc35f21f..966363caa 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,15 @@
-$Id: history.txt,v 1.491 2006-07-01 17:44:34 complication Exp $
+$Id: history.txt,v 1.492 2006-07-04 16:17:44 jrandom Exp $
+
+2006-07-14 jrandom
+ * Improve the multitransport shitlisting (thanks Complication!)
+ * Allow routers with a capacity of 16-32KBps to be used in tunnels under
+ the default configuration (thanks for the stats Complication!)
+ * Properly allow older router references to load on startup
+ (thanks bar, Complication, et al!)
+ * Add a new "i2p.alwaysAllowReseed" advanced config property, though
+ hopefully today's changes should make this unnecessary (thanks void!)
+ * Improved NTCP buffering
+ * Close NTCP connections if we are too backlogged when writing to them
2006-07-04 jrandom
* New NIO-based tcp transport (NTCP), enabled by default for outbound
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index e75936cc3..d601e83b5 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
- public final static String ID = "$Revision: 1.430 $ $Date: 2006-07-01 17:44:36 $";
+ public final static String ID = "$Revision: 1.431 $ $Date: 2006-07-04 16:18:17 $";
public final static String VERSION = "0.6.1.21";
- public final static long BUILD = 2;
+ public final static long BUILD = 3;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/Shitlist.java b/router/java/src/net/i2p/router/Shitlist.java
index ea3adf761..7f8e98ccf 100644
--- a/router/java/src/net/i2p/router/Shitlist.java
+++ b/router/java/src/net/i2p/router/Shitlist.java
@@ -101,7 +101,10 @@ public class Shitlist {
}
}
- _context.netDb().fail(peer);
+ if (transport == null) {
+ // we hate the peer on *any* transport
+ _context.netDb().fail(peer);
+ }
//_context.tunnelManager().peerFailed(peer);
//_context.messageRegistry().peerFailed(peer);
if (!wasAlready)
@@ -192,6 +195,9 @@ public class Shitlist {
buf.append(" (?)");
buf.append(" expiring in ");
buf.append(DataHelper.formatDuration(entry.expireOn-_context.clock().now()));
+ Set transports = entry.transports;
+ if ( (transports != null) && (transports.size() > 0) )
+ buf.append(" on the following transports: ").append(transports);
if (entry.cause != null) {
buf.append("
\n");
buf.append(entry.cause);
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
index 3d8b7737b..6ffe4f770 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
@@ -117,7 +117,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
* dont accept any dbDtore of a router over 24 hours old (unless we dont
* know anyone or just started up)
*/
- private final static long ROUTER_INFO_EXPIRATION = 24*60*60*1000l;
+ private final static long ROUTER_INFO_EXPIRATION = 3*24*60*60*1000l;
public KademliaNetworkDatabaseFacade(RouterContext context) {
_context = context;
@@ -278,8 +278,14 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
_context.jobQueue().addJob(new DataPublisherJob(_context, this));
// expire old leases
_context.jobQueue().addJob(new ExpireLeasesJob(_context, this));
- // expire some routers in overly full kbuckets
- _context.jobQueue().addJob(new ExpireRoutersJob(_context, this));
+
+ // the ExpireRoutersJob never fired since the tunnel pool manager lied
+ // and said all peers are in use (for no good reason), but this expire
+ // thing was a bit overzealous anyway, since the kbuckets are only
+ // relevent when the network is huuuuuuuuge.
+ //// expire some routers in overly full kbuckets
+ ////_context.jobQueue().addJob(new ExpireRoutersJob(_context, this));
+
if (!_quiet) {
// fill the passive queue periodically
_context.jobQueue().addJob(new DataRepublishingSelectorJob(_context, this));
@@ -643,7 +649,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid routerInfo signature! forged router structure! router = " + routerInfo);
return "Invalid routerInfo signature on " + key.toBase64();
- } else if (!routerInfo.isCurrent(ROUTER_INFO_EXPIRATION)) {
+ } else if (!routerInfo.isCurrent(ROUTER_INFO_EXPIRATION) && (_context.router().getUptime() > 60*60*1000) ) {
if (routerInfo.getNetworkId() != Router.NETWORK_ID) {
_context.shitlist().shitlistRouter(key, "Peer is not in our network");
return "Peer is not in our network (" + routerInfo.getNetworkId() + ", wants "
@@ -661,7 +667,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
+ " peers left (curPeer: " + key.toBase64() + " published on "
+ new Date(routerInfo.getPublished()));
}
- } else if (routerInfo.getPublished() > now + Router.CLOCK_FUDGE_FACTOR) {
+ } else if (routerInfo.getPublished() > now + 2*Router.CLOCK_FUDGE_FACTOR) {
long age = routerInfo.getPublished() - _context.clock().now();
if (_log.shouldLog(Log.INFO))
_log.info("Peer " + key.toBase64() + " published their routerInfo in the future?! ["
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java
index d5bbc8372..63ae0f60f 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java
@@ -350,9 +350,10 @@ class PersistentDataStore extends TransientDataStore {
ri.readBytes(fis);
if (ri.getNetworkId() != Router.NETWORK_ID) {
corrupt = true;
- if (_log.shouldLog(Log.WARN))
- _log.warn("The router is from a different network: "
- + ri.getIdentity().calculateHash().toBase64());
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("The router "
+ + ri.getIdentity().calculateHash().toBase64()
+ + " is from a different network");
} else {
try {
_facade.store(ri.getIdentity().getHash(), ri);
@@ -362,14 +363,16 @@ class PersistentDataStore extends TransientDataStore {
}
}
} catch (DataFormatException dfe) {
- _log.warn("Error reading the routerInfo from " + _routerFile.getAbsolutePath(), dfe);
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("Error reading the routerInfo from " + _routerFile.getName(), dfe);
corrupt = true;
} finally {
if (fis != null) try { fis.close(); } catch (IOException ioe) {}
}
if (corrupt) _routerFile.delete();
} catch (IOException ioe) {
- _log.warn("Error reading the RouterInfo from " + _routerFile.getAbsolutePath(), ioe);
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("Unable to read the router reference in " + _routerFile.getName(), ioe);
}
}
}
diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
index 1925f8a22..cb7f411f9 100644
--- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
+++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
@@ -67,6 +67,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public TransportBid getNextBid(OutNetMessage msg) {
return _manager.getNextBid(msg);
}
+ int getTransportCount() { return _manager.getTransportCount(); }
public void processMessage(OutNetMessage msg) {
//GetBidsJob j = new GetBidsJob(_context, this, msg);
diff --git a/router/java/src/net/i2p/router/transport/GetBidsJob.java b/router/java/src/net/i2p/router/transport/GetBidsJob.java
index 01b81e339..9126494f5 100644
--- a/router/java/src/net/i2p/router/transport/GetBidsJob.java
+++ b/router/java/src/net/i2p/router/transport/GetBidsJob.java
@@ -62,8 +62,11 @@ public class GetBidsJob extends JobImpl {
TransportBid bid = facade.getNextBid(msg);
if (bid == null) {
- if (msg.getFailedTransports().size() == 0) {
+ int failedCount = msg.getFailedTransports().size();
+ if (failedCount == 0) {
context.shitlist().shitlistRouter(to, "We share no common transports with them");
+ } else if (failedCount >= facade.getTransportCount()) {
+ // fail after all transports were unsuccessful
context.netDb().fail(to);
}
fail(context, msg);
diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java
index 08a2ac522..a53518b9d 100644
--- a/router/java/src/net/i2p/router/transport/TransportManager.java
+++ b/router/java/src/net/i2p/router/transport/TransportManager.java
@@ -117,6 +117,8 @@ public class TransportManager implements TransportEventListener {
_transports.clear();
}
+ int getTransportCount() { return _transports.size(); }
+
private boolean isSupported(Set addresses, Transport t) {
for (Iterator iter = addresses.iterator(); iter.hasNext(); ) {
RouterAddress addr = (RouterAddress)iter.next();
diff --git a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java
index 120074d18..8e5b8dce8 100644
--- a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java
+++ b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java
@@ -561,11 +561,14 @@ public class EstablishState {
/** anything left over in the byte buffer after verification is extra */
private void prepareExtra(ByteBuffer buf) {
- _extra = new byte[buf.remaining()];
- buf.get(_extra);
- _received += _extra.length;
+ int remaining = buf.remaining();
+ if (remaining > 0) {
+ _extra = new byte[remaining];
+ buf.get(_extra);
+ _received += remaining;
+ }
if (_log.shouldLog(Log.DEBUG))
- _log.debug(prefix() + "prepare extra " + _extra.length + " (total received: " + _received + ")");
+ _log.debug(prefix() + "prepare extra " + remaining + " (total received: " + _received + ")");
}
/**
diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java
index 14ec38a9c..abbc42151 100644
--- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java
+++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java
@@ -467,7 +467,7 @@ public class EventPumper implements Runnable {
_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage());
con.close(false);
} else {
- _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), NTCPTransport.STYLE, "unable to connect: " + ioe.getMessage());
+ _context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "unable to connect: " + ioe.getMessage(), NTCPTransport.STYLE);
con.close(true);
}
}
diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java
index 07e5910f9..5451d3d5c 100644
--- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java
+++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java
@@ -94,6 +94,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/** unencrypted outbound metadata buffer */
private byte _meta[] = new byte[16];
private boolean _sendingMeta;
+ /** how many consecutive sends were failed due to (estimated) send queue time */
+ private int _consecutiveBacklog;
private static final int META_FREQUENCY = 10*60*1000;
@@ -122,6 +124,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_conKey = key;
_conKey.attach(this);
_sendingMeta = false;
+ _consecutiveBacklog = 0;
transport.establishing(this);
}
/**
@@ -147,6 +150,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_curReadState = new ReadState();
_remotePeer = remotePeer;
_sendingMeta = false;
+ _consecutiveBacklog = 0;
//_establishState = new EstablishState(ctx, transport, this);
transport.establishing(this);
}
@@ -187,10 +191,18 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
public long getMessagesSent() { return _messagesWritten; }
public long getMessagesReceived() { return _messagesRead; }
- public long getOutboundQueueSize() { synchronized (_outbound) { return _outbound.size(); } }
+ public long getOutboundQueueSize() {
+ synchronized (_outbound) {
+ int queued = _outbound.size();
+ if (_currentOutbound != null)
+ queued++;
+ return queued;
+ }
+ }
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; }
public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; }
public long getTimeSinceCreated() { return System.currentTimeMillis()-_created; }
+ public int getConsecutiveBacklog() { return _consecutiveBacklog; }
public boolean isClosed() { return _closed; }
public void close() { close(false); }
@@ -232,9 +244,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (tooBacklogged()) {
boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu
boolean successful = false;
+ _consecutiveBacklog++;
_transport.afterSend(msg, successful, allowRequeue, msg.getLifetime());
+ if (_consecutiveBacklog > 50) { // waaay too backlogged
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ") sending to " + _remotePeer.calculateHash().toBase64());
+ close();
+ }
return;
}
+ _consecutiveBacklog = 0;
int enqueued = 0;
if (FAST_LARGE)
bufferedPrepare(msg);
@@ -247,8 +266,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (_established && _currentOutbound == null)
_transport.getWriter().wantsWrite(this);
}
-
- private boolean tooBacklogged() {
+
+ private long queueTime() {
long queueTime = 0;
int size = 0;
synchronized (_outbound) {
@@ -257,9 +276,18 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if ( (msg == null) && (size > 0) )
msg = (OutNetMessage)_outbound.get(0);
if (msg == null)
- return false;
+ return 0;
queueTime = msg.getSendTime(); // does not include any of the pre-send(...) preparation
}
+ return queueTime;
+ }
+ private boolean tooBacklogged() {
+ long queueTime = queueTime();
+ if (queueTime <= 0) return false;
+ int size = 0;
+ synchronized (_outbound) {
+ size = _outbound.size();
+ }
// perhaps we could take into account the size of the queued messages too, our
// current transmission rate, and how much time is left before the new message's expiration?
@@ -665,7 +693,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
synchronized (_writeBufs) {
return _writeBufs.size();
}
- }
+ }
/**
* We have read the data in the buffer, but we can't process it locally yet,
@@ -946,18 +974,51 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
}
+
+ public long getReadTime() { return _curReadState.getReadTime(); }
+
+ private static class DataBuf {
+ byte data[];
+ ByteArrayInputStream bais;
+ public DataBuf() {
+ data = new byte[16*1024];
+ bais = new ByteArrayInputStream(data);
+ }
+ }
+
+ private static int MAX_DATA_READ_BUFS = 16;
+ private static List _dataReadBufs = new ArrayList(16);
+ private static DataBuf acquireReadBuf() {
+ synchronized (_dataReadBufs) {
+ if (_dataReadBufs.size() > 0)
+ return (DataBuf)_dataReadBufs.remove(0);
+ }
+ return new DataBuf();
+ }
+ private static void releaseReadBuf(DataBuf buf) {
+ buf.bais.reset();
+ synchronized (_dataReadBufs) {
+ if (_dataReadBufs.size() < MAX_DATA_READ_BUFS)
+ _dataReadBufs.add(buf);
+ }
+ }
+ /**
+ * sizeof(data)+data+pad+crc.
+ *
+ * perhaps to reduce the per-con memory footprint, we can acquire/release
+ * the ReadState._data and ._bais when _size is > 0, so there are only
+ * J 16KB buffers for the cons actually transmitting, instead of one per
+ * con (including idle ones)
+ */
private class ReadState {
private int _size;
- private byte _data[];
- private ByteArrayInputStream _bais;
+ private DataBuf _dataBuf;
private int _nextWrite;
private long _expectedCrc;
private Adler32 _crc;
private long _stateBegin;
private int _blocks;
public ReadState() {
- _data = new byte[16*1024];
- _bais = new ByteArrayInputStream(_data);
_crc = new Adler32();
init();
}
@@ -968,7 +1029,9 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_stateBegin = -1;
_blocks = -1;
_crc.reset();
- _bais.reset();
+ if (_dataBuf != null)
+ releaseReadBuf(_dataBuf);
+ _dataBuf = null;
}
public int getSize() { return _size; }
public void receiveBlock(byte buf[]) {
@@ -978,15 +1041,24 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
receiveSubsequent(buf);
}
}
+ public long getReadTime() {
+ long now = System.currentTimeMillis();
+ long readTime = now - _stateBegin;
+ if (readTime >= now)
+ return -1;
+ else
+ return readTime;
+ }
private void receiveInitial(byte buf[]) {
_stateBegin = System.currentTimeMillis();
_size = (int)DataHelper.fromLong(buf, 0, 2);
if (_size == 0) {
readMeta(buf);
init();
- return;
+ return;
} else {
- System.arraycopy(buf, 2, _data, 0, buf.length-2);
+ _dataBuf = acquireReadBuf();
+ System.arraycopy(buf, 2, _dataBuf.data, 0, buf.length-2);
_nextWrite += buf.length-2;
_crc.update(buf);
_blocks++;
@@ -999,7 +1071,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
int remaining = _size - _nextWrite;
int blockUsed = Math.min(buf.length, remaining);
if (remaining > 0) {
- System.arraycopy(buf, 0, _data, _nextWrite, blockUsed);
+ System.arraycopy(buf, 0, _dataBuf.data, _nextWrite, blockUsed);
_nextWrite += blockUsed;
remaining -= blockUsed;
}
@@ -1037,7 +1109,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// depend upon EOF to stop reading, so its ok that the _bais could
// in theory return more data than _size bytes, since h.readMessage
// stops when it should.
- I2NPMessage read = h.readMessage(_bais);
+ I2NPMessage read = h.readMessage(_dataBuf.bais);
long timeToRecv = System.currentTimeMillis() - _stateBegin;
releaseHandler(h);
if (_log.shouldLog(Log.DEBUG))
@@ -1073,118 +1145,4 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
}
}
-
- /**
- * sizeof(data)+data+pad+crc
- */
- private class ReadState2 {
- private int _size;
- private byte _dataBegin[];
- private byte _dataRemaining[];
- private int _dataRemainingIndex;
- private long _expectedCrc;
- private Adler32 _crc;
- private long _stateBegin;
- private int _blocks;
- private boolean _wasMeta;
- public ReadState2(byte buf[]) {
- _stateBegin = System.currentTimeMillis();
- _size = (int)DataHelper.fromLong(buf, 0, 2);
- if (_size == 0) {
- readMeta(buf);
- _wasMeta = true;
- return;
- } else {
- _wasMeta = false;
- }
- _dataBegin = new byte[buf.length-2];
- System.arraycopy(buf, 2, _dataBegin, 0, _dataBegin.length);
- _dataRemaining = new byte[_size-_dataBegin.length];
- _dataRemainingIndex = 0;
- _crc = new Adler32();
- _crc.update(buf);
- _blocks++;
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("new read state with size: " + _size + " remaining: " + _dataRemaining.length + " for message " + _messagesRead);
- }
- public int size() { return _size; }
- public int block() { return _blocks; }
- public boolean wasMeta() { return _wasMeta; }
- public void recv(byte buf[]) {
- _blocks++;
- int remaining = _dataRemaining.length-_dataRemainingIndex;
- int blockUsed = Math.min(buf.length, remaining);
- if (remaining > 0) {
- System.arraycopy(buf, 0, _dataRemaining, _dataRemainingIndex, blockUsed);
- _dataRemainingIndex += blockUsed;
- remaining -= blockUsed;
- }
- if ( (remaining <= 0) && (buf.length-blockUsed < 4) ) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("crc wraparound required on block " + _blocks + " in message " + _messagesRead);
- _crc.update(buf);
- return;
- } else if (remaining <= 0) {
- //if (_log.shouldLog(Log.DEBUG))
- // _log.debug("block remaining in the last block: " + (buf.length-blockUsed));
-
- // on the last block
- _expectedCrc = DataHelper.fromLong(buf, buf.length-4, 4);
- _crc.update(buf, 0, buf.length-4);
- long val = _crc.getValue();
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size
- + " remaining=" + remaining);
- if (val == _expectedCrc) {
- try {
- I2NPMessageHandler h = acquireHandler(_context);
- I2NPMessage read = null;
- if (false) {
- byte msg[] = new byte[_size];
- System.arraycopy(_dataBegin, 0, msg, 0, _dataBegin.length);
- System.arraycopy(_dataRemaining, 0, msg, _dataBegin.length, _dataRemaining.length);
- read = h.readMessage(msg);
- } else {
- read = h.readMessage(new SequenceInputStream(new ByteArrayInputStream(_dataBegin),
- new ByteArrayInputStream(_dataRemaining)));
- }
- long timeToRecv = System.currentTimeMillis() - _stateBegin;
- releaseHandler(h);
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("I2NP message " + _messagesRead + "/" + (read != null ? read.getUniqueId() : 0)
- + " received after " + timeToRecv + " with " + _size +"/"+ (_blocks*16) + " bytes");
- _context.statManager().addRateData("ntcp.receiveTime", timeToRecv, timeToRecv);
- _context.statManager().addRateData("ntcp.receiveSize", _size, timeToRecv);
- if (read != null) {
- _transport.messageReceived(read, _remotePeer, null, timeToRecv, _size);
- if (_messagesRead <= 0)
- enqueueInfoMessage();
- _lastReceiveTime = System.currentTimeMillis();
- _messagesRead++;
- }
- } catch (IOException ioe) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Error parsing I2NP message", ioe);
- close();
- return;
- } catch (I2NPMessageException ime) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Error parsing I2NP message", ime);
- close();
- return;
- }
- } else {
- if (_log.shouldLog(Log.ERROR))
- _log.error("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " remaining=" + remaining + " blocks " + _blocks);
- close();
- return;
- }
- _curReadState = null;
- } else {
- _crc.update(buf);
- //if (_log.shouldLog(Log.DEBUG))
- // _log.debug("update read state with another block (remaining: " + remaining + ")");
- }
- }
- }
}
diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java
index 5f872bdef..e0d7669e4 100644
--- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java
+++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java
@@ -379,7 +379,7 @@ public class NTCPTransport extends TransportImpl {
cons = new HashMap(_conByIdent);
_conByIdent.clear();
}
- for (Iterator iter = cons.keySet().iterator(); iter.hasNext(); ) {
+ for (Iterator iter = cons.values().iterator(); iter.hasNext(); ) {
NTCPConnection con = (NTCPConnection)iter.next();
con.close();
}
@@ -400,18 +400,22 @@ public class NTCPTransport extends TransportImpl {
long sendTotal = 0;
long recvTotal = 0;
int numPeers = 0;
+ int readingPeers = 0;
+ int writingPeers = 0;
StringBuffer buf = new StringBuffer(512);
buf.append("NTCP connections: ").append(peers.size()).append("
\n");
buf.append("
| peer | "); - buf.append("uptime | "); - buf.append("idle | "); - buf.append("sent | "); - buf.append("received | "); - buf.append("out/in | "); - buf.append("out queue | "); - buf.append("skew | "); + buf.append("||
| peer | "); + buf.append("uptime | "); + buf.append("idle | "); + buf.append("sent | "); + buf.append("received | "); + buf.append("out/in | "); + buf.append("out queue | "); + buf.append("backlogged? | "); + buf.append("reading? | "); + buf.append("skew | "); buf.append("").append(con.getMessagesReceived()); buf.append(" | ").append(formatRate(con.getSendRate()/1024)); buf.append("/").append(formatRate(con.getRecvRate()/1024)).append("KBps"); - buf.append(" | ").append(con.getOutboundQueueSize()); + long outQueue = con.getOutboundQueueSize(); + if (outQueue <= 0) { + buf.append(" | No messages"); + } else { + buf.append(" | ").append(outQueue).append(" message"); + if (outQueue > 1) + buf.append("s"); + writingPeers++; + } + buf.append(" | ").append(con.getConsecutiveBacklog() > 0 ? "true" : "false"); + long readTime = con.getReadTime(); + if (readTime <= 0) { + buf.append(" | No"); + } else { + buf.append(" | For ").append(DataHelper.formatDuration(readTime)); + readingPeers++; + } buf.append(" | ").append(DataHelper.formatDuration(con.getClockSkew())); buf.append(" | \n"); out.write(buf.toString()); @@ -433,6 +453,8 @@ public class NTCPTransport extends TransportImpl { } buf.append("