diff --git a/apps/addressbook/java/src/net/i2p/addressbook/DaemonThread.java b/apps/addressbook/java/src/net/i2p/addressbook/DaemonThread.java index ae0447336085fbc914a52e57ffc1e825a873cf06..fe74c7a83a6412088cc4c4ba20099146765112b3 100644 --- a/apps/addressbook/java/src/net/i2p/addressbook/DaemonThread.java +++ b/apps/addressbook/java/src/net/i2p/addressbook/DaemonThread.java @@ -32,7 +32,7 @@ import net.i2p.client.naming.NamingServiceUpdater; * @author Ragnarok * */ -class DaemonThread extends Thread implements NamingServiceUpdater { +public class DaemonThread extends Thread implements NamingServiceUpdater { private String[] args; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java index 5d7b6d66db947808fde63b67e859c85dbec3191d..f119ed751594fe63c129a5f0b3714931b03c02fe 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Random; -import java.util.TimerTask; import net.i2p.I2PAppContext; @@ -32,7 +31,7 @@ import net.i2p.I2PAppContext; * TimerTask that checks for good/bad up/downloader. Works together * with the PeerCoordinator to select which Peers get (un)choked. */ -class PeerCheckerTask extends TimerTask +class PeerCheckerTask implements Runnable { private static final long KILOPERSECOND = 1024*(PeerCoordinator.CHECK_PERIOD/1000); @@ -54,8 +53,6 @@ class PeerCheckerTask extends TimerTask List<Peer> peerList = coordinator.peerList(); if (peerList.isEmpty() || coordinator.halted()) { coordinator.setRateHistory(0, 0); - if (coordinator.halted()) - cancel(); return; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java index 3ee8932f61f1653d5ba9dc3143fdd5c833001ee7..db835f730c51556e833544d21540b4641d2136c7 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java @@ -21,7 +21,6 @@ package org.klomp.snark; import java.util.Iterator; -import java.util.TimerTask; import net.i2p.data.DataHelper; @@ -29,7 +28,7 @@ import net.i2p.data.DataHelper; * TimerTask that monitors the peers and total up/download speeds. * Works together with the main Snark class to report periodical statistics. */ -class PeerMonitorTask extends TimerTask +class PeerMonitorTask implements Runnable { final static long MONITOR_PERIOD = 10 * 1000; // Ten seconds. private static final long KILOPERSECOND = 1024 * (MONITOR_PERIOD / 1000); diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index c43679ed398e13a38ea4fe30a0c79a76ad73933e..0f294de0bbee87689374ed2050640467b61c0789 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -32,8 +32,6 @@ import java.util.List; import java.util.Properties; import java.util.Random; import java.util.StringTokenizer; -import java.util.Timer; -import java.util.TimerTask; import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PServerSocket; diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 695cd7b7a313597cd322c213e57e959094f21b73..e6e1ba1c012bbae3171753160ab2e84f59a90115 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -48,14 +48,14 @@ public class SnarkManager implements Snark.CompleteListener { private final Object _addSnarkLock; private /* FIXME final FIXME */ File _configFile; private Properties _config; - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; private final List _messages; - private I2PSnarkUtil _util; + private final I2PSnarkUtil _util; private PeerCoordinatorSet _peerCoordinatorSet; private ConnectionAcceptor _connectionAcceptor; private Thread _monitor; - private boolean _running; + private volatile boolean _running; public static final String PROP_I2CP_HOST = "i2psnark.i2cpHost"; public static final String PROP_I2CP_PORT = "i2psnark.i2cpPort"; @@ -1089,7 +1089,7 @@ public class SnarkManager implements Snark.CompleteListener { // although the user will see the default until then getBWLimit(); boolean doMagnets = true; - while (true) { + while (_running) { File dir = getDataDir(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Directory Monitor loop over " + dir.getAbsolutePath()); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java index 7fc808643b078992e2b2fb65e70d6eb610ef9fca..8ede2033202de5a3a2fd52fc6ab0a12d81ae5186 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java @@ -233,7 +233,7 @@ class HTTPResponseOutputStream extends FilterOutputStream { // there after the accept(). // Overridden in I2PTunnelHTTPServer, where it does not use the client pool. try { - I2PTunnelClientBase._executor.execute(new Pusher(pi, out)); + I2PTunnelClientBase.getClientExecutor().execute(new Pusher(pi, out)); } catch (RejectedExecutionException ree) { // shouldn't happen throw ree; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java index 293703ac9c9786580ad4a1e6c6b691099d95d2e8..069199ef14d8a6d978abcd7834cdd2074221d460 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java @@ -73,12 +73,12 @@ import net.i2p.util.Log; * Todo: Most events are not listened to elsewhere, so error propagation is poor */ public class I2PTunnel implements Logging, EventDispatcher { - private Log _log; - private EventDispatcherImpl _event; - private I2PAppContext _context; + private final Log _log; + private final EventDispatcherImpl _event; + private final I2PAppContext _context; private static long __tunnelId = 0; - private long _tunnelId; - private Properties _clientOptions; + private final long _tunnelId; + private final Properties _clientOptions; private final List<I2PSession> _sessions; public static final int PACKET_DELAY = 100; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index ae723661c649a98fdf7394627a1cf210b3a2eb15..a1ffbc83013be6b22016322f5dfa43d672eda94c 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -83,11 +83,8 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna * Extending classes may use it for other purposes. * Not for use by servers, as there is no limit on threads. */ - static final Executor _executor; + private static volatile ThreadPoolExecutor _executor; private static int _executorThreadCount; - static { - _executor = new CustomThreadPoolExecutor(); - } public I2PTunnelClientBase(int localPort, Logging l, I2PSocketManager sktMgr, I2PTunnel tunnel, EventDispatcher notifyThis, long clientId ) @@ -107,6 +104,11 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _log = _context.logManager().getLog(getClass()); + synchronized (I2PTunnelClientBase.class) { + if (_executor == null) + _executor = new CustomThreadPoolExecutor(); + } + Thread t = new I2PAppThread(this, "Client " + tunnel.listenHost + ':' + localPort); listenerReady = false; t.start(); @@ -160,6 +162,11 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _log = _context.logManager().getLog(getClass()); + synchronized (I2PTunnelClientBase.class) { + if (_executor == null) + _executor = new CustomThreadPoolExecutor(); + } + // normalize path so we can find it if (pkf != null) { File keyFile = new File(pkf); @@ -551,6 +558,30 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } + /** + * @return may be null if no class has been instantiated + * @since 0.8.8 + */ + static ThreadPoolExecutor getClientExecutor() { + return _executor; + } + + /** + * @since 0.8.8 + */ + static void killClientExecutor() { + synchronized (I2PTunnelClientBase.class) { + if (_executor != null) { + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + _executor.shutdownNow(); + _executor = null; + } + // kill the shared client, so that on restart in android + // we won't latch onto the old one + socketManager = null; + } + } + /** * Manage the connection just opened on the specified socket * @@ -558,8 +589,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna */ protected void manageConnection(Socket s) { if (s == null) return; + ThreadPoolExecutor tpe = _executor; + if (tpe == null) { + _log.error("No executor for socket!"); + try { + s.close(); + } catch (IOException ioe) {} + return; + } try { - _executor.execute(new BlockingRunner(s)); + tpe.execute(new BlockingRunner(s)); } catch (RejectedExecutionException ree) { // should never happen, we have an unbounded pool and never stop the executor try { @@ -635,7 +674,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } //l.log("Client closed."); } - return true; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index babdba976de36619bbcdcbb3cda169d257f5880f..6947b520f7a2e1e98a98dae8e2d095ed830db7d3 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -67,6 +67,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected I2PTunnelTask task = null; protected boolean bidir = false; + private ThreadPoolExecutor _executor; private int DEFAULT_LOCALPORT = 4488; protected int localPort = DEFAULT_LOCALPORT; @@ -259,6 +260,10 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } //l.log("Server shut down."); open = false; + if (_usePool && _executor != null) { + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + _executor.shutdownNow(); + } return true; } } @@ -283,7 +288,6 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { */ public void run() { I2PServerSocket i2pS_S = sockMgr.getServerSocket(); - ThreadPoolExecutor executor = null; if (_log.shouldLog(Log.WARN)) { if (_usePool) _log.warn("Starting executor with " + getHandlerCount() + " threads max"); @@ -291,7 +295,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { _log.warn("Threads disabled, running blockingHandles inline"); } if (_usePool) { - executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort); + _executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort); } while (open) { try { @@ -299,7 +303,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { if (i2ps == null) throw new I2PException("I2PServerSocket closed"); if (_usePool) { try { - executor.execute(new Handler(i2ps)); + _executor.execute(new Handler(i2ps)); } catch (RejectedExecutionException ree) { try { i2ps.close(); @@ -328,8 +332,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { // ignored, we never set the timeout } } - if (executor != null) - executor.shutdownNow(); + if (_executor != null) + _executor.shutdownNow(); } /** diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java index f1e9f7108dac21555fae15d887d0b0d2176a9074..600464f87302e0ef7acc1ba38a8276b3017c43c4 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java @@ -26,7 +26,7 @@ import net.i2p.util.OrderedProperties; * Warning - this is a singleton. Todo: fix */ public class TunnelControllerGroup { - private final Log _log; + private Log _log; private static TunnelControllerGroup _instance; static final String DEFAULT_CONFIG_FILE = "i2ptunnel.config"; @@ -55,6 +55,7 @@ public class TunnelControllerGroup { _configFile = configFile; _sessions = new HashMap(4); loadControllers(_configFile); + I2PAppContext.getGlobalContext().addShutdownTask(new Shutdown()); } public static void main(String args[]) { @@ -71,6 +72,34 @@ public class TunnelControllerGroup { } } } + + /** + * Warning - destroys the singleton! + * @since 0.8.8 + */ + private static class Shutdown implements Runnable { + public void run() { + shutdown(); + } + } + + /** + * Warning - destroys the singleton! + * Caller must root a new context before calling instance() or main() again. + * Agressively kill and null everything to reduce memory usage in the JVM + * after stopping, and to recognize what must be reinitialized on restart (Android) + * + * @since 0.8.8 + */ + public static void shutdown() { + synchronized (TunnelControllerGroup.class) { + if (_instance == null) return; + _instance.unloadControllers(); + _instance._log = null; + _instance = null; + } + I2PTunnelClientBase.killClientExecutor(); + } /** * Load up all of the tunnels configured in the given file (but do not start diff --git a/apps/routerconsole/java/src/net/i2p/router/web/NewsFetcher.java b/apps/routerconsole/java/src/net/i2p/router/web/NewsFetcher.java index 1f1cd4098d55ff7ac55949dcc2ccb47c66596f09..a11b8adc52c1c6bf9068e6aaf094fe25a7be5fc8 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/NewsFetcher.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/NewsFetcher.java @@ -37,6 +37,8 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener { private File _newsFile; private File _tempFile; private static NewsFetcher _instance; + private volatile boolean _isRunning; + //public static final synchronized NewsFetcher getInstance() { return _instance; } public static final synchronized NewsFetcher getInstance(I2PAppContext ctx) { if (_instance != null) @@ -64,8 +66,14 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener { _tempFile = new File(_context.getTempDir(), TEMP_NEWS_FILE); updateLastFetched(); _updateVersion = ""; + _isRunning = true; } + /** @since 0.8.8 */ + void shutdown() { + _isRunning = false; + } + private void updateLastFetched() { if (_newsFile.exists()) { if (_lastUpdated == 0) @@ -108,7 +116,7 @@ public class NewsFetcher implements Runnable, EepGet.StatusListener { public void run() { try { Thread.sleep(INITIAL_DELAY + _context.random().nextLong(INITIAL_DELAY)); } catch (InterruptedException ie) {} - while (true) { + while (_isRunning) { if (!_updateAvailable) checkForUpdates(); if (shouldFetchNews()) { fetchNews(); diff --git a/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java b/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java index be46bdca0c477a34b75b9f4acd2a43cff6b90c0b..a436e3e83bcf8448ec3d743f97f395d3ce4987a4 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/RouterConsoleRunner.java @@ -342,10 +342,10 @@ public class RouterConsoleRunner { } NewsFetcher fetcher = NewsFetcher.getInstance(I2PAppContext.getGlobalContext()); - Thread t = new I2PAppThread(fetcher, "NewsFetcher", true); - t.start(); + Thread newsThread = new I2PAppThread(fetcher, "NewsFetcher", true); + newsThread.start(); - t = new I2PAppThread(new StatSummarizer(), "StatSummarizer", true); + Thread t = new I2PAppThread(new StatSummarizer(), "StatSummarizer", true); t.start(); List<RouterContext> contexts = RouterContext.listContexts(); @@ -356,6 +356,9 @@ public class RouterConsoleRunner { t.start(); ctx.addShutdownTask(new PluginStopper(ctx)); } + ctx.addShutdownTask(new NewsShutdown(fetcher, newsThread)); + // stat summarizer registers its own hook + ctx.addShutdownTask(new ServerShutdown()); } } @@ -495,16 +498,31 @@ public class RouterConsoleRunner { } } -/******* - public void stopConsole() { - try { - _server.stop(); - } catch (InterruptedException ie) { - ie.printStackTrace(); + /** @since 0.8.8 */ + private class ServerShutdown implements Runnable { + public void run() { + try { + _server.stop(); + } catch (InterruptedException ie) {} } } -********/ + /** @since 0.8.8 */ + private static class NewsShutdown implements Runnable { + private final NewsFetcher _fetcher; + private final Thread _newsThread; + + public NewsShutdown(NewsFetcher fetcher, Thread t) { + _fetcher = fetcher; + _newsThread = t; + } + + public void run() { + _fetcher.shutdown(); + _newsThread.interrupt(); + } + } + public static Properties webAppProperties() { return webAppProperties(I2PAppContext.getGlobalContext().getConfigDir().getAbsolutePath()); } diff --git a/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java b/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java index 961ff539514179c8c752f3e7abf1936b3fa4a504..6ca88b29367fb188bcaf94b16bd5b5967a67b1a9 100644 --- a/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java +++ b/core/java/src/gnu/crypto/prng/AsyncFortunaStandalone.java @@ -2,6 +2,7 @@ package gnu.crypto.prng; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; import net.i2p.util.Log; @@ -10,6 +11,12 @@ import net.i2p.util.Log; * fortuna instance that tries to avoid blocking if at all possible by using separate * filled buffer segments rather than one buffer (and blocking when that buffer's data * has been eaten) + * + * Note that this class is not fully Thread safe! + * The following methods must be synchronized externally, they are not + * sycned here or in super(): + * addRandomByte(), addRandomBytes(), nextByte(), nextBytes(), seed() + * */ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnable { /** @@ -19,25 +26,23 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl private static final int DEFAULT_BUFFERS = 2; private static final int DEFAULT_BUFSIZE = 256*1024; private final int _bufferCount; - private final byte asyncBuffers[][]; - private final int status[]; - private int nextBuf = 0; + private final int _bufferSize; + /** the lock */ + private final Object asyncBuffers = new Object(); private final I2PAppContext _context; private final Log _log; + private volatile boolean _isRunning; + private Thread _refillThread; + private final LinkedBlockingQueue<AsyncBuffer> _fullBuffers; + private final LinkedBlockingQueue<AsyncBuffer> _emptyBuffers; + private AsyncBuffer _currentBuffer; - private static final int STATUS_NEED_FILL = 0; - private static final int STATUS_FILLING = 1; - private static final int STATUS_FILLED = 2; - private static final int STATUS_LIVE = 3; - public AsyncFortunaStandalone(I2PAppContext context) { super(); _bufferCount = Math.max(context.getProperty("prng.buffers", DEFAULT_BUFFERS), 2); - int bufferSize = Math.max(context.getProperty("prng.bufferSize", DEFAULT_BUFSIZE), 16*1024); - asyncBuffers = new byte[_bufferCount][bufferSize]; - status = new int[_bufferCount]; - for (int i = 0; i < _bufferCount; i++) - status[i] = STATUS_NEED_FILL; + _bufferSize = Math.max(context.getProperty("prng.bufferSize", DEFAULT_BUFSIZE), 16*1024); + _emptyBuffers = new LinkedBlockingQueue(_bufferCount); + _fullBuffers = new LinkedBlockingQueue(_bufferCount); _context = context; context.statManager().createRequiredRateStat("prng.bufferWaitTime", "Delay for random number buffer (ms)", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } ); context.statManager().createRequiredRateStat("prng.bufferFillTime", "Time to fill random number buffer (ms)", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } ); @@ -45,10 +50,27 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl } public void startup() { - Thread refillThread = new Thread(this, "PRNG"); - refillThread.setDaemon(true); - refillThread.setPriority(Thread.MIN_PRIORITY+1); - refillThread.start(); + for (int i = 0; i < _bufferCount; i++) + _emptyBuffers.offer(new AsyncBuffer(_bufferSize)); + _isRunning = true; + _refillThread = new Thread(this, "PRNG"); + _refillThread.setDaemon(true); + _refillThread.setPriority(Thread.MIN_PRIORITY+1); + _refillThread.start(); + } + + /** + * Note - methods may hang or NPE or throw IllegalStateExceptions after this + * @since 0.8.8 + */ + public void shutdown() { + _isRunning = false; + _emptyBuffers.clear(); + _fullBuffers.clear(); + _refillThread.interrupt(); + // unsynchronized to avoid hanging, may NPE elsewhere + _currentBuffer = null; + buffer = null; } /** the seed is only propogated once the prng is started with startup() */ @@ -63,80 +85,67 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl @Override protected void allocBuffer() {} + private static class AsyncBuffer { + public final byte[] buffer; + + public AsyncBuffer(int size) { + buffer = new byte[size]; + } + } + /** * make the next available filled buffer current, scheduling any unfilled * buffers for refill, and blocking until at least one buffer is ready */ protected void rotateBuffer() { synchronized (asyncBuffers) { - // wait until we get some filled + AsyncBuffer old = _currentBuffer; + if (old != null) + _emptyBuffers.offer(old); long before = System.currentTimeMillis(); - long waited = 0; - while (status[nextBuf] != STATUS_FILLED) { - //System.out.println(Thread.currentThread().getName() + ": Next PRNG buffer " - // + nextBuf + " isn't ready (" + status[nextBuf] + ")"); - //new Exception("source").printStackTrace(); - asyncBuffers.notifyAll(); + AsyncBuffer nextBuffer = null; + + while (nextBuffer == null) { + if (!_isRunning) + throw new IllegalStateException("shutdown"); try { - asyncBuffers.wait(); - } catch (InterruptedException ie) {} - waited = System.currentTimeMillis()-before; + nextBuffer = _fullBuffers.take(); + } catch (InterruptedException ie) { + continue; + } } + long waited = System.currentTimeMillis()-before; _context.statManager().addRateData("prng.bufferWaitTime", waited, 0); if (waited > 10*1000 && _log.shouldLog(Log.WARN)) _log.warn(Thread.currentThread().getName() + ": Took " + waited + "ms for a full PRNG buffer to be found"); - //System.out.println(Thread.currentThread().getName() + ": Switching to prng buffer " + nextBuf); - buffer = asyncBuffers[nextBuf]; - status[nextBuf] = STATUS_LIVE; - int prev=nextBuf-1; - if (prev<0) - prev = _bufferCount-1; - if (status[prev] == STATUS_LIVE) - status[prev] = STATUS_NEED_FILL; - nextBuf++; - if (nextBuf >= _bufferCount) - nextBuf = 0; - asyncBuffers.notify(); + _currentBuffer = nextBuffer; + buffer = nextBuffer.buffer; } } + /** + * The refiller thread + */ public void run() { - while (true) { - int toFill = -1; + while (_isRunning) { + AsyncBuffer aBuff = null; try { - synchronized (asyncBuffers) { - for (int i = 0; i < _bufferCount; i++) { - if (status[i] == STATUS_NEED_FILL) { - status[i] = STATUS_FILLING; - toFill = i; - break; - } - } - if (toFill == -1) { - //System.out.println(Thread.currentThread().getName() + ": All pending buffers full"); - asyncBuffers.wait(); - } - } - } catch (InterruptedException ie) {} + aBuff = _emptyBuffers.take(); + } catch (InterruptedException ie) { + continue; + } - if (toFill != -1) { - //System.out.println(Thread.currentThread().getName() + ": Filling prng buffer " + toFill); long before = System.currentTimeMillis(); - doFill(asyncBuffers[toFill]); + doFill(aBuff.buffer); long after = System.currentTimeMillis(); - synchronized (asyncBuffers) { - status[toFill] = STATUS_FILLED; - //System.out.println(Thread.currentThread().getName() + ": Prng buffer " + toFill + " filled after " + (after-before)); - asyncBuffers.notifyAll(); - } + _fullBuffers.offer(aBuff); _context.statManager().addRateData("prng.bufferFillTime", after - before, 0); Thread.yield(); long waitTime = (after-before)*5; if (waitTime <= 0) // somehow postman saw waitTime show up as negative waitTime = 50; try { Thread.sleep(waitTime); } catch (InterruptedException ie) {} - } } } diff --git a/core/java/src/net/i2p/I2PAppContext.java b/core/java/src/net/i2p/I2PAppContext.java index fa352ada6acd6283dc3e1a838c7af5bf567bc1c2..146d4e0e41fab7ea8240ec05e8c73ef150654f9b 100644 --- a/core/java/src/net/i2p/I2PAppContext.java +++ b/core/java/src/net/i2p/I2PAppContext.java @@ -1,6 +1,7 @@ package net.i2p; import java.io.File; +import java.util.Collections; import java.util.HashSet; import java.util.Properties; import java.util.Random; @@ -100,7 +101,7 @@ public class I2PAppContext { private volatile boolean _randomInitialized; private volatile boolean _keyGeneratorInitialized; protected volatile boolean _keyRingInitialized; // used in RouterContext - private Set<Runnable> _shutdownTasks; + protected final Set<Runnable> _shutdownTasks; private File _baseDir; private File _configDir; private File _routerDir; @@ -114,6 +115,10 @@ public class I2PAppContext { * Pull the default context, creating a new one if necessary, else using * the first one created. * + * Warning - do not save the returned value, or the value of any methods below, + * in a static field, or you will get the old context if a new router is + * started in the same JVM after the first is shut down, + * e.g. on Android. */ public static I2PAppContext getGlobalContext() { // skip the global lock - _gAC must be volatile @@ -164,8 +169,12 @@ public class I2PAppContext { private I2PAppContext(boolean doInit, Properties envProps) { if (doInit) { synchronized (I2PAppContext.class) { - if (_globalAppContext == null) + if (_globalAppContext == null) { _globalAppContext = this; + } else { + System.out.println("Warning - New context not replacing old one, you now have a second one"); + (new Exception("I did it")).printStackTrace(); + } } } _overrideProps = new I2PProperties(); @@ -185,7 +194,7 @@ public class I2PAppContext { _elGamalAESEngineInitialized = false; _logManagerInitialized = false; _keyRingInitialized = false; - _shutdownTasks = new ConcurrentHashSet(0); + _shutdownTasks = new ConcurrentHashSet(32); initializeDirs(); } @@ -843,12 +852,24 @@ public class I2PAppContext { } } + /** + * WARNING - Shutdown tasks are not executed in an I2PAppContext. + * You must be in a RouterContext for the tasks to be executed + * at shutdown. + * This method moved from Router in 0.7.1 so that clients + * may use it without depending on router.jar. + * @since 0.7.1 + */ public void addShutdownTask(Runnable task) { _shutdownTasks.add(task); } + /** + * @return an unmodifiable Set + * @since 0.7.1 + */ public Set<Runnable> getShutdownTasks() { - return new HashSet(_shutdownTasks); + return Collections.unmodifiableSet(_shutdownTasks); } /** diff --git a/core/java/src/net/i2p/crypto/DHSessionKeyBuilder.java b/core/java/src/net/i2p/crypto/DHSessionKeyBuilder.java index 160cc1354bb54a5d5e2c3948f65574b7eae05aaa..072fa50b91c5ccb980ade5305b287989462222aa 100644 --- a/core/java/src/net/i2p/crypto/DHSessionKeyBuilder.java +++ b/core/java/src/net/i2p/crypto/DHSessionKeyBuilder.java @@ -47,13 +47,14 @@ import net.i2p.util.RandomSource; * @author jrandom */ public class DHSessionKeyBuilder { - private static final I2PAppContext _context = I2PAppContext.getGlobalContext(); - private static final Log _log; + private static I2PAppContext _context = I2PAppContext.getGlobalContext(); + private static Log _log; private static final int MIN_NUM_BUILDERS; private static final int MAX_NUM_BUILDERS; private static final int CALC_DELAY; private static final LinkedBlockingQueue<DHSessionKeyBuilder> _builders; - private static final Thread _precalcThread; + private static Thread _precalcThread; + private static volatile boolean _isRunning; // the data of importance private BigInteger _myPrivateValue; @@ -96,14 +97,46 @@ public class DHSessionKeyBuilder { if (_log.shouldLog(Log.DEBUG)) _log.debug("DH Precalc (minimum: " + MIN_NUM_BUILDERS + " max: " + MAX_NUM_BUILDERS + ", delay: " + CALC_DELAY + ")"); + startPrecalc(); + } - _precalcThread = new I2PThread(new DHSessionKeyBuilderPrecalcRunner(MIN_NUM_BUILDERS, MAX_NUM_BUILDERS)); - _precalcThread.setName("DH Precalc"); - _precalcThread.setDaemon(true); + /** + * Caller must synch on class + * @since 0.8.8 + */ + private static void startPrecalc() { + _context = I2PAppContext.getGlobalContext(); + _log = _context.logManager().getLog(DHSessionKeyBuilder.class); + _precalcThread = new I2PThread(new DHSessionKeyBuilderPrecalcRunner(MIN_NUM_BUILDERS, MAX_NUM_BUILDERS), + "DH Precalc", true); _precalcThread.setPriority(Thread.MIN_PRIORITY); + _isRunning = true; _precalcThread.start(); } + /** + * Note that this stops the singleton precalc thread. + * You don't want to do this if there are multiple routers in the JVM. + * Fix this if you care. See Router.shutdown(). + * @since 0.8.8 + */ + public static void shutdown() { + _isRunning = false; + _precalcThread.interrupt(); + _builders.clear(); + } + + /** + * Only required if shutdown() previously called. + * @since 0.8.8 + */ + public static void restart() { + synchronized(DHSessionKeyBuilder.class) { + if (!_isRunning) + startPrecalc(); + } + } + /** * Construct a new DH key builder * or pulls a prebuilt one from the queue. @@ -475,7 +508,7 @@ public class DHSessionKeyBuilder { } public void run() { - while (true) { + while (_isRunning) { int curSize = 0; long start = System.currentTimeMillis(); diff --git a/core/java/src/net/i2p/crypto/ElGamalEngine.java b/core/java/src/net/i2p/crypto/ElGamalEngine.java index 29f5df35a0eb0bec59f118036eb4aac6e2cad206..ba5d09f5bee7936ddf2749d2bb9f8bf791b3883c 100644 --- a/core/java/src/net/i2p/crypto/ElGamalEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalEngine.java @@ -78,6 +78,24 @@ public class ElGamalEngine { } + /** + * Note that this stops the singleton precalc thread. + * You don't want to do this if there are multiple routers in the JVM. + * Fix this if you care. See Router.shutdown(). + * @since 0.8.8 + */ + public void shutdown() { + YKGenerator.shutdown(); + } + + /** + * Only required if shutdown() previously called. + * @since 0.8.8 + */ + public static void restart() { + YKGenerator.restart(); + } + private final static BigInteger _two = new NativeBigInteger(1, new byte[] { 0x02}); private BigInteger[] getNextYK() { diff --git a/core/java/src/net/i2p/crypto/YKGenerator.java b/core/java/src/net/i2p/crypto/YKGenerator.java index ef8546e2f0f415225669fc213101fbf3488ababe..9a261a5db897d0548e151b99be135eec527f1bc6 100644 --- a/core/java/src/net/i2p/crypto/YKGenerator.java +++ b/core/java/src/net/i2p/crypto/YKGenerator.java @@ -42,8 +42,9 @@ class YKGenerator { private static final int MAX_NUM_BUILDERS; private static final int CALC_DELAY; private static final LinkedBlockingQueue<BigInteger[]> _values; - private static final Thread _precalcThread; - private static final I2PAppContext ctx; + private static Thread _precalcThread; + private static I2PAppContext ctx; + private static volatile boolean _isRunning; public final static String PROP_YK_PRECALC_MIN = "crypto.yk.precalc.min"; public final static String PROP_YK_PRECALC_MAX = "crypto.yk.precalc.max"; @@ -75,16 +76,47 @@ class YKGenerator { // _log.debug("ElGamal YK Precalc (minimum: " + MIN_NUM_BUILDERS + " max: " + MAX_NUM_BUILDERS + ", delay: " // + CALC_DELAY + ")"); + startPrecalc(); + } + + /** + * Caller must synch on class + * @since 0.8.8 + */ + private static void startPrecalc() { + ctx = I2PAppContext.getGlobalContext(); ctx.statManager().createRateStat("crypto.YKUsed", "Need a YK from the queue", "Encryption", new long[] { 60*60*1000 }); ctx.statManager().createRateStat("crypto.YKEmpty", "YK queue empty", "Encryption", new long[] { 60*60*1000 }); - - _precalcThread = new I2PThread(new YKPrecalcRunner(MIN_NUM_BUILDERS, MAX_NUM_BUILDERS)); - _precalcThread.setName("YK Precalc"); - _precalcThread.setDaemon(true); + _precalcThread = new I2PThread(new YKPrecalcRunner(MIN_NUM_BUILDERS, MAX_NUM_BUILDERS), + "YK Precalc", true); _precalcThread.setPriority(Thread.MIN_PRIORITY); + _isRunning = true; _precalcThread.start(); } + /** + * Note that this stops the singleton precalc thread. + * You don't want to do this if there are multiple routers in the JVM. + * Fix this if you care. See Router.shutdown(). + * @since 0.8.8 + */ + public static void shutdown() { + _isRunning = false; + _precalcThread.interrupt(); + _values.clear(); + } + + /** + * Only required if shutdown() previously called. + * @since 0.8.8 + */ + public static void restart() { + synchronized(YKGenerator.class) { + if (!_isRunning) + startPrecalc(); + } + } + private static final int getSize() { return _values.size(); } @@ -161,7 +193,7 @@ class YKGenerator { } public void run() { - while (true) { + while (_isRunning) { int curSize = 0; //long start = Clock.getInstance().now(); int startSize = getSize(); @@ -172,7 +204,7 @@ class YKGenerator { _checkDelay += 1000; curSize = startSize; if (curSize < _minSize) { - for (int i = curSize; i < _maxSize; i++) { + for (int i = curSize; i < _maxSize && _isRunning; i++) { //long begin = Clock.getInstance().now(); if (!addValues(generateYK())) break; diff --git a/core/java/src/net/i2p/data/SDSCache.java b/core/java/src/net/i2p/data/SDSCache.java index 8698f6a78b7d80c9ce89b4d18f480099412f4025..8902ea1c2520b4006a6dc8c83a7f94aa9008e757 100644 --- a/core/java/src/net/i2p/data/SDSCache.java +++ b/core/java/src/net/i2p/data/SDSCache.java @@ -83,6 +83,18 @@ public class SDSCache<V extends SimpleDataStructure> { _log.debug("New SDSCache for " + rvClass + " data size: " + len + " max: " + size + " max mem: " + (len * size)); I2PAppContext.getGlobalContext().statManager().createRateStat(_statName, "Hit rate", "Router", new long[] { 10*60*1000 }); + I2PAppContext.getGlobalContext().addShutdownTask(new Shutdown()); + } + + /** + * @since 0.8.8 + */ + private class Shutdown implements Runnable { + public void run() { + synchronized(_cache) { + _cache.clear(); + } + } } /** diff --git a/core/java/src/net/i2p/time/Timestamper.java b/core/java/src/net/i2p/time/Timestamper.java index 2c798bab6b2bb75b9671b3951d6b17118fe81246..829d7a68a4127145e1c34f788a23b363f7a3a0a3 100644 --- a/core/java/src/net/i2p/time/Timestamper.java +++ b/core/java/src/net/i2p/time/Timestamper.java @@ -29,6 +29,8 @@ public class Timestamper implements Runnable { private boolean _daemon; private boolean _initialized; private boolean _wellSynced; + private volatile boolean _isRunning; + private Thread _timestamperThread; private static final int MIN_QUERY_FREQUENCY = 5*60*1000; private static final int DEFAULT_QUERY_FREQUENCY = 5*60*1000; @@ -106,10 +108,11 @@ public class Timestamper implements Runnable { } private void startTimestamper() { - I2PThread t = new I2PThread(this, "Timestamper"); - t.setPriority(I2PThread.MIN_PRIORITY); - t.setDaemon(_daemon); - t.start(); + _timestamperThread = new I2PThread(this, "Timestamper", _daemon); + _timestamperThread.setPriority(I2PThread.MIN_PRIORITY); + _isRunning = true; + _timestamperThread.start(); + _context.addShutdownTask(new Shutdown()); } public void waitForInitialization() { @@ -121,6 +124,15 @@ public class Timestamper implements Runnable { } catch (InterruptedException ie) {} } + /** @since 0.8.8 */ + private class Shutdown implements Runnable { + public void run() { + _isRunning = false; + if (_timestamperThread != null) + _timestamperThread.interrupt(); + } + } + public void run() { try { Thread.sleep(1000); } catch (InterruptedException ie) {} _log = _context.logManager().getLog(Timestamper.class); @@ -128,7 +140,7 @@ public class Timestamper implements Runnable { _log.info("Starting timestamper"); boolean lastFailed = false; try { - while (true) { + while (_isRunning) { updateConfig(); if (!_disabled) { // first the servers for our country, if we know what country we're in... diff --git a/core/java/src/net/i2p/util/DecayingBloomFilter.java b/core/java/src/net/i2p/util/DecayingBloomFilter.java index ff564b2656b659decc000a2a43d03732957d9d3f..73e4f6523f0949028f4f89e14b02337c764b1169 100644 --- a/core/java/src/net/i2p/util/DecayingBloomFilter.java +++ b/core/java/src/net/i2p/util/DecayingBloomFilter.java @@ -107,8 +107,18 @@ public class DecayingBloomFilter { context.statManager().createRateStat("router.decayingBloomFilter." + name + ".log10(falsePos)", "log10 of the false positive rate (must have net.i2p.util.DecayingBloomFilter=DEBUG)", "Router", new long[] { Math.max(60*1000, durationMs) }); + context.addShutdownTask(new Shutdown()); } + /** + * @since 0.8.8 + */ + private class Shutdown implements Runnable { + public void run() { + clear(); + } + } + public long getCurrentDuplicateCount() { return _currentDuplicates; } public int getInsertedCount() { diff --git a/core/java/src/net/i2p/util/DecayingHashSet.java b/core/java/src/net/i2p/util/DecayingHashSet.java index f090cf727b61989ca9bec1d4f205ba48767fba17..4a10f994e29eac1360f46790070fcd3a645aec4e 100644 --- a/core/java/src/net/i2p/util/DecayingHashSet.java +++ b/core/java/src/net/i2p/util/DecayingHashSet.java @@ -93,8 +93,18 @@ public class DecayingHashSet extends DecayingBloomFilter { "Size", "Router", new long[] { Math.max(60*1000, durationMs) }); context.statManager().createRateStat("router.decayingHashSet." + name + ".dups", "1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) }); + context.addShutdownTask(new Shutdown()); } + /** + * @since 0.8.8 + */ + private class Shutdown implements Runnable { + public void run() { + clear(); + } + } + /** unsynchronized but only used for logging elsewhere */ @Override public int getInsertedCount() { diff --git a/core/java/src/net/i2p/util/EepGet.java b/core/java/src/net/i2p/util/EepGet.java index 45622a002aede3f83bcacb0e9bc7877d743df5e5..57f4abe026da1f8d04fb6dd8b674613ccde97917 100644 --- a/core/java/src/net/i2p/util/EepGet.java +++ b/core/java/src/net/i2p/util/EepGet.java @@ -842,7 +842,7 @@ public class EepGet { if (val.indexOf("chunked") != -1) _encodingChunked = true; } else if (key.equalsIgnoreCase("Content-Type")) { - _contentType=val; + _contentType=val.trim(); } else if (key.equalsIgnoreCase("Location")) { _redirectLocation=val.trim(); } else { diff --git a/core/java/src/net/i2p/util/FortunaRandomSource.java b/core/java/src/net/i2p/util/FortunaRandomSource.java index 9061d3a570d585812f03f676ff62c16efd5b4e4b..1e201cb2c8524acc20fdbbce8e288dc13f00190a 100644 --- a/core/java/src/net/i2p/util/FortunaRandomSource.java +++ b/core/java/src/net/i2p/util/FortunaRandomSource.java @@ -44,6 +44,14 @@ public class FortunaRandomSource extends RandomSource implements EntropyHarveste _haveNextGaussian = false; } + /** + * Note - methods may hang or NPE or throw IllegalStateExceptions after this + * @since 0.8.8 + */ + public void shutdown() { + _fortuna.shutdown(); + } + @Override public synchronized void setSeed(byte buf[]) { _fortuna.addRandomBytes(buf); diff --git a/core/java/src/net/i2p/util/I2PThread.java b/core/java/src/net/i2p/util/I2PThread.java index 90c95d3818d70b3a3c17c566a5a6190cf5ade57d..3757003909844bc225f619d7722b9a7fd621ae2f 100644 --- a/core/java/src/net/i2p/util/I2PThread.java +++ b/core/java/src/net/i2p/util/I2PThread.java @@ -20,7 +20,11 @@ import java.util.concurrent.CopyOnWriteArraySet; * */ public class I2PThread extends Thread { - private static volatile Log _log; + /** + * Non-static to avoid refs to old context in Android. + * Probably should just remove all the logging though. + */ + private volatile Log _log; private static final Set _listeners = new CopyOnWriteArraySet(); private String _name; private Exception _createdBy; @@ -61,8 +65,9 @@ public class I2PThread extends Thread { _createdBy = new Exception("Created by"); } - private static void log(int level, String msg) { log(level, msg, null); } - private static void log(int level, String msg, Throwable t) { + private void log(int level, String msg) { log(level, msg, null); } + + private void log(int level, String msg, Throwable t) { // we cant assume log is created if (_log == null) _log = new Log(I2PThread.class); if (_log.shouldLog(level)) @@ -85,7 +90,9 @@ public class I2PThread extends Thread { if (t instanceof OutOfMemoryError) fireOOM((OutOfMemoryError)t); } - log(Log.INFO, "Thread finished normally: " + _name); + // This creates a new I2PAppContext after it was deleted + // in Router.finalShutdown() via RouterContext.killGlobalContext() + //log(Log.INFO, "Thread finished normally: " + _name); } @Override diff --git a/core/java/src/net/i2p/util/LogConsoleBuffer.java b/core/java/src/net/i2p/util/LogConsoleBuffer.java index 2fe2708490a96ec5353691e7b68a303f88d44f38..c189f540c84d64ef0e03ae08805669666afd181f 100644 --- a/core/java/src/net/i2p/util/LogConsoleBuffer.java +++ b/core/java/src/net/i2p/util/LogConsoleBuffer.java @@ -2,61 +2,79 @@ package net.i2p.util; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; /** - * Offer a glimpse into the last few console messages generated - * + * Offer a glimpse into the last few console messages generated. + * Maintains two buffers, one normal and one critical. */ public class LogConsoleBuffer { - private I2PAppContext _context; - private final List<String> _buffer; - private final List<String> _critBuffer; + private final int lim; + private final LinkedBlockingQueue<String> _buffer; + private final LinkedBlockingQueue<String> _critBuffer; + /** + * Uses default limit from LogManager. + * As of 0.8.7, limit is not checked at runtime. + * + * @param context unused + */ public LogConsoleBuffer(I2PAppContext context) { - _context = context; - _buffer = new ArrayList(); - _critBuffer = new ArrayList(); + this(LogManager.DEFAULT_CONSOLEBUFFERSIZE); + } + + /** + * @param limit max size of each buffer + * In theory the limit is configurable, but it isn't in the UI, + * so set it at construction. + * + * @since 0.8.7 + */ + public LogConsoleBuffer(int limit) { + lim = Math.min(limit, 4); + // Add some extra room to minimize the chance of losing a message, + // since we are doing offer() below. + _buffer = new LinkedBlockingQueue(limit + 4); + _critBuffer = new LinkedBlockingQueue(limit + 4); } void add(String msg) { - int lim = _context.logManager().getConsoleBufferSize(); - synchronized (_buffer) { while (_buffer.size() >= lim) - _buffer.remove(0); - _buffer.add(msg); - } + _buffer.poll(); + _buffer.offer(msg); } + + /** + * Only adds to the critical buffer, not to both. + * + */ void addCritical(String msg) { - int lim = _context.logManager().getConsoleBufferSize(); - synchronized (_critBuffer) { while (_critBuffer.size() >= lim) - _critBuffer.remove(0); - _critBuffer.add(msg); - } + _critBuffer.poll(); + _critBuffer.offer(msg); } /** - * Retrieve the currently bufferd messages, earlier values were generated... + * Retrieve the currently buffered messages, earlier values were generated... * earlier. All values are strings with no formatting (as they are written * in the logs) * + * @return oldest first */ public List<String> getMostRecentMessages() { - synchronized (_buffer) { return new ArrayList(_buffer); - } } + /** - * Retrieve the currently bufferd crutucak messages, earlier values were generated... + * Retrieve the currently buffered critical messages, earlier values were generated... * earlier. All values are strings with no formatting (as they are written * in the logs) * + * @return oldest first */ public List<String> getMostRecentCriticalMessages() { - synchronized (_critBuffer) { return new ArrayList(_critBuffer); - } } } diff --git a/core/java/src/net/i2p/util/LogManager.java b/core/java/src/net/i2p/util/LogManager.java index b7d280491f6b7375c0e584acac41381a19f4e4f4..81b1148b552a38ad040126a8aa4532a6a1138856 100644 --- a/core/java/src/net/i2p/util/LogManager.java +++ b/core/java/src/net/i2p/util/LogManager.java @@ -129,7 +129,7 @@ public class LogManager { _log = getLog(LogManager.class); String location = context.getProperty(CONFIG_LOCATION_PROP, CONFIG_LOCATION_DEFAULT); setConfig(location); - _consoleBuffer = new LogConsoleBuffer(context); + _consoleBuffer = new LogConsoleBuffer(_consoleBufferSize); // If we aren't in the router context, delay creating the LogWriter until required, // so it doesn't create a log directory and log files unless there is output. // In the router context, we have to rotate to a new log file at startup or the logs.jsp @@ -656,6 +656,9 @@ public class LogManager { // this could generate out-of-order messages _writer.flushRecords(false); _writer.stopWriting(); + synchronized (_writer) { + _writer.notifyAll(); + } } } diff --git a/core/java/src/net/i2p/util/LogWriter.java b/core/java/src/net/i2p/util/LogWriter.java index c141cc24343ee861576d7e0d89d3a96b535ab0da..1714d170047c22a2ef97a3812162769af2dae01d 100644 --- a/core/java/src/net/i2p/util/LogWriter.java +++ b/core/java/src/net/i2p/util/LogWriter.java @@ -36,7 +36,7 @@ class LogWriter implements Runnable { private File _currentFile; private final LogManager _manager; - private boolean _write; + private volatile boolean _write; private static final int MAX_DISKFULL_MESSAGES = 8; private int _diskFullMessageCount; @@ -55,7 +55,8 @@ class LogWriter implements Runnable { rotateFile(); while (_write) { flushRecords(); - rereadConfig(); + if (_write) + rereadConfig(); } //System.err.println("Done writing"); } catch (Exception e) { diff --git a/core/java/src/net/i2p/util/RandomSource.java b/core/java/src/net/i2p/util/RandomSource.java index fe042915212baa352031616a60fba01f91e9f9ca..558c01297c65b397e7aa75d87d18c00640145c59 100644 --- a/core/java/src/net/i2p/util/RandomSource.java +++ b/core/java/src/net/i2p/util/RandomSource.java @@ -28,6 +28,12 @@ public class RandomSource extends SecureRandom implements EntropyHarvester { private final EntropyHarvester _entropyHarvester; protected final I2PAppContext _context; + /** + * Deprecated - do not instantiate this directly, as you won't get the + * good one (Fortuna). Use getInstance() or + * I2PAppContext.getGlobalContext().random() to get the FortunaRandomSource + * instance. + */ public RandomSource(I2PAppContext context) { super(); _context = context; @@ -202,10 +208,4 @@ public class RandomSource extends SecureRandom implements EntropyHarvester { rs.saveSeed(); } } - - // noop - private static class DummyEntropyHarvester implements EntropyHarvester { - public void feedEntropy(String source, long data, int bitoffset, int bits) {} - public void feedEntropy(String source, byte[] data, int offset, int len) {} - } } diff --git a/core/java/src/net/i2p/util/SimpleScheduler.java b/core/java/src/net/i2p/util/SimpleScheduler.java index 61e2e66b347bcb3e823e8a4bf815ef6a9234bfec..728199e6589b65f0e1fda115195e38903ac38529 100644 --- a/core/java/src/net/i2p/util/SimpleScheduler.java +++ b/core/java/src/net/i2p/util/SimpleScheduler.java @@ -2,6 +2,7 @@ package net.i2p.util; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadFactory; @@ -48,12 +49,25 @@ public class SimpleScheduler { _threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024)))); _executor = new ScheduledThreadPoolExecutor(_threads, new CustomThreadFactory()); _executor.prestartAllCoreThreads(); + // don't bother saving ref to remove hook if somebody else calls stop + _context.addShutdownTask(new Shutdown()); } /** - * Removes the SimpleScheduler. + * @since 0.8.8 + */ + private class Shutdown implements Runnable { + public void run() { + stop(); + } + } + + /** + * Stops the SimpleScheduler. + * Subsequent executions should not throw a RejectedExecutionException. */ public void stop() { + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); _executor.shutdownNow(); } diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index b19b5d691ac542a585a19a8a783878de3c7ad2f1..ea6069f16f0d984cfab86cce75bd2ec3351ecaed 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -53,16 +53,32 @@ public class SimpleTimer { executor.setDaemon(true); executor.start(); } + _context.addShutdownTask(new Shutdown()); } + /** + * @since 0.8.8 + */ + private class Shutdown implements Runnable { + public void run() { + removeSimpleTimer(); + } + } + /** * Removes the SimpleTimer. */ public void removeSimpleTimer() { synchronized(_events) { runn.setAnswer(false); + _events.clear(); + _eventTimes.clear(); _events.notifyAll(); } + synchronized (_readyEvents) { + _readyEvents.clear(); + _readyEvents.notifyAll(); + } } /** diff --git a/core/java/src/net/i2p/util/SimpleTimer2.java b/core/java/src/net/i2p/util/SimpleTimer2.java index 955f8faa3a3b034c218c523232aa07d8d23a8477..84793c085af756b60fdbde61a28e400ac68d1392 100644 --- a/core/java/src/net/i2p/util/SimpleTimer2.java +++ b/core/java/src/net/i2p/util/SimpleTimer2.java @@ -3,6 +3,7 @@ package net.i2p.util; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadFactory; @@ -48,12 +49,25 @@ public class SimpleTimer2 { _threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024)))); _executor = new CustomScheduledThreadPoolExecutor(_threads, new CustomThreadFactory()); _executor.prestartAllCoreThreads(); + // don't bother saving ref to remove hook if somebody else calls stop + _context.addShutdownTask(new Shutdown()); } /** - * Removes the SimpleTimer. + * @since 0.8.8 + */ + private class Shutdown implements Runnable { + public void run() { + stop(); + } + } + + /** + * Stops the SimpleTimer. + * Subsequent executions should not throw a RejectedExecutionException. */ public void stop() { + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); _executor.shutdownNow(); } diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java b/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java index 00493c9b47e9a882b292f301aef37bd81101b0ad..a230eb929ab1463e0460b938c7203a16064ed53a 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageHandler.java @@ -22,8 +22,8 @@ import net.i2p.util.Log; * */ public class I2NPMessageHandler { - private Log _log; - private I2PAppContext _context; + private final Log _log; + private final I2PAppContext _context; private long _lastReadBegin; private long _lastReadEnd; private int _lastSize; diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java index eebb05f7e4ebdbf4c1fd618fdf7deaa76444535e..19990a7c84dc3cc32e5ef5d4e2c483fa90638e41 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java @@ -28,8 +28,8 @@ import net.i2p.util.SimpleByteCache; * @author jrandom */ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPMessage { - private Log _log; - protected I2PAppContext _context; + private final Log _log; + protected final I2PAppContext _context; private long _expiration; private long _uniqueId; diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index 7f5cb008fe2fb53dbc4dbab50b393059f7c1afc4..4dcdc186ab1ffdfb6c312982a77c7c70c71e6bd6 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -280,14 +280,22 @@ public class JobQueue { void shutdown() { _alive = false; - _timedJobs.clear(); - _readyJobs.clear(); + synchronized (_jobLock) { + _timedJobs.clear(); + _readyJobs.clear(); + _jobLock.notifyAll(); + } // The JobQueueRunners are NOT daemons, // so they must be stopped. Job poison = new PoisonJob(); - for (int i = 0; i < _queueRunners.size(); i++) + for (JobQueueRunner runner : _queueRunners.values()) { + runner.stopRunning(); _readyJobs.offer(poison); - + // TODO interrupt thread for each runner + } + _queueRunners.clear(); + _jobStats.clear(); + _runnerId = 0; /******** if (_log.shouldLog(Log.WARN)) { diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index 5df671c3ec47129a389fdcd16966a68439478aaf..233bada97ab06dc465320310e1d19eeddd2a6164 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -45,10 +45,12 @@ import net.i2p.stat.RateStat; import net.i2p.stat.StatManager; import net.i2p.util.ByteCache; import net.i2p.util.FileUtil; +import net.i2p.util.FortunaRandomSource; import net.i2p.util.I2PAppThread; import net.i2p.util.I2PThread; import net.i2p.util.Log; import net.i2p.util.SecureFileOutputStream; +import net.i2p.util.SimpleByteCache; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer; @@ -73,6 +75,8 @@ public class Router { private I2PThread.OOMEventListener _oomListener; private ShutdownHook _shutdownHook; private final I2PThread _gracefulShutdownDetector; + private final RouterWatchdog _watchdog; + private final Thread _watchdogThread; public final static String PROP_CONFIG_FILE = "router.configLocation"; @@ -187,6 +191,19 @@ public class Router { // Save this in the context for the logger and apps that need it envProps.setProperty("i2p.systemTimeZone", originalTimeZoneID); + // Make darn sure we don't have a leftover I2PAppContext in the same JVM + // e.g. on Android - see finalShutdown() also + List<RouterContext> contexts = RouterContext.getContexts(); + if (contexts.isEmpty()) { + RouterContext.killGlobalContext(); + } else if (System.getProperty("java.vendor").contains("Android")) { + System.err.println("Warning: Killing " + contexts.size() + " other routers in this JVM"); + contexts.clear(); + RouterContext.killGlobalContext(); + } else { + System.err.println("Warning: " + contexts.size() + " other routers in this JVM"); + } + // The important thing that happens here is the directory paths are set and created // i2p.dir.router defaults to i2p.dir.config // i2p.dir.app defaults to i2p.dir.router @@ -257,7 +274,7 @@ public class Router { _killVMOnEnd = true; _oomListener = new I2PThread.OOMEventListener() { public void outOfMemory(OutOfMemoryError oom) { - ByteCache.clearAll(); + clearCaches(); _log.log(Log.CRIT, "Thread ran out of memory", oom); for (int i = 0; i < 5; i++) { // try this 5 times, in case it OOMs try { @@ -275,11 +292,18 @@ public class Router { _gracefulShutdownDetector = new I2PAppThread(new GracefulShutdown(), "Graceful shutdown hook", true); _gracefulShutdownDetector.start(); - Thread watchdog = new I2PAppThread(new RouterWatchdog(_context), "RouterWatchdog", true); - watchdog.start(); + _watchdog = new RouterWatchdog(_context); + _watchdogThread = new I2PAppThread(_watchdog, "RouterWatchdog", true); + _watchdogThread.start(); } + /** @since 0.8.8 */ + private static final void clearCaches() { + ByteCache.clearAll(); + SimpleByteCache.clearAll(); + } + /** * Configure the router to kill the JVM when the router shuts down, as well * as whether to explicitly halt the JVM during the hard fail process. @@ -616,12 +640,15 @@ public class Router { public void rebuildNewIdentity() { killKeys(); for (Runnable task : _context.getShutdownTasks()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Running shutdown task " + task.getClass()); try { task.run(); } catch (Throwable t) { _log.log(Log.CRIT, "Error running shutdown task", t); } } + _context.removeShutdownTasks(); // hard and ugly if (System.getProperty("wrapper.version") != null) _log.log(Log.CRIT, "Restarting with new router identity"); @@ -632,7 +659,10 @@ public class Router { private void warmupCrypto() { _context.random().nextBoolean(); - new DHSessionKeyBuilder(); // load the class so it starts the precalc process + // Use restart() to refire the static refiller threads, in case + // we are restarting the router in the same JVM (Android) + DHSessionKeyBuilder.restart(); + _context.elGamalEngine().restart(); } private void startupQueue() { @@ -938,12 +968,15 @@ public class Router { // Run the shutdown hooks first in case they want to send some goodbye messages // Maybe we need a delay after this too? for (Runnable task : _context.getShutdownTasks()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Running shutdown task " + task.getClass()); try { task.run(); } catch (Throwable t) { _log.log(Log.CRIT, "Error running shutdown task", t); } } + _context.removeShutdownTasks(); try { _context.clientManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the client manager", t); } try { _context.namingService().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the naming service", t); } try { _context.jobQueue().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the job queue", t); } @@ -953,13 +986,37 @@ public class Router { try { _context.tunnelDispatcher().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the tunnel dispatcher", t); } try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); } try { _context.commSystem().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); } + try { _context.bandwidthLimiter().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); } try { _context.peerManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the peer manager", t); } try { _context.messageRegistry().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message registry", t); } try { _context.messageValidator().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message validator", t); } try { _context.inNetMessagePool().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the inbound net pool", t); } //try { _sessionKeyPersistenceHelper.shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the session key manager", t); } _context.deleteTempDir(); - RouterContext.listContexts().remove(_context); + List<RouterContext> contexts = RouterContext.getContexts(); + contexts.remove(_context); + + // shut down I2PAppContext tasks here + + // If there are multiple routers in the JVM, we don't want to do this + // to the DH or YK tasks, as they are singletons. + if (contexts.isEmpty()) { + try { + DHSessionKeyBuilder.shutdown(); + } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting DH", t); } + try { + _context.elGamalEngine().shutdown(); + } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting elGamal", t); } + } else { + _log.logAlways(Log.WARN, "Warning - " + contexts.size() + " routers remaining in this JVM, not releasing all resources"); + } + try { + ((FortunaRandomSource)_context.random()).shutdown(); + } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting random()", t); } + + // logManager shut down in finalShutdown() + _watchdog.shutdown(); + _watchdogThread.interrupt(); finalShutdown(exitCode); } @@ -970,6 +1027,7 @@ public class Router { private static final boolean ALLOW_DYNAMIC_KEYS = false; private void finalShutdown(int exitCode) { + clearCaches(); _log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete" /* , new Exception("Shutdown") */ ); try { _context.logManager().shutdown(); } catch (Throwable t) { } if (ALLOW_DYNAMIC_KEYS) { @@ -979,6 +1037,20 @@ public class Router { File f = getPingFile(); f.delete(); + if (RouterContext.getContexts().isEmpty()) + RouterContext.killGlobalContext(); + + // Since 0.8.8, mainly for Android + for (Runnable task : _context.getFinalShutdownTasks()) { + System.err.println("Running final shutdown task " + task.getClass()); + try { + task.run(); + } catch (Throwable t) { + System.err.println("Running final shutdown task " + t); + } + } + _context.getFinalShutdownTasks().clear(); + if (_killVMOnEnd) { try { Thread.sleep(1000); } catch (InterruptedException ie) {} Runtime.getRuntime().halt(exitCode); @@ -1537,7 +1609,7 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent { long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); getContext().statManager().addRateData("router.memoryUsed", used, 0); if (_maxMemory - used < LOW_MEMORY_THRESHOLD) - ByteCache.clearAll(); + clearCaches(); getContext().tunnelDispatcher().updateParticipatingStats(COALESCE_TIME); diff --git a/router/java/src/net/i2p/router/RouterContext.java b/router/java/src/net/i2p/router/RouterContext.java index c7d54f88d21d60eb1b5541d5ef81b581f39c7226..77d33b72566d854b37cc75a9944d18b5cf4a4417 100644 --- a/router/java/src/net/i2p/router/RouterContext.java +++ b/router/java/src/net/i2p/router/RouterContext.java @@ -1,8 +1,11 @@ package net.i2p.router; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import net.i2p.I2PAppContext; import net.i2p.data.Hash; @@ -55,6 +58,7 @@ public class RouterContext extends I2PAppContext { private MessageValidator _messageValidator; private MessageStateMonitor _messageStateMonitor; private RouterThrottle _throttle; + private final Set<Runnable> _finalShutdownTasks; private static List<RouterContext> _contexts = new ArrayList(1); @@ -67,7 +71,10 @@ public class RouterContext extends I2PAppContext { // to init everything. Caller MUST call initAll() afterwards. // Sorry, this breaks some main() unit tests out there. //initAll(); + if (!_contexts.isEmpty()) + System.err.println("Warning - More than one router in this JVM"); _contexts.add(this); + _finalShutdownTasks = new CopyOnWriteArraySet(); } /** @@ -165,11 +172,37 @@ public class RouterContext extends I2PAppContext { /** * Retrieve the list of router contexts currently instantiated in this JVM. * This will always contain only one item (except when a simulation per the - * MultiRouter is going on), and the list should only be modified when a new + * MultiRouter is going on). + * + * @return an unmodifiable list (as of 0.8.8). May be null or empty. + */ + public static List<RouterContext> listContexts() { + return Collections.unmodifiableList(_contexts); + } + + /** + * Same as listContexts() but package private and modifiable. + * The list should only be modified when a new * context is created or a router is shut down. * + * @since 0.8.8 + */ + static List<RouterContext> getContexts() { + return _contexts; + } + + /** + * Kill the global I2PAppContext, so it isn't still around + * when we restart in the same JVM (Android). + * Only do this if there are no other routers in the JVM. + * + * @since 0.8.8 */ - public static List<RouterContext> listContexts() { return _contexts; } + static void killGlobalContext() { + synchronized (I2PAppContext.class) { + _globalAppContext = null; + } + } /** what router is this context working for? */ public Router router() { return _router; } @@ -402,6 +435,32 @@ public class RouterContext extends I2PAppContext { } } + /** + * @since 0.8.8 + */ + void removeShutdownTasks() { + _shutdownTasks.clear(); + } + + /** + * The last thing to be called before router shutdown. + * No context resources, including logging, will be available. + * Only for external threads in the same JVM needing to know when + * the shutdown is complete, like Android. + * @since 0.8.8 + */ + public void addFinalShutdownTask(Runnable task) { + _finalShutdownTasks.add(task); + } + + /** + * @return the Set + * @since 0.8.8 + */ + Set<Runnable> getFinalShutdownTasks() { + return _finalShutdownTasks; + } + /** * Use this instead of context instanceof RouterContext * @return true diff --git a/router/java/src/net/i2p/router/RouterWatchdog.java b/router/java/src/net/i2p/router/RouterWatchdog.java index a06a3c1c5c8b1ee66f83e11df7e4c1cd6ac2c1ec..f0d291755c86f6e9cb60cfbbd4c5de8bdaaa83fd 100644 --- a/router/java/src/net/i2p/router/RouterWatchdog.java +++ b/router/java/src/net/i2p/router/RouterWatchdog.java @@ -15,14 +15,21 @@ class RouterWatchdog implements Runnable { private final Log _log; private final RouterContext _context; private int _consecutiveErrors; + private volatile boolean _isRunning; private static final long MAX_JOB_RUN_LAG = 60*1000; public RouterWatchdog(RouterContext ctx) { _context = ctx; _log = ctx.logManager().getLog(RouterWatchdog.class); + _isRunning = true; } + /** @since 0.8.8 */ + public void shutdown() { + _isRunning = false; + } + public boolean verifyJobQueueLiveliness() { long when = _context.jobQueue().getLastJobBegin(); if (when < 0) @@ -109,7 +116,7 @@ class RouterWatchdog implements Runnable { } public void run() { - while (true) { + while (_isRunning) { try { Thread.sleep(60*1000); } catch (InterruptedException ie) {} monitorRouter(); } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java index ad938e7ed37ba65a9df2c4e8a3f5939c973dc25a..5941919144d5cef77654d1517d05affb02cbfa0e 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java @@ -359,6 +359,9 @@ class PersistentDataStore extends TransientDataStore { if (routerInfoFiles.length > 5) _alreadyWarned = false; for (int i = 0; i < routerInfoFiles.length; i++) { + // drop out if the router gets killed right after startup + if (!_context.router().isAlive()) + break; Hash key = getRouterInfoHash(routerInfoFiles[i].getName()); if ( (key != null) && (!isKnown(key)) ) { // Run it inline so we don't clog up the job queue, esp. at startup diff --git a/router/java/src/net/i2p/router/peermanager/PeerManager.java b/router/java/src/net/i2p/router/peermanager/PeerManager.java index 8a7ea723d131d02e889619055715409937533e46..f27a4c46f13e6e87bd1953ee1eede28158e92720 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerManager.java +++ b/router/java/src/net/i2p/router/peermanager/PeerManager.java @@ -52,6 +52,11 @@ class PeerManager { private static final long REORGANIZE_TIME_MEDIUM = 123*1000; private static final long REORGANIZE_TIME_LONG = 551*1000; + /** + * Warning - this loads all the profiles in the constructor. + * This may take a long time - 30 seconds or more. + * Instantiate this in a Job or Thread. + */ public PeerManager(RouterContext context) { _context = context; _log = context.logManager().getLog(PeerManager.class); @@ -99,6 +104,14 @@ class PeerManager { } } + /** @since 0.8.8 */ + void clearProfiles() { + _organizer.clearProfiles(); + _capabilitiesByPeer.clear(); + for (int i = 0; i < _peersByCapability.length; i++) + _peersByCapability[i].clear(); + } + Set selectPeers() { return _organizer.selectAllPeers(); } @@ -111,6 +124,9 @@ class PeerManager { _persistenceHelper.writeProfile(prof); } + /** + * This may take a long time - 30 seconds or more + */ void loadProfiles() { Set<PeerProfile> profiles = _persistenceHelper.readProfiles(); for (Iterator<PeerProfile> iter = profiles.iterator(); iter.hasNext();) { diff --git a/router/java/src/net/i2p/router/peermanager/PeerManagerFacadeImpl.java b/router/java/src/net/i2p/router/peermanager/PeerManagerFacadeImpl.java index e208daefe541a08502e53dde0467881a12069517..840b993bebca5b006e21aca2df476ce1c6f7a984 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerManagerFacadeImpl.java +++ b/router/java/src/net/i2p/router/peermanager/PeerManagerFacadeImpl.java @@ -47,8 +47,10 @@ public class PeerManagerFacadeImpl implements PeerManagerFacade { public void shutdown() { _log.info("Shutting down the peer manager"); _testJob.stopTesting(); - if (_manager != null) + if (_manager != null) { _manager.storeProfiles(); + _manager.clearProfiles(); + } } public void restart() { diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java index 43080246d5ba9c516dbcaf304149a2c7867cd9fc..155f88bbcc533a10de941cb6d74191e4e37dca6e 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java @@ -227,6 +227,19 @@ public class ProfileOrganizer { public boolean isWellIntegrated(Hash peer) { return isX(_wellIntegratedPeers, peer); } public boolean isFailing(Hash peer) { return isX(_failingPeers, peer); } + /** @since 0.8.8 */ + void clearProfiles() { + getReadLock(); + try { + _failingPeers.clear(); + _fastPeers.clear(); + _highCapacityPeers.clear(); + _notFailingPeers.clear(); + _notFailingPeersList.clear(); + _wellIntegratedPeers.clear(); + } finally { releaseReadLock(); } + } + /** * if a peer sends us more than 5 replies in a searchReply that we cannot * fetch, stop listening to them. diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index bea77fcf475dae4b7527b9f0bfe7d0b62907b477..b8f8a95908b87ce4305202fb75f2b5dbda6375aa 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -64,6 +64,7 @@ public class FIFOBandwidthLimiter { /** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */ private final AtomicLong _totalWastedOutboundBytes = new AtomicLong(); private final FIFOBandwidthRefiller _refiller; + private final Thread _refillerThread; private long _lastTotalSent; private long _lastTotalReceived; @@ -91,9 +92,9 @@ public class FIFOBandwidthLimiter { _lastTotalReceived = _totalAllocatedInboundBytes.get(); _lastStatsUpdated = now(); _refiller = new FIFOBandwidthRefiller(_context, this); - I2PThread t = new I2PThread(_refiller, "BWRefiller", true); - t.setPriority(I2PThread.NORM_PRIORITY-1); - t.start(); + _refillerThread = new I2PThread(_refiller, "BWRefiller", true); + _refillerThread.setPriority(I2PThread.NORM_PRIORITY-1); + _refillerThread.start(); } //public long getAvailableInboundBytes() { return _availableInboundBytes; } @@ -122,6 +123,19 @@ public class FIFOBandwidthLimiter { public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); } public void reinitialize() { + clear(); + _refiller.reinitialize(); + } + + /** @since 0.8.8 */ + public void shutdown() { + _refiller.shutdown(); + _refillerThread.interrupt(); + clear(); + } + + /** @since 0.8.8 */ + private void clear() { _pendingInboundRequests.clear(); _pendingOutboundRequests.clear(); _availableInbound.set(0); @@ -134,7 +148,6 @@ public class FIFOBandwidthLimiter { _unavailableOutboundBurst.set(0); _inboundUnlimited = false; _outboundUnlimited = false; - _refiller.reinitialize(); } public Request createRequest() { return new SimpleRequest(); } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java index bc8dd18520d070240311e869528fdb888f6b6b7f..7dd9d4128d021e408ff04e926739e68f32f3cc3c 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java @@ -24,6 +24,7 @@ public class FIFOBandwidthRefiller implements Runnable { private long _lastCheckConfigTime; /** how frequently do we check the config for updates? */ private long _configCheckPeriodMs = 60*1000; + private volatile boolean _isRunning; public static final String PROP_INBOUND_BANDWIDTH = "i2np.bandwidth.inboundKBytesPerSecond"; public static final String PROP_OUTBOUND_BANDWIDTH = "i2np.bandwidth.outboundKBytesPerSecond"; @@ -67,12 +68,19 @@ public class FIFOBandwidthRefiller implements Runnable { _context = context; _log = context.logManager().getLog(FIFOBandwidthRefiller.class); reinitialize(); + _isRunning = true; } + + /** @since 0.8.8 */ + public void shutdown() { + _isRunning = false; + } + public void run() { // bootstrap 'em with nothing _lastRefillTime = _limiter.now(); List<FIFOBandwidthLimiter.Request> buffer = new ArrayList(2); - while (true) { + while (_isRunning) { long now = _limiter.now(); if (now >= _lastCheckConfigTime + _configCheckPeriodMs) { checkConfig(); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index a24b3d9728c0e0374b689f5ca0a3641a7175f3c9..36752a04535ff85ee1c46d12e4f06a57be3551b5 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -1089,6 +1089,10 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } private static final int MAX_HANDLERS = 4; + + /** + * FIXME static queue mixes handlers from different contexts in multirouter JVM + */ private final static LinkedBlockingQueue<I2NPMessageHandler> _i2npHandlers = new LinkedBlockingQueue(MAX_HANDLERS); private final static I2NPMessageHandler acquireHandler(RouterContext ctx) { @@ -1129,6 +1133,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _dataReadBufs.offer(buf); } + /** @since 0.8.8 */ + static void releaseResources() { + _i2npHandlers.clear(); + _dataReadBufs.clear(); + synchronized(_bufs) { + _bufs.clear(); + } + } + /** * sizeof(data)+data+pad+crc. * diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 326f659effbf2b8fdf193edb155af88edc86c88a..8d943fd0716373f0db8b0fc11ac9ddd905370572 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -702,6 +702,7 @@ public class NTCPTransport extends TransportImpl { NTCPConnection con = (NTCPConnection)iter.next(); con.close(); } + NTCPConnection.releaseResources(); // will this work? replaceAddress(null); } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 824f4a9285eab2e3a3d65c5e5c4e626621832bfd..f8b387d685755dfbeedf14cca1bdaff5eb8e4f58 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -113,6 +113,11 @@ class PacketHandler { return rv.toString(); } + /** @since 0.8.8 */ + int getHandlerCount() { + return _handlers.length; + } + /** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */ private static final short OUTBOUND_FALLBACK = 1; /** the packet is from a peer we are establishing an inbound con to, but failed validation, so fallback */ diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index f207819277b367e84b1254cd3af03d76092b7ca1..b59c40d4b82698691826dde6400c2ef653adb549 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -147,6 +147,7 @@ class UDPEndpoint { /** * Blocking call to receive the next inbound UDP packet from any peer. + * @return null if we have shut down */ public UDPPacket receive() { if (_receiver == null) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index ded71f8a2abe350980aa06ca08cefa5bf2ca0ff6..647a5639f2e2a12cf385313cb1fc62b236ab7885 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -58,9 +58,11 @@ class UDPReceiver { public void shutdown() { _keepRunning = false; _inboundQueue.clear(); - UDPPacket poison = UDPPacket.acquire(_context, false); - poison.setMessageType(TYPE_POISON); - _inboundQueue.offer(poison); + for (int i = 0; i < _transport.getPacketHandlerCount(); i++) { + UDPPacket poison = UDPPacket.acquire(_context, false); + poison.setMessageType(TYPE_POISON); + _inboundQueue.offer(poison); + } for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) { try { Thread.sleep(i * 50); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index b8f1d35ee4b38d8268533f68164256be884e1f2f..b49906c43f8ac947476c2205800011cd4862bb8f 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1367,6 +1367,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return ""; } + /** @since 0.8.8 */ + int getPacketHandlerCount() { + PacketHandler handler = _handler; + if (handler != null) + return handler.getHandlerCount(); + else + return 0; + } + private static final int DROP_INACTIVITY_TIME = 60*1000; public void failed(OutboundMessageState msg) { failed(msg, true); } diff --git a/router/java/src/net/i2p/router/util/RandomIterator.java b/router/java/src/net/i2p/router/util/RandomIterator.java index db8a560b80a3125727953a9864a52f6edfa6dde0..b7da15e9b779d590daaac4118842c4a2e36aa456 100644 --- a/router/java/src/net/i2p/router/util/RandomIterator.java +++ b/router/java/src/net/i2p/router/util/RandomIterator.java @@ -84,7 +84,7 @@ public class RandomIterator<E> implements Iterator<E> { * <a href="http://www.qbrundage.com/michaelb/pubs/essays/random_number_generation" title="http://www.qbrundage.com/michaelb/pubs/essays/random_number_generation" target="_blank">http://www.qbrundage.com/michaelb/pubs/e…</a> * for some implementations, which are faster than java.util.Random. */ - private static final Random rand = RandomSource.getInstance(); + private final Random rand = RandomSource.getInstance(); /** Used to narrow the range to take random indexes from */ private int lower, upper; diff --git a/router/java/src/org/cybergarage/upnp/ssdp/HTTPMUSocket.java b/router/java/src/org/cybergarage/upnp/ssdp/HTTPMUSocket.java index 80e58967838ba31c3b2fd9072426c931922efc3b..21f4f13a4bf1e5ee57b337e3d44e54186efa1604 100644 --- a/router/java/src/org/cybergarage/upnp/ssdp/HTTPMUSocket.java +++ b/router/java/src/org/cybergarage/upnp/ssdp/HTTPMUSocket.java @@ -120,7 +120,9 @@ public class HTTPMUSocket return true; try { - ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf); + // I2P close it instead of leaving group so the thread dies + //ssdpMultiSock.leaveGroup(ssdpMultiGroup, ssdpMultiIf); + ssdpMultiSock.close(); ssdpMultiSock = null; } catch (Exception e) { diff --git a/router/java/src/org/cybergarage/util/ThreadCore.java b/router/java/src/org/cybergarage/util/ThreadCore.java index b03f2f0c35993b8c2b6d4b1cbac1150439fd11ed..f2a49ddbebfd76d87b4aec7924157193d008e0fe 100644 --- a/router/java/src/org/cybergarage/util/ThreadCore.java +++ b/router/java/src/org/cybergarage/util/ThreadCore.java @@ -65,6 +65,8 @@ public class ThreadCore implements Runnable //threadObject.destroy(); //threadObject.stop(); setThreadObject(null); + // I2P break Disposer out of sleep() + threadObject.interrupt(); } }