From 9aaf95ca98e9652c84ed2254c1bfc01a8590156c Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 4 Dec 2010 18:54:06 +0000 Subject: [PATCH] * Threads: - Reduce thread pool sizes based on memory and/or bandwidth limits - Tweak some thread names for clarity --- .../src/org/klomp/snark/PeerCoordinator.java | 2 +- .../src/net/i2p/util/SimpleScheduler.java | 10 +++-- core/java/src/net/i2p/util/SimpleTimer.java | 12 ++++-- core/java/src/net/i2p/util/SimpleTimer2.java | 10 +++-- router/java/src/net/i2p/router/JobQueue.java | 4 +- .../transport/ntcp/NTCPSendFinisher.java | 26 +++++++----- .../router/transport/ntcp/NTCPTransport.java | 40 +++++++++++++------ .../net/i2p/router/transport/ntcp/Reader.java | 14 +++---- .../net/i2p/router/transport/ntcp/Writer.java | 14 +++---- .../router/transport/udp/MessageReceiver.java | 25 ++++++++---- .../router/transport/udp/PacketHandler.java | 28 +++++++++---- .../router/tunnel/TunnelGatewayPumper.java | 12 ++++-- 12 files changed, 129 insertions(+), 68 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 2a6eb79af6..bebc4588ff 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -428,7 +428,7 @@ public class PeerCoordinator implements PeerListener peer.runConnection(_util, listener, bitfield); } }; - String threadName = peer.toString(); + String threadName = "Snark peer " + peer.toString(); new I2PAppThread(r, threadName).start(); return true; } diff --git a/core/java/src/net/i2p/util/SimpleScheduler.java b/core/java/src/net/i2p/util/SimpleScheduler.java index ee7d36e99a..2b1ddf44f0 100644 --- a/core/java/src/net/i2p/util/SimpleScheduler.java +++ b/core/java/src/net/i2p/util/SimpleScheduler.java @@ -28,12 +28,14 @@ import net.i2p.I2PAppContext; public class SimpleScheduler { private static final SimpleScheduler _instance = new SimpleScheduler(); public static SimpleScheduler getInstance() { return _instance; } - private static final int THREADS = 4; + private static final int MIN_THREADS = 2; + private static final int MAX_THREADS = 4; private I2PAppContext _context; private Log _log; private ScheduledThreadPoolExecutor _executor; private String _name; private int _count; + private final int _threads; protected SimpleScheduler() { this("SimpleScheduler"); } protected SimpleScheduler(String name) { @@ -41,7 +43,9 @@ public class SimpleScheduler { _log = _context.logManager().getLog(SimpleScheduler.class); _name = name; _count = 0; - _executor = new ScheduledThreadPoolExecutor(THREADS, new CustomThreadFactory()); + long maxMemory = Runtime.getRuntime().maxMemory(); + _threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024)))); + _executor = new ScheduledThreadPoolExecutor(_threads, new CustomThreadFactory()); _executor.prestartAllCoreThreads(); } @@ -90,7 +94,7 @@ public class SimpleScheduler { private class CustomThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread rv = Executors.defaultThreadFactory().newThread(r); - rv.setName(_name + ' ' + (++_count) + '/' + THREADS); + rv.setName(_name + ' ' + (++_count) + '/' + _threads); // Uncomment this to test threadgrouping, but we should be all safe now that the constructor preallocates! // String name = rv.getThreadGroup().getName(); // if(!name.equals("main")) { diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index 6a8b855e30..0b54307119 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -18,14 +18,16 @@ import net.i2p.I2PAppContext; public class SimpleTimer { private static final SimpleTimer _instance = new SimpleTimer(); public static SimpleTimer getInstance() { return _instance; } - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; /** event time (Long) to event (TimedEvent) mapping */ private final TreeMap _events; /** event (TimedEvent) to event time (Long) mapping */ private Map _eventTimes; private final List _readyEvents; private SimpleStore runn; + private static final int MIN_THREADS = 2; + private static final int MAX_THREADS = 4; protected SimpleTimer() { this("SimpleTimer"); } protected SimpleTimer(String name) { @@ -39,9 +41,11 @@ public class SimpleTimer { runner.setName(name); runner.setDaemon(true); runner.start(); - for (int i = 0; i < 3; i++) { + long maxMemory = Runtime.getRuntime().maxMemory(); + int threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024)))); + for (int i = 1; i <= threads ; i++) { I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn)); - executor.setName(name + "Executor " + i); + executor.setName(name + "Executor " + i + '/' + threads); executor.setDaemon(true); executor.start(); } diff --git a/core/java/src/net/i2p/util/SimpleTimer2.java b/core/java/src/net/i2p/util/SimpleTimer2.java index b2af33cf2b..bda41e6211 100644 --- a/core/java/src/net/i2p/util/SimpleTimer2.java +++ b/core/java/src/net/i2p/util/SimpleTimer2.java @@ -27,12 +27,14 @@ import net.i2p.I2PAppContext; public class SimpleTimer2 { private static final SimpleTimer2 _instance = new SimpleTimer2(); public static SimpleTimer2 getInstance() { return _instance; } - private static final int THREADS = 4; + private static final int MIN_THREADS = 2; + private static final int MAX_THREADS = 4; private I2PAppContext _context; private static Log _log; // static so TimedEvent can use it private ScheduledThreadPoolExecutor _executor; private String _name; private int _count; + private final int _threads; protected SimpleTimer2() { this("SimpleTimer2"); } protected SimpleTimer2(String name) { @@ -40,7 +42,9 @@ public class SimpleTimer2 { _log = _context.logManager().getLog(SimpleTimer2.class); _name = name; _count = 0; - _executor = new CustomScheduledThreadPoolExecutor(THREADS, new CustomThreadFactory()); + long maxMemory = Runtime.getRuntime().maxMemory(); + _threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024)))); + _executor = new CustomScheduledThreadPoolExecutor(_threads, new CustomThreadFactory()); _executor.prestartAllCoreThreads(); } @@ -67,7 +71,7 @@ public class SimpleTimer2 { private class CustomThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread rv = Executors.defaultThreadFactory().newThread(r); - rv.setName(_name + ' ' + (++_count) + '/' + THREADS); + rv.setName(_name + ' ' + (++_count) + '/' + _threads); // Uncomment this to test threadgrouping, but we should be all safe now that the constructor preallocates! // String name = rv.getThreadGroup().getName(); // if(!name.equals("main")) { diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index a8b5395b00..42ddadf5ab 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -395,10 +395,8 @@ public class JobQueue { for (int i = _queueRunners.size(); i < numThreads; i++) { JobQueueRunner runner = new JobQueueRunner(_context, i); _queueRunners.put(Integer.valueOf(i), runner); - Thread t = new I2PThread(runner); - t.setName("JobQueue"+(_runnerId++)); + Thread t = new I2PThread(runner, "JobQueue " + (++_runnerId) + '/' + numThreads, false); //t.setPriority(I2PThread.MAX_PRIORITY-1); - t.setDaemon(false); t.start(); } } else if (_queueRunners.size() == numThreads) { diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPSendFinisher.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPSendFinisher.java index fd5cf1ac91..374c7a5ba0 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPSendFinisher.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPSendFinisher.java @@ -24,22 +24,27 @@ import net.i2p.util.Log; * @author zzz */ public class NTCPSendFinisher { - private static final int THREADS = 4; + private static final int MIN_THREADS = 1; + private static final int MAX_THREADS = 4; private final I2PAppContext _context; private final NTCPTransport _transport; private final Log _log; - private int _count; + private static int _count; private ThreadPoolExecutor _executor; + private static int _threads; public NTCPSendFinisher(I2PAppContext context, NTCPTransport transport) { _context = context; _log = _context.logManager().getLog(NTCPSendFinisher.class); _transport = transport; + _context.statManager().createRateStat("ntcp.sendFinishTime", "How long to queue and excecute msg.afterSend()", "ntcp", new long[] {5*1000}); } public void start() { _count = 0; - _executor = new CustomThreadPoolExecutor(); + long maxMemory = Runtime.getRuntime().maxMemory(); + _threads = (int) Math.max(MIN_THREADS, Math.min(MAX_THREADS, 1 + (maxMemory / (32*1024*1024)))); + _executor = new CustomThreadPoolExecutor(_threads); } public void stop() { @@ -57,18 +62,18 @@ public class NTCPSendFinisher { } // not really needed for now but in case we want to add some hooks like afterExecute() - private class CustomThreadPoolExecutor extends ThreadPoolExecutor { - public CustomThreadPoolExecutor() { + private static class CustomThreadPoolExecutor extends ThreadPoolExecutor { + public CustomThreadPoolExecutor(int num) { // use unbounded queue, so maximumPoolSize and keepAliveTime have no effect - super(THREADS, THREADS, 1000, TimeUnit.MILLISECONDS, + super(num, num, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new CustomThreadFactory()); } } - private class CustomThreadFactory implements ThreadFactory { + private static class CustomThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread rv = Executors.defaultThreadFactory().newThread(r); - rv.setName("NTCPSendFinisher " + (++_count) + '/' + THREADS); + rv.setName("NTCPSendFinisher " + (++_count) + '/' + _threads); rv.setDaemon(true); return rv; } @@ -78,15 +83,18 @@ public class NTCPSendFinisher { * Call afterSend() for the message */ private class RunnableEvent implements Runnable { - private OutNetMessage _msg; + private final OutNetMessage _msg; + private final long _queued; public RunnableEvent(OutNetMessage msg) { _msg = msg; + _queued = _context.clock().now(); } public void run() { try { _transport.afterSend(_msg, true, false, _msg.getSendTime()); + _context.statManager().addRateData("ntcp.sendFinishTime", _context.clock().now() - _queued, 0); } catch (Throwable t) { _log.log(Log.CRIT, " wtf, afterSend borked", t); } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 3d6d91f516..31060ba411 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -433,8 +433,10 @@ public class NTCPTransport extends TransportImpl { return skews; } - private static final int NUM_CONCURRENT_READERS = 3; - private static final int NUM_CONCURRENT_WRITERS = 3; + private static final int MIN_CONCURRENT_READERS = 2; // unless < 32MB + private static final int MIN_CONCURRENT_WRITERS = 2; // unless < 32MB + private static final int MAX_CONCURRENT_READERS = 4; + private static final int MAX_CONCURRENT_WRITERS = 4; /** * Called by TransportManager. @@ -449,12 +451,8 @@ public class NTCPTransport extends TransportImpl { if (_pumper.isAlive()) return _myAddress != null ? _myAddress.toRouterAddress() : null; if (_log.shouldLog(Log.WARN)) _log.warn("Starting ntcp transport listening"); - _finisher.start(); - _pumper.startPumping(); - - _reader.startReading(NUM_CONCURRENT_READERS); - _writer.startWriting(NUM_CONCURRENT_WRITERS); + startIt(); configureLocalAddress(); return bindAddress(); } @@ -471,12 +469,8 @@ public class NTCPTransport extends TransportImpl { if (_pumper.isAlive()) return _myAddress != null ? _myAddress.toRouterAddress() : null; if (_log.shouldLog(Log.WARN)) _log.warn("Restarting ntcp transport listening"); - _finisher.start(); - _pumper.startPumping(); - - _reader.startReading(NUM_CONCURRENT_READERS); - _writer.startWriting(NUM_CONCURRENT_WRITERS); + startIt(); if (addr == null) _myAddress = null; else @@ -484,6 +478,28 @@ public class NTCPTransport extends TransportImpl { return bindAddress(); } + /** + * Start up. Caller must synchronize. + * @since 0.8.3 + */ + private void startIt() { + _finisher.start(); + _pumper.startPumping(); + + long maxMemory = Runtime.getRuntime().maxMemory(); + int nr, nw; + if (maxMemory < 32*1024*1024) { + nr = nw = 1; + } else if (maxMemory < 64*1024*1024) { + nr = nw = 2; + } else { + nr = Math.max(MIN_CONCURRENT_READERS, Math.min(MAX_CONCURRENT_READERS, _context.bandwidthLimiter().getInboundKBytesPerSecond() / 20)); + nw = Math.max(MIN_CONCURRENT_WRITERS, Math.min(MAX_CONCURRENT_WRITERS, _context.bandwidthLimiter().getOutboundKBytesPerSecond() / 20)); + } + _reader.startReading(nr); + _writer.startWriting(nw); + } + public boolean isAlive() { return _pumper.isAlive(); } diff --git a/router/java/src/net/i2p/router/transport/ntcp/Reader.java b/router/java/src/net/i2p/router/transport/ntcp/Reader.java index c1029b26e5..9694815453 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Reader.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Reader.java @@ -15,13 +15,13 @@ import net.i2p.util.Log; * */ class Reader { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; // TODO change to LBQ ?? private final List<NTCPConnection> _pendingConnections; - private List<NTCPConnection> _liveReads; - private List<NTCPConnection> _readAfterLive; - private List<Runner> _runners; + private final List<NTCPConnection> _liveReads; + private final List<NTCPConnection> _readAfterLive; + private final List<Runner> _runners; public Reader(RouterContext ctx) { _context = ctx; @@ -33,9 +33,9 @@ class Reader { } public void startReading(int numReaders) { - for (int i = 0; i < numReaders; i++) { + for (int i = 1; i <= numReaders; i++) { Runner r = new Runner(); - I2PThread t = new I2PThread(r, "NTCP read " + i, true); + I2PThread t = new I2PThread(r, "NTCP reader " + i + '/' + numReaders, true); _runners.add(r); t.start(); } diff --git a/router/java/src/net/i2p/router/transport/ntcp/Writer.java b/router/java/src/net/i2p/router/transport/ntcp/Writer.java index ca676c572c..260569df75 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Writer.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Writer.java @@ -14,12 +14,12 @@ import net.i2p.util.Log; * */ class Writer { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; private final List<NTCPConnection> _pendingConnections; - private List<NTCPConnection> _liveWrites; - private List<NTCPConnection> _writeAfterLive; - private List<Runner> _runners; + private final List<NTCPConnection> _liveWrites; + private final List<NTCPConnection> _writeAfterLive; + private final List<Runner> _runners; public Writer(RouterContext ctx) { _context = ctx; @@ -31,9 +31,9 @@ class Writer { } public void startWriting(int numWriters) { - for (int i = 0; i < numWriters; i++) { + for (int i = 1; i <=numWriters; i++) { Runner r = new Runner(); - I2PThread t = new I2PThread(r, "NTCP write " + i, true); + I2PThread t = new I2PThread(r, "NTCP writer " + i + '/' + numWriters, true); _runners.add(r); t.start(); } diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index 08b6088c4f..4988a06851 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -27,7 +27,9 @@ class MessageReceiver { private final BlockingQueue<InboundMessageState> _completeMessages; private boolean _alive; //private ByteCache _cache; - private static final int THREADS = 5; + private static final int MIN_THREADS = 2; // unless < 32MB + private static final int MAX_THREADS = 5; + private final int _threadCount; private static final long POISON_IMS = -99999999999l; public MessageReceiver(RouterContext ctx, UDPTransport transport) { @@ -35,10 +37,19 @@ class MessageReceiver { _log = ctx.logManager().getLog(MessageReceiver.class); _transport = transport; _completeMessages = new LinkedBlockingQueue(); + + long maxMemory = Runtime.getRuntime().maxMemory(); + if (maxMemory < 32*1024*1024) + _threadCount = 1; + else if (maxMemory < 64*1024*1024) + _threadCount = 2; + else + _threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20)); + // the runners run forever, no need to have a cache //_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE); _context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES); @@ -49,8 +60,8 @@ class MessageReceiver { public void startup() { _alive = true; - for (int i = 0; i < THREADS; i++) { - I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i + '/' + THREADS, true); + for (int i = 0; i < _threadCount; i++) { + I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + (i+1) + '/' + _threadCount, true); t.start(); } } @@ -64,7 +75,7 @@ class MessageReceiver { public void shutdown() { _alive = false; _completeMessages.clear(); - for (int i = 0; i < THREADS; i++) { + for (int i = 0; i < _threadCount; i++) { InboundMessageState ims = new InboundMessageState(_context, POISON_IMS, null); _completeMessages.offer(ims); } @@ -119,8 +130,8 @@ class MessageReceiver { if (message != null) { long before = System.currentTimeMillis(); - if (remaining > 0) - _context.statManager().addRateData("udp.inboundRemaining", remaining, 0); + //if (remaining > 0) + // _context.statManager().addRateData("udp.inboundRemaining", remaining, 0); int size = message.getCompleteSize(); if (_log.shouldLog(Log.INFO)) _log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime()); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index d130a4c838..2c0138228d 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -31,11 +31,13 @@ class PacketHandler { private boolean _keepReading; private final Handler[] _handlers; - private static final int NUM_HANDLERS = 5; + private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB + private static final int MAX_NUM_HANDLERS = 5; /** let packets be up to 30s slow */ private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000; - PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound, PeerTestManager testManager, IntroductionManager introManager) {// LINT -- Exporting non-public type through public API + PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, + InboundMessageFragments inbound, PeerTestManager testManager, IntroductionManager introManager) { _context = ctx; _log = ctx.logManager().getLog(PacketHandler.class); _transport = transport; @@ -44,10 +46,20 @@ class PacketHandler { _inbound = inbound; _testManager = testManager; _introManager = introManager; - _handlers = new Handler[NUM_HANDLERS]; - for (int i = 0; i < NUM_HANDLERS; i++) { + + long maxMemory = Runtime.getRuntime().maxMemory(); + int num_handlers; + if (maxMemory < 32*1024*1024) + num_handlers = 1; + else if (maxMemory < 64*1024*1024) + num_handlers = 2; + else + num_handlers = Math.max(MIN_NUM_HANDLERS, Math.min(MAX_NUM_HANDLERS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20)); + _handlers = new Handler[num_handlers]; + for (int i = 0; i < num_handlers; i++) { _handlers[i] = new Handler(); } + _context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", UDPTransport.RATES); @@ -79,8 +91,8 @@ class PacketHandler { public void startup() { _keepReading = true; - for (int i = 0; i < NUM_HANDLERS; i++) { - I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + i + '/' + NUM_HANDLERS, true); + for (int i = 0; i < _handlers.length; i++) { + I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + (i+1) + '/' + _handlers.length, true); t.start(); } } @@ -91,8 +103,8 @@ class PacketHandler { String getHandlerStatus() { StringBuilder rv = new StringBuilder(); - rv.append("Handlers: ").append(NUM_HANDLERS); - for (int i = 0; i < NUM_HANDLERS; i++) { + rv.append("Handlers: ").append(_handlers.length); + for (int i = 0; i < _handlers.length; i++) { Handler handler = _handlers[i]; rv.append(" handler ").append(i).append(" state: ").append(handler._state); } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java index 05db0b0ce4..7f29f5743d 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java @@ -16,22 +16,26 @@ public class TunnelGatewayPumper implements Runnable { private RouterContext _context; private final BlockingQueue<PumpedTunnelGateway> _wantsPumping; private boolean _stop; - private static final int PUMPERS = 4; + private static final int MIN_PUMPERS = 1; + private static final int MAX_PUMPERS = 4; + private final int _pumpers; /** Creates a new instance of TunnelGatewayPumper */ public TunnelGatewayPumper(RouterContext ctx) { _context = ctx; _wantsPumping = new LinkedBlockingQueue(); _stop = false; - for (int i = 0; i < PUMPERS; i++) - new I2PThread(this, "Tunnel GW pumper " + i + '/' + PUMPERS, true).start(); + long maxMemory = Runtime.getRuntime().maxMemory(); + _pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024)))); + for (int i = 0; i < _pumpers; i++) + new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true).start(); } public void stopPumping() { _stop=true; _wantsPumping.clear(); PumpedTunnelGateway poison = new PoisonPTG(_context); - for (int i = 0; i < PUMPERS; i++) + for (int i = 0; i < _pumpers; i++) _wantsPumping.offer(poison); for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) { try { -- GitLab