diff --git a/history.txt b/history.txt index fb1cace8b02c672deb5d455d1f3c8f763c9ccdc6..a3ece9d2e651e6407181f8a7b962efb3e114e9a5 100644 --- a/history.txt +++ b/history.txt @@ -2,7 +2,9 @@ * NetDB: - ECIES router support for encrypted lookups and stores (proposal #156) - Reseed after a long downtime - * SSU: Increase socket buffer size (ticket #2781) + * SSU: + - Increase socket buffer size (ticket #2781) + - Redesign of the congestion control (tickets #2412, #2649, #2654, #2713) 2020-10-17 zzz * i2psnark: Remove references to "maggot" links diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index a6204817e3d9e640632604f1432a96320e1992ce..f2522cd4ed540000a936a603d4922f4a287ece1f 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 8; + public final static long BUILD = 9; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 03ec420dcc7dc45c74c85bc56a3a88fdd532474b..c94f2d1cedc15cb16724e23d15927cc650394627 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -294,17 +294,17 @@ class OutboundMessageFragments { // race with add() _iterator.remove(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("No more pending messages for " + peer.getRemotePeer()); + _log.debug("No more pending messages for " + p.getRemotePeer()); continue; } peersProcessed++; - states = p.allocateSend(); + states = p.allocateSend(now); if (states != null) { peer = p; // we have something to send and we will be returning it break; } - int delay = p.getNextDelay(); + int delay = p.getNextDelay(now); if (delay < nextSendDelay) nextSendDelay = delay; @@ -371,6 +371,16 @@ class OutboundMessageFragments { return packets; } + /** + * Wakes up the packet pusher thread. + * @since 0.9.48 + */ + void nudge() { + synchronized(_activePeers) { + _activePeers.notify(); + } + } + /** * @return null if state or peer is null */ diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index f5a42ee27a7e6ddcfe79b96832b2a5c5a046cfd1..f010e4dd3a613bd108a2a918a0a9a5e78f4bd9af 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -31,7 +31,6 @@ class OutboundMessageState implements CDPQEntry { private long _fragmentAcks; private final int _numFragments; private final long _startedOn; - private long _nextSendTime; private int _pushCount; private int _maxSends; // we can't use the ones in _message since it is null for injections @@ -77,7 +76,6 @@ class OutboundMessageState implements CDPQEntry { _i2npMessage = msg; _peer = peer; _startedOn = _context.clock().now(); - _nextSendTime = _startedOn; _expiration = _startedOn + EXPIRATION; //_expiration = msg.getExpiration(); @@ -166,9 +164,6 @@ class OutboundMessageState implements CDPQEntry { return isComplete(); } - public long getNextSendTime() { return _nextSendTime; } - public void setNextSendTime(long when) { _nextSendTime = when; } - /** * The max number of sends for any fragment, which is the * same as the push count, at least as it's coded now. diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index d67390e5b87e3e08d0c820c331c101d10ce0f1cc..4b03005d8eac6b76f55aa8c97d3be436ce12bca5 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -219,8 +219,8 @@ public class PeerState { //private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue; private final PriBlockingQueue<OutboundMessageState> _outboundQueue; - /** which outbound message is currently being retransmitted */ - private OutboundMessageState _retransmitter; + /** when the retransmit timer is about to trigger */ + private long _retransmitTimer; private final UDPTransport _transport; @@ -246,9 +246,6 @@ public class PeerState { private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; - /** max number of msgs returned from allocateSend() */ - private static final int MAX_ALLOCATE_SEND = 2; - /** * Was 32 before 0.9.2, but since the streaming lib goes up to 128, * we would just drop our own msgs right away during slow start. @@ -693,15 +690,14 @@ public class PeerState { * * Caller should synch */ - private boolean allocateSendingBytes(int size, int messagePushCount) { - return allocateSendingBytes(size, false, messagePushCount); + private boolean allocateSendingBytes(int size, int messagePushCount, long now) { + return allocateSendingBytes(size, false, messagePushCount, now); } /** * Caller should synch */ - private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) { - long now = _context.clock().now(); + private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount, long now) { long duration = now - _lastSendRefill; if (duration >= 1000) { _sendWindowBytesRemaining = _sendWindowBytes; @@ -928,6 +924,14 @@ public class PeerState { _sendWindowBytes = MINIMUM_WINDOW_BYTES; //if (congestionAt/2 < _slowStartThreshold) _slowStartThreshold = congestionAt/2; + + int oldRto = _rto; + long oldTimer = _retransmitTimer - now; + _rto = Math.min(MAX_RTO, Math.max(minRTO(), _rto << 1 )); + _retransmitTimer = now + _rto; + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now)); + return true; } @@ -1186,7 +1190,7 @@ public class PeerState { * We sent a message which was ACKed containing the given # of bytes. * Caller should synch on this */ - private void locked_messageACKed(int bytesACKed, long lifetime, int numSends) { + private void locked_messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) { _consecutiveFailedSends = 0; // _lastFailedSendPeriod = -1; if (numSends < 2) { @@ -1231,17 +1235,31 @@ public class PeerState { adjustMTU(); //} } + + if (!anyPending) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_remotePeer + " nothing pending, cancelling timer"); + _retransmitTimer = 0; + } else { + // any time new data gets acked, push out the timer + long now = _context.clock().now(); + long oldTimer = _retransmitTimer - now; + _retransmitTimer = now + getRTO(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_remotePeer + " ACK, timer: " + oldTimer + " -> " + (_retransmitTimer - now)); + } + _transport.getOMF().nudge(); } /** * We sent a message which was ACKed containing the given # of bytes. */ - private void messageACKed(int bytesACKed, long lifetime, int numSends) { + private void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) { synchronized(this) { - locked_messageACKed(bytesACKed, lifetime, numSends); + locked_messageACKed(bytesACKed, lifetime, numSends, anyPending); } if (numSends >= 2 && _log.shouldLog(Log.INFO)) - _log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); + _log.info(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); _context.statManager().addRateData("udp.sendBps", _sendBps); } @@ -1548,7 +1566,6 @@ public class PeerState { List<OutboundMessageState> tempList; synchronized (_outboundMessages) { - _retransmitter = null; tempList = new ArrayList<OutboundMessageState>(_outboundMessages); _outboundMessages.clear(); } @@ -1610,21 +1627,15 @@ public class PeerState { OutboundMessageState state = iter.next(); if (state.isComplete()) { iter.remove(); - if (_retransmitter == state) - _retransmitter = null; if (succeeded == null) succeeded = new ArrayList<OutboundMessageState>(4); succeeded.add(state); } else if (state.isExpired(now)) { iter.remove(); - if (_retransmitter == state) - _retransmitter = null; _context.statManager().addRateData("udp.sendFailed", state.getPushCount()); if (failed == null) failed = new ArrayList<OutboundMessageState>(4); failed.add(state); } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { iter.remove(); - if (state == _retransmitter) - _retransmitter = null; _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount()); if (failed == null) failed = new ArrayList<OutboundMessageState>(4); failed.add(state); @@ -1660,59 +1671,84 @@ public class PeerState { /** * Pick one or more messages we want to send and allocate them out of our window + * Adjusts the retransmit timer if necessary. * High usage - * OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned > 0. - * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times. + * TODO combine finishMessages() and allocateSend() so we don't iterate 2 times. * * @return allocated messages to send (never empty), or null if no messages or no resources */ - List<OutboundMessageState> allocateSend() { + List<OutboundMessageState> allocateSend(long now) { + long retransmitTimer; + synchronized(this) { + retransmitTimer = _retransmitTimer; + } + List<OutboundMessageState> rv = allocateSend2(retransmitTimer > 0 && now >= retransmitTimer, now); + if (rv != null && !rv.isEmpty()) { + synchronized(this) { + long old = _retransmitTimer; + if (_retransmitTimer == 0) + _retransmitTimer = now + getRTO(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer); + } + } + return rv; + } + + /** + * Pick one or more messages to send. This will alloace either old or new messages, but not both. + * @param canSendOld if any already sent messages can be sent. If false, only new messages will be considered + * @param now what time is it now + * @since 0.9.48 + */ + private List<OutboundMessageState> allocateSend2(boolean canSendOld, long now) { if (_dead) return null; List<OutboundMessageState> rv = null; synchronized (_outboundMessages) { - for (OutboundMessageState state : _outboundMessages) { - // We have 3 return values, because if allocateSendingBytes() returns false, - // then we can stop iterating. - ShouldSend should = locked_shouldSend(state); - if (should == ShouldSend.YES) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId()); - /* - while (iter.hasNext()) { - OutboundMessageState later = (OutboundMessageState)iter.next(); - OutNetMessage msg = later.getMessage(); - if (msg != null) - msg.timestamp("not reached for allocation " + msgs.size() + " other peers"); - } - */ - if (rv == null) - rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND); - rv.add(state); - if (rv.size() >= MAX_ALLOCATE_SEND) + if (canSendOld) { + for (OutboundMessageState state : _outboundMessages) { + boolean should = locked_shouldSend(state, now); + if (should) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId()); + /* + while (iter.hasNext()) { + OutboundMessageState later = (OutboundMessageState)iter.next(); + OutNetMessage msg = later.getMessage(); + if (msg != null) + msg.timestamp("not reached for allocation " + msgs.size() + " other peers"); + } + */ + if (rv == null) + rv = new ArrayList<OutboundMessageState>(_outboundMessages.size()); + rv.add(state); + if (rv.size() >= _outboundMessages.size() / 2) + return rv; + } else { + // no more bandwidth available + // we don't bother looking for a smaller msg that would fit. + // By not looking further, we keep strict sending order, and that allows + // some efficiency in acked() below. + if (_log.shouldLog(Log.DEBUG)) { + if (rv == null) + _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() + + " / " + _outboundQueue.size() + " remaining"); + else + _log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size()); + } return rv; - } else if (should == ShouldSend.NO_BW) { - // no more bandwidth available - // we don't bother looking for a smaller msg that would fit. - // By not looking further, we keep strict sending order, and that allows - // some efficiency in acked() below. - if (rv == null && _log.shouldLog(Log.DEBUG)) - _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() + - " / " + _outboundQueue.size() + " remaining"); - return rv; - } /* else { - OutNetMessage msg = state.getMessage(); - if (msg != null) - msg.timestamp("passed over for allocation with " + msgs.size() + " peers"); - } */ + } + } + return null; } - // Peek at head of _outboundQueue and see if we can send it. // If so, pull it off, put it in _outbundMessages, test // again for bandwidth if necessary, and return it. OutboundMessageState state; synchronized (_outboundQueue) { while ((state = _outboundQueue.peek()) != null && - ShouldSend.YES == locked_shouldSend(state)) { + locked_shouldSend(state, now)) { // This is guaranted to be the same as what we got in peek(), // due to locking and because we aren't using the dropping CDPBQ. // If we do switch to CDPBQ, @@ -1729,9 +1765,9 @@ public class PeerState { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId()); if (rv == null) - rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND); + rv = new ArrayList<OutboundMessageState>(_concurrentMessagesAllowed); rv.add(dequeuedState); - if (rv.size() >= MAX_ALLOCATE_SEND) + if (rv.size() >= _concurrentMessagesAllowed) return rv; } } @@ -1739,39 +1775,27 @@ public class PeerState { } if ( rv == null && _log.shouldLog(Log.DEBUG)) _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + - " / " + _outboundQueue.size() + " remaining"); + " / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - _context.clock().now())); + return rv; } /** * High usage - * OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null. - * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times. + * TODO combine finishMessages(), allocateSend() so we don't iterate 2 times. * + * @param now what time it is now * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send. * If ready now, will return 0 or a negative value. */ - int getNextDelay() { + int getNextDelay(long now) { int rv = Integer.MAX_VALUE; if (_dead) return rv; - long now = _context.clock().now(); - synchronized (_outboundMessages) { - if (_retransmitter != null) { - rv = (int)(_retransmitter.getNextSendTime() - now); - return rv; - } - for (OutboundMessageState state : _outboundMessages) { - int delay = (int)(state.getNextSendTime() - now); - // short circuit once we hit something ready to go - if (delay <= 0) - return delay; - if (delay < rv) - rv = delay; - } + synchronized(this) { + if (_retransmitTimer >= now) + return (int) (_retransmitTimer - now); } - // failsafe... is this OK? - if (rv > 100 && !_outboundQueue.isEmpty()) - rv = 100; return rv; } @@ -1782,20 +1806,6 @@ public class PeerState { return _dead || _outboundQueue.isBacklogged(); } - /** - * If set to true, we should throttle retransmissions of all but the first message in - * flight to a peer. If set to false, we will only throttle the initial flight of a - * message to a peer while a retransmission is going on. - */ - private static final boolean THROTTLE_RESENDS = true; - /** - * if true, throttle the initial volley of a message if there is a resend in progress. - * if false, always send the first volley, regardless of retransmissions (but keeping in - * mind bw/cwin throttle, etc) - * - */ - private static final boolean THROTTLE_INITIAL_SEND = true; - /** * Always leave room for this many explicit acks. * Only for data packets. Does not affect ack-only packets. @@ -1817,88 +1827,42 @@ public class PeerState { MIN_ACK_SIZE; } - private enum ShouldSend { YES, NO, NO_BW }; - /** - * Have 3 return values, because if allocateSendingBytes() returns false, - * then allocateSend() can stop iterating - * * Caller should synch */ - private ShouldSend locked_shouldSend(OutboundMessageState state) { - long now = _context.clock().now(); - if (state.getNextSendTime() <= now) { - OutboundMessageState retrans = _retransmitter; - if ( (retrans != null) && ( (retrans.isExpired(now) || retrans.isComplete()) ) ) { - _retransmitter = null; - retrans = null; - } - - if ( (retrans != null) && (retrans != state) ) { - // choke it, since there's already another message retransmitting to this - // peer. - _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted); - int max = state.getMaxSends(); - if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) { - //if (state.getMessage() != null) - // state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley..."); - } else if ( (max <= 0) || (THROTTLE_RESENDS) ) { - //if (state.getMessage() != null) - // state.getMessage().timestamp("choked, with another message retransmitting"); - return ShouldSend.NO; - } else { - //if (state.getMessage() != null) - // state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending..."); - } - } - + private boolean locked_shouldSend(OutboundMessageState state, long now) { int size = state.getUnackedSize(); - if (allocateSendingBytes(size, state.getPushCount())) { + if (allocateSendingBytes(size, state.getPushCount(), now)) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Allocation of " + size + " allowed with " + _log.debug(_remotePeer + " Allocation of " + size + " allowed with " + getSendWindowBytesRemaining() + "/" + getSendWindowBytes() + " remaining" + " for message " + state.getMessageId() + ": " + state); - int rto = getRTO(); - if (state.getPushCount() > 0) { - _retransmitter = state; - rto = Math.min(MAX_RTO, rto << state.getPushCount()); // Section 5.5 RFC 6298 - } if (state.push()) _messagesSent++; - // messages with multiple fragments need more time - state.setNextSendTime(now + rto + ((state.getFragmentCount() - 1) * ACKSender.ACK_FREQUENCY)); - //if (peer.getSendWindowBytesRemaining() > 0) // _throttle.unchoke(peer.getRemotePeer()); - return ShouldSend.YES; + return true; } else { _context.statManager().addRateData("udp.sendRejected", state.getPushCount()); //if (state.getMessage() != null) // state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); if (_log.shouldLog(Log.INFO)) - _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + _log.info(_remotePeer + " Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + " available=" + getSendWindowBytesRemaining() + " for message " + state.getMessageId() + ": " + state); - state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) + - _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK); - if (_log.shouldLog(Log.INFO)) - _log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); //_throttle.choke(peer.getRemotePeer()); //if (state.getMessage() != null) // state.getMessage().timestamp("choked, not enough available, wsize=" // + getSendWindowBytes() + " available=" // + getSendWindowBytesRemaining()); - return ShouldSend.NO_BW; + return false; } - } // nextTime <= now - - return ShouldSend.NO; } /** @@ -1910,6 +1874,7 @@ public class PeerState { boolean acked(long messageId) { if (_dead) return false; OutboundMessageState state = null; + boolean anyPending; synchronized (_outboundMessages) { for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) { state = iter.next(); @@ -1925,8 +1890,7 @@ public class PeerState { state = null; } } - if ( (state != null) && (state == _retransmitter) ) - _retransmitter = null; + anyPending = !_outboundMessages.isEmpty(); } if (state != null) { @@ -1948,7 +1912,7 @@ public class PeerState { _context.statManager().addRateData("udp.sendConfirmVolley", numSends); _transport.succeeded(state); // this adjusts the rtt/rto/window/etc - messageACKed(state.getMessageSize(), state.getLifetime(), numSends); + messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending); //if (getSendWindowBytesRemaining() > 0) // _throttle.unchoke(peer.getRemotePeer()); @@ -1976,6 +1940,7 @@ public class PeerState { OutboundMessageState state = null; boolean isComplete = false; + boolean anyPending; synchronized (_outboundMessages) { for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) { state = iter.next(); @@ -1984,8 +1949,6 @@ public class PeerState { if (complete) { isComplete = true; iter.remove(); - if (state == _retransmitter) - _retransmitter = null; } break; } else if (state.getPushCount() <= 0) { @@ -1997,6 +1960,7 @@ public class PeerState { state = null; } } + anyPending = !_outboundMessages.isEmpty(); } if (state != null) { @@ -2020,7 +1984,7 @@ public class PeerState { _transport.succeeded(state); // this adjusts the rtt/rto/window/etc - messageACKed(state.getMessageSize(), state.getLifetime(), numSends); + messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending); //if (state.getPeer().getSendWindowBytesRemaining() > 0) // _throttle.unchoke(state.getPeer().getRemotePeer()); @@ -2085,13 +2049,10 @@ public class PeerState { synchronized (oldPeer._outboundMessages) { tmp2.addAll(oldPeer._outboundMessages); oldPeer._outboundMessages.clear(); - retransmitter = oldPeer._retransmitter; - oldPeer._retransmitter = null; } if (!_dead) { synchronized (_outboundMessages) { _outboundMessages.addAll(tmp2); - _retransmitter = retransmitter; } } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 37617a8761b21f485d3cf379f922f9dd84b93aad..370cbda7fe08b83c7bec1b194a99218b83696f78 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -345,6 +345,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _context.simpleTimer2().addPeriodicEvent(new PingIntroducers(), MIN_EXPIRE_TIMEOUT * 3 / 4); } + + /** + * @returns the instance of OutboundMessageFragments + * @since 0.9.48 + */ + OutboundMessageFragments getOMF() { + return _fragments; + } /** * Pick a port if not previously configured, so that TransportManager may