From c76402a160f6f1f08aa1b8fcf8c3cdcc53225e0e Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Sat, 8 Apr 2006 06:15:43 +0000
Subject: [PATCH] 2006-04-08  jrandom     * Process inbound tunnel requests
 more efficiently     * Proactively drop inbound tunnel requests if the queue
 before we'd       process it in is too long (dynamically adjusted by cpu
 load)     * Adjust the tunnel rejection throttle to reject requeusts when we
 have to       proactively drop too many requests.     * Display the number of
 pending inbound tunnel join requests on the router       console (as the
 "handle backlog")     * Include a few more stats in the default set of graphs

 .../net/i2p/router/web/    |  4 +
 .../src/net/i2p/router/web/ |  7 ++
 apps/routerconsole/jsp/summary.jsp            |  1 +
 history.txt                                   | 12 ++-
 .../src/net/i2p/router/     |  4 +-
 .../net/i2p/router/   |  4 +
 .../i2p/router/tunnel/pool/ | 47 ++++++++--
 .../i2p/router/tunnel/pool/  | 93 ++++++++++++++++---
 .../router/tunnel/pool/ | 10 +-
 9 files changed, 152 insertions(+), 30 deletions(-)

diff --git a/apps/routerconsole/java/src/net/i2p/router/web/ b/apps/routerconsole/java/src/net/i2p/router/web/
index aa38562b7c..07fa08fc1d 100644
--- a/apps/routerconsole/java/src/net/i2p/router/web/
+++ b/apps/routerconsole/java/src/net/i2p/router/web/
@@ -45,6 +45,10 @@ public class StatSummarizer implements Runnable {
                                                     ",router.activePeers.60000" +
                                                     ",router.activeSendPeers.60000" +
                                                     ",tunnel.acceptLoad.60000" +
+                                                    ",tunnel.dropLoadProactive.60000" +
+                                                    ",tunnel.buildExploratorySuccess.60000" +
+                                                    ",tunnel.buildExploratoryReject.60000" +
+                                                    ",tunnel.buildExploratoryExpire.60000" +
                                                     ",client.sendAckTime.60000" +
                                                     ",client.dispatchNoACK.60000" +
                                                     ",transport.sendMessageFailureLifetime.60000" +
diff --git a/apps/routerconsole/java/src/net/i2p/router/web/ b/apps/routerconsole/java/src/net/i2p/router/web/
index 98d0ef6cda..a3de9cc477 100644
--- a/apps/routerconsole/java/src/net/i2p/router/web/
+++ b/apps/routerconsole/java/src/net/i2p/router/web/
@@ -493,6 +493,13 @@ public class SummaryHelper {
         return _context.throttle().getTunnelLag() + "ms";
+    public String getInboundBacklog() {
+        if (_context == null)
+            return "0";
+        return String.valueOf(_context.tunnelManager().getInboundBuildQueueSize());
+    }
     public boolean updateAvailable() { 
         return NewsFetcher.getInstance(_context).updateAvailable();
diff --git a/apps/routerconsole/jsp/summary.jsp b/apps/routerconsole/jsp/summary.jsp
index 72a5e8f204..38a4d0fc87 100644
--- a/apps/routerconsole/jsp/summary.jsp
+++ b/apps/routerconsole/jsp/summary.jsp
@@ -83,6 +83,7 @@
  <b>Job lag:</b> <jsp:getProperty name="helper" property="jobLag" /><br />
  <b>Message delay:</b> <jsp:getProperty name="helper" property="messageDelay" /><br />
  <b>Tunnel lag:</b> <jsp:getProperty name="helper" property="tunnelLag" /><br />
+ <b>Handle backlog:</b> <jsp:getProperty name="helper" property="inboundBacklog" /><br />
  <hr />
diff --git a/history.txt b/history.txt
index 29efff5f18..40fa5467dc 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,14 @@
-$Id: history.txt,v 1.447 2006/04/06 05:33:46 jrandom Exp $
+$Id: history.txt,v 1.448 2006/04/06 20:26:33 jrandom Exp $
+2006-04-08  jrandom
+    * Process inbound tunnel requests more efficiently
+    * Proactively drop inbound tunnel requests if the queue before we'd
+      process it in is too long (dynamically adjusted by cpu load)
+    * Adjust the tunnel rejection throttle to reject requeusts when we have to
+      proactively drop too many requests.
+    * Display the number of pending inbound tunnel join requests on the router
+      console (as the "handle backlog")
+    * Include a few more stats in the default set of graphs
 2006-04-06  jrandom
     * Fix for a bug in the new irc ping/pong filter (thanks Complication!)
diff --git a/router/java/src/net/i2p/router/ b/router/java/src/net/i2p/router/
index 41709cce37..ed3dcfbd95 100644
--- a/router/java/src/net/i2p/router/
+++ b/router/java/src/net/i2p/router/
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
 public class RouterVersion {
-    public final static String ID = "$Revision: 1.387 $ $Date: 2006/04/06 05:33:47 $";
+    public final static String ID = "$Revision: 1.388 $ $Date: 2006/04/06 20:26:33 $";
     public final static String VERSION = "";
-    public final static long BUILD = 2;
+    public final static long BUILD = 3;
     public static void main(String args[]) {
         System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
         System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/ b/router/java/src/net/i2p/router/
index 052af69b8a..ad4d1fad57 100644
--- a/router/java/src/net/i2p/router/
+++ b/router/java/src/net/i2p/router/
@@ -56,6 +56,9 @@ public interface TunnelManagerFacade extends Service {
     /** When does the last tunnel we are participating in expire? */
     public long getLastParticipatingExpiration();
+    /** count how many inbound tunnel requests we have received but not yet processed */
+    public int getInboundBuildQueueSize();
      * the client connected (or updated their settings), so make sure we have
      * the tunnels for them, and whenever necessary, ask them to authorize 
@@ -97,6 +100,7 @@ class DummyTunnelManagerFacade implements TunnelManagerFacade {
     public void setOutboundSettings(TunnelPoolSettings settings) {}
     public void setInboundSettings(Hash client, TunnelPoolSettings settings) {}
     public void setOutboundSettings(Hash client, TunnelPoolSettings settings) {}
+    public int getInboundBuildQueueSize() { return 0; }
     public void renderStatusHTML(Writer out) throws IOException {}
     public void restart() {}
diff --git a/router/java/src/net/i2p/router/tunnel/pool/ b/router/java/src/net/i2p/router/tunnel/pool/
index 80e67e824c..9d20b384a6 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/
+++ b/router/java/src/net/i2p/router/tunnel/pool/
@@ -41,6 +41,7 @@ class BuildExecutor implements Runnable {
         _context.statManager().createRateStat("tunnel.buildClientReject", "How often a client tunnel is rejected", "Tunnels", new long[] { 60*1000, 10*60*1000 });
         _context.statManager().createRateStat("tunnel.buildRequestTime", "How long it takes to build a tunnel request", "Tunnels", new long[] { 60*1000, 10*60*1000 });
         _context.statManager().createRateStat("tunnel.buildRequestZeroHopTime", "How long it takes to build a zero hop tunnel", "Tunnels", new long[] { 60*1000, 10*60*1000 });
+        _context.statManager().createRateStat("tunnel.pendingRemaining", "How many inbound requests are pending after a pass (period is how long the pass takes)?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
         _repoll = false;
         _handler = new BuildHandler(ctx, this);
@@ -117,11 +118,19 @@ class BuildExecutor implements Runnable {
         List wanted = new ArrayList(8);
         List pools = new ArrayList(8);
-        boolean pendingRemaining = false;
+        int pendingRemaining = 0;
+        long loopBegin = 0;
+        long beforeHandleInboundReplies = 0;
+        long afterHandleInboundReplies = 0;
+        long afterBuildZeroHop = 0;
+        long afterBuildReal = 0;
+        long afterHandleInbound = 0;
         while (!_manager.isShutdown()){
+            loopBegin = System.currentTimeMillis();
             try {
-                _repoll = pendingRemaining; // resets repoll to false unless there are inbound requeusts pending
+                _repoll = pendingRemaining > 0; // resets repoll to false unless there are inbound requeusts pending
                 for (int i = 0; i < pools.size(); i++) {
                     TunnelPool pool = (TunnelPool)pools.get(i);
@@ -130,7 +139,9 @@ class BuildExecutor implements Runnable {
+                beforeHandleInboundReplies = System.currentTimeMillis();
+                afterHandleInboundReplies = System.currentTimeMillis();
                 // allowed() also expires timed out requests (for new style requests)
                 int allowed = allowed();
@@ -140,17 +151,22 @@ class BuildExecutor implements Runnable {
                 // zero hop ones can run inline
                 allowed = buildZeroHopTunnels(wanted, allowed);
+                afterBuildZeroHop = System.currentTimeMillis();
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("Zero hops built, Allowed: " + allowed + " wanted: " + wanted);
+                int realBuilt = 0;
                 TunnelManagerFacade mgr = _context.tunnelManager();
                 if ( (mgr == null) || (mgr.selectInboundTunnel() == null) || (mgr.selectOutboundTunnel() == null) ) {
                     // we don't have either inbound or outbound tunnels, so don't bother trying to build
                     // non-zero-hop tunnels
                     synchronized (_currentlyBuilding) {
-                        if (!_repoll)
-                            _currentlyBuilding.wait(5*1000+_context.random().nextInt(5*1000));
+                        if (!_repoll) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug("No tunnel to build with (allowed=" + allowed + ", wanted=" + wanted.size() + ", pending=" + pendingRemaining + "), wait for a while");
+                            _currentlyBuilding.wait(1*1000+_context.random().nextInt(1*1000));
+                        }
                 } else {
                     if ( (allowed > 0) && (wanted.size() > 0) ) {
@@ -173,6 +189,7 @@ class BuildExecutor implements Runnable {
                                 buildTunnel(pool, cfg);
+                                realBuilt++;
                                 // 0hops are taken care of above, these are nonstandard 0hops
                                 //if (cfg.getLength() <= 1)
                                 //    i--; //0hop, we can keep going, as there's no worry about throttling
@@ -184,13 +201,13 @@ class BuildExecutor implements Runnable {
                     } else {
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug("Nothin' doin, wait for a while");
                         try {
                             synchronized (_currentlyBuilding) {
                                 if (!_repoll) {
+                                    if (_log.shouldLog(Log.DEBUG))
+                                        _log.debug("Nothin' doin (allowed=" + allowed + ", wanted=" + wanted.size() + ", pending=" + pendingRemaining + "), wait for a while");
                                     //if (allowed <= 0)
-                                        _currentlyBuilding.wait(_context.random().nextInt(5*1000));
+                                        _currentlyBuilding.wait(_context.random().nextInt(1*1000));
                                     //else // wanted <= 0
                                     //    _currentlyBuilding.wait(_context.random().nextInt(30*1000));
@@ -201,8 +218,23 @@ class BuildExecutor implements Runnable {
+                afterBuildReal = System.currentTimeMillis();
                 pendingRemaining = _handler.handleInboundRequests();
+                afterHandleInbound = System.currentTimeMillis();
+                if (pendingRemaining > 0)
+                    _context.statManager().addRateData("tunnel.pendingRemaining", pendingRemaining, afterHandleInbound-afterBuildReal);
+                if (_log.shouldLog(Log.INFO))
+          "build loop complete, tot=" + (afterHandleInbound-loopBegin) + 
+                              " inReply=" + (afterHandleInboundReplies-beforeHandleInboundReplies) +
+                              " zeroHop=" + (afterBuildZeroHop-afterHandleInboundReplies) +
+                              " real=" + (afterBuildReal-afterBuildZeroHop) +
+                              " in=" + (afterHandleInbound-afterBuildReal) + 
+                              " built=" + realBuilt +
+                              " pending=" + pendingRemaining);
             } catch (Exception e) {
@@ -300,4 +332,5 @@ class BuildExecutor implements Runnable {
     List locked_getCurrentlyBuilding() { return _currentlyBuilding; }
+    public int getInboundBuildQueueSize() { return _handler.getInboundBuildQueueSize(); }
diff --git a/router/java/src/net/i2p/router/tunnel/pool/ b/router/java/src/net/i2p/router/tunnel/pool/
index 267c7e74fe..2ce132fae8 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/
+++ b/router/java/src/net/i2p/router/tunnel/pool/
@@ -6,6 +6,8 @@ import*;
 import net.i2p.router.*;
 import net.i2p.router.tunnel.*;
 import net.i2p.router.peermanager.TunnelHistory;
+import net.i2p.stat.Rate;
+import net.i2p.stat.RateStat;
 import net.i2p.util.Log;
@@ -46,6 +48,7 @@ class BuildHandler {
         _context.statManager().createRateStat("tunnel.dropLoad", "How long we had to wait before finally giving up on an inbound request (period is queue count)?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
         _context.statManager().createRateStat("tunnel.dropLoadDelay", "How long we had to wait before finally giving up on an inbound request?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
         _context.statManager().createRateStat("tunnel.dropLoadBacklog", "How many requests were pending when they were so lagged that we had to drop a new inbound request??", "Tunnels", new long[] { 60*1000, 10*60*1000 });
+        _context.statManager().createRateStat("tunnel.dropLoadProactive", "What the estimated queue time was when we dropped an inbound request (period is num pending)", "Tunnels", new long[] { 60*1000, 10*60*1000 });
         _context.statManager().createRateStat("tunnel.handleRemaining", "How many pending inbound requests were left on the queue after one pass?", "Tunnels", new long[] { 60*1000, 10*60*1000 });
         _context.statManager().createRateStat("tunnel.receiveRejectionProbabalistic", "How often we are rejected probabalistically?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
@@ -64,10 +67,10 @@ class BuildHandler {
     private static final int NEXT_HOP_LOOKUP_TIMEOUT = 5*1000;
-     * Blocking call to handle a few of the pending inbound requests, returning true if
-     * there are remaining requeusts we skipped over
+     * Blocking call to handle a few of the pending inbound requests, returning how many
+     * requests remain after this pass
-    boolean handleInboundRequests() {
+    int handleInboundRequests() {
         int dropExpired = 0;
         List handled = null;
         synchronized (_inboundBuildMessages) {
@@ -98,7 +101,7 @@ class BuildHandler {
                     // now pull off the oldest requests first (we're doing a tail-drop
                     // when adding)
-                    for (int i = 0; i < toHandle; i++)
+                    for (int i = 0; i < toHandle && _inboundBuildMessages.size() > 0; i++)
@@ -139,7 +142,7 @@ class BuildHandler {
             int remaining = _inboundBuildMessages.size();
             if (remaining > 0)
                 _context.statManager().addRateData("tunnel.handleRemaining", remaining, 0);
-            return remaining > 0;
+            return remaining;
@@ -364,6 +367,30 @@ class BuildHandler {
+    /**
+     * If we are dropping lots of requests before even trying to handle them,
+     * I suppose you could call us "overloaded"
+     */
+    private final static int MAX_PROACTIVE_DROPS = 120;
+    private int countProactiveDrops() {
+        int dropped = 0;
+        dropped += countEvents("tunnel.dropLoadProactive", 60*1000);
+        dropped += countEvents("tunnel.dropLoad", 60*1000);
+        dropped += countEvents("tunnel.dropLoadBacklog", 60*1000);
+        dropped += countEvents("tunnel.dropLoadDelay", 60*1000);
+        return dropped;
+    }
+    private int countEvents(String stat, long period) {
+        RateStat rs = _context.statManager().getRate(stat);
+        if (rs != null) {
+            Rate r = rs.getRate(period);
+            if (r != null)
+                return (int)r.getCurrentEventCount();
+        }
+        return 0;
+    }
     private void handleReq(RouterInfo nextPeerInfo, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
         long ourId = req.readReceiveTunnelId();
         long nextId = req.readNextTunnelId();
@@ -384,17 +411,21 @@ class BuildHandler {
         //if ( (response == 0) && (_context.random().nextInt(50) <= 1) )
         //    response = TunnelHistory.TUNNEL_REJECT_PROBABALISTIC_REJECT;
+        int proactiveDrops = countProactiveDrops();
         long recvDelay = System.currentTimeMillis()-state.recvTime;
-        if ( (response == 0) && (recvDelay > BuildRequestor.REQUEST_TIMEOUT/2) ) {
-            _context.statManager().addRateData("tunnel.rejectOverloaded", recvDelay, recvDelay);
-            response = TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
+        if ( (response == 0) && ( (recvDelay > BuildRequestor.REQUEST_TIMEOUT/2) || (proactiveDrops > MAX_PROACTIVE_DROPS) ) ) {
+            _context.statManager().addRateData("tunnel.rejectOverloaded", recvDelay, proactiveDrops);
+            if (true || (proactiveDrops < MAX_PROACTIVE_DROPS*2))
+                response = TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD;
+            else
+                response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
         } else if (response == 0) {
             _context.statManager().addRateData("tunnel.acceptLoad", recvDelay, recvDelay);
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("Responding to " + state.msg.getUniqueId() + "/" + ourId
-                       + " after " + recvDelay + " with " + response 
+                       + " after " + recvDelay + "/" + proactiveDrops + " with " + response 
                        + " from " + (state.fromHash != null ? state.fromHash.toBase64() : 
                                      state.from != null ? state.from.calculateHash().toBase64() : "tunnel"));
@@ -505,6 +536,12 @@ class BuildHandler {
+    public int getInboundBuildQueueSize() {
+        synchronized (_inboundBuildMessages) {
+            return _inboundBuildMessages.size();
+        }
+    }
     private static final boolean HANDLE_REPLIES_INLINE = true;
     private class TunnelBuildMessageHandlerJobBuilder implements HandlerJobBuilder {
@@ -551,22 +588,26 @@ class BuildHandler {
                     synchronized (_inboundBuildMessages) {
                         boolean removed = false;
                         int dropped = 0;
-                        while (_inboundBuildMessages.size() > 0) {
-                            BuildMessageState cur = (BuildMessageState)_inboundBuildMessages.get(_inboundBuildMessages.size()-1);
+                        for (int i = 0; i < _inboundBuildMessages.size(); i++) {
+                            BuildMessageState cur = (BuildMessageState)_inboundBuildMessages.get(i);
                             long age = System.currentTimeMillis() - cur.recvTime;
                             if (age >= BuildRequestor.REQUEST_TIMEOUT) {
-                                _inboundBuildMessages.remove(_inboundBuildMessages.size()-1);
+                                _inboundBuildMessages.remove(i);
+                                i--;
                                 _context.statManager().addRateData("tunnel.dropLoad", age, _inboundBuildMessages.size());
-                            } else {
-                                break;
                         if (dropped > 0) {
                             // if the queue is backlogged, stop adding new messages
                             _context.statManager().addRateData("tunnel.dropLoadBacklog", _inboundBuildMessages.size(), _inboundBuildMessages.size());
                         } else {
-                            _inboundBuildMessages.add(new BuildMessageState(receivedMessage, from, fromHash));
+                            int queueTime = estimateQueueTime(_inboundBuildMessages.size());
+                            if (queueTime > BuildRequestor.REQUEST_TIMEOUT/2) {
+                                _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, _inboundBuildMessages.size());
+                            } else {
+                                _inboundBuildMessages.add(new BuildMessageState(receivedMessage, from, fromHash));
+                            }
@@ -576,6 +617,28 @@ class BuildHandler {
+    private int estimateQueueTime(int numPendingMessages) {
+        int decryptTime = 200;
+        RateStat rs = _context.statManager().getRate("crypto.elGamal.decrypt");
+        if (rs != null) {
+            Rate r = rs.getRate(60*1000);
+            double avg = 0;
+            if (r != null)
+                avg = r.getAverageValue();
+            if (avg > 0) {
+                decryptTime = (int)avg;
+            } else {
+                avg = rs.getLifetimeAverageValue();
+                if (avg > 0)
+                    decryptTime = (int)avg;
+            }
+        }
+        int estimatedQueueTime = numPendingMessages * decryptTime;
+        estimatedQueueTime *= 2; // lets leave some cpu to spare, 'eh?
+        return estimatedQueueTime;
+    }
     private class TunnelBuildReplyMessageHandlerJobBuilder implements HandlerJobBuilder {
         public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) {
             if (_log.shouldLog(Log.DEBUG))
diff --git a/router/java/src/net/i2p/router/tunnel/pool/ b/router/java/src/net/i2p/router/tunnel/pool/
index 55bf0580c4..0a1578fd31 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/
+++ b/router/java/src/net/i2p/router/tunnel/pool/
@@ -8,10 +8,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import net.i2p.stat.RateStat;
 import net.i2p.router.*;
@@ -390,6 +387,9 @@ public class TunnelPoolManager implements TunnelManagerFacade {
     void tunnelFailed() { _executor.repoll(); }
     BuildExecutor getExecutor() { return _executor; }
     boolean isShutdown() { return _isShutdown; }
+    public int getInboundBuildQueueSize() { return _executor.getInboundBuildQueueSize(); }
     public void renderStatusHTML(Writer out) throws IOException {
         out.write("<h2><a name=\"exploratory\">Exploratory tunnels</a> (<a href=\"/configtunnels.jsp#exploratory\">config</a>):</h2>\n");
@@ -499,7 +499,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
                 if (_context.routerHash().equals(peer))
                     out.write("<td><i>" + peer.toBase64().substring(0,4) + (id == null ? "" : ":" + id) + "</i></td>");
-                    out.write("<td>" + peer.toBase64().substring(0,4) + (id == null ? "" : ":" + id) + "</td>");
+                    out.write("<td>" + peer.toBase64().substring(0,4) + (id == null ? "" : ":" + id) + "</td>");                