From a52dd57215856790f751a6a7e1dd89952b3fdf70 Mon Sep 17 00:00:00 2001 From: jrandom Date: Tue, 18 Jul 2006 20:08:00 +0000 Subject: [PATCH] * 2006-07-18 0.6.1.22 released 2006-07-18 jrandom * Add a failsafe to the NTCP transport to make sure we keep pumping writes when we should. * Properly reallow 16-32KBps routers in the default config (thanks Complication!) --- .../net/i2p/router/web/StatSummarizer.java | 4 + core/java/src/net/i2p/CoreVersion.java | 4 +- history.txt | 10 +- initialNews.xml | 4 +- installer/install.xml | 2 +- news.xml | 6 +- .../src/net/i2p/router/RouterVersion.java | 6 +- .../router/transport/ntcp/EventPumper.java | 138 ++++++++++++------ .../router/transport/ntcp/NTCPAddress.java | 2 +- .../router/transport/ntcp/NTCPConnection.java | 8 +- .../router/transport/ntcp/NTCPTransport.java | 2 + .../tunnel/pool/TunnelPeerSelector.java | 15 +- 12 files changed, 141 insertions(+), 60 deletions(-) diff --git a/apps/routerconsole/java/src/net/i2p/router/web/StatSummarizer.java b/apps/routerconsole/java/src/net/i2p/router/web/StatSummarizer.java index 9c0e1c794..55c0b4946 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/StatSummarizer.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/StatSummarizer.java @@ -60,6 +60,10 @@ public class StatSummarizer implements Runnable { ",tunnel.buildExploratoryExpire.60000" + ",client.sendAckTime.60000" + ",client.dispatchNoACK.60000" + + ",ntcp.sendTime.60000" + + ",ntcp.transmitTime.60000" + + ",ntcp.sendBacklogTime.60000" + + ",ntcp.receiveTime.60000" + ",transport.sendMessageFailureLifetime.60000" + ",transport.sendProcessingTime.60000"; diff --git a/core/java/src/net/i2p/CoreVersion.java b/core/java/src/net/i2p/CoreVersion.java index d71808f11..b91639b66 100644 --- a/core/java/src/net/i2p/CoreVersion.java +++ b/core/java/src/net/i2p/CoreVersion.java @@ -14,8 +14,8 @@ package net.i2p; * */ public class CoreVersion { - public final static String ID = "$Revision: 1.63 $ $Date: 2006-06-04 17:25:14 $"; - public final static String VERSION = "0.6.1.21"; + public final static String ID = "$Revision: 1.64 $ $Date: 2006-06-13 21:17:43 $"; + public final static String VERSION = "0.6.1.22"; public static void main(String args[]) { System.out.println("I2P Core version: " + VERSION); diff --git a/history.txt b/history.txt index f14734108..b6f5dcddf 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,12 @@ -$Id: history.txt,v 1.493 2006-07-14 13:08:44 jrandom Exp $ +$Id: history.txt,v 1.494 2006-07-16 12:20:46 complication Exp $ + +* 2006-07-18 0.6.1.22 released + +2006-07-18 jrandom + * Add a failsafe to the NTCP transport to make sure we keep + pumping writes when we should. + * Properly reallow 16-32KBps routers in the default config + (thanks Complication!) 2006-07-16 Complication * Collect tunnel build agree/reject/expire statistics diff --git a/initialNews.xml b/initialNews.xml index 4c47a0220..e6962eed2 100644 --- a/initialNews.xml +++ b/initialNews.xml @@ -1,5 +1,5 @@ - - + i2p - 0.6.1.21 + 0.6.1.22 diff --git a/news.xml b/news.xml index 87b0b752c..6a03d02a3 100644 --- a/news.xml +++ b/news.xml @@ -1,5 +1,5 @@ - - + • -2006-06-14: 0.6.1.21 released +2006-07-14: 0.6.1.22 released
• 2006-06-13: diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 1d60bd9f6..c92547853 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.432 $ $Date: 2006-07-14 13:08:51 $"; - public final static String VERSION = "0.6.1.21"; - public final static long BUILD = 4; + public final static String ID = "$Revision: 1.433 $ $Date: 2006-07-16 12:20:47 $"; + public final static String VERSION = "0.6.1.22"; + public final static long BUILD = 0; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index abbc42151..0e5bb8d11 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -3,14 +3,7 @@ package net.i2p.router.transport.ntcp; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; -import java.nio.channels.NotYetConnectedException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; +import java.nio.channels.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -39,6 +32,11 @@ public class EventPumper implements Runnable { private static final int BUF_SIZE = 8*1024; private static final int MAX_CACHE_SIZE = 64; + /** + * every 30s or so, iterate across all ntcp connections just to make sure + * we have their interestOps set properly (and to expire any looong idle cons) + */ + private static final long FAILSAFE_ITERATION_FREQ = 60*1000l; public EventPumper(RouterContext ctx, NTCPTransport transport) { _context = ctx; @@ -82,6 +80,7 @@ public class EventPumper implements Runnable { } public void run() { + long lastFailsafeIteration = System.currentTimeMillis(); List bufList = new ArrayList(16); while (_alive && _selector.isOpen()) { try { @@ -107,43 +106,56 @@ public class EventPumper implements Runnable { continue; } - for (Iterator iter = selected.iterator(); iter.hasNext(); ) { + processKeys(selected); + selected.clear(); + + if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < System.currentTimeMillis()) { + // in the *cough* unthinkable possibility that there are bugs in + // the code, lets periodically pass over all NTCP connections and + // make sure that anything which should be able to write has been + // properly marked as such, etc + lastFailsafeIteration = System.currentTimeMillis(); try { - SelectionKey key = (SelectionKey)iter.next(); - int ops = key.readyOps(); - boolean accept = (ops & SelectionKey.OP_ACCEPT) != 0; - boolean connect = (ops & SelectionKey.OP_CONNECT) != 0; - boolean read = (ops & SelectionKey.OP_READ) != 0; - boolean write = (ops & SelectionKey.OP_WRITE) != 0; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ready ops for : " + key - + " accept? " + accept + " connect? " + connect - + " read? " + read - + "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0) - + " write? " + write - + "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0) - ); - if (accept) { - processAccept(key); + Set all = _selector.keys(); + + int failsafeWrites = 0; + int failsafeCloses = 0; + + long expireIdleWriteTime = 60*60*1000l + _context.random().nextLong(60*60*1000l); + for (Iterator iter = all.iterator(); iter.hasNext(); ) { + try { + SelectionKey key = (SelectionKey)iter.next(); + Object att = key.attachment(); + if (!(att instanceof NTCPConnection)) + continue; // to the next con + NTCPConnection con = (NTCPConnection)att; + + if ( (con.getWriteBufCount() > 0) && + ((key.interestOps() & SelectionKey.OP_WRITE) == 0) ) { + // the data queued to be sent has already passed through + // the bw limiter and really just wants to get shoved + // out the door asap. + key.interestOps(SelectionKey.OP_WRITE | key.interestOps()); + failsafeWrites++; + } + + if ( (con.getTimeSinceSend() > expireIdleWriteTime) && (con.getMessagesSent() > 0) ) { + // we haven't sent anything in a really long time, so lets just close 'er up + con.close(); + failsafeCloses++; + } + } catch (CancelledKeyException cke) { + // cancelled while updating the interest ops. ah well + } + if (failsafeWrites > 0) + _context.statManager().addRateData("ntcp.failsafeWrites", failsafeWrites, 0); + if (failsafeCloses > 0) + _context.statManager().addRateData("ntcp.failsafeCloses", failsafeCloses, 0); } - if (connect) { - key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); - processConnect(key); - } - if (read) { - key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); - processRead(key); - } - if (write) { - key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); - processWrite(key); - } - } catch (CancelledKeyException cke) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("key cancelled"); + } catch (ClosedSelectorException cse) { + continue; } } - selected.clear(); } catch (RuntimeException re) { _log.log(Log.CRIT, "Error in the event pumper", re); } @@ -184,6 +196,45 @@ public class EventPumper implements Runnable { synchronized (_wantsWrite) { _wantsWrite.clear(); } } + private void processKeys(Set selected) { + for (Iterator iter = selected.iterator(); iter.hasNext(); ) { + try { + SelectionKey key = (SelectionKey)iter.next(); + int ops = key.readyOps(); + boolean accept = (ops & SelectionKey.OP_ACCEPT) != 0; + boolean connect = (ops & SelectionKey.OP_CONNECT) != 0; + boolean read = (ops & SelectionKey.OP_READ) != 0; + boolean write = (ops & SelectionKey.OP_WRITE) != 0; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("ready ops for : " + key + + " accept? " + accept + " connect? " + connect + + " read? " + read + + "/" + ((key.interestOps()&SelectionKey.OP_READ)!= 0) + + " write? " + write + + "/" + ((key.interestOps()&SelectionKey.OP_WRITE)!= 0) + ); + if (accept) { + processAccept(key); + } + if (connect) { + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); + processConnect(key); + } + if (read) { + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + processRead(key); + } + if (write) { + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + processWrite(key); + } + } catch (CancelledKeyException cke) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("key cancelled"); + } + } + } + public void wantsWrite(NTCPConnection con, byte data[]) { ByteBuffer buf = ByteBuffer.wrap(data); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write", null, null);//con, buf); @@ -305,7 +356,9 @@ public class EventPumper implements Runnable { } } catch (IOException ioe) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Error processing connection", ioe); - } + } catch (NoConnectionPendingException ncpe) { + // ignore + } } private void processRead(SelectionKey key) { @@ -455,6 +508,7 @@ public class EventPumper implements Runnable { con.setKey(key); try { NTCPAddress naddr = con.getRemoteAddress(); + if (naddr.getPort() <= 0) throw new IOException("Invalid NTCP address: " + naddr); InetSocketAddress saddr = new InetSocketAddress(naddr.getHost(), naddr.getPort()); boolean connected = con.getChannel().connect(saddr); if (connected) { diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPAddress.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPAddress.java index 2bf9fc176..e7ab7c3bd 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPAddress.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPAddress.java @@ -64,7 +64,7 @@ public class NTCPAddress { } else { _host = host.trim(); String port = addr.getOptions().getProperty(PROP_PORT); - if ( (port != null) && (port.trim().length() > 0) ) { + if ( (port != null) && (port.trim().length() > 0) && !("null".equals(port)) ) { try { _port = Integer.parseInt(port.trim()); } catch (NumberFormatException nfe) { diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 5451d3d5c..179df146a 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -246,9 +246,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { boolean successful = false; _consecutiveBacklog++; _transport.afterSend(msg, successful, allowRequeue, msg.getLifetime()); - if (_consecutiveBacklog > 50) { // waaay too backlogged + if (_consecutiveBacklog > 10) { // waaay too backlogged + boolean wantsWrite = false; + try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (Exception e) {} + int blocks = 0; + synchronized (_writeBufs) { blocks = _writeBufs.size(); } if (_log.shouldLog(Log.ERROR)) - _log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ") sending to " + _remotePeer.calculateHash().toBase64()); + _log.error("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64()); close(); } return; diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index e0d7669e4..36a3cbc23 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -60,6 +60,8 @@ public class NTCPTransport extends TransportImpl { _context.statManager().createRateStat("ntcp.receiveTime", "How long it takes to receive an inbound message", "ntcp", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("ntcp.receiveSize", "How large the received message was", "ntcp", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("ntcp.sendBacklogTime", "How long the head of the send queue has been waiting when we fail to add a new one to the queue (period is the number of messages queued)", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.failsafeWrites", "How many times do we need to proactively add in an extra nio write to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("ntcp.failsafeCloses", "How many times do we need to proactively close an idle connection to a peer at any given failsafe pass?", "ntcp", new long[] { 60*1000, 10*60*1000 }); _establishing = new ArrayList(4); _conLock = new Object(); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java index 589ec608e..6910a8bb5 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java @@ -156,8 +156,7 @@ abstract class TunnelPeerSelector { } else if (filterSlow(ctx, isInbound, isExploratory)) { Log log = ctx.logManager().getLog(TunnelPeerSelector.class); String excludeCaps = ctx.getProperty("router.excludePeerCaps", - String.valueOf(Router.CAPABILITY_BW16) + - String.valueOf(Router.CAPABILITY_BW32)); + String.valueOf(Router.CAPABILITY_BW16)); Set peers = new HashSet(); if (excludeCaps != null) { char excl[] = excludeCaps.toCharArray(); @@ -224,7 +223,17 @@ abstract class TunnelPeerSelector { if (infoAge < 0) { infoAge = 0; } else if (infoAge > 24*60*60*1000) { - peers.add(peer.getIdentity().calculateHash()); + // Only exclude long-unseen peers if we haven't just started up + long DONT_EXCLUDE_PERIOD = 15*60*1000; + if (ctx.router().getUptime() < DONT_EXCLUDE_PERIOD) { + if (log.shouldLog(Log.DEBUG)) + log.debug("Not excluding a long-unseen peer, since we just started up."); + } else { + if (log.shouldLog(Log.DEBUG)) + log.debug("Excluding a long-unseen peer."); + peers.add(peer.getIdentity().calculateHash()); + } + //peers.add(peer.getIdentity().calculateHash()); continue; } else { if (infoAge + uptimeMs < 2*60*60*1000) {