diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 33f4eb15d..c302f7baf 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -56,11 +56,13 @@ import net.i2p.util.Log; * * */ -class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { +class NTCPConnection { private final RouterContext _context; private final Log _log; private SocketChannel _chan; private SelectionKey _conKey; + private final FIFOBandwidthLimiter.CompleteListener _inboundListener; + private final FIFOBandwidthLimiter.CompleteListener _outboundListener; /** * queue of ByteBuffer containing data we have read and are ready to process, oldest first * unbounded and lockless @@ -73,7 +75,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { */ private final Queue _writeBufs; /** Requests that were not granted immediately */ - private final Set _bwRequests; + private final Set _bwInRequests; + private final Set _bwOutRequests; private boolean _established; private long _establishedOn; private EstablishState _establishState; @@ -154,7 +157,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _chan = chan; _readBufs = new ConcurrentLinkedQueue(); _writeBufs = new ConcurrentLinkedQueue(); - _bwRequests = new ConcurrentHashSet(2); + _bwInRequests = new ConcurrentHashSet(2); + _bwOutRequests = new ConcurrentHashSet(8); _outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); _isInbound = true; _decryptBlockBuf = new byte[BLOCK_SIZE]; @@ -162,6 +166,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _establishState = new EstablishState(ctx, transport, this); _conKey = key; _conKey.attach(this); + _inboundListener = new InboundListener(); + _outboundListener = new OutboundListener(); initialize(); } @@ -178,11 +184,14 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _remAddr = remAddr; _readBufs = new ConcurrentLinkedQueue(); _writeBufs = new ConcurrentLinkedQueue(); - _bwRequests = new ConcurrentHashSet(8); + _bwInRequests = new ConcurrentHashSet(2); + _bwOutRequests = new ConcurrentHashSet(8); _outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); _isInbound = false; _decryptBlockBuf = new byte[BLOCK_SIZE]; _curReadState = new ReadState(); + _inboundListener = new InboundListener(); + _outboundListener = new OutboundListener(); initialize(); } @@ -283,12 +292,16 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _transport.getReader().connectionClosed(this); _transport.getWriter().connectionClosed(this); - for (Iterator iter = _bwRequests.iterator(); iter.hasNext(); ) { - iter.next().abort(); + for (FIFOBandwidthLimiter.Request req :_bwInRequests) { + req.abort(); // we would like to return read ByteBuffers via EventPumper.releaseBuf(), // but we can't risk releasing it twice } - _bwRequests.clear(); + _bwInRequests.clear(); + for (FIFOBandwidthLimiter.Request req :_bwOutRequests) { + req.abort(); + } + _bwOutRequests.clear(); _writeBufs.clear(); ByteBuffer bb; @@ -848,12 +861,12 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { /** * The FifoBandwidthLimiter.CompleteListener callback. - * Does the delayed read or write. + * Does the delayed read. */ - public void complete(FIFOBandwidthLimiter.Request req) { - removeRequest(req); - ByteBuffer buf = (ByteBuffer)req.attachment(); - if (req.getTotalInboundRequested() > 0) { + private class InboundListener implements FIFOBandwidthLimiter.CompleteListener { + public void complete(FIFOBandwidthLimiter.Request req) { + removeIBRequest(req); + ByteBuffer buf = (ByteBuffer)req.attachment(); if (_closed) { EventPumper.releaseBuf(buf); return; @@ -863,20 +876,40 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { // our reads used to be bw throttled (during which time we were no // longer interested in reading from the network), but we aren't // throttled anymore, so we should resume being interested in reading - _transport.getPumper().wantsRead(this); + _transport.getPumper().wantsRead(NTCPConnection.this); //_transport.getReader().wantsRead(this); - } else if (req.getTotalOutboundRequested() > 0 && !_closed) { - _context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime())); - write(buf); } } - private void removeRequest(FIFOBandwidthLimiter.Request req) { - _bwRequests.remove(req); + /** + * The FifoBandwidthLimiter.CompleteListener callback. + * Does the delayed write. + */ + private class OutboundListener implements FIFOBandwidthLimiter.CompleteListener { + public void complete(FIFOBandwidthLimiter.Request req) { + removeOBRequest(req); + ByteBuffer buf = (ByteBuffer)req.attachment(); + if (!_closed) { + _context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime())); + write(buf); + } + } } - private void addRequest(FIFOBandwidthLimiter.Request req) { - _bwRequests.add(req); + private void removeIBRequest(FIFOBandwidthLimiter.Request req) { + _bwInRequests.remove(req); + } + + private void addIBRequest(FIFOBandwidthLimiter.Request req) { + _bwInRequests.add(req); + } + + private void removeOBRequest(FIFOBandwidthLimiter.Request req) { + _bwOutRequests.remove(req); + } + + private void addOBRequest(FIFOBandwidthLimiter.Request req) { + _bwOutRequests.add(req); } /** @@ -887,15 +920,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { */ public void queuedRecv(ByteBuffer buf, FIFOBandwidthLimiter.Request req) { req.attach(buf); - req.setCompleteListener(this); - addRequest(req); + req.setCompleteListener(_inboundListener); + addIBRequest(req); } /** ditto for writes */ public void queuedWrite(ByteBuffer buf, FIFOBandwidthLimiter.Request req) { req.attach(buf); - req.setCompleteListener(this); - addRequest(req); + req.setCompleteListener(_outboundListener); + addOBRequest(req); } /**