forked from I2P_Developers/i2p.i2p
Tunnels: Refactor build completion handling
- Add result code to BuildExecutor.buildComplete() and TunnelPool.buildComplete() - Remove BuildExecutor.buildSuccessful(), move to buildComplete() - Move ExpireJob creation to buildComplete() - TunnelPool.buildComplete() now calls addTunnel() - Eliminate some now() calls
This commit is contained in:
@@ -86,7 +86,7 @@ public class AliasedTunnelPool extends TunnelPool {
|
||||
}
|
||||
|
||||
@Override
|
||||
void addTunnel(TunnelInfo info) {
|
||||
protected void addTunnel(TunnelInfo info) {
|
||||
_aliasOf.addTunnel(info);
|
||||
}
|
||||
|
||||
@@ -147,7 +147,7 @@ public class AliasedTunnelPool extends TunnelPool {
|
||||
}
|
||||
|
||||
@Override
|
||||
void buildComplete(PooledTunnelCreatorConfig cfg) {}
|
||||
void buildComplete(PooledTunnelCreatorConfig cfg, BuildExecutor.Result result) {}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
||||
@@ -49,6 +49,11 @@ class BuildExecutor implements Runnable {
|
||||
/** accept replies up to a minute after we gave up on them */
|
||||
private static final long GRACE_PERIOD = 60*1000;
|
||||
|
||||
/**
|
||||
* @since 0.9.53
|
||||
*/
|
||||
enum Result { SUCCESS, REJECT, TIMEOUT, BAD_RESPONSE, DUP_ID, OTHER_FAILURE }
|
||||
|
||||
public BuildExecutor(RouterContext ctx, TunnelPoolManager mgr) {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(getClass());
|
||||
@@ -145,7 +150,8 @@ class BuildExecutor implements Runnable {
|
||||
allowed = _context.getProperty("router.tunnelConcurrentBuilds", allowed);
|
||||
|
||||
// expire any REALLY old requests
|
||||
long expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT - GRACE_PERIOD;
|
||||
long now = _context.clock().now();
|
||||
long expireBefore = now + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT - GRACE_PERIOD;
|
||||
for (Iterator<PooledTunnelCreatorConfig> iter = _recentlyBuildingMap.values().iterator(); iter.hasNext(); ) {
|
||||
PooledTunnelCreatorConfig cfg = iter.next();
|
||||
if (cfg.getExpiration() <= expireBefore) {
|
||||
@@ -157,7 +163,7 @@ class BuildExecutor implements Runnable {
|
||||
List<PooledTunnelCreatorConfig> expired = null;
|
||||
int concurrent = 0;
|
||||
// Todo: Make expiration variable
|
||||
expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT;
|
||||
expireBefore = now + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT;
|
||||
for (Iterator<PooledTunnelCreatorConfig> iter = _currentlyBuildingMap.values().iterator(); iter.hasNext(); ) {
|
||||
PooledTunnelCreatorConfig cfg = iter.next();
|
||||
if (cfg.getExpiration() <= expireBefore) {
|
||||
@@ -202,7 +208,7 @@ class BuildExecutor implements Runnable {
|
||||
|
||||
TunnelPool pool = cfg.getTunnelPool();
|
||||
if (pool != null)
|
||||
pool.buildComplete(cfg);
|
||||
pool.buildComplete(cfg, Result.TIMEOUT);
|
||||
if (cfg.getDestination() == null) {
|
||||
_context.statManager().addRateData("tunnel.buildExploratoryExpire", 1);
|
||||
//if (cfg.isInbound())
|
||||
@@ -409,7 +415,7 @@ class BuildExecutor implements Runnable {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("We don't need more fallbacks for " + pool);
|
||||
i--; //0hop, we can keep going, as there's no worry about throttling
|
||||
pool.buildComplete(cfg);
|
||||
pool.buildComplete(cfg, Result.OTHER_FAILURE);
|
||||
continue;
|
||||
}
|
||||
long pTime = System.currentTimeMillis() - bef;
|
||||
@@ -553,19 +559,25 @@ class BuildExecutor implements Runnable {
|
||||
}
|
||||
|
||||
/**
|
||||
* This wakes up the executor, so call this after TunnelPool.addTunnel()
|
||||
* so we don't build too many.
|
||||
* This calls TunnelPool.buildComplete which calls TunnelPool.addTunnel()
|
||||
* on success, and then we wake up the executor.
|
||||
*
|
||||
* On success, this also calls TunnelPoolManager to optionally start a test job,
|
||||
* and queues an ExpireJob.
|
||||
*
|
||||
* @since 0.9.53 added result parameter
|
||||
*/
|
||||
public void buildComplete(PooledTunnelCreatorConfig cfg) {
|
||||
public void buildComplete(PooledTunnelCreatorConfig cfg, Result result) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Build complete for " + cfg, new Exception());
|
||||
cfg.getTunnelPool().buildComplete(cfg);
|
||||
_log.debug("Build complete (" + result + ") for " + cfg);
|
||||
cfg.getTunnelPool().buildComplete(cfg, result);
|
||||
if (cfg.getLength() > 1)
|
||||
removeFromBuilding(cfg.getReplyMessageId());
|
||||
// Only wake up the build thread if it took a reasonable amount of time -
|
||||
// this prevents high CPU usage when there is no network connection
|
||||
// (via BuildRequestor.TunnelBuildFirstHopFailJob)
|
||||
long buildTime = _context.clock().now() + 10*60*1000- cfg.getExpiration();
|
||||
long now = _context.clock().now();
|
||||
long buildTime = now + 10*60*1000 - cfg.getExpiration();
|
||||
if (buildTime > 250) {
|
||||
synchronized (_currentlyBuilding) {
|
||||
_currentlyBuilding.notifyAll();
|
||||
@@ -575,11 +587,16 @@ class BuildExecutor implements Runnable {
|
||||
_log.info("Build complete really fast (" + buildTime + " ms) for tunnel: " + cfg);
|
||||
}
|
||||
|
||||
long expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT;
|
||||
long expireBefore = now + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT;
|
||||
if (cfg.getExpiration() <= expireBefore) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Build complete for expired tunnel: " + cfg);
|
||||
}
|
||||
if (result == Result.SUCCESS) {
|
||||
_manager.buildComplete(cfg);
|
||||
ExpireJob expireJob = new ExpireJob(_context, cfg);
|
||||
_context.jobQueue().addJob(expireJob);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean wasRecentlyBuilding(long replyId) {
|
||||
@@ -588,10 +605,6 @@ class BuildExecutor implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
public void buildSuccessful(PooledTunnelCreatorConfig cfg) {
|
||||
_manager.buildComplete(cfg);
|
||||
}
|
||||
|
||||
public void repoll() {
|
||||
synchronized (_currentlyBuilding) {
|
||||
_repoll = true;
|
||||
|
||||
@@ -35,6 +35,7 @@ import net.i2p.router.networkdb.kademlia.MessageWrapper;
|
||||
import net.i2p.router.peermanager.TunnelHistory;
|
||||
import net.i2p.router.tunnel.HopConfig;
|
||||
import net.i2p.router.tunnel.TunnelDispatcher;
|
||||
import static net.i2p.router.tunnel.pool.BuildExecutor.Result.*;
|
||||
import net.i2p.router.util.CDQEntry;
|
||||
import net.i2p.router.util.CoDelBlockingQueue;
|
||||
import net.i2p.stat.Rate;
|
||||
@@ -279,7 +280,7 @@ class BuildHandler implements Runnable {
|
||||
return;
|
||||
}
|
||||
|
||||
handleRequest(state);
|
||||
handleRequest(state, now);
|
||||
|
||||
//int remaining = _inboundBuildMessages.size();
|
||||
//if (remaining > 0)
|
||||
@@ -330,7 +331,7 @@ class BuildHandler implements Runnable {
|
||||
if (record < 0) {
|
||||
_log.error("Bad status index " + i);
|
||||
// don't leak
|
||||
_exec.buildComplete(cfg);
|
||||
_exec.buildComplete(cfg, BAD_RESPONSE);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -387,15 +388,12 @@ class BuildHandler implements Runnable {
|
||||
// This will happen very rarely. We check for dups when
|
||||
// creating the config, but we don't track IDs for builds in progress.
|
||||
_context.statManager().addRateData("tunnel.ownDupID", 1);
|
||||
_exec.buildComplete(cfg);
|
||||
_exec.buildComplete(cfg, DUP_ID);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Dup ID for our own tunnel " + cfg);
|
||||
return;
|
||||
}
|
||||
cfg.getTunnelPool().addTunnel(cfg); // self.self.self.foo!
|
||||
// call buildComplete() after addTunnel() so we don't try another build.
|
||||
_exec.buildComplete(cfg);
|
||||
_exec.buildSuccessful(cfg);
|
||||
_exec.buildComplete(cfg, SUCCESS);
|
||||
|
||||
if (cfg.getTunnelPool().getSettings().isExploratory()) {
|
||||
// Notify router that exploratory tunnels are ready
|
||||
@@ -429,15 +427,13 @@ class BuildHandler implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
ExpireJob expireJob = new ExpireJob(_context, cfg);
|
||||
_context.jobQueue().addJob(expireJob);
|
||||
if (cfg.getDestination() == null)
|
||||
_context.statManager().addRateData("tunnel.buildExploratorySuccess", rtt);
|
||||
else
|
||||
_context.statManager().addRateData("tunnel.buildClientSuccess", rtt);
|
||||
} else {
|
||||
// someone is no fun
|
||||
_exec.buildComplete(cfg);
|
||||
_exec.buildComplete(cfg, REJECT);
|
||||
if (cfg.getDestination() == null)
|
||||
_context.statManager().addRateData("tunnel.buildExploratoryReject", rtt);
|
||||
else
|
||||
@@ -448,7 +444,7 @@ class BuildHandler implements Runnable {
|
||||
_log.warn(msg.getUniqueId() + ": Tunnel reply could not be decrypted for tunnel " + cfg);
|
||||
_context.statManager().addRateData("tunnel.corruptBuildReply", 1);
|
||||
// don't leak
|
||||
_exec.buildComplete(cfg);
|
||||
_exec.buildComplete(cfg, BAD_RESPONSE);
|
||||
// TODO blame everybody
|
||||
}
|
||||
}
|
||||
@@ -459,8 +455,8 @@ class BuildHandler implements Runnable {
|
||||
*
|
||||
* @return handle time or -1 if it wasn't completely handled
|
||||
*/
|
||||
private long handleRequest(BuildMessageState state) {
|
||||
long timeSinceReceived = _context.clock().now()-state.recvTime;
|
||||
private long handleRequest(BuildMessageState state, long now) {
|
||||
long timeSinceReceived = now - state.recvTime;
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug(state.msg.getUniqueId() + ": handling request after " + timeSinceReceived);
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ import net.i2p.router.networkdb.kademlia.MessageWrapper;
|
||||
import net.i2p.router.networkdb.kademlia.MessageWrapper.OneTimeSession;
|
||||
import net.i2p.router.tunnel.HopConfig;
|
||||
import net.i2p.router.tunnel.TunnelCreatorConfig;
|
||||
import static net.i2p.router.tunnel.pool.BuildExecutor.Result.*;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.VersionComparator;
|
||||
|
||||
@@ -208,7 +209,7 @@ abstract class BuildRequestor {
|
||||
if (pairedTunnel == null) {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn("Tunnel build failed, as we couldn't find a paired tunnel for " + cfg);
|
||||
exec.buildComplete(cfg);
|
||||
exec.buildComplete(cfg, OTHER_FAILURE);
|
||||
// Not even an exploratory tunnel? We are in big trouble.
|
||||
// Let's not spin through here too fast.
|
||||
// But don't let a client tunnel waiting for exploratories slow things down too much,
|
||||
@@ -224,7 +225,7 @@ abstract class BuildRequestor {
|
||||
if (msg == null) {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn("Tunnel build failed, as we couldn't create the tunnel build message for " + cfg);
|
||||
exec.buildComplete(cfg);
|
||||
exec.buildComplete(cfg, OTHER_FAILURE);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -277,7 +278,7 @@ abstract class BuildRequestor {
|
||||
if (peer == null) {
|
||||
if (log.shouldLog(Log.WARN))
|
||||
log.warn("Could not find the next hop to send the outbound request to: " + cfg);
|
||||
exec.buildComplete(cfg);
|
||||
exec.buildComplete(cfg, OTHER_FAILURE);
|
||||
return false;
|
||||
}
|
||||
OutNetMessage outMsg = new OutNetMessage(ctx, msg, ctx.clock().now() + FIRST_HOP_TIMEOUT, PRIORITY, peer);
|
||||
@@ -491,15 +492,12 @@ abstract class BuildRequestor {
|
||||
if (log.shouldLog(Log.DEBUG))
|
||||
log.debug("Build zero hop tunnel " + cfg);
|
||||
|
||||
exec.buildComplete(cfg);
|
||||
boolean ok;
|
||||
if (cfg.isInbound())
|
||||
ctx.tunnelDispatcher().joinInbound(cfg);
|
||||
ok = ctx.tunnelDispatcher().joinInbound(cfg);
|
||||
else
|
||||
ctx.tunnelDispatcher().joinOutbound(cfg);
|
||||
cfg.getTunnelPool().addTunnel(cfg);
|
||||
exec.buildSuccessful(cfg);
|
||||
ExpireJob expireJob = new ExpireJob(ctx, cfg);
|
||||
ctx.jobQueue().addJob(expireJob);
|
||||
ok = ctx.tunnelDispatcher().joinOutbound(cfg);
|
||||
exec.buildComplete(cfg, ok ? SUCCESS : DUP_ID);
|
||||
// can it get much easier?
|
||||
}
|
||||
|
||||
@@ -522,7 +520,7 @@ abstract class BuildRequestor {
|
||||
}
|
||||
public String getName() { return "Timeout contacting first peer for OB tunnel"; }
|
||||
public void runJob() {
|
||||
_exec.buildComplete(_cfg);
|
||||
_exec.buildComplete(_cfg, OTHER_FAILURE);
|
||||
getContext().profileManager().tunnelTimedOut(_cfg.getPeer(1));
|
||||
getContext().statManager().addRateData("tunnel.buildFailFirstHop", 1, 0);
|
||||
// static, no _log
|
||||
|
||||
@@ -449,7 +449,7 @@ public class TunnelPool {
|
||||
/**
|
||||
* Add to the pool.
|
||||
*/
|
||||
void addTunnel(TunnelInfo info) {
|
||||
protected void addTunnel(TunnelInfo info) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(toString() + ": Adding tunnel " + info /* , new Exception("Creator") */ );
|
||||
LeaseSet ls = null;
|
||||
@@ -1197,15 +1197,39 @@ public class TunnelPool {
|
||||
}
|
||||
return cfg;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Remove from the _inprogress list
|
||||
* Remove from the _inprogress list and call addTunnel() if result is SUCCESS.
|
||||
*
|
||||
* @since 0.9.53 added result parameter
|
||||
*/
|
||||
void buildComplete(PooledTunnelCreatorConfig cfg) {
|
||||
void buildComplete(PooledTunnelCreatorConfig cfg, BuildExecutor.Result result) {
|
||||
if (cfg.getTunnelPool() != this) {
|
||||
_log.error("Wrong pool " + cfg + " for " + this, new Exception());
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (_inProgress) { _inProgress.remove(cfg); }
|
||||
//_manager.buildComplete(cfg);
|
||||
|
||||
switch (result) {
|
||||
case SUCCESS:
|
||||
addTunnel(cfg);
|
||||
break;
|
||||
|
||||
case REJECT:
|
||||
case BAD_RESPONSE:
|
||||
case DUP_ID:
|
||||
break;
|
||||
|
||||
case TIMEOUT:
|
||||
break;
|
||||
|
||||
case OTHER_FAILURE:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (_settings.isExploratory()) {
|
||||
|
||||
Reference in New Issue
Block a user