diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 43a413ca79247c349b9a613b4a9de9283709acd3..bd91c3b7e05a863ab8b163a006b35e3600186be4 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -20,6 +20,7 @@ import java.util.Set; import net.i2p.data.RouterInfo; import net.i2p.data.i2np.I2NPMessage; +import net.i2p.router.util.CDPQEntry; import net.i2p.util.Log; /** @@ -27,7 +28,7 @@ import net.i2p.util.Log; * delivery and jobs to be fired off if particular events occur. * */ -public class OutNetMessage { +public class OutNetMessage implements CDPQEntry { private final Log _log; private final RouterContext _context; private RouterInfo _target; @@ -49,6 +50,8 @@ public class OutNetMessage { private long _sendBegin; //private Exception _createdBy; private final long _created; + private long _enqueueTime; + private long _seqNum; /** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */ private HashMap<String, Long> _timestamps; /** @@ -283,6 +286,45 @@ public class OutNetMessage { /** time the transport tries to send the message (including any queueing) */ public long getSendTime() { return _context.clock().now() - _sendBegin; } + /** + * For CDQ + * @since 0.9.3 + */ + public void setEnqueueTime(long now) { + _enqueueTime = now; + } + + /** + * For CDQ + * @since 0.9.3 + */ + public long getEnqueueTime() { + return _enqueueTime; + } + + /** + * For CDQ + * @since 0.9.3 + */ + public void drop() { + } + + /** + * For CDPQ + * @since 0.9.3 + */ + public void setSeqNum(long num) { + _seqNum = num; + } + + /** + * For CDPQ + * @since 0.9.3 + */ + public long getSeqNum() { + return _seqNum; + } + /** * We've done what we need to do with the data from this message, though * we may keep the object around for a while to use its ID, jobs, etc. diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 2f26884b7fbae2a8cb6a0f6a13a2b09c9b779c7e..4ad84d2592bb6a2d170b263d5aae9d02ccdc44c7 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -4,7 +4,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -24,6 +26,7 @@ import net.i2p.router.OutNetMessage; import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; +import net.i2p.router.util.CoDelPriorityBlockingQueue; import net.i2p.util.ConcurrentHashSet; import net.i2p.util.HexDump; import net.i2p.util.Log; @@ -83,7 +86,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { /** * pending unprepared OutNetMessage instances */ - private final Queue<OutNetMessage> _outbound; + private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound; /** * current prepared OutNetMessage, or null - synchronize on _outbound to modify * FIXME why do we need this??? @@ -136,9 +139,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { public static final int BUFFER_SIZE = 16*1024; /** 2 bytes for length and 4 for CRC */ public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4); - - private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW; + private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW; + /** * Create an inbound connected (though not established) NTCP connection * @@ -152,8 +155,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _readBufs = new ConcurrentLinkedQueue(); _writeBufs = new ConcurrentLinkedQueue(); _bwRequests = new ConcurrentHashSet(2); - // TODO possible switch to CLQ but beware non-constant size() - see below - _outbound = new LinkedBlockingQueue(); + _outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); _isInbound = true; _decryptBlockBuf = new byte[BLOCK_SIZE]; _curReadState = new ReadState(); @@ -177,8 +179,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _readBufs = new ConcurrentLinkedQueue(); _writeBufs = new ConcurrentLinkedQueue(); _bwRequests = new ConcurrentHashSet(8); - // TODO possible switch to CLQ but beware non-constant size() - see below - _outbound = new LinkedBlockingQueue(); + _outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32); _isInbound = false; _decryptBlockBuf = new byte[BLOCK_SIZE]; _curReadState = new ReadState(); @@ -297,15 +298,16 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { EventPumper.releaseBuf(bb); } - OutNetMessage msg; - while ((msg = _outbound.poll()) != null) { + List<OutNetMessage> pending = new ArrayList(); + _outbound.drainAllTo(pending); + for (OutNetMessage msg : pending) { Object buf = msg.releasePreparationBuffer(); if (buf != null) releaseBuf((PrepBuffer)buf); _transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); } - msg = _currentOutbound; + OutNetMessage msg = _currentOutbound; if (msg != null) { Object buf = msg.releasePreparationBuffer(); if (buf != null) @@ -318,6 +320,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { * toss the message onto the connection's send queue */ public void send(OutNetMessage msg) { + /**** + always enqueue, let the queue do the dropping + if (tooBacklogged()) { boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu boolean successful = false; @@ -337,20 +342,20 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { return; } _consecutiveBacklog = 0; - int enqueued = 0; + ****/ //if (FAST_LARGE) bufferedPrepare(msg); - boolean noOutbound = false; _outbound.offer(msg); - enqueued = _outbound.size(); + //int enqueued = _outbound.size(); // although stat description says ahead of this one, not including this one... - _context.statManager().addRateData("ntcp.sendQueueSize", enqueued); - noOutbound = (_currentOutbound == null); - if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); + //_context.statManager().addRateData("ntcp.sendQueueSize", enqueued); + boolean noOutbound = (_currentOutbound == null); + //if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); if (_established && noOutbound) _transport.getWriter().wantsWrite(this, "enqueued"); } +/**** private long queueTime() { OutNetMessage msg = _currentOutbound; if (msg == null) { @@ -360,29 +365,31 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { } return msg.getSendTime(); // does not include any of the pre-send(...) preparation } +****/ public boolean tooBacklogged() { - long queueTime = queueTime(); - if (queueTime <= 0) return false; - boolean currentOutboundSet = _currentOutbound != null; + //long queueTime = queueTime(); + //if (queueTime <= 0) return false; // perhaps we could take into account the size of the queued messages too, our // current transmission rate, and how much time is left before the new message's expiration? // ok, maybe later... if (getUptime() < 10*1000) // allow some slack just after establishment return false; - if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime... + //if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime... + if (_outbound.isBacklogged()) { // bloody arbitrary. well, its half the average message lifetime... int size = _outbound.size(); if (_log.shouldLog(Log.WARN)) { int writeBufs = _writeBufs.size(); + boolean currentOutboundSet = _currentOutbound != null; try { - _log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size + _log.warn("Too backlogged: size is " + size + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE)) + ", currentOut set? " + currentOutboundSet + ", writeBufs: " + writeBufs + " on " + toString()); } catch (Exception e) {} // java.nio.channels.CancelledKeyException } - _context.statManager().addRateData("ntcp.sendBacklogTime", queueTime); + //_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime); return true; //} else if (size > 32) { // another arbitrary limit. // if (_log.shouldLog(Log.ERROR)) @@ -651,11 +658,14 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { _log.info("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued"); return; } +/**** //throw new RuntimeException("We should not be preparing a write while we still have one pending"); if (queueTime() > 3*1000) { // don't stall low-priority messages +****/ msg = _outbound.poll(); if (msg == null) return; +/**** } else { // FIXME // This is a linear search to implement a priority queue, O(n**2) @@ -681,6 +691,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { if ((!removed) && _log.shouldLog(Log.WARN)) _log.warn("Already removed??? " + msg.getMessage().getType()); } +****/ _currentOutbound = msg; } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index f2180df8ce5452cf9f0c97bbbb2bccc223a3b376..9ce77b336418e2b1a4c5e17567d6efb9fa7205ba 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -165,9 +165,9 @@ class OutboundMessageFragments { state.releaseResources(); return; } - int active = peer.add(state); + peer.add(state); add(peer); - _context.statManager().addRateData("udp.outboundActiveCount", active, 0); + //_context.statManager().addRateData("udp.outboundActiveCount", active, 0); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Error initializing " + msg); @@ -182,9 +182,9 @@ class OutboundMessageFragments { PeerState peer = state.getPeer(); if (peer == null) throw new RuntimeException("wtf, null peer for " + state); - int active = peer.add(state); + peer.add(state); add(peer); - _context.statManager().addRateData("udp.outboundActiveCount", active, 0); + //_context.statManager().addRateData("udp.outboundActiveCount", active, 0); } /** diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 4dc7c87faf3aefdc5c5e3926086a07c2204ab039..49a44873e44c1fcabf680ec0a17e2a753cc2e3bd 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -7,6 +7,7 @@ import net.i2p.data.Base64; import net.i2p.data.ByteArray; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.OutNetMessage; +import net.i2p.router.util.CDPQEntry; import net.i2p.util.ByteCache; import net.i2p.util.Log; @@ -14,7 +15,7 @@ import net.i2p.util.Log; * Maintain the outbound fragmentation for resending, for a single message. * */ -class OutboundMessageState { +class OutboundMessageState implements CDPQEntry { private final I2PAppContext _context; private final Log _log; /** may be null if we are part of the establishment */ @@ -36,6 +37,9 @@ class OutboundMessageState { /** for tracking use-after-free bugs */ private boolean _released; private Exception _releasedBy; + // we can't use the ones in _message since it is null for injections + private long _enqueueTime; + private long _seqNum; public static final int MAX_MSG_SIZE = 32 * 1024; /** is this enough for a high-bandwidth router? */ @@ -104,6 +108,7 @@ class OutboundMessageState { /** * Called from OutboundMessageFragments + * @param m null if msg is "injected" * @return success */ private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) { @@ -128,8 +133,8 @@ class OutboundMessageState { _expiration = _startedOn + EXPIRATION; //_expiration = msg.getExpiration(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len)); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len)); return true; } catch (IllegalStateException ise) { _cache.release(_messageBuf); @@ -368,6 +373,56 @@ class OutboundMessageState { } } + /** + * For CDQ + * @since 0.9.3 + */ + public void setEnqueueTime(long now) { + _enqueueTime = now; + } + + /** + * For CDQ + * @since 0.9.3 + */ + public long getEnqueueTime() { + return _enqueueTime; + } + + /** + * For CDQ + * @since 0.9.3 + */ + public void drop() { + _peer.getTransport().failed(this, false); + releaseResources(); + } + + /** + * For CDPQ + * @since 0.9.3 + */ + public void setSeqNum(long num) { + _seqNum = num; + } + + /** + * For CDPQ + * @since 0.9.3 + */ + public long getSeqNum() { + return _seqNum; + } + + /** + * For CDPQ + * @return OutNetMessage priority or 1000 for injected + * @since 0.9.3 + */ + public int getPriority() { + return _message != null ? _message.getPriority() : 1000; + } + @Override public String toString() { short sends[] = _fragmentSends; diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index e938599499857ad12a6e2845c346cfdfd938ae5e..46557950b9c5c021c064903fbff04b5b3c66dffb 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -16,6 +16,7 @@ import net.i2p.data.Hash; import net.i2p.data.SessionKey; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; +import net.i2p.router.util.CoDelPriorityBlockingQueue; import net.i2p.util.Log; import net.i2p.util.ConcurrentHashSet; @@ -188,8 +189,19 @@ class PeerState { /** list of InboundMessageState for active message */ private final Map<Long, InboundMessageState> _inboundMessages; - /** list of OutboundMessageState */ + + /** + * Mostly messages that have been transmitted and are awaiting acknowledgement, + * although there could be some that have not been sent yet. + */ private final List<OutboundMessageState> _outboundMessages; + + /** + * Priority queue of messages that have not yet been sent. + * They are taken from here and put in _outboundMessages. + */ + private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue; + /** which outbound message is currently being retransmitted */ private OutboundMessageState _retransmitter; @@ -298,6 +310,7 @@ class PeerState { _rttDeviation = _rtt; _inboundMessages = new HashMap(8); _outboundMessages = new ArrayList(32); + _outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32); // all createRateStat() moved to EstablishmentManager _remoteIP = remoteIP; _remotePeer = remotePeer; @@ -726,8 +739,8 @@ class PeerState { public List<Long> getCurrentFullACKs() { // no such element exception seen here List<Long> rv = new ArrayList(_currentACKs); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Returning " + _currentACKs.size() + " current acks"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Returning " + _currentACKs.size() + " current acks"); return rv; } @@ -748,8 +761,8 @@ class PeerState { public List<Long> getCurrentResendACKs() { List<Long> randomResends = new ArrayList(_currentACKsResend); Collections.shuffle(randomResends, _context.random()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Returning " + randomResends.size() + " resend acks"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Returning " + randomResends.size() + " resend acks"); return randomResends; } @@ -1194,24 +1207,26 @@ class PeerState { * TODO priority queue? (we don't implement priorities in SSU now) * TODO backlog / pushback / block instead of dropping? Can't really block here. * TODO SSU does not support isBacklogged() now - * @return total pending messages */ - public int add(OutboundMessageState state) { + public void add(OutboundMessageState state) { if (_dead) { _transport.failed(state, false); - return 0; + return; } state.setPeer(this); if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding to " + _remotePeer + ": " + state.getMessageId()); int rv = 0; - boolean fail = false; + // will never fail for CDPQ + boolean fail = !_outboundQueue.offer(state); +/**** synchronized (_outboundMessages) { rv = _outboundMessages.size() + 1; if (rv > MAX_SEND_MSGS_PENDING) { // too many queued messages to one peer? nuh uh. fail = true; rv--; +****/ /******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless @@ -1250,17 +1265,17 @@ class PeerState { } *******/ - +/**** } else { _outboundMessages.add(state); } } +****/ if (fail) { if (_log.shouldLog(Log.WARN)) _log.warn("Dropping msg, OB queue full for " + toString()); _transport.failed(state, false); } - return rv; } /** drop all outbound messages */ @@ -1268,19 +1283,17 @@ class PeerState { //if (_dead) return; _dead = true; //_outboundMessages = null; - _retransmitter = null; - int sz = 0; - List<OutboundMessageState> tempList = null; + List<OutboundMessageState> tempList; synchronized (_outboundMessages) { - sz = _outboundMessages.size(); - if (sz > 0) { + _retransmitter = null; tempList = new ArrayList(_outboundMessages); _outboundMessages.clear(); - } } - for (int i = 0; i < sz; i++) - _transport.failed(tempList.get(i), false); + _outboundQueue.drainAllTo(tempList); + for (OutboundMessageState oms : tempList) { + _transport.failed(oms, false); + } // so the ACKSender will drop this peer from its queue _wantACKSendSince = -1; @@ -1291,7 +1304,7 @@ class PeerState { */ public int getOutboundMessageCount() { if (_dead) return 0; - return _outboundMessages.size(); + return _outboundMessages.size() + _outboundQueue.size(); } /** @@ -1305,7 +1318,7 @@ class PeerState { public int finishMessages() { // short circuit, unsynchronized if (_outboundMessages.isEmpty()) - return 0; + return _outboundQueue.size(); if (_dead) { dropOutbound(); @@ -1367,7 +1380,7 @@ class PeerState { state.releaseResources(); } - return rv; + return rv + _outboundQueue.size(); } /** @@ -1387,7 +1400,7 @@ class PeerState { ShouldSend should = locked_shouldSend(state); if (should == ShouldSend.YES) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId()); + _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId()); /* while (iter.hasNext()) { OutboundMessageState later = (OutboundMessageState)iter.next(); @@ -1402,16 +1415,37 @@ class PeerState { // we don't bother looking for a smaller msg that would fit. // By not looking further, we keep strict sending order, and that allows // some efficiency in acked() below. - break; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() + + " / " + _outboundQueue.size() + " remaining"); + return null; } /* else { OutNetMessage msg = state.getMessage(); if (msg != null) msg.timestamp("passed over for allocation with " + msgs.size() + " peers"); } */ } + // Peek at head of _outboundQueue and see if we can send it. + // If so, pull it off, put it in _outbundMessages, test + // again for bandwidth if necessary, and return it. + OutboundMessageState state = _outboundQueue.peek(); + if (state != null && ShouldSend.YES == locked_shouldSend(state)) { + // we could get a different state, or null, when we poll, + // due to AQM drops, so we test again if necessary + OutboundMessageState dequeuedState = _outboundQueue.poll(); + if (dequeuedState != null) { + _outboundMessages.add(dequeuedState); + if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(dequeuedState)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId()); + return dequeuedState; + } + } + } } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + " remaining"); + _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + + " / " + _outboundQueue.size() + " remaining"); return null; } @@ -1441,9 +1475,19 @@ class PeerState { rv = delay; } } + // failsafe... is this OK? + if (rv > 100 && !_outboundQueue.isEmpty()) + rv = 100; return rv; } + /** + * @since 0.9.3 + */ + public boolean isBacklogged() { + return _dead || _outboundQueue.isBacklogged(); + } + /** * If set to true, we should throttle retransmissions of all but the first message in * flight to a peer. If set to false, we will only throttle the initial flight of a @@ -1521,8 +1565,8 @@ class PeerState { int size = state.getUnackedSize(); if (allocateSendingBytes(size, state.getPushCount())) { - if (_log.shouldLog(Log.INFO)) - _log.info("Allocation of " + size + " allowed with " + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Allocation of " + size + " allowed with " + getSendWindowBytesRemaining() + "/" + getSendWindowBytes() + " remaining" @@ -1566,7 +1610,7 @@ class PeerState { /** * A full ACK was received. - * TODO if messages awaiting ack were a HashSet this would be faster. + * TODO if messages awaiting ack were a HashMap<Long, OutboundMessageState> this would be faster. * * @return true if the message was acked for the first time */ @@ -1620,8 +1664,8 @@ class PeerState { state.releaseResources(); } else { // dupack, likely - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received an ACK for a message not pending: " + messageId); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Received an ACK for a message not pending: " + messageId); } return state != null; } @@ -1767,6 +1811,14 @@ class PeerState { } } + /** + * Convenience for OutboundMessageState so it can fail itself + * @since 0.9.3 + */ + public UDPTransport getTransport() { + return _transport; + } + // why removed? Some risk of dups in OutboundMessageFragments._activePeers ??? /* diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 7512d26adbc059abb6423ec97025e6e3196c8d7a..f738c210404ade0d3271c5bf197e9b655d2af953 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1678,7 +1678,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return getPeerState(dest) != null; } + /** + * @since 0.9.3 + */ + @Override + public boolean isBacklogged(Hash dest) { + PeerState peer = _peersByIdent.get(dest); + return peer != null && peer.isBacklogged(); + } + public boolean allowConnection() { + return _peersByIdent.size() < getMaxConnections(); } @@ -2187,6 +2197,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(THINSP).append(peer.getConcurrentSends()); buf.append(THINSP).append(peer.getConcurrentSendWindow()); buf.append(THINSP).append(peer.getConsecutiveSendRejections()); + if (peer.isBacklogged()) + buf.append(' ').append(_("backlogged")); buf.append("</td>"); buf.append("<td class=\"cells\" align=\"right\">");