From f232775161e91cb8d42558cca6c2e36216f7eb3d Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 19 Sep 2012 19:00:06 +0000 Subject: [PATCH] CoDel for build handler --- .../i2p/router/tunnel/pool/BuildHandler.java | 59 +++++++++++++------ 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index c3f1c92b01..16b2031335 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -1,7 +1,7 @@ package net.i2p.router.tunnel.pool; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.BlockingQueue; import net.i2p.data.Base64; import net.i2p.data.ByteArray; @@ -28,6 +28,8 @@ import net.i2p.router.tunnel.BuildMessageProcessor; import net.i2p.router.tunnel.BuildReplyHandler; import net.i2p.router.tunnel.HopConfig; import net.i2p.router.tunnel.TunnelDispatcher; +import net.i2p.router.util.CDQEntry; +import net.i2p.router.util.CoDelBlockingQueue; import net.i2p.stat.Rate; import net.i2p.stat.RateStat; import net.i2p.util.Log; @@ -51,7 +53,7 @@ class BuildHandler implements Runnable { private final BuildExecutor _exec; private final Job _buildMessageHandlerJob; private final Job _buildReplyMessageHandlerJob; - private final LinkedBlockingQueue<BuildMessageState> _inboundBuildMessages; + private final BlockingQueue<BuildMessageState> _inboundBuildMessages; private final BuildMessageProcessor _processor; private final ParticipatingThrottler _throttler; private volatile boolean _isRunning; @@ -79,7 +81,7 @@ class BuildHandler implements Runnable { _exec = exec; // Queue size = 12 * share BW / 48K int sz = Math.min(MAX_QUEUE, Math.max(MIN_QUEUE, TunnelDispatcher.getShareBandwidth(ctx) * MIN_QUEUE / 48)); - _inboundBuildMessages = new LinkedBlockingQueue(sz); + _inboundBuildMessages = new CoDelBlockingQueue(ctx, "BuildHandler", sz); _context.statManager().createRateStat("tunnel.reject.10", "How often we reject a tunnel probabalistically", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.reject.20", "How often we reject a tunnel because of transient overload", "Tunnels", new long[] { 60*1000, 10*60*1000 }); @@ -137,7 +139,7 @@ class BuildHandler implements Runnable { public synchronized void shutdown(int numThreads) { _isRunning = false; _inboundBuildMessages.clear(); - BuildMessageState poison = new BuildMessageState(null, null, null); + BuildMessageState poison = new BuildMessageState(_context, null, null, null); for (int i = 0; i < numThreads; i++) { _inboundBuildMessages.offer(poison); } @@ -765,6 +767,7 @@ class BuildHandler implements Runnable { _context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0); } else { int sz = _inboundBuildMessages.size(); + // Can probably remove this check, since CoDel is in use BuildMessageState cur = _inboundBuildMessages.peek(); boolean accept = true; if (cur != null) { @@ -773,21 +776,21 @@ class BuildHandler implements Runnable { _context.statManager().addRateData("tunnel.dropLoad", age, sz); _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load")); // if the queue is backlogged, stop adding new messages - _context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz); accept = false; } } if (accept) { - int queueTime = estimateQueueTime(sz); - float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3); - pDrop = (float)Math.pow(pDrop, 16); // steeeep - float f = _context.random().nextFloat(); + // This is expensive and rarely seen, use CoDel instead + //int queueTime = estimateQueueTime(sz); + //float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3); + //pDrop = (float)Math.pow(pDrop, 16); // steeeep + //float f = _context.random().nextFloat(); //if ( (pDrop > f) && (allowProactiveDrop()) ) { - if (pDrop > f) { - _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time")); - _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz); - } else { - accept = _inboundBuildMessages.offer(new BuildMessageState(receivedMessage, from, fromHash)); + //if (pDrop > f) { + // _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time")); + // _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz); + //} else { + accept = _inboundBuildMessages.offer(new BuildMessageState(_context, receivedMessage, from, fromHash)); if (accept) { // wake up the Executor to call handleInboundRequests() _exec.repoll(); @@ -795,7 +798,7 @@ class BuildHandler implements Runnable { _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load")); _context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz); } - } + //} } } } @@ -812,6 +815,7 @@ class BuildHandler implements Runnable { } ****/ +/**** private int estimateQueueTime(int numPendingMessages) { int decryptTime = 200; RateStat rs = _context.statManager().getRate("tunnel.decryptRequestTime"); @@ -832,8 +836,9 @@ class BuildHandler implements Runnable { estimatedQueueTime *= 1.2f; // lets leave some cpu to spare, 'eh? return (int)estimatedQueueTime; } +****/ - + /** */ private class TunnelBuildReplyMessageHandlerJobBuilder implements HandlerJobBuilder { public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) { if (_log.shouldLog(Log.DEBUG)) @@ -845,16 +850,32 @@ class BuildHandler implements Runnable { } /** normal inbound requests from other people */ - private static class BuildMessageState { + private static class BuildMessageState implements CDQEntry { + private final RouterContext _ctx; final TunnelBuildMessage msg; final RouterIdentity from; final Hash fromHash; final long recvTime; - public BuildMessageState(I2NPMessage m, RouterIdentity f, Hash h) { + + public BuildMessageState(RouterContext ctx, I2NPMessage m, RouterIdentity f, Hash h) { + _ctx = ctx; msg = (TunnelBuildMessage)m; from = f; fromHash = h; - recvTime = System.currentTimeMillis(); + recvTime = ctx.clock().now(); + } + + public void setEnqueueTime(long time) { + // set at instantiation, which is just before enqueueing + } + + public long getEnqueueTime() { + return recvTime; + } + + public void drop() { + _ctx.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time")); + _ctx.statManager().addRateData("tunnel.dropLoadProactive", _ctx.clock().now() - recvTime); } } -- GitLab