diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPSendFinisher.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPSendFinisher.java new file mode 100644 index 0000000000000000000000000000000000000000..8d19c62498fedae7de6426558925dacd3f6739ff --- /dev/null +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPSendFinisher.java @@ -0,0 +1,89 @@ +package net.i2p.router.transport.ntcp; + +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; + +import net.i2p.I2PAppContext; +import net.i2p.router.OutNetMessage; +import net.i2p.util.Log; + +/** + * Previously, NTCP was using SimpleTimer with a delay of 0, which + * was a real abuse. + * + * Here we use the non-scheduled, lockless ThreadPoolExecutor with + * a fixed pool size and an unbounded queue. + * + * The old implementation was having problems with lock contention; + * this should work a lot better - and not clog up the SimpleTimer queue. + * + * @author zzz + */ +public class NTCPSendFinisher { + private static final int THREADS = 4; + private I2PAppContext _context; + private NTCPTransport _transport; + private Log _log; + private int _count; + private ThreadPoolExecutor _executor; + + public NTCPSendFinisher(I2PAppContext context, NTCPTransport transport) { + _context = context; + _log = _context.logManager().getLog(NTCPSendFinisher.class); + _transport = transport; + } + + public void start() { + _count = 0; + _executor = new CustomThreadPoolExecutor(); + } + + public void stop() { + _executor.shutdownNow(); + } + + public void add(OutNetMessage msg) { + _executor.execute(new RunnableEvent(msg)); + } + + // not really needed for now but in case we want to add some hooks like afterExecute() + private class CustomThreadPoolExecutor extends ThreadPoolExecutor { + public CustomThreadPoolExecutor() { + // use unbounded queue, so maximumPoolSize and keepAliveTime have no effect + super(THREADS, THREADS, 1000, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), new CustomThreadFactory()); + } + } + + private class CustomThreadFactory implements ThreadFactory { + public Thread newThread(Runnable r) { + Thread rv = Executors.defaultThreadFactory().newThread(r); + rv.setName("NTCPSendFinisher " + (++_count) + '/' + THREADS); + rv.setDaemon(true); + return rv; + } + } + + /** + * Call afterSend() for the message + */ + private class RunnableEvent implements Runnable { + private OutNetMessage _msg; + + public RunnableEvent(OutNetMessage msg) { + _msg = msg; + } + + public void run() { + try { + _transport.afterSend(_msg, true, false, _msg.getSendTime()); + } catch (Throwable t) { + _log.log(Log.CRIT, " wtf, afterSend borked", t); + } + } + } +} + diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 4b5573917eb7f0c6afba5d4ecf22f015216476cd..c23245bae41edab88fb81c2903697474fb314b41 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -27,7 +27,6 @@ import net.i2p.router.transport.Transport; import net.i2p.router.transport.TransportBid; import net.i2p.router.transport.TransportImpl; import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; /** * @@ -50,7 +49,7 @@ public class NTCPTransport extends TransportImpl { private List _establishing; private List _sent; - private SendFinisher _finisher; + private NTCPSendFinisher _finisher; public NTCPTransport(RouterContext ctx) { super(ctx); @@ -124,7 +123,7 @@ public class NTCPTransport extends TransportImpl { _conByIdent = new HashMap(64); _sent = new ArrayList(4); - _finisher = new SendFinisher(); + _finisher = new NTCPSendFinisher(ctx, this); _pumper = new EventPumper(ctx, this); _reader = new Reader(ctx); @@ -310,27 +309,8 @@ public class NTCPTransport extends TransportImpl { return countActivePeers() < getMaxConnections() * 4 / 5; } + /** queue up afterSend call, which can take some time w/ jobs, etc */ void sendComplete(OutNetMessage msg) { _finisher.add(msg); } - /** async afterSend call, which can take some time w/ jobs, etc */ - private class SendFinisher implements SimpleTimer.TimedEvent { - public void add(OutNetMessage msg) { - synchronized (_sent) { _sent.add(msg); } - SimpleTimer.getInstance().addEvent(SendFinisher.this, 0); - } - public void timeReached() { - int pending = 0; - OutNetMessage msg = null; - synchronized (_sent) { - pending = _sent.size()-1; - if (pending >= 0) - msg = (OutNetMessage)_sent.remove(0); - } - if (msg != null) - afterSend(msg, true, false, msg.getSendTime()); - if (pending > 0) - SimpleTimer.getInstance().addEvent(SendFinisher.this, 0); - } - } private boolean isEstablished(RouterIdentity peer) { return isEstablished(peer.calculateHash()); @@ -412,6 +392,7 @@ public class NTCPTransport extends TransportImpl { public RouterAddress startListening() { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting ntcp transport listening"); + _finisher.start(); _pumper.startPumping(); _reader.startReading(NUM_CONCURRENT_READERS); @@ -423,6 +404,7 @@ public class NTCPTransport extends TransportImpl { public RouterAddress restartListening(RouterAddress addr) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Restarting ntcp transport listening"); + _finisher.start(); _pumper.startPumping(); _reader.startReading(NUM_CONCURRENT_READERS); @@ -551,6 +533,7 @@ public class NTCPTransport extends TransportImpl { _pumper.stopPumping(); _writer.stopWriting(); _reader.stopReading(); + _finisher.stop(); Map cons = null; synchronized (_conLock) { cons = new HashMap(_conByIdent);