From e3353df8bbee062e5f67178b34fe3939df543653 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Fri, 26 Feb 2010 16:52:09 +0000 Subject: [PATCH] * NTCP Transport: - Replace lists with concurrent queues in EventPumper and NTCPConnection to remove global locks - Java 5 cleanup --- .../router/transport/ntcp/EventPumper.java | 122 ++++------ .../router/transport/ntcp/NTCPConnection.java | 227 ++++++++---------- .../net/i2p/router/transport/ntcp/Reader.java | 18 +- .../net/i2p/router/transport/ntcp/Writer.java | 12 +- 4 files changed, 158 insertions(+), 221 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 82ec5719e0..611665c415 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.RouterIdentity; import net.i2p.data.RouterInfo; @@ -33,11 +34,11 @@ public class EventPumper implements Runnable { private Log _log; private volatile boolean _alive; private Selector _selector; - private final List _bufCache; - private final List _wantsRead = new ArrayList(16); - private final List _wantsWrite = new ArrayList(4); - private final List _wantsRegister = new ArrayList(1); - private final List _wantsConRegister = new ArrayList(4); + private final LinkedBlockingQueue<ByteBuffer> _bufCache; + private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue(); + private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue(); + private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue(); + private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue(); private NTCPTransport _transport; private long _expireIdleWriteTime; @@ -54,23 +55,19 @@ public class EventPumper implements Runnable { /** tunnel test is every 30-60s, so this should be longer than, say, 3*45s to allow for drops */ private static final long MIN_EXPIRE_IDLE_TIME = 3*60*1000l; private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l; - + public EventPumper(RouterContext ctx, NTCPTransport transport) { _context = ctx; _log = ctx.logManager().getLog(getClass()); _transport = transport; _alive = false; - _bufCache = new ArrayList(MAX_CACHE_SIZE); + _bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE); _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; } public synchronized void startPumping() { if (_log.shouldLog(Log.INFO)) _log.info("Starting pumper"); -// _wantsRead = new ArrayList(16); -// _wantsWrite = new ArrayList(4); -// _wantsRegister = new ArrayList(1); -// _wantsConRegister = new ArrayList(4); try { _selector = Selector.open(); _alive = true; @@ -98,13 +95,13 @@ public class EventPumper implements Runnable { public void register(ServerSocketChannel chan) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering server socket channel"); - synchronized (_wantsRegister) { _wantsRegister.add(chan); } + _wantsRegister.offer(chan); _selector.wakeup(); } public void registerConnect(NTCPConnection con) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection"); _context.statManager().addRateData("ntcp.registerConnect", 1, 0); - synchronized (_wantsConRegister) { _wantsConRegister.add(con); } + _wantsConRegister.offer(con); _selector.wakeup(); } @@ -254,10 +251,10 @@ public class EventPumper implements Runnable { } catch (Exception e) { _log.error("Error closing keys on pumper shutdown", e); } - synchronized (_wantsConRegister) { _wantsConRegister.clear(); } - synchronized (_wantsRead) { _wantsRead.clear(); } - synchronized (_wantsRegister) { _wantsRegister.clear(); } - synchronized (_wantsWrite) { _wantsWrite.clear(); } + _wantsConRegister.clear(); + _wantsRead.clear(); + _wantsRegister.clear(); + _wantsWrite.clear(); } private void processKeys(Set selected) { @@ -322,10 +319,8 @@ public class EventPumper implements Runnable { public void wantsWrite(NTCPConnection con) { if (_log.shouldLog(Log.INFO)) _log.info("Before adding wants to write on " + con); - synchronized (_wantsWrite) { - if (!_wantsWrite.contains(con)) - _wantsWrite.add(con); - } + if (!_wantsWrite.contains(con)) + _wantsWrite.offer(con); if (_log.shouldLog(Log.INFO)) _log.info("Wants to write on " + con); _selector.wakeup(); @@ -333,10 +328,8 @@ public class EventPumper implements Runnable { _log.debug("selector awoken for write"); } public void wantsRead(NTCPConnection con) { - synchronized (_wantsRead) { - if (!_wantsRead.contains(con)) - _wantsRead.add(con); - } + if (!_wantsRead.contains(con)) + _wantsRead.offer(con); if (_log.shouldLog(Log.DEBUG)) _log.debug("wants to read on " + con); _selector.wakeup(); @@ -345,16 +338,16 @@ public class EventPumper implements Runnable { } private static final int MIN_BUFS = 5; + /** + * There's only one pumper, so static is fine, unless multi router + * Is there a better way to do this? + */ private static int NUM_BUFS = 5; private static int __liveBufs = 0; private static int __consecutiveExtra; ByteBuffer acquireBuf() { - if (false) return ByteBuffer.allocate(BUF_SIZE); - ByteBuffer rv = null; - synchronized (_bufCache) { - if (_bufCache.size() > 0) - rv = (ByteBuffer)_bufCache.remove(0); - } + //if (false) return ByteBuffer.allocate(BUF_SIZE); + ByteBuffer rv = (ByteBuffer)_bufCache.poll(); if (rv == null) { rv = ByteBuffer.allocate(BUF_SIZE); NUM_BUFS = ++__liveBufs; @@ -369,27 +362,24 @@ public class EventPumper implements Runnable { } void releaseBuf(ByteBuffer buf) { - if (false) return; + //if (false) return; if (_log.shouldLog(Log.DEBUG)) _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf); buf.clear(); - int extra = 0; - boolean cached = false; - synchronized (_bufCache) { - if (_bufCache.size() < NUM_BUFS) { - extra = _bufCache.size(); - _bufCache.add(buf); - cached = true; - if (extra > 5) { - __consecutiveExtra++; - if (__consecutiveExtra >= 20) { - NUM_BUFS = Math.max(NUM_BUFS - 1, MIN_BUFS); - __consecutiveExtra = 0; - } + int extra = _bufCache.size(); + boolean cached = extra < NUM_BUFS; + + if (cached) { + _bufCache.offer(buf); + if (extra > 5) { + __consecutiveExtra++; + if (__consecutiveExtra >= 20) { + NUM_BUFS = Math.max(NUM_BUFS - 1, MIN_BUFS); + __consecutiveExtra = 0; } - } else { - __liveBufs--; } + } else { + __liveBufs--; } if (cached && _log.shouldLog(Log.DEBUG)) _log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live"); @@ -578,14 +568,8 @@ public class EventPumper implements Runnable { } private void runDelayedEvents(List buf) { - synchronized (_wantsRead) { - if (_wantsRead.size() > 0) { - buf.addAll(_wantsRead); - _wantsRead.clear(); - } - } - while (buf.size() > 0) { - NTCPConnection con = (NTCPConnection)buf.remove(0); + NTCPConnection con; + while ((con = _wantsRead.poll()) != null) { SelectionKey key = con.getKey(); try { key.interestOps(key.interestOps() | SelectionKey.OP_READ); @@ -594,14 +578,7 @@ public class EventPumper implements Runnable { } } - synchronized (_wantsWrite) { - if (_wantsWrite.size() > 0) { - buf.addAll(_wantsWrite); - _wantsWrite.clear(); - } - } - while (buf.size() > 0) { - NTCPConnection con = (NTCPConnection)buf.remove(0); + while ((con = _wantsWrite.poll()) != null) { SelectionKey key = con.getKey(); try { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); @@ -610,14 +587,8 @@ public class EventPumper implements Runnable { } } - synchronized (_wantsRegister) { - if (_wantsRegister.size() > 0) { - buf.addAll(_wantsRegister); - _wantsRegister.clear(); - } - } - while (buf.size() > 0) { - ServerSocketChannel chan = (ServerSocketChannel)buf.remove(0); + ServerSocketChannel chan; + while ((chan = _wantsRegister.poll()) != null) { try { SelectionKey key = chan.register(_selector, SelectionKey.OP_ACCEPT); key.attach(chan); @@ -626,14 +597,7 @@ public class EventPumper implements Runnable { } } - synchronized (_wantsConRegister) { - if (_wantsConRegister.size() > 0) { - buf.addAll(_wantsConRegister); - _wantsConRegister.clear(); - } - } - while (buf.size() > 0) { - NTCPConnection con = (NTCPConnection)buf.remove(0); + while ((con = _wantsConRegister.poll()) != null) { try { SelectionKey key = con.getChannel().register(_selector, SelectionKey.OP_CONNECT); key.attach(con); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 42e205671b..9eff966bfb 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -8,6 +8,8 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.Adler32; import net.i2p.data.Base64; @@ -23,6 +25,7 @@ import net.i2p.router.OutNetMessage; import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; /** @@ -53,13 +56,14 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { private SocketChannel _chan; private SelectionKey _conKey; /** list of ByteBuffer containing data we have read and are ready to process, oldest first */ - private final List _readBufs; + private final LinkedBlockingQueue<ByteBuffer> _readBufs; /** * list of ByteBuffers containing fully populated and encrypted data, ready to write, * and already cleared through the bandwidth limiter. */ - private final List _writeBufs; - private final List _bwRequests; + private final LinkedBlockingQueue<ByteBuffer> _writeBufs; + /** Todo: This is only so we can abort() them when we close() ??? */ + private final Set<FIFOBandwidthLimiter.Request> _bwRequests; private boolean _established; private long _establishedOn; private EstablishState _establishState; @@ -72,8 +76,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { /** * pending unprepared OutNetMessage instances */ - private final List _outbound; - /** current prepared OutNetMessage, or null */ + private final LinkedBlockingQueue<OutNetMessage> _outbound; + /** current prepared OutNetMessage, or null - synchronize on _outbound to modify */ private OutNetMessage _currentOutbound; private SessionKey _sessionKey; /** encrypted block of the current I2NP message being read */ @@ -124,10 +128,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _lastReceiveTime = _created; _transport = transport; _chan = chan; - _readBufs = new ArrayList(4); - _writeBufs = new ArrayList(4); - _bwRequests = new ArrayList(2); - _outbound = new ArrayList(4); + _readBufs = new LinkedBlockingQueue(); + _writeBufs = new LinkedBlockingQueue(); + _bwRequests = new ConcurrentHashSet(2); + _outbound = new LinkedBlockingQueue(); _established = false; _isInbound = true; _closed = false; @@ -154,10 +158,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _lastReceiveTime = _created; _transport = transport; _remAddr = remAddr; - _readBufs = new ArrayList(4); - _writeBufs = new ArrayList(4); - _bwRequests = new ArrayList(2); - _outbound = new ArrayList(4); + _readBufs = new LinkedBlockingQueue(); + _writeBufs = new LinkedBlockingQueue(); + _bwRequests = new ConcurrentHashSet(2); + _outbound = new LinkedBlockingQueue(); _established = false; _isInbound = false; _closed = false; @@ -210,12 +214,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { public long getMessagesSent() { return _messagesWritten; } public long getMessagesReceived() { return _messagesRead; } public long getOutboundQueueSize() { - synchronized (_outbound) { int queued = _outbound.size(); if (_currentOutbound != null) queued++; return queued; - } } public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; } public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; } @@ -234,29 +236,21 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _transport.removeCon(this); _transport.getReader().connectionClosed(this); _transport.getWriter().connectionClosed(this); - List reqs = null; - synchronized (_bwRequests) { - if (_bwRequests.size() > 0) { - reqs = new ArrayList(_bwRequests); - _bwRequests.clear(); - } - } - if (reqs != null) - for (Iterator iter = reqs.iterator(); iter.hasNext(); ) - ((FIFOBandwidthLimiter.Request)iter.next()).abort(); - List msgs = null; - synchronized (_outbound) { - msgs = new ArrayList(_outbound); - _outbound.clear(); + + for (Iterator<FIFOBandwidthLimiter.Request> iter = _bwRequests.iterator(); iter.hasNext(); ) { + iter.next().abort(); } - for (int i = 0; i < msgs.size(); i++) { - OutNetMessage msg = (OutNetMessage)msgs.get(i); + _bwRequests.clear(); + + OutNetMessage msg; + while ((msg = _outbound.poll()) != null) { Object buf = msg.releasePreparationBuffer(); if (buf != null) releaseBuf((PrepBuffer)buf); _transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); } - OutNetMessage msg = _currentOutbound; + + msg = _currentOutbound; if (msg != null) { Object buf = msg.releasePreparationBuffer(); if (buf != null) @@ -277,10 +271,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (_consecutiveBacklog > 10) { // waaay too backlogged boolean wantsWrite = false; try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (Exception e) {} - int blocks = 0; - synchronized (_writeBufs) { blocks = _writeBufs.size(); } - if (_log.shouldLog(Log.WARN)) + if (_log.shouldLog(Log.WARN)) { + int blocks = _writeBufs.size(); _log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64()); + } _context.statManager().addRateData("ntcp.closeOnBacklog", getUptime(), getUptime()); close(); } @@ -292,40 +286,29 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (FAST_LARGE) bufferedPrepare(msg); boolean noOutbound = false; - synchronized (_outbound) { - _outbound.add(msg); - enqueued = _outbound.size(); - msg.setQueueSize(enqueued); - noOutbound = (_currentOutbound == null); - } + _outbound.offer(msg); + enqueued = _outbound.size(); + msg.setQueueSize(enqueued); + noOutbound = (_currentOutbound == null); if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); if (_established && noOutbound) _transport.getWriter().wantsWrite(this, "enqueued"); } private long queueTime() { - long queueTime = 0; - int size = 0; - synchronized (_outbound) { - OutNetMessage msg = _currentOutbound; - size = _outbound.size(); - if ( (msg == null) && (size > 0) ) - msg = (OutNetMessage)_outbound.get(0); + OutNetMessage msg = _currentOutbound; + if (msg == null) { + msg = _outbound.peek(); if (msg == null) return 0; - queueTime = msg.getSendTime(); // does not include any of the pre-send(...) preparation } - return queueTime; + return msg.getSendTime(); // does not include any of the pre-send(...) preparation } + public boolean tooBacklogged() { long queueTime = queueTime(); if (queueTime <= 0) return false; - int size = 0; - boolean currentOutboundSet = false; - synchronized (_outbound) { - size = _outbound.size(); - currentOutboundSet = (_currentOutbound != null); - } + boolean currentOutboundSet = _currentOutbound != null; // perhaps we could take into account the size of the queued messages too, our // current transmission rate, and how much time is left before the new message's expiration? @@ -333,15 +316,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if (getUptime() < 10*1000) // allow some slack just after establishment return false; if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime... - int writeBufs = 0; - synchronized (_writeBufs) { writeBufs = _writeBufs.size(); } - if (_log.shouldLog(Log.WARN)) + int size = _outbound.size(); + if (_log.shouldLog(Log.WARN)) { + int writeBufs = _writeBufs.size(); try { _log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE)) + ", currentOut set? " + currentOutboundSet + ", writeBufs: " + writeBufs + " on " + toString()); } catch (Exception e) {} // java.nio.channels.CancelledKeyException + } _context.statManager().addRateData("ntcp.sendBacklogTime", queueTime, size); return true; //} else if (size > 32) { // another arbitrary limit. @@ -440,10 +424,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _establishState = null; _transport.markReachable(getRemotePeer().calculateHash(), false); //_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE); - boolean msgs = false; - synchronized (_outbound) { - msgs = (_outbound.size() > 0); - } + boolean msgs = !_outbound.isEmpty(); _nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY); _nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); if (msgs) @@ -469,6 +450,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { * prepare the next i2np message for transmission. this should be run from * the Writer thread pool. * + * Todo: remove synchronization? + * */ synchronized void prepareNextWrite() { //if (FAST_LARGE) @@ -579,6 +562,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { * prepare the next i2np message for transmission. this should be run from * the Writer thread pool. * + * Todo: remove synchronization? + * */ synchronized void prepareNextWriteFast() { if (_log.shouldLog(Log.DEBUG)) @@ -600,6 +585,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } OutNetMessage msg = null; + // this is synchronized only for _currentOutbound + // Todo: figure out how to remove the synchronization synchronized (_outbound) { if (_currentOutbound != null) { if (_log.shouldLog(Log.INFO)) @@ -608,21 +595,28 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } //throw new RuntimeException("We should not be preparing a write while we still have one pending"); if (queueTime() > 3*1000) { // don't stall low-priority messages - msg = (OutNetMessage)_outbound.remove(0); + msg = _outbound.poll(); + if (msg == null) + return; } else { - Iterator it = _outbound.iterator(); + int slot = 0; // only for logging + Iterator<OutNetMessage> it = _outbound.iterator(); for (int i = 0; it.hasNext() && i < 75; i++) { //arbitrary bound - OutNetMessage mmsg = (OutNetMessage) it.next(); - if (msg == null || mmsg.getPriority() > msg.getPriority()) + OutNetMessage mmsg = it.next(); + if (msg == null || mmsg.getPriority() > msg.getPriority()) { msg = mmsg; + slot = i; + } } if (msg == null) return; // if (_outbound.indexOf(msg) > 0) // _log.debug("Priority message sent, pri = " + msg.getPriority() + " pos = " + _outbound.indexOf(msg) + "/" +_outbound.size()); if (_log.shouldLog(Log.INFO)) - _log.info("Type " + msg.getMessage().getType() + " pri " + msg.getPriority() + " slot " + _outbound.indexOf(msg)); - _outbound.remove(msg); + _log.info("Type " + msg.getMessage().getType() + " pri " + msg.getPriority() + " slot " + slot); + boolean removed = _outbound.remove(msg); + if ((!removed) && _log.shouldLog(Log.ERROR)) + _log.info("Already removed??? " + msg.getMessage().getType()); } _currentOutbound = msg; } @@ -815,17 +809,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { write(buf); } } + private void removeRequest(FIFOBandwidthLimiter.Request req) { - synchronized (_bwRequests) { _bwRequests.remove(req); } + _bwRequests.remove(req); } + private void addRequest(FIFOBandwidthLimiter.Request req) { - synchronized (_bwRequests) { _bwRequests.add(req); } - } - - public int outboundQueueSize() { - synchronized (_writeBufs) { - return _writeBufs.size(); - } + _bwRequests.add(req); } /** @@ -852,10 +842,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { */ public void recv(ByteBuffer buf) { _bytesReceived += buf.remaining(); - synchronized (_readBufs) { //buf.flip(); - _readBufs.add(buf); - } + _readBufs.offer(buf); _transport.getReader().wantsRead(this); updateStats(); } @@ -865,34 +853,27 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { */ public void write(ByteBuffer buf) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)"); - synchronized (_writeBufs) { - _writeBufs.add(buf); - } + _writeBufs.offer(buf); if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)"); _transport.getPumper().wantsWrite(this); } + /** @return null if none available */ public ByteBuffer getNextReadBuf() { - synchronized (_readBufs) { - if (_readBufs.size() > 0) - return (ByteBuffer)_readBufs.get(0); - } - return null; + return _readBufs.poll(); } + + /** since getNextReadBuf() removes, this should not be necessary */ public void removeReadBuf(ByteBuffer buf) { - synchronized (_readBufs) { - _readBufs.remove(buf); - } + _readBufs.remove(buf); //_transport.getPumper().releaseBuf(buf); } - public int getWriteBufCount() { synchronized (_writeBufs) { return _writeBufs.size(); } } + public int getWriteBufCount() { return _writeBufs.size(); } + + /** @return null if none available */ public ByteBuffer getNextWriteBuf() { - synchronized (_writeBufs) { - if (_writeBufs.size() > 0) - return (ByteBuffer)_writeBufs.get(0); // not remove! we removeWriteBuf afterwards - } - return null; + return _writeBufs.peek(); // not remove! we removeWriteBuf afterwards } public void removeWriteBuf(ByteBuffer buf) { @@ -900,16 +881,15 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { OutNetMessage msg = null; boolean bufsRemain = false; boolean clearMessage = false; - synchronized (_writeBufs) { - if (_sendingMeta && (buf.capacity() == _meta.length)) { - _sendingMeta = false; - } else { - clearMessage = true; - } - _writeBufs.remove(buf); - bufsRemain = _writeBufs.size() > 0; + if (_sendingMeta && (buf.capacity() == _meta.length)) { + _sendingMeta = false; + } else { + clearMessage = true; } + _writeBufs.remove(buf); + bufsRemain = !_writeBufs.isEmpty(); if (clearMessage) { + // see synchronization comments in prepareNextWriteFast() synchronized (_outbound) { if (_currentOutbound != null) msg = _currentOutbound; @@ -935,10 +915,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _log.info("I2NP meta message sent completely"); } - boolean msgs = false; - synchronized (_outbound) { - msgs = ((_outbound.size() > 0) || (_currentOutbound != null)); - } + boolean msgs = ((!_outbound.isEmpty()) || (_currentOutbound != null)); if (msgs) // push through the bw limiter to reach _writeBufs _transport.getWriter().wantsWrite(this, "write completed"); if (bufsRemain) // send asap @@ -1113,22 +1090,17 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } private static final int MAX_HANDLERS = 4; - private final static List _i2npHandlers = new ArrayList(MAX_HANDLERS); + private final static LinkedBlockingQueue<I2NPMessageHandler> _i2npHandlers = new LinkedBlockingQueue(MAX_HANDLERS); + private final static I2NPMessageHandler acquireHandler(RouterContext ctx) { - I2NPMessageHandler rv = null; - synchronized (_i2npHandlers) { - if (_i2npHandlers.size() > 0) - rv = (I2NPMessageHandler)_i2npHandlers.remove(0); - } + I2NPMessageHandler rv = _i2npHandlers.poll(); if (rv == null) rv = new I2NPMessageHandler(ctx); return rv; } + private static void releaseHandler(I2NPMessageHandler handler) { - synchronized (_i2npHandlers) { - if (_i2npHandlers.size() < MAX_HANDLERS) - _i2npHandlers.add(handler); - } + _i2npHandlers.offer(handler); } @@ -1144,21 +1116,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } private static final int MAX_DATA_READ_BUFS = 16; - private final static List _dataReadBufs = new ArrayList(MAX_DATA_READ_BUFS); + private final static LinkedBlockingQueue<DataBuf> _dataReadBufs = new LinkedBlockingQueue(MAX_DATA_READ_BUFS); + private static DataBuf acquireReadBuf() { - synchronized (_dataReadBufs) { - if (_dataReadBufs.size() > 0) - return (DataBuf)_dataReadBufs.remove(0); - } + DataBuf rv = _dataReadBufs.poll(); + if (rv != null) + return rv; return new DataBuf(); } + private static void releaseReadBuf(DataBuf buf) { buf.bais.reset(); - synchronized (_dataReadBufs) { - if (_dataReadBufs.size() < MAX_DATA_READ_BUFS) - _dataReadBufs.add(buf); - } + _dataReadBufs.offer(buf); } + /** * sizeof(data)+data+pad+crc. * diff --git a/router/java/src/net/i2p/router/transport/ntcp/Reader.java b/router/java/src/net/i2p/router/transport/ntcp/Reader.java index 2c017b5b6d..c0f77c1a85 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Reader.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Reader.java @@ -17,10 +17,10 @@ import net.i2p.util.Log; class Reader { private RouterContext _context; private Log _log; - private final List _pendingConnections; - private List _liveReads; - private List _readAfterLive; - private List _runners; + private final List<NTCPConnection> _pendingConnections; + private List<NTCPConnection> _liveReads; + private List<NTCPConnection> _readAfterLive; + private List<Runner> _runners; public Reader(RouterContext ctx) { _context = ctx; @@ -41,7 +41,7 @@ class Reader { } public void stopReading() { while (_runners.size() > 0) { - Runner r = (Runner)_runners.remove(0); + Runner r = _runners.remove(0); r.stop(); } synchronized (_pendingConnections) { @@ -93,7 +93,7 @@ class Reader { if (_pendingConnections.size() <= 0) { _pendingConnections.wait(); } else { - con = (NTCPConnection)_pendingConnections.remove(0); + con = _pendingConnections.remove(0); _liveReads.add(con); } } @@ -155,7 +155,8 @@ class Reader { con.close(); return; } else if (buf.remaining() <= 0) { - con.removeReadBuf(buf); + // not necessary, getNextReadBuf() removes + //con.removeReadBuf(buf); } if (est.isComplete() && est.getExtraBytes() != null) con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes())); @@ -165,7 +166,8 @@ class Reader { if (_log.shouldLog(Log.DEBUG)) _log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)"); con.recvEncryptedI2NP(buf); - con.removeReadBuf(buf); + // not necessary, getNextReadBuf() removes + //con.removeReadBuf(buf); } } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/Writer.java b/router/java/src/net/i2p/router/transport/ntcp/Writer.java index 97823313bc..45efc30553 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Writer.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Writer.java @@ -16,10 +16,10 @@ import net.i2p.util.Log; class Writer { private RouterContext _context; private Log _log; - private final List _pendingConnections; - private List _liveWrites; - private List _writeAfterLive; - private List _runners; + private final List<NTCPConnection> _pendingConnections; + private List<NTCPConnection> _liveWrites; + private List<NTCPConnection> _writeAfterLive; + private List<Runner> _runners; public Writer(RouterContext ctx) { _context = ctx; @@ -40,7 +40,7 @@ class Writer { } public void stopWriting() { while (_runners.size() > 0) { - Runner r = (Runner)_runners.remove(0); + Runner r = _runners.remove(0); r.stop(); } synchronized (_pendingConnections) { @@ -100,7 +100,7 @@ class Writer { _log.debug("Done writing, but nothing pending, so wait"); _pendingConnections.wait(); } else { - con = (NTCPConnection)_pendingConnections.remove(0); + con = _pendingConnections.remove(0); _liveWrites.add(con); if (_log.shouldLog(Log.DEBUG)) _log.debug("Switch to writing on: " + con); -- GitLab