* BuildHandler: Implement restart and shutdown to stop the thread

This commit is contained in:
zzz
2012-03-17 21:58:41 +00:00
parent 0c348ec17e
commit 0a521b7456
4 changed files with 69 additions and 9 deletions

View File

@@ -40,7 +40,7 @@ class BuildExecutor implements Runnable {
private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _currentlyBuildingMap;
/** indexed by ptcc.getReplyMessageId() */
private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _recentlyBuildingMap;
private boolean _isRunning;
private volatile boolean _isRunning;
private boolean _repoll;
private static final int MAX_CONCURRENT_BUILDS = 10;
/** accept replies up to a minute after we gave up on them */
@@ -84,6 +84,26 @@ class BuildExecutor implements Runnable {
statMgr.createRateStat("tunnel.tierExpireUnknown", "Expired joins from unknown", "Tunnels", new long[] { 60*1000, 10*60*1000 });
}
/**
* @since 0.9
*/
public void restart() {
synchronized (_recentBuildIds) {
_recentBuildIds.clear();
}
_currentlyBuildingMap.clear();
_recentlyBuildingMap.clear();
}
/**
* Cannot be restarted.
* @since 0.9
*/
public void shutdown() {
_isRunning = false;
restart();
}
private int allowed() {
int maxKBps = _context.bandwidthLimiter().getOutboundKBytesPerSecond();
int allowed = maxKBps / 6; // Max. 1 concurrent build per 6 KB/s outbound
@@ -270,7 +290,7 @@ class BuildExecutor implements Runnable {
long afterBuildReal = 0;
long afterHandleInbound = 0;
while (!_manager.isShutdown()){
while (_isRunning && !_manager.isShutdown()){
//loopBegin = System.currentTimeMillis();
try {
_repoll = false; // resets repoll to false unless there are inbound requeusts pending

View File

@@ -54,7 +54,7 @@ class BuildHandler implements Runnable {
private final LinkedBlockingQueue<BuildMessageState> _inboundBuildMessages;
private final BuildMessageProcessor _processor;
private final ParticipatingThrottler _throttler;
private boolean _isRunning;
private volatile boolean _isRunning;
/** TODO these may be too high, review and adjust */
private static final int MIN_QUEUE = 18;
@@ -121,13 +121,34 @@ class BuildHandler implements Runnable {
ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb);
}
/**
* @since 0.9
*/
public void restart() {
_inboundBuildMessages.clear();
}
/**
* Cannot be restarted.
* @param numThreads the number of threads to be shut down
* @since 0.9
*/
public void shutdown(int numThreads) {
_isRunning = false;
_inboundBuildMessages.clear();
BuildMessageState poison = new BuildMessageState(null, null, null);
for (int i = 0; i < numThreads; i++) {
_inboundBuildMessages.offer(poison);
}
}
/**
* Thread to handle inbound requests
* @since 0.8.11
*/
public void run() {
_isRunning = true;
while (!_manager.isShutdown()) {
while (_isRunning && !_manager.isShutdown()) {
try {
handleInboundRequest();
} catch (Exception e) {
@@ -150,6 +171,13 @@ class BuildHandler implements Runnable {
} catch (InterruptedException ie) {
return;
}
// check for poison
if (state.msg == null) {
_isRunning = false;
return;
}
long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
if (state.recvTime <= dropBefore) {
if (_log.shouldLog(Log.WARN))

View File

@@ -105,7 +105,7 @@ public class TunnelPool {
void shutdown() {
if (_log.shouldLog(Log.WARN))
_log.warn(toString() + ": Shutdown called", new Exception());
_log.warn(toString() + ": Shutdown called");
_alive = false;
_lastSelectionPeriod = 0;
_lastSelected = null;

View File

@@ -50,6 +50,7 @@ public class TunnelPoolManager implements TunnelManagerFacade {
private final BuildExecutor _executor;
private final BuildHandler _handler;
private boolean _isShutdown;
private final int _numHandlerThreads;
private static final long[] RATES = { 60*1000, 10*60*1000l, 60*60*1000l };
private static final int MIN_KBPS_TWO_HANDLERS = 512;
@@ -80,8 +81,8 @@ public class TunnelPoolManager implements TunnelManagerFacade {
numHandlerThreads = 2;
else
numHandlerThreads = 1;
numHandlerThreads = ctx.getProperty("router.buildHandlerThreads", numHandlerThreads);
for (int i = 1; i <= numHandlerThreads; i++) {
_numHandlerThreads = ctx.getProperty("router.buildHandlerThreads", numHandlerThreads);
for (int i = 1; i <= _numHandlerThreads; i++) {
I2PThread hThread = new I2PThread(_handler, "BuildHandler " + i + '/' + numHandlerThreads, true);
hThread.start();
}
@@ -396,7 +397,9 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
public void restart() {
shutdown();
_handler.restart();
_executor.restart();
shutdownExploratory();
startup();
}
@@ -548,12 +551,21 @@ public class TunnelPoolManager implements TunnelManagerFacade {
}
}
/**
* Cannot be restarted
*/
public void shutdown() {
_handler.shutdown(_numHandlerThreads);
_executor.shutdown();
shutdownExploratory();
_isShutdown = true;
}
private void shutdownExploratory() {
if (_inboundExploratory != null)
_inboundExploratory.shutdown();
if (_outboundExploratory != null)
_outboundExploratory.shutdown();
_isShutdown = true;
}
/** list of TunnelPool instances currently in play */