From 5e67008d264ae3ec7b33aa33c6ea01b96d2aa62a Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Fri, 5 Dec 2014 15:12:51 +0000 Subject: [PATCH] I2PTunnel: Reduce i2ptunnel threads, more thread pooling. Big savings is on client side (two less threads per connection) - Move client pool from static inI2PTunnelClientBase to TCG. - Use client pool for some server threads - Run some things inline that were formerly threads - Client-side I2PTunnelRunner thread used to do nothing but start 2 more threads; now it runs one inline (like we do for server-side HTTP) - Javadocs and cleanups Was originally intended to reduce load for high-traffic servers but most of the savings for now is on the client side. Ref: http://zzz.i2p/topics/1741 Todo: Figure out how to run the HTTP client-side gunzipper inline too Todo: More server-side improvements --- Client side: before: 4-5 threads, 1-2 pooled I2PTunnel Client Runner (BlockingRunner from client pool) starts I2PTunnelRunner or I2PTunnelHTTPClientRunner and exits starts StreamForwarder toI2P and waits starts StreamForwarder fromI2P and waits starts HTTPResponseOutputStream (HTTP gunzip only) (from client pool) now: 2-3 threads, 1-2 pooled I2PTunnel Client Runner (BlockingRunner from client pool) runs I2PTunnelRunner or I2PTunnelHTTPClientRunner inline starts StreamForwarder toI2P and waits runs StreamForwarder fromI2P inline starts HTTPResponseOutputStream (HTTP gunzip only) (from client pool) --- Server side: before: 1-4 threads, 0-1 pooled Server Handler Pool (Handler from server pool) execpt for standard server, blockingHandle() inline in acceptor starts I2PTunnelRunner or CompressedRequestor and exits starts StreamForwarder toI2P and waits (inline for HTTP) starts StreamForwarder fromI2P and waits (except not for HTTP GET) now: 1-4 threads, 0-2 pooled Server Handler Pool (Handler from server pool) execpt for standard server, blockingHandle() inline in acceptor starts I2PTunnelRunner or CompressedRequestor and exits (using client pool) starts StreamForwarder toI2P and waits (inline for HTTP) starts StreamForwarder fromI2P and waits (except not for HTTP GET) --- .../i2ptunnel/HTTPResponseOutputStream.java | 30 ++++-- .../net/i2p/i2ptunnel/I2PTunnelClient.java | 6 +- .../i2p/i2ptunnel/I2PTunnelClientBase.java | 96 +++++-------------- .../i2p/i2ptunnel/I2PTunnelConnectClient.java | 4 +- .../i2p/i2ptunnel/I2PTunnelHTTPClient.java | 14 ++- .../i2ptunnel/I2PTunnelHTTPClientRunner.java | 3 +- .../i2p/i2ptunnel/I2PTunnelHTTPServer.java | 12 +-- .../net/i2p/i2ptunnel/I2PTunnelIRCClient.java | 7 +- .../net/i2p/i2ptunnel/I2PTunnelIRCServer.java | 4 +- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 12 +-- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 26 ++++- .../net/i2p/i2ptunnel/TunnelController.java | 2 +- .../i2p/i2ptunnel/TunnelControllerGroup.java | 80 +++++++++++++++- .../i2p/i2ptunnel/irc/I2PTunnelDCCClient.java | 4 +- .../i2p/i2ptunnel/irc/I2PTunnelDCCServer.java | 4 +- .../i2ptunnel/socks/I2PSOCKSIRCTunnel.java | 9 +- .../i2p/i2ptunnel/socks/I2PSOCKSTunnel.java | 4 +- 17 files changed, 203 insertions(+), 114 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java index 244e76c6e0..bc1104d83b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java @@ -21,6 +21,7 @@ import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; import net.i2p.util.BigPipedInputStream; import net.i2p.util.ByteCache; +import net.i2p.util.I2PAppThread; import net.i2p.util.Log; import net.i2p.util.ReusableGZIPInputStream; @@ -251,16 +252,27 @@ class HTTPResponseOutputStream extends FilterOutputStream { //out.flush(); PipedInputStream pi = BigPipedInputStream.getInstance(); PipedOutputStream po = new PipedOutputStream(pi); - // Run in the client thread pool, as there should be an unused thread - // there after the accept(). - // Overridden in I2PTunnelHTTPServer, where it does not use the client pool. - try { - I2PTunnelClientBase.getClientExecutor().execute(new Pusher(pi, out)); - } catch (RejectedExecutionException ree) { - // shouldn't happen - throw ree; - } + Runnable r = new Pusher(pi, out); out = po; + // TODO we should be able to do this inline somehow + TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); + if (tcg != null) { + // Run in the client thread pool, as there should be an unused thread + // there after the accept(). + // Overridden in I2PTunnelHTTPServer, where it does not use the client pool. + try { + tcg.getClientExecutor().execute(r); + } catch (RejectedExecutionException ree) { + // shouldn't happen + throw ree; + } + } else { + // Fallback in case TCG.getInstance() is null, never instantiated + // and we were not started by TCG. + // Maybe a plugin loaded before TCG? Should be rare. + Thread t = new I2PAppThread(r, "Pusher"); + t.start(); + } } private class Pusher implements Runnable { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClient.java index 2c448d946a..6a552719ec 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClient.java @@ -122,9 +122,11 @@ public class I2PTunnelClient extends I2PTunnelClientBase { int port = addr.getPort(); i2ps = createI2PSocket(clientDest, port); i2ps.setReadTimeout(readTimeout); - Thread t = new I2PTunnelRunner(s, i2ps, sockLock, null, null, mySockets, + I2PTunnelRunner t = new I2PTunnelRunner(s, i2ps, sockLock, null, null, mySockets, (I2PTunnelRunner.FailCallback) null); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch (Exception ex) { if (_log.shouldLog(Log.INFO)) _log.info("Error connecting", ex); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 673928552f..030e4a0b63 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -16,12 +16,8 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; import javax.net.ssl.SSLServerSocket; @@ -77,18 +73,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna // true if we are chained from a server. private boolean chained; - /** how long to wait before dropping an idle thread */ - private static final long HANDLER_KEEPALIVE_MS = 2*60*1000; - - /** - * We keep a static pool of socket handlers for all clients, - * as there is no need for isolation on the client side. - * Extending classes may use it for other purposes. - * Not for use by servers, as there is no limit on threads. - */ - private static volatile ThreadPoolExecutor _executor; - private static int _executorThreadCount; - private static final Object _executorLock = new Object(); + private volatile ThreadPoolExecutor _executor; public static final String PROP_USE_SSL = I2PTunnelServer.PROP_USE_SSL; @@ -116,11 +101,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _log = _context.logManager().getLog(getClass()); - synchronized (_executorLock) { - if (_executor == null) - _executor = new CustomThreadPoolExecutor(); - } - Thread t = new I2PAppThread(this, "Client " + tunnel.listenHost + ':' + localPort); t.start(); open = true; @@ -184,11 +164,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna _context.statManager().createRateStat("i2ptunnel.client.buildRunTime", "How long it takes to run a queued socket into an i2ptunnel runner?", "I2PTunnel", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _log = _context.logManager().getLog(getClass()); - synchronized (_executorLock) { - if (_executor == null) - _executor = new CustomThreadPoolExecutor(); - } - // normalize path so we can find it if (pkf != null) { File keyFile = new File(pkf); @@ -361,6 +336,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna return socketManager; } + /** + * Kill the shared client, so that on restart in android + * we won't latch onto the old one + * + * @since 0.9.18 + */ + protected static synchronized void killSharedClient() { + socketManager = null; + } + /** * This may take a LONG time. * @@ -653,6 +638,16 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } + TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); + if (tcg != null) { + _executor = tcg.getClientExecutor(); + } else { + // Fallback in case TCG.getInstance() is null, never instantiated + // and we were not started by TCG. + // Maybe a plugin loaded before TCG? Should be rare. + // Never shut down. + _executor = new TunnelControllerGroup.CustomThreadPoolExecutor(); + } while (open) { Socket s = ss.accept(); manageConnection(s); @@ -672,30 +667,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - /** - * @return may be null if no class has been instantiated - * @since 0.8.8 - */ - static ThreadPoolExecutor getClientExecutor() { - return _executor; - } - - /** - * @since 0.8.8 - */ - static void killClientExecutor() { - synchronized (_executorLock) { - if (_executor != null) { - _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); - _executor.shutdownNow(); - _executor = null; - } - // kill the shared client, so that on restart in android - // we won't latch onto the old one - socketManager = null; - } - } - /** * Manage the connection just opened on the specified socket * @@ -721,26 +692,6 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna } } - /** - * Not really needed for now but in case we want to add some hooks like afterExecute(). - */ - private static class CustomThreadPoolExecutor extends ThreadPoolExecutor { - public CustomThreadPoolExecutor() { - super(0, Integer.MAX_VALUE, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS, - new SynchronousQueue<Runnable>(), new CustomThreadFactory()); - } - } - - /** just to set the name and set Daemon */ - private static class CustomThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread rv = Executors.defaultThreadFactory().newThread(r); - rv.setName("I2PTunnel Client Runner " + (++_executorThreadCount)); - rv.setDaemon(true); - return rv; - } - } - /** * Blocking runner, used during the connection establishment */ @@ -822,7 +773,10 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna /** * Manage a connection in a separate thread. This only works if - * you do not override manageConnection() + * you do not override manageConnection(). + * + * This is run in a thread from an unlimited-size thread pool, + * so it may block or run indefinitely. */ protected abstract void clientConnectionRun(Socket s); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelConnectClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelConnectClient.java index 7690e2c650..5e8207a26f 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelConnectClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelConnectClient.java @@ -292,7 +292,9 @@ public class I2PTunnelConnectClient extends I2PTunnelHTTPClientBase implements R response = SUCCESS_RESPONSE; OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId); Thread t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch (IOException ex) { _log.info(getPrefix(requestId) + "Error trying to connect", ex); handleClientException(ex, out, targetRequest, usingWWWProxy, currentProxy, requestId); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java index 625ae863aa..4b869243d2 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java @@ -972,7 +972,9 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn response = null; } Thread t = new I2PTunnelOutproxyRunner(s, outSocket, sockLock, data, response, onTimeout); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); return; } @@ -1091,6 +1093,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn sktOpts.setPort(remotePort); I2PSocket i2ps = createI2PSocket(clientDest, sktOpts); OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId); + Thread t; if (method.toUpperCase(Locale.US).equals("CONNECT")) { byte[] data; byte[] response; @@ -1101,13 +1104,14 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn data = null; response = SUCCESS_RESPONSE; } - Thread t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout); - t.start(); + t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, onTimeout); } else { byte[] data = newRequest.toString().getBytes("ISO-8859-1"); - Thread t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout); - t.start(); + t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout); } + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch(IOException ex) { if(_log.shouldLog(Log.INFO)) { _log.info(getPrefix(requestId) + "Error trying to connect", ex); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java index e12b839d32..51425cbd01 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java @@ -86,7 +86,8 @@ public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner { // ignore } t1.join(30*1000); - t2.join(30*1000); + // t2 = fromI2P now run inline + //t2.join(30*1000); } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index ac1d2d39a7..d4d06c4240 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -302,16 +302,16 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { if (_log.shouldLog(Log.DEBUG)) _log.debug("Modified header: [" + modifiedHeader + "]"); + Runnable t; if (allowGZIP && useGZIP) { - I2PAppThread req = new I2PAppThread( - new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), _log), - Thread.currentThread().getName()+".hc"); - req.start(); + t = new CompressedRequestor(s, socket, modifiedHeader, getTunnel().getContext(), _log); } else { - Thread t = new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(), + t = new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(), null, (I2PTunnelRunner.FailCallback) null); - t.start(); } + // run in the unlimited client pool + //t.start(); + _clientExecutor.execute(t); long afterHandle = getTunnel().getContext().clock().now(); long timeToHandle = afterHandle - afterAccept; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java index 3915ae6885..344c721971 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java @@ -136,8 +136,11 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase { DCCHelper dcc = _dccEnabled ? new DCC(s.getLocalAddress().getAddress()) : null; Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " in", true); in.start(); - Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " out", true); - out.start(); + //Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + _clientId + " out", true); + Runnable out = new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc); + // we are called from an unlimited thread pool, so run inline + //out.start(); + out.run(); } catch (Exception ex) { // generally NoRouteToHostException if (_log.shouldLog(Log.WARN)) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java index f39df586f0..f82c154173 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java @@ -140,7 +140,9 @@ public class I2PTunnelIRCServer extends I2PTunnelServer implements Runnable { Socket s = getSocket(socket.getPeerDestination().calculateHash(), socket.getLocalPort()); Thread t = new I2PTunnelRunner(s, socket, slock, null, modifiedRegistration.getBytes(), null, (I2PTunnelRunner.FailCallback) null); - t.start(); + // run in the unlimited client pool + //t.start(); + _clientExecutor.execute(t); } catch (SocketException ex) { try { // Send a response so the user doesn't just see a disconnect diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index d4bd1d455f..d0dfc74ece 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -62,8 +62,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr private long totalSent; private long totalReceived; - private static final AtomicLong __forwarderId = new AtomicLong(); - /** * For use in new constructor * @since 0.9.14 @@ -268,9 +266,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE); StreamForwarder toI2P = new StreamForwarder(in, i2pout, true); StreamForwarder fromI2P = new StreamForwarder(i2pin, out, false); - // TODO can we run one of these inline and save a thread? toI2P.start(); - fromI2P.start(); + // We are already a thread, so run the second one inline + //fromI2P.start(); + fromI2P.run(); synchronized (finishLock) { while (!finished) { finishLock.wait(); @@ -384,7 +383,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr // ignore } t1.join(30*1000); - t2.join(30*1000); + // t2 = fromI2P now run inline + //t2.join(30*1000); } /** @@ -426,7 +426,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr _toI2P = toI2P; direction = (toI2P ? "toI2P" : "fromI2P"); _cache = ByteCache.getInstance(32, NETWORK_BUFFER_SIZE); - setName("StreamForwarder " + _runnerId + '.' + __forwarderId.incrementAndGet()); + setName("StreamForwarder " + _runnerId + '.' + direction); } @Override diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 71d2643b22..db1f0d07e0 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -80,6 +80,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected I2PTunnelTask task; protected boolean bidir; private ThreadPoolExecutor _executor; + protected volatile ThreadPoolExecutor _clientExecutor; private final Map<Integer, InetSocketAddress> _socketMap = new ConcurrentHashMap<Integer, InetSocketAddress>(4); /** unused? port should always be specified */ @@ -470,6 +471,16 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { if (_usePool) { _executor = new CustomThreadPoolExecutor(getHandlerCount(), "ServerHandler pool " + remoteHost + ':' + remotePort); } + TunnelControllerGroup tcg = TunnelControllerGroup.getInstance(); + if (tcg != null) { + _clientExecutor = tcg.getClientExecutor(); + } else { + // Fallback in case TCG.getInstance() is null, never instantiated + // and we were not started by TCG. + // Maybe a plugin loaded before TCG? Should be rare. + // Never shut down. + _clientExecutor = new TunnelControllerGroup.CustomThreadPoolExecutor(); + } while (open) { try { I2PServerSocket ci2pss = i2pss; @@ -563,6 +574,17 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } } + /** + * This is run in a thread from a limited-size thread pool via Handler.run(), + * except for a standard server (this class, no extension, as determined in getUsePool()), + * it is run directly in the acceptor thread (see run()). + * + * In either case, this method and any overrides must spawn a thread and return quickly. + * If blocking while reading the headers (as in HTTP and IRC), the thread pool + * may be exhausted. + * + * See PROP_USE_POOL, DEFAULT_USE_POOL, PROP_HANDLER_COUNT, DEFAULT_HANDLER_COUNT + */ protected void blockingHandle(I2PSocket socket) { if (_log.shouldLog(Log.INFO)) _log.info("Incoming connection to '" + toString() + "' port " + socket.getLocalPort() + @@ -577,7 +599,9 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { afterSocket = getTunnel().getContext().clock().now(); Thread t = new I2PTunnelRunner(s, socket, slock, null, null, null, (I2PTunnelRunner.FailCallback) null); - t.start(); + // run in the unlimited client pool + //t.start(); + _clientExecutor.execute(t); long afterHandle = getTunnel().getContext().clock().now(); long timeToHandle = afterHandle - afterAccept; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java index cb68319494..ad5256fa65 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java @@ -425,7 +425,7 @@ public class TunnelController implements Logging { // We use _sessions AND the tunnel sessions as // _sessions will be null for delay-open tunnels - see acquire(). // We want the current sessions. - Set<I2PSession> sessions = new HashSet(_tunnel.getSessions()); + Set<I2PSession> sessions = new HashSet<I2PSession>(_tunnel.getSessions()); if (_sessions != null) sessions.addAll(_sessions); return sessions; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java index af8818abdc..4c28e1b035 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelControllerGroup.java @@ -9,6 +9,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; import net.i2p.app.*; @@ -48,6 +55,21 @@ public class TunnelControllerGroup implements ClientApp { */ private final Map<I2PSession, Set<TunnelController>> _sessions; + /** + * We keep a pool of socket handlers for all clients, + * as there is no need for isolation on the client side. + * Extending classes may use it for other purposes. + * + * May also be used by servers, carefully, + * as there is no limit on threads. + */ + private ThreadPoolExecutor _executor; + private static final AtomicLong _executorThreadCount = new AtomicLong(); + private final Object _executorLock = new Object(); + /** how long to wait before dropping an idle thread */ + private static final long HANDLER_KEEPALIVE_MS = 2*60*1000; + + /** * In I2PAppContext will instantiate if necessary and always return non-null. * As of 0.9.4, when in RouterContext, will return null (except in Android) @@ -206,8 +228,7 @@ public class TunnelControllerGroup implements ClientApp { if (_instance == this) _instance = null; } -/// fixme static - I2PTunnelClientBase.killClientExecutor(); + killClientExecutor(); changeState(STOPPED); } @@ -500,4 +521,59 @@ public class TunnelControllerGroup implements ClientApp { } } } + + /** + * @return non-null + * @since 0.8.8 Moved from I2PTunnelClientBase in 0.9.18 + */ + ThreadPoolExecutor getClientExecutor() { + synchronized (_executorLock) { + if (_executor == null) + _executor = new CustomThreadPoolExecutor(); + } + return _executor; + } + + /** + * @since 0.8.8 Moved from I2PTunnelClientBase in 0.9.18 + */ + private void killClientExecutor() { + synchronized (_executorLock) { + if (_executor != null) { + _executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + _executor.shutdownNow(); + _executor = null; + } + } + // kill the shared client, so that on restart in android + // we won't latch onto the old one + I2PTunnelClientBase.killSharedClient(); + } + + /** + * Not really needed for now but in case we want to add some hooks like afterExecute(). + * Package private for fallback in case TCG.getInstance() is null, never instantiated + * but a plugin still needs it... should be rare. + * + * @since 0.9.18 Moved from I2PTunnelClientBase + */ + static class CustomThreadPoolExecutor extends ThreadPoolExecutor { + public CustomThreadPoolExecutor() { + super(0, Integer.MAX_VALUE, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS, + new SynchronousQueue<Runnable>(), new CustomThreadFactory()); + } + } + + /** + * Just to set the name and set Daemon + * @since 0.9.18 Moved from I2PTunnelClientBase + */ + private static class CustomThreadFactory implements ThreadFactory { + public Thread newThread(Runnable r) { + Thread rv = Executors.defaultThreadFactory().newThread(r); + rv.setName("I2PTunnel Client Runner " + _executorThreadCount.incrementAndGet()); + rv.setDaemon(true); + return rv; + } + } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java index e226621038..fcca71cb9a 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java @@ -76,7 +76,9 @@ public class I2PTunnelDCCClient extends I2PTunnelClientBase { try { i2ps = createI2PSocket(dest, opts); Thread t = new Runner(s, i2ps); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch (Exception ex) { _log.error("Could not make DCC connection to " + _dest + ':' + _remotePort, ex); closeSocket(s); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java index 4b1b6b8f56..cab6513adc 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java @@ -111,7 +111,9 @@ public class I2PTunnelDCCServer extends I2PTunnelServer { _sockList.add(socket); Thread t = new I2PTunnelRunner(s, socket, slock, null, null, _sockList, (I2PTunnelRunner.FailCallback) null); - t.start(); + // run in the unlimited client pool + //t.start(); + _clientExecutor.execute(t); local.socket = socket; local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE; _active.put(Integer.valueOf(myPort), local); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java index b4363afe0f..8484ef0c23 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java @@ -55,9 +55,12 @@ public class I2PSOCKSIRCTunnel extends I2PSOCKSTunnel { Thread in = new I2PAppThread(new IrcInboundFilter(clientSock, destSock, expectedPong, _log), "SOCKS IRC Client " + id + " in", true); in.start(); - Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log), - "SOCKS IRC Client " + id + " out", true); - out.start(); + //Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log), + // "SOCKS IRC Client " + id + " out", true); + Runnable out = new IrcOutboundFilter(clientSock, destSock, expectedPong, _log); + // we are called from an unlimited thread pool, so run inline + //out.start(); + out.run(); } catch (SOCKSException e) { if (_log.shouldLog(Log.WARN)) _log.warn("Error from SOCKS connection", e); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSTunnel.java index 89674c5929..498d75a181 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSTunnel.java @@ -56,7 +56,9 @@ public class I2PSOCKSTunnel extends I2PTunnelClientBase { I2PSocket destSock = serv.getDestinationI2PSocket(this); Thread t = new I2PTunnelRunner(clientSock, destSock, sockLock, null, null, mySockets, (I2PTunnelRunner.FailCallback) null); - t.start(); + // we are called from an unlimited thread pool, so run inline + //t.start(); + t.run(); } catch (SOCKSException e) { if (_log.shouldLog(Log.WARN)) _log.warn("Error from SOCKS connection", e); -- GitLab