Split up NTCPConnection's single _bwRequests Set into inbound and outbound,

in prep for making FIFOBandwithLimiter.Request unidirectional
and support priorities (ticket #719)
This commit is contained in:
zzz
2012-10-09 13:36:14 +00:00
parent 0eedc3aa19
commit 3cdfc2d33a

View File

@@ -56,11 +56,13 @@ import net.i2p.util.Log;
*</pre>
*
*/
class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
class NTCPConnection {
private final RouterContext _context;
private final Log _log;
private SocketChannel _chan;
private SelectionKey _conKey;
private final FIFOBandwidthLimiter.CompleteListener _inboundListener;
private final FIFOBandwidthLimiter.CompleteListener _outboundListener;
/**
* queue of ByteBuffer containing data we have read and are ready to process, oldest first
* unbounded and lockless
@@ -73,7 +75,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*/
private final Queue<ByteBuffer> _writeBufs;
/** Requests that were not granted immediately */
private final Set<FIFOBandwidthLimiter.Request> _bwRequests;
private final Set<FIFOBandwidthLimiter.Request> _bwInRequests;
private final Set<FIFOBandwidthLimiter.Request> _bwOutRequests;
private boolean _established;
private long _establishedOn;
private EstablishState _establishState;
@@ -154,7 +157,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_chan = chan;
_readBufs = new ConcurrentLinkedQueue();
_writeBufs = new ConcurrentLinkedQueue();
_bwRequests = new ConcurrentHashSet(2);
_bwInRequests = new ConcurrentHashSet(2);
_bwOutRequests = new ConcurrentHashSet(8);
_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_isInbound = true;
_decryptBlockBuf = new byte[BLOCK_SIZE];
@@ -162,6 +166,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_establishState = new EstablishState(ctx, transport, this);
_conKey = key;
_conKey.attach(this);
_inboundListener = new InboundListener();
_outboundListener = new OutboundListener();
initialize();
}
@@ -178,11 +184,14 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_remAddr = remAddr;
_readBufs = new ConcurrentLinkedQueue();
_writeBufs = new ConcurrentLinkedQueue();
_bwRequests = new ConcurrentHashSet(8);
_bwInRequests = new ConcurrentHashSet(2);
_bwOutRequests = new ConcurrentHashSet(8);
_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_isInbound = false;
_decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState();
_inboundListener = new InboundListener();
_outboundListener = new OutboundListener();
initialize();
}
@@ -283,12 +292,16 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_transport.getReader().connectionClosed(this);
_transport.getWriter().connectionClosed(this);
for (Iterator<FIFOBandwidthLimiter.Request> iter = _bwRequests.iterator(); iter.hasNext(); ) {
iter.next().abort();
for (FIFOBandwidthLimiter.Request req :_bwInRequests) {
req.abort();
// we would like to return read ByteBuffers via EventPumper.releaseBuf(),
// but we can't risk releasing it twice
}
_bwRequests.clear();
_bwInRequests.clear();
for (FIFOBandwidthLimiter.Request req :_bwOutRequests) {
req.abort();
}
_bwOutRequests.clear();
_writeBufs.clear();
ByteBuffer bb;
@@ -848,12 +861,12 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/**
* The FifoBandwidthLimiter.CompleteListener callback.
* Does the delayed read or write.
* Does the delayed read.
*/
public void complete(FIFOBandwidthLimiter.Request req) {
removeRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment();
if (req.getTotalInboundRequested() > 0) {
private class InboundListener implements FIFOBandwidthLimiter.CompleteListener {
public void complete(FIFOBandwidthLimiter.Request req) {
removeIBRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment();
if (_closed) {
EventPumper.releaseBuf(buf);
return;
@@ -863,20 +876,40 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// our reads used to be bw throttled (during which time we were no
// longer interested in reading from the network), but we aren't
// throttled anymore, so we should resume being interested in reading
_transport.getPumper().wantsRead(this);
_transport.getPumper().wantsRead(NTCPConnection.this);
//_transport.getReader().wantsRead(this);
} else if (req.getTotalOutboundRequested() > 0 && !_closed) {
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()));
write(buf);
}
}
private void removeRequest(FIFOBandwidthLimiter.Request req) {
_bwRequests.remove(req);
/**
* The FifoBandwidthLimiter.CompleteListener callback.
* Does the delayed write.
*/
private class OutboundListener implements FIFOBandwidthLimiter.CompleteListener {
public void complete(FIFOBandwidthLimiter.Request req) {
removeOBRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment();
if (!_closed) {
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()));
write(buf);
}
}
}
private void addRequest(FIFOBandwidthLimiter.Request req) {
_bwRequests.add(req);
private void removeIBRequest(FIFOBandwidthLimiter.Request req) {
_bwInRequests.remove(req);
}
private void addIBRequest(FIFOBandwidthLimiter.Request req) {
_bwInRequests.add(req);
}
private void removeOBRequest(FIFOBandwidthLimiter.Request req) {
_bwOutRequests.remove(req);
}
private void addOBRequest(FIFOBandwidthLimiter.Request req) {
_bwOutRequests.add(req);
}
/**
@@ -887,15 +920,15 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*/
public void queuedRecv(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
req.attach(buf);
req.setCompleteListener(this);
addRequest(req);
req.setCompleteListener(_inboundListener);
addIBRequest(req);
}
/** ditto for writes */
public void queuedWrite(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
req.attach(buf);
req.setCompleteListener(this);
addRequest(req);
req.setCompleteListener(_outboundListener);
addOBRequest(req);
}
/**