diff --git a/history.txt b/history.txt index 6812caecc..fb18ce907 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,8 @@ +2020-12-17 zzz + * SSU: Send subset of fragments (ticket #2427) + 2020-12-16 zzz + * NetDB: Fix rare floodfill monitor NPE * SSU: Fix occasional high CPU usage 2020-12-11 zzz diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index f6e6df3a5..4e28a2c8d 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 4; + public final static long BUILD = 5; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index c94f2d1ce..cfce2f6fc 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -160,7 +160,7 @@ class OutboundMessageFragments { // will throw IAE if peer == null OutboundMessageState state = new OutboundMessageState(_context, msg, peer); peer.add(state); - add(peer, state.fragmentSize(0)); + add(peer, state.getMinSendSize()); } catch (IllegalArgumentException iae) { _transport.failed(msg, "Peer disconnected quickly"); return; @@ -177,7 +177,7 @@ class OutboundMessageFragments { if (peer == null) throw new RuntimeException("null peer for " + state); peer.add(state); - add(peer, state.fragmentSize(0)); + add(peer, state.getMinSendSize()); //_context.statManager().addRateData("udp.outboundActiveCount", active, 0); } @@ -194,7 +194,7 @@ class OutboundMessageFragments { for (int i = 0; i < sz; i++) { OutboundMessageState state = states.get(i); peer.add(state); - int fsz = state.fragmentSize(0); + int fsz = state.getMinSendSize(); if (fsz < min) min = fsz; } @@ -402,16 +402,9 @@ class OutboundMessageFragments { // build the list of fragments to send List toSend = new ArrayList(8); for (OutboundMessageState state : states) { - int fragments = state.getFragmentCount(); - int queued = 0; - for (int i = 0; i < fragments; i++) { - if (state.needsSending(i)) { - toSend.add(new Fragment(state, i)); - queued++; - } - } + int queued = state.push(toSend); // per-state stats - if (queued > 0 && state.getPushCount() > 1) { + if (queued > 0 && state.getMaxSends() > 1) { peer.messageRetransmitted(queued); // _packetsRetransmitted += toSend; // lifetime for the transport _context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted()); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index f010e4dd3..a1c3b2e54 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -1,11 +1,12 @@ package net.i2p.router.transport.udp; -import java.util.Date; +import java.util.List; import net.i2p.I2PAppContext; import net.i2p.data.Base64; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.OutNetMessage; +import net.i2p.router.transport.udp.PacketBuilder.Fragment; import net.i2p.router.util.CDPQEntry; import net.i2p.util.Log; @@ -30,12 +31,16 @@ class OutboundMessageState implements CDPQEntry { /** bitmask, 0 if acked, all 0 = complete */ private long _fragmentAcks; private final int _numFragments; + /** sends for each fragment, or null if only one fragment */ + private final byte _fragmentSends[]; private final long _startedOn; private int _pushCount; private int _maxSends; // we can't use the ones in _message since it is null for injections private long _enqueueTime; private long _seqNum; + /** how many bytes push() is allowed to send */ + private int _allowedSendBytes; public static final int MAX_MSG_SIZE = 32 * 1024; @@ -95,6 +100,7 @@ class OutboundMessageState implements CDPQEntry { _numFragments = numFragments; // all 1's where we care _fragmentAcks = _numFragments < 64 ? mask(_numFragments) - 1L : -1L; + _fragmentSends = (numFragments > 1) ? new byte[numFragments] : null; } /** @@ -143,6 +149,105 @@ class OutboundMessageState implements CDPQEntry { return rv; } + /** + * Is any fragment unsent? + * + * @since 0.9.49 + */ + public synchronized boolean hasUnsentFragments() { + if (isComplete()) + return false; + if (_numFragments == 1) + return _maxSends == 0; + for (int i = _numFragments - 1; i >= 0; i--) { + if (_fragmentSends[i] == 0) + return true; + } + return false; + } + + /** + * The min send count of unacked fragments. + * Only call if not complete and _numFragments greater than 1. + * Caller must synch. + * + * @since 0.9.49 + */ + private int getMinSendCount() { + int rv = 127; + for (int i = 0; i < _numFragments; i++) { + if (needsSending(i)) { + int count = _fragmentSends[i]; + if (count < rv) + rv = count; + } + } + return rv; + } + + /** + * The minimum number of bytes we can send, which is the smallest unacked fragment we will send next. + * + * @return 0 to total size + * @since 0.9.49 + */ + public synchronized int getMinSendSize() { + if (isComplete()) + return 0; + if (_numFragments == 1) + return _messageBuf.length; + if (_pushCount == 0) + return fragmentSize(_numFragments - 1); + int minSendCount = getMinSendCount(); + int rv = _fragmentSize; + for (int i = 0; i < _numFragments; i++) { + if (needsSending(i) && _fragmentSends[i] == minSendCount) { + int sz = fragmentSize(i); + if (sz < rv) { + rv = sz; + } + } + } + return rv; + } + + /** + * How many bytes we can send under the max given. + * Side effect: if applicable, amount to send will be saved for the push() call. + * Note: With multiple fragments, this will allocate only the fragments with the lowest push count. + * Example: If push counts are 1 1 1 0 0, this will only return the size of the last two fragments, + * even if any of the first three need to be retransmitted. + * + * @param max the maximum number of bytes we can send + * @return 0 to max bytes + * @since 0.9.49 + */ + public synchronized int getSendSize(int max) { + if (isComplete()) + return 0; + if (_numFragments == 1) + return _messageBuf.length <= max ? _messageBuf.length : 0; + // find the fragments we've sent the least + int minSendCount = getMinSendCount(); + // Allow only the fragments we've sent the least + int rv = 0; + for (int i = 0; i < _numFragments; i++) { + if (needsSending(i) && _fragmentSends[i] == minSendCount) { + int sz = fragmentSize(i); + int tot = rv + sz; + if (tot <= max) { + rv = tot; + } else { + if (_log.shouldInfo()) + _log.info("Send window limited to " + (max - rv) + ", not sending fragment " + i + " for " + toString()); + } + } + } + if (rv > 0) + _allowedSendBytes = rv; + return rv; + } + public synchronized boolean needsSending(int fragment) { return (_fragmentAcks & mask(fragment)) != 0; } @@ -165,27 +270,79 @@ class OutboundMessageState implements CDPQEntry { } /** - * The max number of sends for any fragment, which is the - * same as the push count, at least as it's coded now. + * The max number of sends for any fragment. + * As of 0.9.49, may be less than getPushCount() if we pushed only some fragments */ public synchronized int getMaxSends() { return _maxSends; } /** - * The number of times we've pushed some fragments, which is the - * same as the max sends, at least as it's coded now. + * The number of times we've pushed some fragments. + * As of 0.9.49, may be greater than getMaxSends() if we pushed only some fragments. */ public synchronized int getPushCount() { return _pushCount; } /** - * Note that we have pushed the message fragments. - * Increments push count (and max sends... why?) - * @return true if this is the first push + * Add fragments up to the number of bytes allowed by setAllowedSendBytes() + * Side effects: Clears setAllowedSendBytes. Increments pushCount. Increments maxSends if applicable. + * Note: With multiple fragments, this will send only the fragments with the lowest push count. + * Example: If push counts are 1 1 1 0 0, this will only send the last two fragments, + * even if any of the first three need to be retransmitted. + * + * @param toSend out parameter + * @return the number of Fragments added + * @since 0.9.49 */ - public synchronized boolean push() { - boolean rv = _pushCount == 0; - // these will never be different... - _pushCount++; - _maxSends = _pushCount; + public synchronized int push(List toSend) { + int rv = 0; + if (_allowedSendBytes <= 0 || _numFragments == 1) { + // easy way + // send all, or only one fragment + for (int i = 0; i < _numFragments; i++) { + if (needsSending(i)) { + toSend.add(new Fragment(this, i)); + rv++; + if (_fragmentSends != null) { + _fragmentSends[i]++; + if (_fragmentSends[i] > _maxSends) + _maxSends = _fragmentSends[i]; + } + } + } + if (_fragmentSends == null) + _maxSends++; + } else { + // hard way. + // send the fragments we've sent the least, up to the max size + int minSendCount = getMinSendCount(); + int sent = 0; + for (int i = 0; i < _numFragments; i++) { + if (needsSending(i)) { + int count = _fragmentSends[i]; + if (count == minSendCount) { + int sz = fragmentSize(i); + if (sz <= _allowedSendBytes - sent) { + sent += sz; + toSend.add(new Fragment(this, i)); + rv++; + _fragmentSends[i]++; + if (_fragmentSends[i] > _maxSends) + _maxSends = _fragmentSends[i]; + if (sent >= _allowedSendBytes) + break; + } + } + } + } + } + if (rv > 0) { + _pushCount++; + if (_log.shouldDebug()) + _log.debug("Pushed " + rv + " fragments for " + toString()); + } else { + if (_log.shouldDebug()) + _log.debug("Nothing pushed??? allowedSendBytes=" + _allowedSendBytes + " for " + toString()); + } + _allowedSendBytes = 0; return rv; } @@ -296,15 +453,28 @@ class OutboundMessageState implements CDPQEntry { StringBuilder buf = new StringBuilder(256); buf.append("OB Message ").append(_i2npMessage.getUniqueId()); buf.append(" type ").append(_i2npMessage.getType()); - buf.append(" with ").append(_numFragments).append(" fragments"); - buf.append(" of size ").append(_messageBuf.length); + buf.append(" size ").append(_messageBuf.length); + if (_numFragments > 1) + buf.append(" fragments: ").append(_numFragments); buf.append(" volleys: ").append(_maxSends); buf.append(" lifetime: ").append(getLifetime()); if (!isComplete()) { - buf.append(" pending fragments: "); - for (int i = 0; i < _numFragments; i++) { - if (needsSending(i)) - buf.append(i).append(' '); + if (_fragmentSends != null) { + buf.append(" unacked fragments: "); + for (int i = 0; i < _numFragments; i++) { + if (needsSending(i)) + buf.append(i).append(' '); + } + buf.append("sizes: "); + for (int i = 0; i < _numFragments; i++) { + buf.append(fragmentSize(i)).append(' '); + } + buf.append("send counts: "); + for (int i = 0; i < _numFragments; i++) { + buf.append(_fragmentSends[i]).append(' '); + } + } else { + buf.append(" unacked"); } } //buf.append(" to: ").append(_peer.toString()); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 380df52c9..77a952b03 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -609,39 +609,34 @@ public class PeerState { * * Caller should synch */ - private boolean allocateSendingBytes(int size, int messagePushCount, long now) { + private boolean allocateSendingBytes(OutboundMessageState state, long now, boolean resetWindow) { long duration = now - _lastSendRefill; - if (duration >= 1000) { + if (resetWindow || duration >= 1000) { _sendWindowBytesRemaining = _sendWindowBytes; - _sendBytes += size; + if (duration <= 0) + duration = 10; _sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration))); _sendBytes = 0; _lastSendRefill = now; } + int messagePushCount = state.getPushCount(); + if (messagePushCount == 0 && _outboundMessages.size() > _concurrentMessagesAllowed) { + _consecutiveRejections++; + _context.statManager().addRateData("udp.rejectConcurrentActive", _outboundMessages.size(), _consecutiveRejections); + return false; + } + if (_sendWindowBytesRemaining <= 0) + return false; - // Ticket 2505 - // We always send all unacked fragments for a message, - // because we don't have any mechanism in OutboundMessageFragments - // to track the next send time for fragments individually. - // Huge messages that are larger than the window size could never - // get sent and block the outbound queue forever. - // So we let it through when the window is empty (full window remaining). - if (size <= _sendWindowBytesRemaining || - (size > _sendWindowBytes && _sendWindowBytesRemaining >= _sendWindowBytes)) { - // move this check to getSendWindowBytesRemaining() ? - if (messagePushCount == 0 && _outboundMessages.size() > _concurrentMessagesAllowed) { - _consecutiveRejections++; - _context.statManager().addRateData("udp.rejectConcurrentActive", _outboundMessages.size(), _consecutiveRejections); - return false; - } else if (messagePushCount == 0) { + int size = state.getSendSize(_sendWindowBytesRemaining); + if (size > 0) { + if (messagePushCount == 0) { _context.statManager().addRateData("udp.allowConcurrentActive", _outboundMessages.size(), _concurrentMessagesAllowed); if (_consecutiveRejections > 0) _context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _outboundMessages.size()); _consecutiveRejections = 0; } _sendWindowBytesRemaining -= size; - if (_sendWindowBytesRemaining < 0) - _sendWindowBytesRemaining = 0; _sendBytes += size; _lastSendTime = now; return true; @@ -853,8 +848,8 @@ public class PeerState { randomResends.add(rack.id); } else { iter.remove(); - if (_log.shouldLog(Log.INFO)) - _log.info("Expired ack " + rack.id + " sent " + (cutoff + RESEND_ACK_TIMEOUT - rack.time) + + if (_log.shouldDebug()) + _log.debug("Expired ack " + rack.id + " sent " + (cutoff + RESEND_ACK_TIMEOUT - rack.time) + " ago, now " + _currentACKsResend.size() + " resend acks"); } } @@ -875,8 +870,8 @@ public class PeerState { // acks already in _currentACKsResend. _currentACKsResend.offer(new ResendACK(messageId, _context.clock().now())); // trim happens in getCurrentResendACKs above - if (_log.shouldLog(Log.INFO)) - _log.info("Sent ack " + messageId + " now " + _currentACKs.size() + " current and " + + if (_log.shouldDebug()) + _log.debug("Sent ack " + messageId + " now " + _currentACKs.size() + " current and " + _currentACKsResend.size() + " resend acks"); } // should we only do this if removed? @@ -1124,8 +1119,8 @@ public class PeerState { synchronized(this) { locked_messageACKed(bytesACKed, lifetime, numSends, anyPending, anyQueued); } - if (numSends >= 2 && _log.shouldLog(Log.INFO)) - _log.info(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); + if (numSends >= 2 && _log.shouldDebug()) + _log.debug(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); _context.statManager().addRateData("udp.sendBps", _sendBps); } @@ -1442,7 +1437,7 @@ public class PeerState { _context.statManager().addRateData("udp.sendFailed", state.getPushCount()); if (failed == null) failed = new ArrayList(4); failed.add(state); - } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { + } else if (state.getMaxSends() > OutboundMessageFragments.MAX_VOLLEYS) { iter.remove(); _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount()); if (failed == null) failed = new ArrayList(4); @@ -1470,7 +1465,7 @@ public class PeerState { // it can not have an OutNetMessage if the source is the // final after establishment message if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to send a direct message: " + state); + _log.warn("Unable to send a direct message: " + state + " to: " + this); } } @@ -1496,10 +1491,11 @@ public class PeerState { if (rv != null && !rv.isEmpty()) { synchronized(this) { long old = _retransmitTimer; - if (_retransmitTimer == 0) + if (_retransmitTimer == 0) { _retransmitTimer = now + getRTO(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer); + } } } else if (canSendOld) { // failsafe - push out or cancel timer to prevent looping @@ -1529,7 +1525,7 @@ public class PeerState { synchronized (_outboundMessages) { if (canSendOld) { for (OutboundMessageState state : _outboundMessages) { - boolean should = locked_shouldSend(state, now); + boolean should = locked_shouldSend(state, now, true); if (should) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId()); @@ -1554,6 +1550,31 @@ public class PeerState { } } return rv; + } else if (!_outboundMessages.isEmpty()) { + // send some unsent fragments of pending messages, if any + for (OutboundMessageState state : _outboundMessages) { + if (!state.hasUnsentFragments()) + continue; + boolean should = locked_shouldSend(state, now, false); + if (should) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Allocate sending more fragments to " + _remotePeer + ": " + state.getMessageId()); + if (rv == null) + rv = new ArrayList(_concurrentMessagesAllowed); + rv.add(state); + } else { + // no more bandwidth available + if (_log.shouldLog(Log.DEBUG)) { + if (rv == null) + _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() + + " / " + _outboundQueue.size() + " remaining"); + else + _log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size()); + } + return rv; + } + } + // fall through to new messages } // Peek at head of _outboundQueue and see if we can send it. // If so, pull it off, put it in _outbundMessages, test @@ -1561,7 +1582,7 @@ public class PeerState { OutboundMessageState state; synchronized (_outboundQueue) { while ((state = _outboundQueue.peek()) != null && - locked_shouldSend(state, now)) { + locked_shouldSend(state, now, false)) { // This is guaranted to be the same as what we got in peek(), // due to locking and because we aren't using the dropping CDPBQ. // If we do switch to CDPBQ, @@ -1643,22 +1664,21 @@ public class PeerState { /** * Caller should synch */ - private boolean locked_shouldSend(OutboundMessageState state, long now) { - int size = state.getUnackedSize(); - if (allocateSendingBytes(size, state.getPushCount(), now)) { + private boolean locked_shouldSend(OutboundMessageState state, long now, boolean resetWindow) { + if (allocateSendingBytes(state, now, resetWindow)) { if (_log.shouldLog(Log.DEBUG)) - _log.debug(_remotePeer + " Allocation of " + size + " allowed with " + _log.debug(_remotePeer + " Allocation allowed with " + getSendWindowBytesRemaining() + "/" + getSendWindowBytes() + " remaining" + " for message " + state.getMessageId() + ": " + state); - if (state.push()) + if (state.getPushCount() == 0) _messagesSent++; return true; } else { _context.statManager().addRateData("udp.sendRejected", state.getPushCount()); if (_log.shouldLog(Log.INFO)) - _log.info(_remotePeer + " Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + _log.info(_remotePeer + " Allocation rejected w/ wsize=" + getSendWindowBytes() + " available=" + getSendWindowBytesRemaining() + " for message " + state.getMessageId() + ": " + state); return false; @@ -1695,10 +1715,11 @@ public class PeerState { if (state != null) { int numSends = state.getMaxSends(); - if (_log.shouldLog(Log.INFO)) - _log.info("Received ack of " + messageId + " by " + _remotePeer - + " after " + state.getLifetime() + " and " + numSends + " sends"); - _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime()); + long lifetime = state.getLifetime(); + if (_log.shouldDebug()) + _log.debug("Received ack of " + messageId + " by " + _remotePeer + + " after " + lifetime + " and " + numSends + " sends"); + _context.statManager().addRateData("udp.sendConfirmTime", lifetime); if (state.getFragmentCount() > 1) _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount()); _context.statManager().addRateData("udp.sendConfirmVolley", numSends); @@ -1713,8 +1734,7 @@ public class PeerState { } } // this adjusts the rtt/rto/window/etc - messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending, anyQueued); - + messageACKed(state.getUnackedSize(), lifetime, numSends, anyPending, anyQueued); } else { // dupack, likely //if (_log.shouldLog(Log.DEBUG)) @@ -1740,14 +1760,18 @@ public class PeerState { OutboundMessageState state = null; boolean isComplete = false; boolean anyPending; + int ackedSize = 0; synchronized (_outboundMessages) { for (Iterator iter = _outboundMessages.iterator(); iter.hasNext(); ) { state = iter.next(); if (state.getMessageId() == messageId) { + ackedSize = state.getUnackedSize(); boolean complete = state.acked(bitfield); if (complete) { isComplete = true; iter.remove(); + } else { + ackedSize -= state.getUnackedSize(); } break; } else if (state.getPushCount() <= 0) { @@ -1768,18 +1792,23 @@ public class PeerState { int numACKed = bitfield.ackCount(); _context.statManager().addRateData("udp.partialACKReceived", numACKed); + long lifetime = state.getLifetime(); if (_log.shouldLog(Log.INFO)) _log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer - + " after " + state.getLifetime() + " and " + numSends + " sends: " + bitfield + ": completely removed? " - + isComplete + ": " + state); + + " after " + lifetime + " and " + numSends + " sends" + + " complete? " + isComplete + + " newly-acked: " + ackedSize + + ' ' + bitfield + + " for: " + state); if (isComplete) { - _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime()); + _context.statManager().addRateData("udp.sendConfirmTime", lifetime); if (state.getFragmentCount() > 1) _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount()); _context.statManager().addRateData("udp.sendConfirmVolley", numSends); _transport.succeeded(state); - + } + if (ackedSize > 0) { boolean anyQueued; if (anyPending) { // locked_messageACKed will nudge() @@ -1790,7 +1819,7 @@ public class PeerState { } } // this adjusts the rtt/rto/window/etc - messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending, anyQueued); + messageACKed(ackedSize, lifetime, numSends, anyPending, anyQueued); } return isComplete; } else {