From 702da0a929e2ec3a10d375234f5bbe1b04d898bf Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sun, 24 Jul 2011 12:16:35 +0000 Subject: [PATCH] * UDP: - Don't delay in OutboundMessageFragments at the end of the loop if we have more to send now, this should speed things up - More cleanups --- .../udp/InboundMessageFragments.java | 2 +- .../transport/udp/InboundMessageState.java | 22 ++++++++-------- .../udp/OutboundMessageFragments.java | 25 +++++++++---------- .../transport/udp/OutboundMessageState.java | 2 ++ .../i2p/router/transport/udp/PeerState.java | 16 +++++------- 5 files changed, 33 insertions(+), 34 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 113a9e2585..9d68f898b5 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -158,7 +158,7 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ state.releaseResources(); if (_log.shouldLog(Log.WARN)) _log.warn("Message expired while only being partially read: " + state); - _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString()); + _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString()); } else if (partialACK) { // not expired but not yet complete... lets queue up a partial ACK if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java index 9261dc70a3..0c5fe9359a 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java @@ -12,20 +12,20 @@ import net.i2p.util.Log; * */ class InboundMessageState { - private RouterContext _context; - private Log _log; - private long _messageId; - private Hash _from; + private final RouterContext _context; + private final Log _log; + private final long _messageId; + private final Hash _from; /** * indexed array of fragments for the message, where not yet * received fragments are null. */ - private ByteArray _fragments[]; + private final ByteArray _fragments[]; /** * what is the last fragment in the message (or -1 if not yet known) */ private int _lastFragment; - private long _receiveBegin; + private final long _receiveBegin; private int _completeSize; private boolean _released; @@ -153,10 +153,12 @@ class InboundMessageState { } public void releaseResources() { - if (_fragments != null) - for (int i = 0; i < _fragments.length; i++) + for (int i = 0; i < _fragments.length; i++) { + if (_fragments[i] != null) { _fragmentCache.release(_fragments[i]); - //_fragments = null; + _fragments[i] = null; + } + } _released = true; } @@ -178,7 +180,7 @@ class InboundMessageState { buf.append(" completely received with "); buf.append(getCompleteSize()).append(" bytes"); } else { - for (int i = 0; (_fragments != null) && (i < _fragments.length); i++) { + for (int i = 0; i < _lastFragment; i++) { buf.append(" fragment ").append(i); if (_fragments[i] != null) buf.append(": known at size ").append(_fragments[i].getValid()); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 3fb3a63f0b..26ce9c44af 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -261,7 +261,7 @@ class OutboundMessageFragments { // Keep track of how many we've looked at, since we don't start the iterator at the beginning. int peersProcessed = 0; while (_alive && (state == null) ) { - int nextSendDelay = -1; + int nextSendDelay = Integer.MAX_VALUE; // no, not every time - O(n**2) - do just before waiting below //finishMessages(); @@ -295,10 +295,10 @@ class OutboundMessageFragments { // we've gone all the way around, time to sleep break; } else { - // Update the minimum delay for all peers (getNextDelay() returns 1 for "now") + // Update the minimum delay for all peers // which will be used if we found nothing to send across all peers int delay = peer.getNextDelay(); - if ( (nextSendDelay <= 0) || (delay < nextSendDelay) ) + if (delay < nextSendDelay) nextSendDelay = delay; peer = null; } @@ -309,23 +309,22 @@ class OutboundMessageFragments { peer.getRemotePeer().toBase64()); // if we've gone all the way through the loop, wait - if (state == null && peersProcessed >= _activePeers.size()) { + // ... unless nextSendDelay says we have more ready now + if (state == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) { + _isWaiting = true; peersProcessed = 0; // why? we do this in the loop one at a time //finishMessages(); + // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says + // use max of 1 second so finishMessages() and/or PeerState.finishMessages() + // gets called regularly + int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT); if (_log.shouldLog(Log.DEBUG)) - _log.debug("wait for " + nextSendDelay); + _log.debug("wait for " + toWait); // wait.. or somethin' - // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says - _isWaiting = true; synchronized (_activePeers) { try { - // use max of 1 second so finishMessages() and/or PeerState.finishMessages() - // gets called regularly - if (nextSendDelay > 0) - _activePeers.wait(Math.min(Math.max(nextSendDelay, 10), MAX_WAIT)); - else - _activePeers.wait(MAX_WAIT); + _activePeers.wait(toWait); } catch (InterruptedException ie) { // noop if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 3a81ffd9ce..362ce4f4ff 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -206,6 +206,7 @@ class OutboundMessageState { sends[i] = (short)-1; boolean rv = isComplete(); + /**** if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY); //_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO); @@ -218,6 +219,7 @@ class OutboundMessageState { // _nextSendTime = now + 100; //_nextSendTime = now; } + ****/ return rv; } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index a81f9bcbb6..438162e6d1 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -1248,26 +1248,22 @@ class PeerState { } /** - * return how long to wait before sending, or -1 if we have nothing to send + * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send. + * If ready now, will return 0 or a negative value. */ public int getNextDelay() { - int rv = -1; + int rv = Integer.MAX_VALUE; + if (_dead) return rv; long now = _context.clock().now(); List<OutboundMessageState> msgs = _outboundMessages; - if (_dead) return -1; synchronized (msgs) { if (_retransmitter != null) { rv = (int)(_retransmitter.getNextSendTime() - now); - if (rv <= 0) - return 1; - else - return rv; + return rv; } for (OutboundMessageState state : msgs) { int delay = (int)(state.getNextSendTime() - now); - if (delay <= 0) - delay = 1; - if ( (rv <= 0) || (delay < rv) ) + if (delay < rv) rv = delay; } } -- GitLab