From c4fa8fabb2cb7d99a87a7e2b4d7d0a0edcce7e9a Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sun, 28 Oct 2012 12:10:24 +0000 Subject: [PATCH] - Continue work to use priorities in FIFOBandwidthLimiter - Log tweaks --- .../transport/FIFOBandwidthLimiter.java | 64 ++++++------------- .../router/transport/ntcp/EventPumper.java | 2 +- .../transport/udp/EstablishmentManager.java | 4 +- .../transport/udp/OutboundEstablishState.java | 4 +- .../i2p/router/transport/udp/UDPReceiver.java | 4 +- .../i2p/router/transport/udp/UDPSender.java | 4 +- .../router/transport/udp/UDPTransport.java | 1 + 7 files changed, 30 insertions(+), 53 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index 4a8b76f3a7..2e82c55de1 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; +import net.i2p.router.util.PQEntry; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -59,7 +60,6 @@ public class FIFOBandwidthLimiter { private final AtomicLong _totalAllocatedInboundBytes = new AtomicLong(); /** lifetime counter of bytes sent */ private final AtomicLong _totalAllocatedOutboundBytes = new AtomicLong(); - private static final AtomicLong __requestId = new AtomicLong(); /** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */ //private final AtomicLong _totalWastedInboundBytes = new AtomicLong(); @@ -190,8 +190,6 @@ public class FIFOBandwidthLimiter { return _refiller.getCurrentParticipatingBandwidth(); } - public Request createRequest() { return new SimpleRequest(); } - /** * Request some bytes. Does not block. */ @@ -199,11 +197,7 @@ public class FIFOBandwidthLimiter { // try to satisfy without grabbing the global lock if (shortcutSatisfyInboundRequest(bytesIn)) return _noop; - return requestInbound(bytesIn, purpose, null, null); - } - - public Request requestInbound(int bytesIn, String purpose, CompleteListener lsnr, Object attachment) { - SimpleRequest req = new SimpleRequest(bytesIn, purpose, lsnr, attachment); + SimpleRequest req = new SimpleRequest(bytesIn, 0); requestInbound(req, bytesIn, purpose); return req; } @@ -230,15 +224,11 @@ public class FIFOBandwidthLimiter { /** * Request some bytes. Does not block. */ - public Request requestOutbound(int bytesOut, String purpose) { + public Request requestOutbound(int bytesOut, int priority, String purpose) { // try to satisfy without grabbing the global lock if (shortcutSatisfyOutboundRequest(bytesOut)) return _noop; - return requestOutbound(bytesOut, purpose, null, null); - } - - public Request requestOutbound(int bytesOut, String purpose, CompleteListener lsnr, Object attachment) { - SimpleRequest req = new SimpleRequest(bytesOut, purpose, lsnr, attachment); + SimpleRequest req = new SimpleRequest(bytesOut, priority); requestOutbound(req, bytesOut, purpose); return req; } @@ -789,44 +779,24 @@ public class FIFOBandwidthLimiter { private static class SimpleRequest implements Request { private int _allocated; - private int _total; + private final int _total; private long _requestId; private long _requestTime; - private String _target; private int _allocationsSinceWait; private boolean _aborted; private boolean _waited; final List<Request> satisfiedBuffer; private CompleteListener _lsnr; private Object _attachment; + private final int _priority; - public SimpleRequest() { - satisfiedBuffer = new ArrayList(1); - init(0, null); - } - /** * @param target for debugging, to be removed */ - public SimpleRequest(int bytes, String target, CompleteListener lsnr, Object attachment) { + public SimpleRequest(int bytes, int priority) { satisfiedBuffer = new ArrayList(1); - _lsnr = lsnr; - _attachment = attachment; - init(bytes, target); - } - - /** - * @param target for debugging, to be removed - */ - public void init(int bytes, String target) { - _waited = false; _total = bytes; - _allocated = 0; - _aborted = false; - _target = target; - satisfiedBuffer.clear(); - _requestId = __requestId.incrementAndGet(); - _requestTime = System.currentTimeMillis(); + _priority = priority; } public long getRequestTime() { return _requestTime; } @@ -900,17 +870,22 @@ public class FIFOBandwidthLimiter { public void attach(Object obj) { _attachment = obj; } public Object attachment() { return _attachment; } + // PQEntry methods + public int getPriority() { return _priority; }; + public void setSeqNum(long num) { _requestId = num; }; + public long getSeqNum() { return _requestId; }; + @Override public String toString() { - return "Req" + _requestId + " to " + _target + - _allocated + '/' + _total; + return "Req" + _requestId + " priority " + _priority + + _allocated + '/' + _total + " bytes"; } } /** * A bandwidth request, either inbound or outbound. */ - public interface Request { + public interface Request extends PQEntry { /** when was the request made? */ public long getRequestTime(); /** how many bytes were requested? */ @@ -923,8 +898,6 @@ public class FIFOBandwidthLimiter { public void abort(); /** was this request aborted? */ public boolean getAborted(); - /** thar be dragons */ - public void init(int bytes, String target); public void setCompleteListener(CompleteListener lsnr); /** Only supported if the request is not satisfied */ public void attach(Object obj); @@ -947,7 +920,6 @@ public class FIFOBandwidthLimiter { public long getRequestTime() { return 0; } public int getTotalRequested() { return 0; } public void waitForNextAllocation() {} - public void init(int bytes, String target) {} public CompleteListener getCompleteListener() { return null; } public void setCompleteListener(CompleteListener lsnr) { lsnr.complete(NoopRequest.this); @@ -956,5 +928,9 @@ public class FIFOBandwidthLimiter { throw new UnsupportedOperationException("Don't attach to a satisfied request"); } public Object attachment() { return null; } + // PQEntry methods + public int getPriority() { return 0; }; + public void setSeqNum(long num) {}; + public long getSeqNum() { return 0; }; } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 84e8bc7646..522c1ae872 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -375,7 +375,7 @@ class EventPumper implements Runnable { */ public void wantsWrite(NTCPConnection con, byte data[]) { ByteBuffer buf = ByteBuffer.wrap(data); - FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write");//con, buf); + FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, 0, "NTCP write");//con, buf); if (req.getPendingRequested() > 0) { if (_log.shouldLog(Log.INFO)) _log.info("queued write on " + con + " for " + data.length); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 88e89131fa..6bcba2aa6c 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -824,8 +824,8 @@ class EstablishmentManager { try { state.generateSessionKey(); } catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Peer " + state + " sent us an invalid DH parameter (or were spoofed)", ippe); + if (_log.shouldLog(Log.WARN)) + _log.warn("Peer " + state + " sent us an invalid DH parameter", ippe); _inboundStates.remove(state.getRemoteHostId()); return; } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index 797060c583..9b0a55221c 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -265,8 +265,8 @@ class OutboundEstablishState { try { generateSessionKey(); } catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Peer " + getRemoteHostId() + " sent us an invalid DH parameter (or were spoofed)", ippe); + if (_log.shouldLog(Log.WARN)) + _log.warn("Peer " + getRemoteHostId() + " sent us an invalid DH parameter", ippe); valid = false; } if (valid) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index c693510c86..72637fef86 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -253,7 +253,6 @@ class UDPReceiver { public void run() { //_socketChanged = false; - FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest(); while (_keepRunning) { //if (_socketChanged) { // Thread.currentThread().setName(_name + "." + _id); @@ -292,7 +291,8 @@ class UDPReceiver { if (size > 0) { //FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); //_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver"); - req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); + FIFOBandwidthLimiter.Request req = + _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); while (req.getPendingRequested() > 0) req.waitForNextAllocation(); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 1f195d5a4e..885cc7299d 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -191,7 +191,6 @@ class UDPSender { private class Runner implements Runnable { //private volatile boolean _socketChanged; - FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest(); public void run() { if (_log.shouldLog(Log.DEBUG)) _log.debug("Running the UDP sender"); @@ -211,7 +210,8 @@ class UDPSender { // ?? int size2 = packet.getPacket().getLength(); if (size > 0) { //_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender"); - req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender"); + FIFOBandwidthLimiter.Request req = + _context.bandwidthLimiter().requestOutbound(size, 0, "UDP sender"); while (req.getPendingRequested() > 0) req.waitForNextAllocation(); } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 3874a9d4f8..bdeb1af710 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -2595,6 +2595,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // if old != unsolicited && now - lastUpdated > STATUS_GRACE_PERIOD) // // fall through... + case CommSystemFacade.STATUS_DISCONNECTED: case CommSystemFacade.STATUS_HOSED: _reachabilityStatus = status; _reachabilityStatusLastUpdated = now; -- GitLab