diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index 15ccd18fee5007d4088985cea34280dbeea22aaf..4a8b76f3a7772d3cae098aa9d62e92afe7220f7a 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -59,6 +59,7 @@ public class FIFOBandwidthLimiter { private final AtomicLong _totalAllocatedInboundBytes = new AtomicLong(); /** lifetime counter of bytes sent */ private final AtomicLong _totalAllocatedOutboundBytes = new AtomicLong(); + private static final AtomicLong __requestId = new AtomicLong(); /** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */ //private final AtomicLong _totalWastedInboundBytes = new AtomicLong(); @@ -202,7 +203,7 @@ public class FIFOBandwidthLimiter { } public Request requestInbound(int bytesIn, String purpose, CompleteListener lsnr, Object attachment) { - SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose, lsnr, attachment); + SimpleRequest req = new SimpleRequest(bytesIn, purpose, lsnr, attachment); requestInbound(req, bytesIn, purpose); return req; } @@ -237,7 +238,7 @@ public class FIFOBandwidthLimiter { } public Request requestOutbound(int bytesOut, String purpose, CompleteListener lsnr, Object attachment) { - SimpleRequest req = new SimpleRequest(0, bytesOut, purpose, lsnr, attachment); + SimpleRequest req = new SimpleRequest(bytesOut, purpose, lsnr, attachment); requestOutbound(req, bytesOut, purpose); return req; } @@ -489,14 +490,13 @@ public class FIFOBandwidthLimiter { private final void locked_satisfyInboundUnlimited(List<Request> satisfied) { while (!_pendingInboundRequests.isEmpty()) { SimpleRequest req = (SimpleRequest)_pendingInboundRequests.remove(0); - int allocated = req.getPendingInboundRequested(); + int allocated = req.getPendingRequested(); _totalAllocatedInboundBytes.addAndGet(allocated); - req.allocateBytes(allocated, 0); + req.allocateBytes(allocated); satisfied.add(req); long waited = now() - req.getRequestTime(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Granting inbound request " + req.getRequestName() + " fully for " - + req.getTotalInboundRequested() + " bytes (waited " + _log.debug("Granting inbound request " + req + " fully (waited " + waited + "ms) pending " + _pendingInboundRequests.size()); if (waited > 10) @@ -520,8 +520,8 @@ public class FIFOBandwidthLimiter { // connection decided they dont want the data anymore if (_log.shouldLog(Log.DEBUG)) _log.debug("Aborting inbound request to " - + req.getRequestName() + " (total " - + req.getTotalInboundRequested() + " bytes, waited " + + req + + " waited " + waited + "ms) pending " + _pendingInboundRequests.size()); _pendingInboundRequests.remove(i); @@ -537,7 +537,7 @@ public class FIFOBandwidthLimiter { continue; } // ok, they are really waiting for us to give them stuff - int requested = req.getPendingInboundRequested(); + int requested = req.getPendingRequested(); int avi = _availableInbound.get(); int allocated; if (avi >= requested) @@ -546,21 +546,21 @@ public class FIFOBandwidthLimiter { allocated = avi; _availableInbound.addAndGet(0 - allocated); _totalAllocatedInboundBytes.addAndGet(allocated); - req.allocateBytes(allocated, 0); + req.allocateBytes(allocated); satisfied.add(req); - if (req.getPendingInboundRequested() > 0) { + if (req.getPendingRequested() > 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocating " + allocated + " bytes inbound as a partial grant to " - + req.getRequestName() + " (wanted " - + req.getTotalInboundRequested() + " bytes, waited " + + req + + " waited " + waited + "ms) pending " + _pendingInboundRequests.size() + ", longest waited " + locked_getLongestInboundWait() + " in"); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocating " + allocated + " bytes inbound to finish the partial grant to " - + req.getRequestName() + " (total " - + req.getTotalInboundRequested() + " bytes, waited " + + req + + " waited " + waited + "ms) pending " + _pendingInboundRequests.size() + ", longest waited " + locked_getLongestInboundWait() + " out"); @@ -607,14 +607,13 @@ public class FIFOBandwidthLimiter { private final void locked_satisfyOutboundUnlimited(List<Request> satisfied) { while (!_pendingOutboundRequests.isEmpty()) { SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.remove(0); - int allocated = req.getPendingOutboundRequested(); + int allocated = req.getPendingRequested(); _totalAllocatedOutboundBytes.addAndGet(allocated); - req.allocateBytes(0, allocated); + req.allocateBytes(allocated); satisfied.add(req); long waited = now() - req.getRequestTime(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Granting outbound request " + req.getRequestName() + " fully for " - + req.getTotalOutboundRequested() + " bytes (waited " + _log.debug("Granting outbound request " + req + " fully (waited " + waited + "ms) pending " + _pendingOutboundRequests.size() + ", longest waited " + locked_getLongestOutboundWait() + " out"); @@ -639,8 +638,8 @@ public class FIFOBandwidthLimiter { // connection decided they dont want the data anymore if (_log.shouldLog(Log.DEBUG)) _log.debug("Aborting outbound request to " - + req.getRequestName() + " (total " - + req.getTotalOutboundRequested() + " bytes, waited " + + req + + " waited " + waited + "ms) pending " + _pendingOutboundRequests.size()); _pendingOutboundRequests.remove(i); @@ -652,11 +651,11 @@ public class FIFOBandwidthLimiter { // they haven't taken advantage of it yet (most likely they're // IO bound) if (_log.shouldLog(Log.WARN)) - _log.warn("multiple allocations since wait... ntcp shouldn't do this: " + req.getRequestName()); + _log.warn("multiple allocations since wait... ntcp shouldn't do this: " + req); continue; } // ok, they are really waiting for us to give them stuff - int requested = req.getPendingOutboundRequested(); + int requested = req.getPendingRequested(); int avo = _availableOutbound.get(); int allocated; if (avo >= requested) @@ -665,22 +664,22 @@ public class FIFOBandwidthLimiter { allocated = avo; _availableOutbound.addAndGet(0 - allocated); _totalAllocatedOutboundBytes.addAndGet(allocated); - req.allocateBytes(0, allocated); + req.allocateBytes(allocated); satisfied.add(req); - if (req.getPendingOutboundRequested() > 0) { + if (req.getPendingRequested() > 0) { if (req.attachment() != null) { if (_log.shouldLog(Log.INFO)) _log.info("Allocating " + allocated + " bytes outbound as a partial grant to " - + req.getRequestName() + " (wanted " - + req.getTotalOutboundRequested() + " bytes, waited " + + req + + " waited " + waited + "ms) pending " + _pendingOutboundRequests.size() + ", longest waited " + locked_getLongestOutboundWait() + " out"); } if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocating " + allocated + " bytes outbound as a partial grant to " - + req.getRequestName() + " (wanted " - + req.getTotalOutboundRequested() + " bytes, waited " + + req + + " waited " + waited + "ms) pending " + _pendingOutboundRequests.size() + ", longest waited " + locked_getLongestOutboundWait() + " out"); @@ -688,16 +687,16 @@ public class FIFOBandwidthLimiter { if (req.attachment() != null) { if (_log.shouldLog(Log.INFO)) _log.info("Allocating " + allocated + " bytes outbound to finish the partial grant to " - + req.getRequestName() + " (total " - + req.getTotalOutboundRequested() + " bytes, waited " + + req + + " waited " + waited + "ms) pending " + _pendingOutboundRequests.size() + ", longest waited " + locked_getLongestOutboundWait() + " out)"); } if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocating " + allocated + " bytes outbound to finish the partial grant to " - + req.getRequestName() + " (total " - + req.getTotalOutboundRequested() + " bytes, waited " + + req + + " waited " + waited + "ms) pending " + _pendingOutboundRequests.size() + ", longest waited " + locked_getLongestOutboundWait() + " out)"); @@ -788,12 +787,9 @@ public class FIFOBandwidthLimiter { ******/ } - private static long __requestId = 0; - private final class SimpleRequest implements Request { - private int _inAllocated; - private int _inTotal; - private int _outAllocated; - private int _outTotal; + private static class SimpleRequest implements Request { + private int _allocated; + private int _total; private long _requestId; private long _requestTime; private String _target; @@ -806,119 +802,121 @@ public class FIFOBandwidthLimiter { public SimpleRequest() { satisfiedBuffer = new ArrayList(1); - init(0, 0, null); + init(0, null); } - public SimpleRequest(int in, int out, String target, CompleteListener lsnr, Object attachment) { + + /** + * @param target for debugging, to be removed + */ + public SimpleRequest(int bytes, String target, CompleteListener lsnr, Object attachment) { satisfiedBuffer = new ArrayList(1); _lsnr = lsnr; _attachment = attachment; - init(in, out, target); + init(bytes, target); } - public void init(int in, int out, String target) { + + /** + * @param target for debugging, to be removed + */ + public void init(int bytes, String target) { _waited = false; - _inTotal = in; - _outTotal = out; - _inAllocated = 0; - _outAllocated = 0; + _total = bytes; + _allocated = 0; _aborted = false; _target = target; satisfiedBuffer.clear(); - _requestId = ++__requestId; - _requestTime = now(); + _requestId = __requestId.incrementAndGet(); + _requestTime = System.currentTimeMillis(); } - public String getRequestName() { return "Req" + _requestId + " to " + _target; } + public long getRequestTime() { return _requestTime; } - public int getTotalOutboundRequested() { return _outTotal; } - public int getPendingOutboundRequested() { return _outTotal - _outAllocated; } - public int getTotalInboundRequested() { return _inTotal; } - public int getPendingInboundRequested() { return _inTotal - _inAllocated; } + public int getTotalRequested() { return _total; } + public int getPendingRequested() { return _total - _allocated; } public boolean getAborted() { return _aborted; } public void abort() { _aborted = true; } public CompleteListener getCompleteListener() { return _lsnr; } + public void setCompleteListener(CompleteListener lsnr) { boolean complete = false; - synchronized (SimpleRequest.this) { + synchronized (this) { _lsnr = lsnr; if (isComplete()) { complete = true; } } if (complete && lsnr != null) { - if (_log.shouldLog(Log.INFO)) - _log.info("complete listener set AND completed: " + lsnr); - lsnr.complete(SimpleRequest.this); + //if (_log.shouldLog(Log.INFO)) + // _log.info("complete listener set AND completed: " + lsnr); + lsnr.complete(this); } } - private boolean isComplete() { return (_outAllocated >= _outTotal) && (_inAllocated >= _inTotal); } + private boolean isComplete() { return _allocated >= _total; } public void waitForNextAllocation() { _waited = true; _allocationsSinceWait = 0; boolean complete = false; try { - synchronized (SimpleRequest.this) { + synchronized (this) { if (isComplete()) complete = true; else - SimpleRequest.this.wait(); + wait(); } } catch (InterruptedException ie) {} if (complete && _lsnr != null) - _lsnr.complete(SimpleRequest.this); + _lsnr.complete(this); } + int getAllocationsSinceWait() { return _waited ? _allocationsSinceWait : 0; } - void allocateBytes(int in, int out) { - _inAllocated += in; - _outAllocated += out; + + void allocateBytes(int bytes) { + _allocated += bytes; if (_lsnr == null) _allocationsSinceWait++; - if (isComplete()) { - if (_log.shouldLog(Log.INFO)) - _log.info("allocate " + in +"/"+ out + " completed, listener=" + _lsnr); - } + //if (isComplete()) { + // if (_log.shouldLog(Log.INFO)) + // _log.info("allocate " + bytes + " completed, listener=" + _lsnr); + //} //notifyAllocation(); // handled within the satisfy* methods } + void notifyAllocation() { boolean complete = false; - synchronized (SimpleRequest.this) { + synchronized (this) { if (isComplete()) complete = true; - SimpleRequest.this.notifyAll(); + notifyAll(); } if (complete && _lsnr != null) { - _lsnr.complete(SimpleRequest.this); - if (_log.shouldLog(Log.INFO)) - _log.info("at completion for " + _inTotal + "/" + _outTotal - + ", recvBps=" + _recvBps + "/"+ _recvBps15s + " listener is " + _lsnr); + _lsnr.complete(this); + //if (_log.shouldLog(Log.INFO)) + // _log.info("at completion for " + _total + // + ", recvBps=" + _recvBps + "/"+ _recvBps15s + " listener is " + _lsnr); } } + public void attach(Object obj) { _attachment = obj; } public Object attachment() { return _attachment; } + @Override - public String toString() { return getRequestName(); } + public String toString() { + return "Req" + _requestId + " to " + _target + + _allocated + '/' + _total; + } } /** - * This is somewhat complicated by having both - * inbound and outbound in a single request. - * Making a request unidirectional would - * be a good simplification. - * But NTCP would have to be changed as it puts them on one queue. + * A bandwidth request, either inbound or outbound. */ public interface Request { - /** describe this particular request */ - public String getRequestName(); /** when was the request made? */ public long getRequestTime(); - /** how many outbound bytes were requested? */ - public int getTotalOutboundRequested(); - /** how many outbound bytes were requested and haven't yet been allocated? */ - public int getPendingOutboundRequested(); - /** how many inbound bytes were requested? */ - public int getTotalInboundRequested(); - /** how many inbound bytes were requested and haven't yet been allocated? */ - public int getPendingInboundRequested(); + /** how many bytes were requested? */ + public int getTotalRequested(); + /** how many bytes were requested and haven't yet been allocated? */ + public int getPendingRequested(); /** block until we are allocated some more bytes */ public void waitForNextAllocation(); /** we no longer want the data requested (the connection closed) */ @@ -926,7 +924,7 @@ public class FIFOBandwidthLimiter { /** was this request aborted? */ public boolean getAborted(); /** thar be dragons */ - public void init(int in, int out, String target); + public void init(int bytes, String target); public void setCompleteListener(CompleteListener lsnr); /** Only supported if the request is not satisfied */ public void attach(Object obj); @@ -943,14 +941,13 @@ public class FIFOBandwidthLimiter { private static class NoopRequest implements Request { public void abort() {} public boolean getAborted() { return false; } - public int getPendingInboundRequested() { return 0; } - public int getPendingOutboundRequested() { return 0; } - public String getRequestName() { return "noop"; } + public int getPendingRequested() { return 0; } + @Override + public String toString() { return "noop"; } public long getRequestTime() { return 0; } - public int getTotalInboundRequested() { return 0; } - public int getTotalOutboundRequested() { return 0; } + public int getTotalRequested() { return 0; } public void waitForNextAllocation() {} - public void init(int in, int out, String target) {} + public void init(int bytes, String target) {} public CompleteListener getCompleteListener() { return null; } public void setCompleteListener(CompleteListener lsnr) { lsnr.complete(NoopRequest.this); diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index bbf9e4f1f1b0fe92661c9286940a0ab0220545e1..84e8bc76467a827432d9359f431ab06c34cb92bb 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -376,7 +376,7 @@ class EventPumper implements Runnable { public void wantsWrite(NTCPConnection con, byte data[]) { ByteBuffer buf = ByteBuffer.wrap(data); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write");//con, buf); - if (req.getPendingOutboundRequested() > 0) { + if (req.getPendingRequested() > 0) { if (_log.shouldLog(Log.INFO)) _log.info("queued write on " + con + " for " + data.length); _context.statManager().addRateData("ntcp.wantsQueuedWrite", 1); @@ -584,7 +584,7 @@ class EventPumper implements Runnable { // ZERO COPY. The buffer will be returned in Reader.processRead() buf.flip(); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf); - if (req.getPendingInboundRequested() > 0) { + if (req.getPendingRequested() > 0) { // rare since we generally don't throttle inbound key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); //if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 9f6f625a78e5597e63527c16889e6631c51723ca..c693510c86658bbd9d130708ec232245989ff9fb 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -293,7 +293,7 @@ class UDPReceiver { //FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); //_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver"); req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); - while (req.getPendingInboundRequested() > 0) + while (req.getPendingRequested() > 0) req.waitForNextAllocation(); int queued = receive(packet); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index e169d3a8cff75d29ca67f385304b4d2a51e2c7a8..1f195d5a4e2c60561037e1e69dcd9be8466517c5 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -212,7 +212,7 @@ class UDPSender { if (size > 0) { //_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender"); req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender"); - while (req.getPendingOutboundRequested() > 0) + while (req.getPendingRequested() > 0) req.waitForNextAllocation(); }