From 49bba109acfd3d59086bb20f009515f09a3835fb Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 23 Jul 2011 23:16:28 +0000 Subject: [PATCH] * UDP: - Complete rewrite of OutboundMessageFragments for concurrent and for efficiency to avoid O(n**2) behavior - Queue a new send immediately after a packet is acked - Cleanups, log tweaks, javadocs, final --- .../i2p/router/transport/udp/ACKSender.java | 11 +- .../transport/udp/EstablishmentManager.java | 13 + .../udp/InboundMessageFragments.java | 48 ++- .../udp/OutboundMessageFragments.java | 299 +++++++++--------- .../transport/udp/OutboundMessageState.java | 2 +- .../router/transport/udp/PacketPusher.java | 8 +- .../i2p/router/transport/udp/PeerState.java | 118 ++++--- .../i2p/router/transport/udp/UDPSender.java | 6 +- .../router/transport/udp/UDPTransport.java | 2 +- 9 files changed, 274 insertions(+), 233 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 33fd401df..7136f8a9d 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -14,13 +14,14 @@ import net.i2p.util.Log; /** * Blocking thread that is given peers by the inboundFragment pool, sending out * any outstanding ACKs. - * + * The ACKs are sent directly to UDPSender, + * bypassing OutboundMessageFragments and PacketPusher. */ class ACKSender implements Runnable { - private RouterContext _context; - private Log _log; - private UDPTransport _transport; - private PacketBuilder _builder; + private final RouterContext _context; + private final Log _log; + private final UDPTransport _transport; + private final PacketBuilder _builder; /** list of peers (PeerState) who we have received data from but not yet ACKed to */ private final BlockingQueue _peersToACK; private boolean _alive; diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 24e0efe87..55b0c6074 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -72,6 +72,19 @@ class EstablishmentManager { _context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", UDPTransport.RATES); + // following are for PeerState + _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES); } public void startup() { diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index f685c9e78..113a9e258 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -18,14 +18,14 @@ import net.i2p.util.Log; * */ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; /** list of message IDs recently received, so we can ignore in flight dups */ private DecayingBloomFilter _recentlyCompletedMessages; - private OutboundMessageFragments _outbound; - private UDPTransport _transport; - private ACKSender _ackSender; - private MessageReceiver _messageReceiver; + private final OutboundMessageFragments _outbound; + private final UDPTransport _transport; + private final ACKSender _ackSender; + private final MessageReceiver _messageReceiver; private boolean _alive; /** decay the recently completed every 20 seconds */ @@ -148,8 +148,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ from.messageFullyReceived(messageId, state.getCompleteSize()); _ackSender.ackPeer(from); - if (_log.shouldLog(Log.INFO)) - _log.info("Message received completely! " + state); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Message received completely! " + state); _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); if (state.getFragmentCount() > 0) @@ -174,10 +174,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ return fragments; } + /** + * @return the number of bitfields in the ack? why? + */ private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) { int rv = 0; + boolean newAck = false; if (data.readACKsIncluded()) { - int fragments = 0; int ackCount = data.readACKCount(); if (ackCount > 0) { rv += ackCount; @@ -186,9 +189,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ for (int i = 0; i < ackCount; i++) { long id = data.readACK(i); - if (_log.shouldLog(Log.INFO)) - _log.info("Full ACK of message " + id + " received!"); - fragments += _outbound.acked(id, from.getRemotePeer()); + if (from.acked(id)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("First full ACK of message " + id + " received from " + from.getRemotePeer()); + newAck = true; + //} else if (_log.shouldLog(Log.DEBUG)) { + // _log.debug("Dup full ACK of message " + id + " received from " + from.getRemotePeer()); + } } } else { _log.error("Received ACKs with no acks?! " + data); @@ -201,9 +208,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ //_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receivePartialACKCount", bitfields.length, 0); for (int i = 0; i < bitfields.length; i++) { - if (_log.shouldLog(Log.INFO)) - _log.info("Partial ACK received: " + bitfields[i]); - _outbound.acked(bitfields[i], from.getRemotePeer()); + if (from.acked(bitfields[i])) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Final partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer()); + newAck = true; + } else if (_log.shouldLog(Log.DEBUG)) { + _log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer()); + } } } } @@ -211,6 +222,13 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ from.ECNReceived(); else from.dataReceived(); + + // Wake up the packet pusher if it is sleeping. + // By calling add(), this also is a failsafe against possible + // races in OutboundMessageFragments. + if (newAck && from.getOutboundMessageCount() > 0) + _outbound.add(from); + return rv; } } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 3af1adc2c..3fb3a63f0 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -1,13 +1,16 @@ package net.i2p.router.transport.udp; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Set; import net.i2p.data.Hash; import net.i2p.data.RouterInfo; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; /** @@ -23,16 +26,33 @@ import net.i2p.util.Log; * */ class OutboundMessageFragments { - private RouterContext _context; - private Log _log; - private UDPTransport _transport; + private final RouterContext _context; + private final Log _log; + private final UDPTransport _transport; // private ActiveThrottle _throttle; // LINT not used ?? - /** peers we are actively sending messages to */ - private final List _activePeers; + + /** + * Peers we are actively sending messages to. + * We use the iterator so we treat it like a list, + * but we use a HashSet so remove() is fast and + * we don't need to do contains(). + * Even though most (but NOT all) accesses are synchronized, + * we use a ConcurrentHashSet as the iterator is long-lived. + */ + private final Set _activePeers; + + /** + * The long-lived iterator over _activePeers. + */ + private Iterator _iterator; + + /** + * Avoid sync in add() if possible (not 100% reliable) + */ + private boolean _isWaiting; + private boolean _alive; - /** which peer should we build the next packet out of? */ - private int _nextPeer; - private PacketBuilder _builder; + private final PacketBuilder _builder; private long _lastCycleTime = System.currentTimeMillis(); /** if we can handle more messages explicitly, set this to true */ @@ -42,13 +62,14 @@ class OutboundMessageFragments { // private static final int MAX_ACTIVE = 64; // not used. // don't send a packet more than 10 times static final int MAX_VOLLEYS = 10; + private static final int MAX_WAIT = 1000; public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) { _context = ctx; _log = ctx.logManager().getLog(OutboundMessageFragments.class); _transport = transport; // _throttle = throttle; - _activePeers = new ArrayList(256); + _activePeers = new ConcurrentHashSet(256); _builder = new PacketBuilder(ctx, transport); _alive = true; // _allowExcess = false; @@ -59,6 +80,7 @@ class OutboundMessageFragments { _context.statManager().createRateStat("udp.sendFailed", "How many sends a failed message was pushed", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES); @@ -72,20 +94,20 @@ class OutboundMessageFragments { } public void startup() { _alive = true; } + public void shutdown() { _alive = false; + _activePeers.clear(); synchronized (_activePeers) { _activePeers.notifyAll(); } } + void dropPeer(PeerState peer) { if (_log.shouldLog(Log.INFO)) _log.info("Dropping peer " + peer.getRemotePeer().toBase64()); peer.dropOutbound(); - synchronized (_activePeers) { - _activePeers.remove(peer); - _activePeers.notifyAll(); - } + _activePeers.remove(peer); } /** @@ -145,24 +167,12 @@ class OutboundMessageFragments { return; } int active = peer.add(state); - synchronized (_activePeers) { - if (!_activePeers.contains(peer)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); - _activePeers.add(peer); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); - } - _activePeers.notifyAll(); - } - //msg.timestamp("made active along with: " + active); + add(peer); _context.statManager().addRateData("udp.outboundActiveCount", active, 0); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Error initializing " + msg); } - //finishMessages(); } /** @@ -174,94 +184,115 @@ class OutboundMessageFragments { if (peer == null) throw new RuntimeException("wtf, null peer for " + state); int active = peer.add(state); - synchronized (_activePeers) { - if (!_activePeers.contains(peer)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); - if (_activePeers.isEmpty()) - _lastCycleTime = System.currentTimeMillis(); - _activePeers.add(peer); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); - } - _activePeers.notifyAll(); - } + add(peer); _context.statManager().addRateData("udp.outboundActiveCount", active, 0); - // should we finish messages here too? - /* - synchronized (_activeMessages) { - _activeMessages.add(state); - if (_activeMessages.size() == 1) + } + + /** + * Add the peer to the list of peers wanting to transmit something. + * This wakes up the packet pusher if it is sleeping. + * + * Avoid synchronization where possible. + * There are small chances of races. + * There are larger chances of adding the PeerState "behind" where + * the iterator is now... but these issues are the same as before concurrentification. + * + * @since 0.8.9 + */ + public void add(PeerState peer) { + boolean wasEmpty = _activePeers.isEmpty(); + boolean added = _activePeers.add(peer); + if (added) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); + if (wasEmpty) _lastCycleTime = System.currentTimeMillis(); - _activeMessages.notifyAll(); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); + } + _context.statManager().addRateData("udp.outboundActivePeers", _activePeers.size(), 0); + + // Avoid sync if possible + // no, this doesn't always work. + // Also note that the iterator in getNextVolley may have alreay passed us, + // or not reflect the addition. + if (_isWaiting || wasEmpty) { + synchronized (_activePeers) { + _activePeers.notifyAll(); + } } - */ } /** * Remove any expired or complete messages */ +/**** private void finishMessages() { - int rv = 0; - List peers = null; - synchronized (_activePeers) { - peers = new ArrayList(_activePeers.size()); - for (int i = 0; i < _activePeers.size(); i++) { - PeerState state = _activePeers.get(i); - if (state.getOutboundMessageCount() <= 0) { - _activePeers.remove(i); - i--; - } else { - peers.add(state); - } - } - _activePeers.notifyAll(); - } - for (int i = 0; i < peers.size(); i++) { - PeerState state = (PeerState)peers.get(i); - int remaining = state.finishMessages(); - if (remaining <= 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("No more pending messages for " + state.getRemotePeer().toBase64()); - } - rv += remaining; - } - } - + for (Iterator iter = _activePeers.iterator(); iter.hasNext(); ) { + PeerState state = iter.next(); + if (state.getOutboundMessageCount() <= 0) { + iter.remove(); + } else { + int remaining = state.finishMessages(); + if (remaining <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("No more pending messages for " + state.getRemotePeer().toBase64()); + iter.remove(); + } + } + } + } +****/ + /** * Fetch all the packets for a message volley, blocking until there is a * message which can be fully transmitted (or the transport is shut down). * The returned array may be sparse, with null packets taking the place of * already ACKed fragments. * + * NOT thread-safe. Called by the PacketPusher thread only. + * + * @return null only on shutdown */ public UDPPacket[] getNextVolley() { PeerState peer = null; OutboundMessageState state = null; + // Keep track of how many we've looked at, since we don't start the iterator at the beginning. + int peersProcessed = 0; while (_alive && (state == null) ) { - long now = _context.clock().now(); int nextSendDelay = -1; - finishMessages(); - try { - synchronized (_activePeers) { - for (int i = 0; i < _activePeers.size(); i++) { - int cur = (i + _nextPeer) % _activePeers.size(); - if (cur == 0) { - // FIXME or delete, these stats aren't much help since they include the sleep time - long ts = System.currentTimeMillis(); - long cycleTime = ts - _lastCycleTime; - _lastCycleTime = ts; - _context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size()); - // make longer than the default sleep time below - if (cycleTime > 1100) - _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size()); + // no, not every time - O(n**2) - do just before waiting below + //finishMessages(); + + // do we need a new long-lived iterator? + if (_iterator == null || + ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) { + _iterator = _activePeers.iterator(); + } + + // Go through all the peers that we are actively sending messages to. + // Call finishMessages() for each one, and remove them from the iterator + // if there is nothing left to send. + // Otherwise, return the volley to be sent. + // Otherwise, wait() + while (_iterator.hasNext()) { + peer = _iterator.next(); + int remaining = peer.finishMessages(); + if (remaining <= 0) { + // race with add() + _iterator.remove(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("No more pending messages for " + peer.getRemotePeer().toBase64()); + continue; } - peer = _activePeers.get(i); + peersProcessed++; state = peer.allocateSend(); if (state != null) { // we have something to send and we will be returning it - _nextPeer = i + 1; + break; + } else if (peersProcessed >= _activePeers.size()) { + // we've gone all the way around, time to sleep break; } else { // Update the minimum delay for all peers (getNextDelay() returns 1 for "now") @@ -270,53 +301,70 @@ class OutboundMessageFragments { if ( (nextSendDelay <= 0) || (delay < nextSendDelay) ) nextSendDelay = delay; peer = null; - state = null; } } - if (_log.shouldLog(Log.DEBUG)) + + if (peer != null && _log.shouldLog(Log.DEBUG)) _log.debug("Done looping, next peer we are sending for: " + - (peer != null ? peer.getRemotePeer().toBase64() : "none")); - if (state == null) { + peer.getRemotePeer().toBase64()); + + // if we've gone all the way through the loop, wait + if (state == null && peersProcessed >= _activePeers.size()) { + peersProcessed = 0; + // why? we do this in the loop one at a time + //finishMessages(); if (_log.shouldLog(Log.DEBUG)) _log.debug("wait for " + nextSendDelay); // wait.. or somethin' - // wait a min of 10 and a max of 3000 ms no matter what peer.getNextDelay() says - if (nextSendDelay > 0) - _activePeers.wait(Math.min(Math.max(nextSendDelay, 10), 3000)); - else - _activePeers.wait(1000); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("dont wait: alive=" + _alive + " state = " + state); + // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says + _isWaiting = true; + synchronized (_activePeers) { + try { + // use max of 1 second so finishMessages() and/or PeerState.finishMessages() + // gets called regularly + if (nextSendDelay > 0) + _activePeers.wait(Math.min(Math.max(nextSendDelay, 10), MAX_WAIT)); + else + _activePeers.wait(MAX_WAIT); + } catch (InterruptedException ie) { + // noop + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Woken up while waiting"); + } + } + _isWaiting = false; + //} else { + // if (_log.shouldLog(Log.DEBUG)) + // _log.debug("dont wait: alive=" + _alive + " state = " + state); } - } - } catch (InterruptedException ie) { - // noop - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Woken up while waiting"); - } - } + + } // while alive && state == null if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending " + state); UDPPacket packets[] = preparePackets(state, peer); + + /**** if ( (state != null) && (state.getMessage() != null) ) { int valid = 0; for (int i = 0; packets != null && i < packets.length ; i++) if (packets[i] != null) valid++; - /* state.getMessage().timestamp("sending a volley of " + valid + " lastReceived: " + (_context.clock().now() - peer.getLastReceiveTime()) + " lastSentFully: " + (_context.clock().now() - peer.getLastSendFullyTime())); - */ } + ****/ + return packets; } + /** + * @return null if state or peer is null + */ private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) { if ( (state != null) && (peer != null) ) { int fragments = state.getFragmentCount(); @@ -397,37 +445,6 @@ class OutboundMessageFragments { } } - /** - * We received an ACK of the given messageId from the given peer, so if it - * is still unacked, mark it as complete. - * - * @return fragments acked - */ - public int acked(long messageId, Hash ackedBy) { - PeerState peer = _transport.getPeerState(ackedBy); - if (peer != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("acked [" + messageId + "] by " + ackedBy.toBase64()); - return peer.acked(messageId); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("acked [" + messageId + "] by an unknown remote peer? " + ackedBy.toBase64()); - return 0; - } - } - - public void acked(ACKBitfield bitfield, Hash ackedBy) { - PeerState peer = _transport.getPeerState(ackedBy); - if (peer != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("partial acked [" + bitfield + "] by " + ackedBy.toBase64()); - peer.acked(bitfield); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("partial acked [" + bitfield + "] by an unknown remote peer? " + ackedBy.toBase64()); - } - } - public interface ActiveThrottle { public void choke(Hash peer); public void unchoke(Hash peer); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index d0e03ec83..3a81ffd9c 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -12,7 +12,7 @@ import net.i2p.util.ByteCache; import net.i2p.util.Log; /** - * Maintain the outbound fragmentation for resending + * Maintain the outbound fragmentation for resending, for a single message. * */ class OutboundMessageState { diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index 50b8d3ba7..7b4d7f3e2 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -11,10 +11,10 @@ import net.i2p.util.Log; */ class PacketPusher implements Runnable { // private RouterContext _context; - private Log _log; - private OutboundMessageFragments _fragments; - private UDPSender _sender; - private boolean _alive; + private final Log _log; + private final OutboundMessageFragments _fragments; + private final UDPSender _sender; + private volatile boolean _alive; public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) { // _context = ctx; diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 155003293..a81f9bcbb 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -24,8 +24,8 @@ import net.i2p.util.ConcurrentHashSet; * */ class PeerState { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; /** * The peer are we talking to. This should be set as soon as this * state is created if we are initiating a connection, but if we are @@ -192,7 +192,7 @@ class PeerState { /** which outbound message is currently being retransmitted */ private OutboundMessageState _retransmitter; - private UDPTransport _transport; + private final UDPTransport _transport; /** have we migrated away from this peer to another newer one? */ private volatile boolean _dead; @@ -268,18 +268,7 @@ class PeerState { _rttDeviation = _rtt; _inboundMessages = new HashMap(8); _outboundMessages = new ArrayList(32); - _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES); + // all createRateStat() moved to EstablishmentManager } private int getDefaultMTU() { @@ -1061,7 +1050,6 @@ class PeerState { if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); List msgs = _outboundMessages; - if (msgs == null) return 0; int rv = 0; boolean fail = false; synchronized (msgs) { @@ -1070,11 +1058,14 @@ class PeerState { // 32 queued messages? to *one* peer? nuh uh. fail = true; rv--; + + /******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless + } else if (_retransmitter != null) { long lifetime = _retransmitter.getLifetime(); long totalLifetime = lifetime; for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter - OutboundMessageState cur = (OutboundMessageState)msgs.get(i); + OutboundMessageState cur = msgs.get(i); totalLifetime += cur.getLifetime(); } long remaining = -1; @@ -1103,6 +1094,9 @@ class PeerState { _context.statManager().addRateData("udp.queueAllowTotalLifetime", totalLifetime, lifetime); msgs.add(state); } + + *******/ + } else { msgs.add(state); } @@ -1111,6 +1105,7 @@ class PeerState { _transport.failed(state, false); return rv; } + /** drop all outbound messages */ public void dropOutbound() { //if (_dead) return; @@ -1118,7 +1113,7 @@ class PeerState { List msgs = _outboundMessages; //_outboundMessages = null; _retransmitter = null; - if (msgs != null) { + int sz = 0; List tempList = null; synchronized (msgs) { @@ -1130,21 +1125,17 @@ class PeerState { } for (int i = 0; i < sz; i++) _transport.failed(tempList.get(i), false); - } + // so the ACKSender will drop this peer from its queue _wantACKSendSince = -1; } + /** + * @return number of active outbound messages remaining (unsynchronized) + */ public int getOutboundMessageCount() { - List msgs = _outboundMessages; if (_dead) return 0; - if (msgs != null) { - synchronized (msgs) { - return msgs.size(); - } - } else { - return 0; - } + return _outboundMessages.size(); } /** @@ -1152,39 +1143,37 @@ class PeerState { * @return number of active outbound messages remaining */ public int finishMessages() { - int rv = 0; List msgs = _outboundMessages; + // short circuit, unsynchronized + if (msgs.isEmpty()) + return 0; + if (_dead) { dropOutbound(); return 0; } + + int rv = 0; List succeeded = null; List failed = null; synchronized (msgs) { - int size = msgs.size(); - for (int i = 0; i < size; i++) { - OutboundMessageState state = msgs.get(i); + for (Iterator iter = msgs.iterator(); iter.hasNext(); ) { + OutboundMessageState state = iter.next(); if (state.isComplete()) { - msgs.remove(i); - i--; - size--; + iter.remove(); if (_retransmitter == state) _retransmitter = null; if (succeeded == null) succeeded = new ArrayList(4); succeeded.add(state); } else if (state.isExpired()) { - msgs.remove(i); - i--; - size--; + iter.remove(); if (_retransmitter == state) _retransmitter = null; _context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime()); if (failed == null) failed = new ArrayList(4); failed.add(state); } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { - msgs.remove(i); - i--; - size--; + iter.remove(); if (state == _retransmitter) _retransmitter = null; _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); @@ -1232,9 +1221,7 @@ class PeerState { List msgs = _outboundMessages; if (_dead) return null; synchronized (msgs) { - int size = msgs.size(); - for (int i = 0; i < size; i++) { - OutboundMessageState state = msgs.get(i); + for (OutboundMessageState state : msgs) { if (locked_shouldSend(state)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId()); @@ -1276,9 +1263,7 @@ class PeerState { else return rv; } - int size = msgs.size(); - for (int i = 0; i < size; i++) { - OutboundMessageState state = msgs.get(i); + for (OutboundMessageState state : msgs) { int delay = (int)(state.getNextSendTime() - now); if (delay <= 0) delay = 1; @@ -1393,16 +1378,20 @@ class PeerState { return false; } - public int acked(long messageId) { + /** + * A full ACK was received. + * + * @return true if the message was acked for the first time + */ + public boolean acked(long messageId) { + if (_dead) return false; OutboundMessageState state = null; List msgs = _outboundMessages; - if (_dead) return 0; synchronized (msgs) { - int sz = msgs.size(); - for (int i = 0; i < sz; i++) { - state = msgs.get(i); + for (Iterator iter = msgs.iterator(); iter.hasNext(); ) { + state = iter.next(); if (state.getMessageId() == messageId) { - msgs.remove(i); + iter.remove(); break; } else { state = null; @@ -1438,22 +1427,25 @@ class PeerState { // _throttle.unchoke(peer.getRemotePeer()); state.releaseResources(); - return numFragments; } else { // dupack, likely if (_log.shouldLog(Log.DEBUG)) _log.debug("Received an ACK for a message not pending: " + messageId); - return 0; } + return state != null; } - public void acked(ACKBitfield bitfield) { + /** + * A partial ACK was received. This is much less common than full ACKs. + * + * @return true if the message was completely acked for the first time + */ + public boolean acked(ACKBitfield bitfield) { if (_dead) - return; + return false; if (bitfield.receivedComplete()) { - acked(bitfield.getMessageId()); - return; + return acked(bitfield.getMessageId()); } List msgs = _outboundMessages; @@ -1461,13 +1453,13 @@ class PeerState { OutboundMessageState state = null; boolean isComplete = false; synchronized (msgs) { - for (int i = 0; i < msgs.size(); i++) { - state = msgs.get(i); + for (Iterator iter = msgs.iterator(); iter.hasNext(); ) { + state = iter.next(); if (state.getMessageId() == bitfield.getMessageId()) { boolean complete = state.acked(bitfield); if (complete) { isComplete = true; - msgs.remove(i); + iter.remove(); if (state == _retransmitter) _retransmitter = null; } @@ -1514,12 +1506,12 @@ class PeerState { //if (state.getMessage() != null) // state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString()); } - return; + return isComplete; } else { // dupack if (_log.shouldLog(Log.DEBUG)) _log.debug("Received an ACK for a message not pending: " + bitfield); - return; + return false; } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 4c7b97bca..bd6bb204c 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -16,13 +16,13 @@ import net.i2p.util.Log; * */ class UDPSender { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; private DatagramSocket _socket; private String _name; private final BlockingQueue _outboundQueue; private boolean _keepRunning; - private Runner _runner; + private final Runner _runner; private static final int TYPE_POISON = 99999; //private static final int MAX_QUEUED = 4; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 5c8d60510..fbb1e6806 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -345,9 +345,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _refiller.shutdown(); if (_handler != null) _handler.shutdown(); - _fragments.shutdown(); if (_pusher != null) _pusher.shutdown(); + _fragments.shutdown(); if (_establisher != null) _establisher.shutdown(); _inboundFragments.shutdown();