CoDel for build handler

This commit is contained in:
zzz
2012-09-19 19:00:06 +00:00
parent bd57463d42
commit f232775161

View File

@@ -1,7 +1,7 @@
package net.i2p.router.tunnel.pool;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
@@ -28,6 +28,8 @@ import net.i2p.router.tunnel.BuildMessageProcessor;
import net.i2p.router.tunnel.BuildReplyHandler;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.router.tunnel.TunnelDispatcher;
import net.i2p.router.util.CDQEntry;
import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
@@ -51,7 +53,7 @@ class BuildHandler implements Runnable {
private final BuildExecutor _exec;
private final Job _buildMessageHandlerJob;
private final Job _buildReplyMessageHandlerJob;
private final LinkedBlockingQueue<BuildMessageState> _inboundBuildMessages;
private final BlockingQueue<BuildMessageState> _inboundBuildMessages;
private final BuildMessageProcessor _processor;
private final ParticipatingThrottler _throttler;
private volatile boolean _isRunning;
@@ -79,7 +81,7 @@ class BuildHandler implements Runnable {
_exec = exec;
// Queue size = 12 * share BW / 48K
int sz = Math.min(MAX_QUEUE, Math.max(MIN_QUEUE, TunnelDispatcher.getShareBandwidth(ctx) * MIN_QUEUE / 48));
_inboundBuildMessages = new LinkedBlockingQueue(sz);
_inboundBuildMessages = new CoDelBlockingQueue(ctx, "BuildHandler", sz);
_context.statManager().createRateStat("tunnel.reject.10", "How often we reject a tunnel probabalistically", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.reject.20", "How often we reject a tunnel because of transient overload", "Tunnels", new long[] { 60*1000, 10*60*1000 });
@@ -137,7 +139,7 @@ class BuildHandler implements Runnable {
public synchronized void shutdown(int numThreads) {
_isRunning = false;
_inboundBuildMessages.clear();
BuildMessageState poison = new BuildMessageState(null, null, null);
BuildMessageState poison = new BuildMessageState(_context, null, null, null);
for (int i = 0; i < numThreads; i++) {
_inboundBuildMessages.offer(poison);
}
@@ -765,6 +767,7 @@ class BuildHandler implements Runnable {
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0);
} else {
int sz = _inboundBuildMessages.size();
// Can probably remove this check, since CoDel is in use
BuildMessageState cur = _inboundBuildMessages.peek();
boolean accept = true;
if (cur != null) {
@@ -773,21 +776,21 @@ class BuildHandler implements Runnable {
_context.statManager().addRateData("tunnel.dropLoad", age, sz);
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
// if the queue is backlogged, stop adding new messages
_context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
accept = false;
}
}
if (accept) {
int queueTime = estimateQueueTime(sz);
float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
pDrop = (float)Math.pow(pDrop, 16); // steeeep
float f = _context.random().nextFloat();
// This is expensive and rarely seen, use CoDel instead
//int queueTime = estimateQueueTime(sz);
//float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
//pDrop = (float)Math.pow(pDrop, 16); // steeeep
//float f = _context.random().nextFloat();
//if ( (pDrop > f) && (allowProactiveDrop()) ) {
if (pDrop > f) {
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
_context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz);
} else {
accept = _inboundBuildMessages.offer(new BuildMessageState(receivedMessage, from, fromHash));
//if (pDrop > f) {
// _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
// _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz);
//} else {
accept = _inboundBuildMessages.offer(new BuildMessageState(_context, receivedMessage, from, fromHash));
if (accept) {
// wake up the Executor to call handleInboundRequests()
_exec.repoll();
@@ -795,7 +798,7 @@ class BuildHandler implements Runnable {
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
_context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
}
}
//}
}
}
}
@@ -812,6 +815,7 @@ class BuildHandler implements Runnable {
}
****/
/****
private int estimateQueueTime(int numPendingMessages) {
int decryptTime = 200;
RateStat rs = _context.statManager().getRate("tunnel.decryptRequestTime");
@@ -832,8 +836,9 @@ class BuildHandler implements Runnable {
estimatedQueueTime *= 1.2f; // lets leave some cpu to spare, 'eh?
return (int)estimatedQueueTime;
}
****/
/** */
private class TunnelBuildReplyMessageHandlerJobBuilder implements HandlerJobBuilder {
public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) {
if (_log.shouldLog(Log.DEBUG))
@@ -845,16 +850,32 @@ class BuildHandler implements Runnable {
}
/** normal inbound requests from other people */
private static class BuildMessageState {
private static class BuildMessageState implements CDQEntry {
private final RouterContext _ctx;
final TunnelBuildMessage msg;
final RouterIdentity from;
final Hash fromHash;
final long recvTime;
public BuildMessageState(I2NPMessage m, RouterIdentity f, Hash h) {
public BuildMessageState(RouterContext ctx, I2NPMessage m, RouterIdentity f, Hash h) {
_ctx = ctx;
msg = (TunnelBuildMessage)m;
from = f;
fromHash = h;
recvTime = System.currentTimeMillis();
recvTime = ctx.clock().now();
}
public void setEnqueueTime(long time) {
// set at instantiation, which is just before enqueueing
}
public long getEnqueueTime() {
return recvTime;
}
public void drop() {
_ctx.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
_ctx.statManager().addRateData("tunnel.dropLoadProactive", _ctx.clock().now() - recvTime);
}
}