diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java index 244e76c6e045050d540e0538e70d51b090892f17..bc1104d83b8d4cb47cac4b4e85491fd9c6a4ff69 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java @@ -21,6 +21,7 @@ import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; import net.i2p.util.BigPipedInputStream; import net.i2p.util.ByteCache; +import net.i2p.util.I2PAppThread; import net.i2p.util.Log; import net.i2p.util.ReusableGZIPInputStream; @@ -251,16 +252,27 @@ class HTTPResponseOutputStream extends FilterOutputStream { //out.flush(); PipedInputStream pi = BigPipedInputStream.getInstance(); PipedOutputStream po = new PipedOutputStream(pi); - // Run in the client thread pool, as there should be an unused thread - // there after the accept(). - // Overridden in I2PTunnelHTTPServer, where it does not use the client pool. - try { - I2PTunnelClientBase.getClientExecutor().execute(new Pusher(pi, out)); - } catch (RejectedExecutionException ree) { - // shouldn't happen - throw ree; - } + Runnable r = new Pusher(pi, out); out = po; + // TODO we should be able to do this inline somehow + TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); + if (tcg != null) { + // Run in the client thread pool, as there should be an unused thread + // there after the accept(). + // Overridden in I2PTunnelHTTPServer, where it does not use the client pool. + try { + tcg.getClientExecutor().execute(r); + } catch (RejectedExecutionException ree) { + // shouldn't happen + throw ree; + } + } else { + // Fallback in case TCG.getInstance() is null, never instantiated + // and we were not started by TCG. + // Maybe a plugin loaded before TCG? Should be rare. + Thread t = new I2PAppThread(r, "Pusher"); + t.start(); + } } private class Pusher implements Runnable { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClient.java index 2c448d946a6c6e558e06ac09c6832b63ad0eec61..6a552719ec1d0cf3bf7339a9bc4d4a47d396c1fd 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClient.java @@ -122,9 +122,11 @@ public class I2PTunnelClient extends I2PTunnelClientBase { int port = addr.getPort(); i2ps = createI2PSocket(clientDest, port); i2ps.setReadTimeout(readTimeout); - Thread t = new I2PTunnelRunner(s, i2ps, sockLock, null, null, mySockets, + I2PTunnelRunner t = new I2PTunnelRunner(s, i2ps, sockLock, null, null, mySockets, (I2PTunnelRunner.FailCallback) null); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch (Exception ex) { if (_log.shouldLog(Log.INFO)) _log.info("Error connecting", ex); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 673928552f588f8d3bc8a92c8b496717a764df86..030e4a0b637da4aa3d4554a4831b50fd638225c2 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -16,12 +16,8 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import javax.net.ssl.SSLServerSocket; @@ -77,18 +73,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna // true if we are chained from a server. private boolean chained; - /** how long to wait before dropping an idle thread */ - private static final long HANDLER_KEEPALIVE_MS = 2*60*1000; - - /** - * We keep a static pool of socket handlers for all clients, - * as there is no need for isolation on the client side. - * Extending classes may use it for other purposes. - * Not for use by servers, as there is no limit on threads. - */ - private static volatile ThreadPoolExecutor _executor; - private static int _executorThreadCount; - private static final Object _executorLock = new Object(); + private volatile ThreadPoolExecutor _executor; public static final String PROP_USE_SSL = I2PTunnelServer.PROP_USE_SSL; @@ -116,11 +101,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _log = _context.logManager().getLog(getClass()); - synchronized (_executorLock) { - if (_executor == null) - _executor = new CustomThreadPoolExecutor(); - } - Thread t = new I2PAppThread(this, "Client " + tunnel.listenHost + ':' + localPort); t.start(); open = true; @@ -184,11 +164,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _log = _context.logManager().getLog(getClass()); - synchronized (_executorLock) { - if (_executor == null) - _executor = new CustomThreadPoolExecutor(); - } - // normalize path so we can find it if (pkf != null) { File keyFile = new File(pkf); @@ -361,6 +336,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna return socketManager; } + /** + * Kill the shared client, so that on restart in android + * we won't latch onto the old one + * + * @since 0.9.18 + */ + protected static synchronized void killSharedClient() { + socketManager = null; + } + /** * This may take a LONG time. * @@ -653,6 +638,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } + TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); + if (tcg != null) { + _executor = tcg.getClientExecutor(); + } else { + // Fallback in case TCG.getInstance() is null, never instantiated + // and we were not started by TCG. + // Maybe a plugin loaded before TCG? Should be rare. + // Never shut down. + _executor = new TunnelControllerGroup.CustomThreadPoolExecutor(); + } while (open) { Socket s = ss.accept(); manageConnection(s); @@ -672,30 +667,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - /** - * @return may be null if no class has been instantiated - * @since 0.8.8 - */ - static ThreadPoolExecutor getClientExecutor() { - return _executor; - } - - /** - * @since 0.8.8 - */ - static void killClientExecutor() { - synchronized (_executorLock) { - if (_executor != null) { - _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); - _executor.shutdownNow(); - _executor = null; - } - // kill the shared client, so that on restart in android - // we won't latch onto the old one - socketManager = null; - } - } - /** * Manage the connection just opened on the specified socket * @@ -721,26 +692,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - /** - * Not really needed for now but in case we want to add some hooks like afterExecute(). - */ - private static class CustomThreadPoolExecutor extends ThreadPoolExecutor { - public CustomThreadPoolExecutor() { - super(0, Integer.MAX_VALUE, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS, - new SynchronousQueue<Runnable>(), new CustomThreadFactory()); - } - } - - /** just to set the name and set Daemon */ - private static class CustomThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread rv = Executors.defaultThreadFactory().newThread(r); - rv.setName("I2PTunnel Client Runner " + (++_executorThreadCount)); - rv.setDaemon(true); - return rv; - } - } - /** * Blocking runner, used during the connection establishment */ @@ -822,7 +773,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna /** * Manage a connection in a separate thread. This only works if - * you do not override manageConnection() + * you do not override manageConnection(). + * + * This is run in a thread from an unlimited-size thread pool, + * so it may block or run indefinitely. */ protected abstract void clientConnectionRun(Socket s); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelConnectClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelConnectClient.java index 7690e2c650f514881eeb4e217b74d1f2e64861d3..5e8207a26f9728fa99c57d1e675fc2be725d3111 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelConnectClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelConnectClient.java @@ -292,7 +292,9 @@ public class I2PTunnelConnectClient extends I2PTunnelHTTPClientBase implements R response = SUCCESS_RESPONSE; OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId); Thread t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch (IOException ex) { _log.info(getPrefix(requestId) + "Error trying to connect", ex); handleClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java index 625ae863aa2f20afb509c53b76580f4fd737746a..4b869243d2616dfd3b4bb964bc9be79684ffb172 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java @@ -972,7 +972,9 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn response = null; } Thread t = new I2PTunnelOutproxyRunner(s, outSocket, sockLock, data, response, onTimeout); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); return; } @@ -1091,6 +1093,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn sktOpts.setPort(remotePort); I2PSocket i2ps = createI2PSocket(clientDest, sktOpts); OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId); + Thread t; if (method.toUpperCase(Locale.US).equals("CONNECT")) { byte[] data; byte[] response; @@ -1101,13 +1104,14 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn data = null; response = SUCCESS_RESPONSE; } - Thread t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout); - t.start(); + t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout); } else { byte[] data = newRequest.toString().getBytes("ISO-8859-1"); - Thread t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout); - t.start(); + t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout); } + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch(IOException ex) { if(_log.shouldLog(Log.INFO)) { _log.info(getPrefix(requestId) + "Error trying to connect", ex); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java index e12b839d32ac544c99f11b83f95259e16e85234e..51425cbd013ad052eda1d8a509f7a34ef516e5c9 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java @@ -86,7 +86,8 @@ public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner { // ignore } t1.join(30*1000); - t2.join(30*1000); + // t2 = fromI2P now run inline + //t2.join(30*1000); } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index ac1d2d39a7d5689fff43a510435b2c087a1feffc..d4d06c4240c190a231a38c9489ac07fd0dc6d119 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -302,16 +302,16 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { if (_log.shouldLog(Log.DEBUG)) _log.debug("Modified header: [" + modifiedHeader + "]"); + Runnable t; if (allowGZIP && useGZIP) { - I2PAppThread req = new I2PAppThread( - new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), _log), - Thread.currentThread().getName()+".hc"); - req.start(); + t = new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), _log); } else { - Thread t = new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(), + t = new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(), null, (I2PTunnelRunner.FailCallback) null); - t.start(); } + // run in the unlimited client pool + //t.start(); + _clientExecutor.execute(t); long afterHandle = getTunnel().getContext().clock().now(); long timeToHandle = afterHandle - afterAccept; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java index 3915ae68850bbec3a02fee007d3ce7b4290ad707..344c721971d42de85651596be45a84567ca8497b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java @@ -136,8 +136,11 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase { DCCHelper dcc = _dccEnabled ? new DCC(s.getLocalAddress().getAddress()) : null; Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " in", true); in.start(); - Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " out", true); - out.start(); + //Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " out", true); + Runnable out = new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc); + // we are called from an unlimited thread pool, so run inline + //out.start(); + out.run(); } catch (Exception ex) { // generally NoRouteToHostException if (_log.shouldLog(Log.WARN)) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java index f39df586f0cca84b6a89d85761ed7529d3a557bb..f82c154173dba1253dec017a064a34003f5c2c42 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java @@ -140,7 +140,9 @@ public class I2PTunnelIRCServer extends I2PTunnelServer implements Runnable { Socket s = getSocket(socket.getPeerDestination().calculateHash(), socket.getLocalPort()); Thread t = new I2PTunnelRunner(s, socket, slock, null, modifiedRegistration.getBytes(), null, (I2PTunnelRunner.FailCallback) null); - t.start(); + // run in the unlimited client pool + //t.start(); + _clientExecutor.execute(t); } catch (SocketException ex) { try { // Send a response so the user doesn't just see a disconnect diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index d4bd1d455f997b9a40cefd8d75cb248add015dbb..d0dfc74ece2e71cc7631a2a83b494d3060c7dbaa 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -62,8 +62,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr private long totalSent; private long totalReceived; - private static final AtomicLong __forwarderId = new AtomicLong(); - /** * For use in new constructor * @since 0.9.14 @@ -268,9 +266,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE); StreamForwarder toI2P = new StreamForwarder(in, i2pout, true); StreamForwarder fromI2P = new StreamForwarder(i2pin, out, false); - // TODO can we run one of these inline and save a thread? toI2P.start(); - fromI2P.start(); + // We are already a thread, so run the second one inline + //fromI2P.start(); + fromI2P.run(); synchronized (finishLock) { while (!finished) { finishLock.wait(); @@ -384,7 +383,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr // ignore } t1.join(30*1000); - t2.join(30*1000); + // t2 = fromI2P now run inline + //t2.join(30*1000); } /** @@ -426,7 +426,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr _toI2P = toI2P; direction = (toI2P ? "toI2P" : "fromI2P"); _cache = ByteCache.getInstance(32, NETWORK_BUFFER_SIZE); - setName("StreamForwarder " + _runnerId + '.' + __forwarderId.incrementAndGet()); + setName("StreamForwarder " + _runnerId + '.' + direction); } @Override diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 71d2643b22c2b75272f7d6d67aa4b9e838df6cb5..db1f0d07e011c08754f52fac2c4d2c0317736d0d 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -80,6 +80,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected I2PTunnelTask task; protected boolean bidir; private ThreadPoolExecutor _executor; + protected volatile ThreadPoolExecutor _clientExecutor; private final Map<Integer, InetSocketAddress> _socketMap = new ConcurrentHashMap<Integer, InetSocketAddress>(4); /** unused? port should always be specified */ @@ -470,6 +471,16 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { if (_usePool) { _executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort); } + TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); + if (tcg != null) { + _clientExecutor = tcg.getClientExecutor(); + } else { + // Fallback in case TCG.getInstance() is null, never instantiated + // and we were not started by TCG. + // Maybe a plugin loaded before TCG? Should be rare. + // Never shut down. + _clientExecutor = new TunnelControllerGroup.CustomThreadPoolExecutor(); + } while (open) { try { I2PServerSocket ci2pss = i2pss; @@ -563,6 +574,17 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } } + /** + * This is run in a thread from a limited-size thread pool via Handler.run(), + * except for a standard server (this class, no extension, as determined in getUsePool()), + * it is run directly in the acceptor thread (see run()). + * + * In either case, this method and any overrides must spawn a thread and return quickly. + * If blocking while reading the headers (as in HTTP and IRC), the thread pool + * may be exhausted. + * + * See PROP_USE_POOL, DEFAULT_USE_POOL, PROP_HANDLER_COUNT, DEFAULT_HANDLER_COUNT + */ protected void blockingHandle(I2PSocket socket) { if (_log.shouldLog(Log.INFO)) _log.info("Incoming connection to '" + toString() + "' port " + socket.getLocalPort() + @@ -577,7 +599,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { afterSocket = getTunnel().getContext().clock().now(); Thread t = new I2PTunnelRunner(s, socket, slock, null, null, null, (I2PTunnelRunner.FailCallback) null); - t.start(); + // run in the unlimited client pool + //t.start(); + _clientExecutor.execute(t); long afterHandle = getTunnel().getContext().clock().now(); long timeToHandle = afterHandle - afterAccept; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java index cb68319494ea48afa781af9ea82b4f1a72e5a215..ad5256fa65b4893ce578d05319d07bb6627bef01 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java @@ -425,7 +425,7 @@ public class TunnelController implements Logging { // We use _sessions AND the tunnel sessions as // _sessions will be null for delay-open tunnels - see acquire(). // We want the current sessions. - Set<I2PSession> sessions = new HashSet(_tunnel.getSessions()); + Set<I2PSession> sessions = new HashSet<I2PSession>(_tunnel.getSessions()); if (_sessions != null) sessions.addAll(_sessions); return sessions; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java index af8818abdc9b7ee0fe8601a8445936922c368449..4c28e1b035e66eada4917755a647e6960aa92e06 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java @@ -9,6 +9,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; import net.i2p.app.*; @@ -48,6 +55,21 @@ public class TunnelControllerGroup implements ClientApp { */ private final Map<I2PSession, Set<TunnelController>> _sessions; + /** + * We keep a pool of socket handlers for all clients, + * as there is no need for isolation on the client side. + * Extending classes may use it for other purposes. + * + * May also be used by servers, carefully, + * as there is no limit on threads. + */ + private ThreadPoolExecutor _executor; + private static final AtomicLong _executorThreadCount = new AtomicLong(); + private final Object _executorLock = new Object(); + /** how long to wait before dropping an idle thread */ + private static final long HANDLER_KEEPALIVE_MS = 2*60*1000; + + /** * In I2PAppContext will instantiate if necessary and always return non-null. * As of 0.9.4, when in RouterContext, will return null (except in Android) @@ -206,8 +228,7 @@ public class TunnelControllerGroup implements ClientApp { if (_instance == this) _instance = null; } -/// fixme static - I2PTunnelClientBase.killClientExecutor(); + killClientExecutor(); changeState(STOPPED); } @@ -500,4 +521,59 @@ public class TunnelControllerGroup implements ClientApp { } } } + + /** + * @return non-null + * @since 0.8.8 Moved from I2PTunnelClientBase in 0.9.18 + */ + ThreadPoolExecutor getClientExecutor() { + synchronized (_executorLock) { + if (_executor == null) + _executor = new CustomThreadPoolExecutor(); + } + return _executor; + } + + /** + * @since 0.8.8 Moved from I2PTunnelClientBase in 0.9.18 + */ + private void killClientExecutor() { + synchronized (_executorLock) { + if (_executor != null) { + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + _executor.shutdownNow(); + _executor = null; + } + } + // kill the shared client, so that on restart in android + // we won't latch onto the old one + I2PTunnelClientBase.killSharedClient(); + } + + /** + * Not really needed for now but in case we want to add some hooks like afterExecute(). + * Package private for fallback in case TCG.getInstance() is null, never instantiated + * but a plugin still needs it... should be rare. + * + * @since 0.9.18 Moved from I2PTunnelClientBase + */ + static class CustomThreadPoolExecutor extends ThreadPoolExecutor { + public CustomThreadPoolExecutor() { + super(0, Integer.MAX_VALUE, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS, + new SynchronousQueue<Runnable>(), new CustomThreadFactory()); + } + } + + /** + * Just to set the name and set Daemon + * @since 0.9.18 Moved from I2PTunnelClientBase + */ + private static class CustomThreadFactory implements ThreadFactory { + public Thread newThread(Runnable r) { + Thread rv = Executors.defaultThreadFactory().newThread(r); + rv.setName("I2PTunnel Client Runner " + _executorThreadCount.incrementAndGet()); + rv.setDaemon(true); + return rv; + } + } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java index e226621038415d66bed78cd95a118d431a066fe6..fcca71cb9a5cb90aafc215dc43abe631366ec13d 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java @@ -76,7 +76,9 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { try { i2ps = createI2PSocket(dest, opts); Thread t = new Runner(s, i2ps); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch (Exception ex) { _log.error("Could not make DCC connection to " + _dest + ':' + _remotePort, ex); closeSocket(s); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java index 4b1b6b8f5656f9651cf0b9294c36d8f41736715a..cab6513adca7ed77b771f5c980394b10852dc5eb 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java @@ -111,7 +111,9 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { _sockList.add(socket); Thread t = new I2PTunnelRunner(s, socket, slock, null, null, _sockList, (I2PTunnelRunner.FailCallback) null); - t.start(); + // run in the unlimited client pool + //t.start(); + _clientExecutor.execute(t); local.socket = socket; local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE; _active.put(Integer.valueOf(myPort), local); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java index b4363afe0fbaed06749f5655e13c0d98a30c1059..8484ef0c23a05321621be57953f84d76c6fb0987 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java @@ -55,9 +55,12 @@ public class I2PSOCKSIRCTunnel extends I2PSOCKSTunnel { Thread in = new I2PAppThread(new IrcInboundFilter(clientSock, destSock, expectedPong, _log), "SOCKS IRC Client " + id + " in", true); in.start(); - Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log), - "SOCKS IRC Client " + id + " out", true); - out.start(); + //Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log), + // "SOCKS IRC Client " + id + " out", true); + Runnable out = new IrcOutboundFilter(clientSock, destSock, expectedPong, _log); + // we are called from an unlimited thread pool, so run inline + //out.start(); + out.run(); } catch (SOCKSException e) { if (_log.shouldLog(Log.WARN)) _log.warn("Error from SOCKS connection", e); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSTunnel.java index 89674c5929cbb017bf3ee5c60dd7c880287aee53..498d75a181cd8e0366eb51ba7d9a19cba8ed0c7f 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSTunnel.java @@ -56,7 +56,9 @@ public class I2PSOCKSTunnel extends I2PTunnelClientBase { I2PSocket destSock = serv.getDestinationI2PSocket(this); Thread t = new I2PTunnelRunner(clientSock, destSock, sockLock, null, null, mySockets, (I2PTunnelRunner.FailCallback) null); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch (SOCKSException e) { if (_log.shouldLog(Log.WARN)) _log.warn("Error from SOCKS connection", e);