forked from I2P_Developers/i2p.i2p
SSU: Send subset of fragments (ticket #2427)
if all fragments will not fit in the window. Track per-fragment send count. Reset send window when retransmitting. Update send window when partial acks received. Make OMS.getMaxSends() and getPushCount() track different things. Change OMS.push() to be called by OMF and return the pushed fragments. Use size of smallest fragment rather than total size to determine if we can send a message now. This is an improved fix for ticket #2505. Eliminate repeated calls to OMS.getLifetime() Log tweaks and reduce log levels Improves throughput on lossy connections. Reduces latency for large messages. This is prep for reducing DEFAULT_SEND_WINDOW_BYTES and W+, which would have exacerbated these issues. Additional changes to follow, implementing Westwood+, see #2427
This commit is contained in:
@@ -1,4 +1,8 @@
|
||||
2020-12-17 zzz
|
||||
* SSU: Send subset of fragments (ticket #2427)
|
||||
|
||||
2020-12-16 zzz
|
||||
* NetDB: Fix rare floodfill monitor NPE
|
||||
* SSU: Fix occasional high CPU usage
|
||||
|
||||
2020-12-11 zzz
|
||||
|
||||
@@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 4;
|
||||
public final static long BUILD = 5;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
||||
@@ -160,7 +160,7 @@ class OutboundMessageFragments {
|
||||
// will throw IAE if peer == null
|
||||
OutboundMessageState state = new OutboundMessageState(_context, msg, peer);
|
||||
peer.add(state);
|
||||
add(peer, state.fragmentSize(0));
|
||||
add(peer, state.getMinSendSize());
|
||||
} catch (IllegalArgumentException iae) {
|
||||
_transport.failed(msg, "Peer disconnected quickly");
|
||||
return;
|
||||
@@ -177,7 +177,7 @@ class OutboundMessageFragments {
|
||||
if (peer == null)
|
||||
throw new RuntimeException("null peer for " + state);
|
||||
peer.add(state);
|
||||
add(peer, state.fragmentSize(0));
|
||||
add(peer, state.getMinSendSize());
|
||||
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
|
||||
}
|
||||
|
||||
@@ -194,7 +194,7 @@ class OutboundMessageFragments {
|
||||
for (int i = 0; i < sz; i++) {
|
||||
OutboundMessageState state = states.get(i);
|
||||
peer.add(state);
|
||||
int fsz = state.fragmentSize(0);
|
||||
int fsz = state.getMinSendSize();
|
||||
if (fsz < min)
|
||||
min = fsz;
|
||||
}
|
||||
@@ -402,16 +402,9 @@ class OutboundMessageFragments {
|
||||
// build the list of fragments to send
|
||||
List<Fragment> toSend = new ArrayList<Fragment>(8);
|
||||
for (OutboundMessageState state : states) {
|
||||
int fragments = state.getFragmentCount();
|
||||
int queued = 0;
|
||||
for (int i = 0; i < fragments; i++) {
|
||||
if (state.needsSending(i)) {
|
||||
toSend.add(new Fragment(state, i));
|
||||
queued++;
|
||||
}
|
||||
}
|
||||
int queued = state.push(toSend);
|
||||
// per-state stats
|
||||
if (queued > 0 && state.getPushCount() > 1) {
|
||||
if (queued > 0 && state.getMaxSends() > 1) {
|
||||
peer.messageRetransmitted(queued);
|
||||
// _packetsRetransmitted += toSend; // lifetime for the transport
|
||||
_context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package net.i2p.router.transport.udp;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.Base64;
|
||||
import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.transport.udp.PacketBuilder.Fragment;
|
||||
import net.i2p.router.util.CDPQEntry;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
@@ -30,12 +31,16 @@ class OutboundMessageState implements CDPQEntry {
|
||||
/** bitmask, 0 if acked, all 0 = complete */
|
||||
private long _fragmentAcks;
|
||||
private final int _numFragments;
|
||||
/** sends for each fragment, or null if only one fragment */
|
||||
private final byte _fragmentSends[];
|
||||
private final long _startedOn;
|
||||
private int _pushCount;
|
||||
private int _maxSends;
|
||||
// we can't use the ones in _message since it is null for injections
|
||||
private long _enqueueTime;
|
||||
private long _seqNum;
|
||||
/** how many bytes push() is allowed to send */
|
||||
private int _allowedSendBytes;
|
||||
|
||||
public static final int MAX_MSG_SIZE = 32 * 1024;
|
||||
|
||||
@@ -95,6 +100,7 @@ class OutboundMessageState implements CDPQEntry {
|
||||
_numFragments = numFragments;
|
||||
// all 1's where we care
|
||||
_fragmentAcks = _numFragments < 64 ? mask(_numFragments) - 1L : -1L;
|
||||
_fragmentSends = (numFragments > 1) ? new byte[numFragments] : null;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -143,6 +149,105 @@ class OutboundMessageState implements CDPQEntry {
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is any fragment unsent?
|
||||
*
|
||||
* @since 0.9.49
|
||||
*/
|
||||
public synchronized boolean hasUnsentFragments() {
|
||||
if (isComplete())
|
||||
return false;
|
||||
if (_numFragments == 1)
|
||||
return _maxSends == 0;
|
||||
for (int i = _numFragments - 1; i >= 0; i--) {
|
||||
if (_fragmentSends[i] == 0)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* The min send count of unacked fragments.
|
||||
* Only call if not complete and _numFragments greater than 1.
|
||||
* Caller must synch.
|
||||
*
|
||||
* @since 0.9.49
|
||||
*/
|
||||
private int getMinSendCount() {
|
||||
int rv = 127;
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i)) {
|
||||
int count = _fragmentSends[i];
|
||||
if (count < rv)
|
||||
rv = count;
|
||||
}
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* The minimum number of bytes we can send, which is the smallest unacked fragment we will send next.
|
||||
*
|
||||
* @return 0 to total size
|
||||
* @since 0.9.49
|
||||
*/
|
||||
public synchronized int getMinSendSize() {
|
||||
if (isComplete())
|
||||
return 0;
|
||||
if (_numFragments == 1)
|
||||
return _messageBuf.length;
|
||||
if (_pushCount == 0)
|
||||
return fragmentSize(_numFragments - 1);
|
||||
int minSendCount = getMinSendCount();
|
||||
int rv = _fragmentSize;
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i) && _fragmentSends[i] == minSendCount) {
|
||||
int sz = fragmentSize(i);
|
||||
if (sz < rv) {
|
||||
rv = sz;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* How many bytes we can send under the max given.
|
||||
* Side effect: if applicable, amount to send will be saved for the push() call.
|
||||
* Note: With multiple fragments, this will allocate only the fragments with the lowest push count.
|
||||
* Example: If push counts are 1 1 1 0 0, this will only return the size of the last two fragments,
|
||||
* even if any of the first three need to be retransmitted.
|
||||
*
|
||||
* @param max the maximum number of bytes we can send
|
||||
* @return 0 to max bytes
|
||||
* @since 0.9.49
|
||||
*/
|
||||
public synchronized int getSendSize(int max) {
|
||||
if (isComplete())
|
||||
return 0;
|
||||
if (_numFragments == 1)
|
||||
return _messageBuf.length <= max ? _messageBuf.length : 0;
|
||||
// find the fragments we've sent the least
|
||||
int minSendCount = getMinSendCount();
|
||||
// Allow only the fragments we've sent the least
|
||||
int rv = 0;
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i) && _fragmentSends[i] == minSendCount) {
|
||||
int sz = fragmentSize(i);
|
||||
int tot = rv + sz;
|
||||
if (tot <= max) {
|
||||
rv = tot;
|
||||
} else {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Send window limited to " + (max - rv) + ", not sending fragment " + i + " for " + toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (rv > 0)
|
||||
_allowedSendBytes = rv;
|
||||
return rv;
|
||||
}
|
||||
|
||||
public synchronized boolean needsSending(int fragment) {
|
||||
return (_fragmentAcks & mask(fragment)) != 0;
|
||||
}
|
||||
@@ -165,27 +270,79 @@ class OutboundMessageState implements CDPQEntry {
|
||||
}
|
||||
|
||||
/**
|
||||
* The max number of sends for any fragment, which is the
|
||||
* same as the push count, at least as it's coded now.
|
||||
* The max number of sends for any fragment.
|
||||
* As of 0.9.49, may be less than getPushCount() if we pushed only some fragments
|
||||
*/
|
||||
public synchronized int getMaxSends() { return _maxSends; }
|
||||
|
||||
/**
|
||||
* The number of times we've pushed some fragments, which is the
|
||||
* same as the max sends, at least as it's coded now.
|
||||
* The number of times we've pushed some fragments.
|
||||
* As of 0.9.49, may be greater than getMaxSends() if we pushed only some fragments.
|
||||
*/
|
||||
public synchronized int getPushCount() { return _pushCount; }
|
||||
|
||||
/**
|
||||
* Note that we have pushed the message fragments.
|
||||
* Increments push count (and max sends... why?)
|
||||
* @return true if this is the first push
|
||||
* Add fragments up to the number of bytes allowed by setAllowedSendBytes()
|
||||
* Side effects: Clears setAllowedSendBytes. Increments pushCount. Increments maxSends if applicable.
|
||||
* Note: With multiple fragments, this will send only the fragments with the lowest push count.
|
||||
* Example: If push counts are 1 1 1 0 0, this will only send the last two fragments,
|
||||
* even if any of the first three need to be retransmitted.
|
||||
*
|
||||
* @param toSend out parameter
|
||||
* @return the number of Fragments added
|
||||
* @since 0.9.49
|
||||
*/
|
||||
public synchronized boolean push() {
|
||||
boolean rv = _pushCount == 0;
|
||||
// these will never be different...
|
||||
_pushCount++;
|
||||
_maxSends = _pushCount;
|
||||
public synchronized int push(List<Fragment> toSend) {
|
||||
int rv = 0;
|
||||
if (_allowedSendBytes <= 0 || _numFragments == 1) {
|
||||
// easy way
|
||||
// send all, or only one fragment
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i)) {
|
||||
toSend.add(new Fragment(this, i));
|
||||
rv++;
|
||||
if (_fragmentSends != null) {
|
||||
_fragmentSends[i]++;
|
||||
if (_fragmentSends[i] > _maxSends)
|
||||
_maxSends = _fragmentSends[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
if (_fragmentSends == null)
|
||||
_maxSends++;
|
||||
} else {
|
||||
// hard way.
|
||||
// send the fragments we've sent the least, up to the max size
|
||||
int minSendCount = getMinSendCount();
|
||||
int sent = 0;
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i)) {
|
||||
int count = _fragmentSends[i];
|
||||
if (count == minSendCount) {
|
||||
int sz = fragmentSize(i);
|
||||
if (sz <= _allowedSendBytes - sent) {
|
||||
sent += sz;
|
||||
toSend.add(new Fragment(this, i));
|
||||
rv++;
|
||||
_fragmentSends[i]++;
|
||||
if (_fragmentSends[i] > _maxSends)
|
||||
_maxSends = _fragmentSends[i];
|
||||
if (sent >= _allowedSendBytes)
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (rv > 0) {
|
||||
_pushCount++;
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Pushed " + rv + " fragments for " + toString());
|
||||
} else {
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Nothing pushed??? allowedSendBytes=" + _allowedSendBytes + " for " + toString());
|
||||
}
|
||||
_allowedSendBytes = 0;
|
||||
return rv;
|
||||
}
|
||||
|
||||
@@ -296,15 +453,28 @@ class OutboundMessageState implements CDPQEntry {
|
||||
StringBuilder buf = new StringBuilder(256);
|
||||
buf.append("OB Message ").append(_i2npMessage.getUniqueId());
|
||||
buf.append(" type ").append(_i2npMessage.getType());
|
||||
buf.append(" with ").append(_numFragments).append(" fragments");
|
||||
buf.append(" of size ").append(_messageBuf.length);
|
||||
buf.append(" size ").append(_messageBuf.length);
|
||||
if (_numFragments > 1)
|
||||
buf.append(" fragments: ").append(_numFragments);
|
||||
buf.append(" volleys: ").append(_maxSends);
|
||||
buf.append(" lifetime: ").append(getLifetime());
|
||||
if (!isComplete()) {
|
||||
buf.append(" pending fragments: ");
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i))
|
||||
buf.append(i).append(' ');
|
||||
if (_fragmentSends != null) {
|
||||
buf.append(" unacked fragments: ");
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i))
|
||||
buf.append(i).append(' ');
|
||||
}
|
||||
buf.append("sizes: ");
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
buf.append(fragmentSize(i)).append(' ');
|
||||
}
|
||||
buf.append("send counts: ");
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
buf.append(_fragmentSends[i]).append(' ');
|
||||
}
|
||||
} else {
|
||||
buf.append(" unacked");
|
||||
}
|
||||
}
|
||||
//buf.append(" to: ").append(_peer.toString());
|
||||
|
||||
@@ -609,39 +609,34 @@ public class PeerState {
|
||||
*
|
||||
* Caller should synch
|
||||
*/
|
||||
private boolean allocateSendingBytes(int size, int messagePushCount, long now) {
|
||||
private boolean allocateSendingBytes(OutboundMessageState state, long now, boolean resetWindow) {
|
||||
long duration = now - _lastSendRefill;
|
||||
if (duration >= 1000) {
|
||||
if (resetWindow || duration >= 1000) {
|
||||
_sendWindowBytesRemaining = _sendWindowBytes;
|
||||
_sendBytes += size;
|
||||
if (duration <= 0)
|
||||
duration = 10;
|
||||
_sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
|
||||
_sendBytes = 0;
|
||||
_lastSendRefill = now;
|
||||
}
|
||||
int messagePushCount = state.getPushCount();
|
||||
if (messagePushCount == 0 && _outboundMessages.size() > _concurrentMessagesAllowed) {
|
||||
_consecutiveRejections++;
|
||||
_context.statManager().addRateData("udp.rejectConcurrentActive", _outboundMessages.size(), _consecutiveRejections);
|
||||
return false;
|
||||
}
|
||||
if (_sendWindowBytesRemaining <= 0)
|
||||
return false;
|
||||
|
||||
// Ticket 2505
|
||||
// We always send all unacked fragments for a message,
|
||||
// because we don't have any mechanism in OutboundMessageFragments
|
||||
// to track the next send time for fragments individually.
|
||||
// Huge messages that are larger than the window size could never
|
||||
// get sent and block the outbound queue forever.
|
||||
// So we let it through when the window is empty (full window remaining).
|
||||
if (size <= _sendWindowBytesRemaining ||
|
||||
(size > _sendWindowBytes && _sendWindowBytesRemaining >= _sendWindowBytes)) {
|
||||
// move this check to getSendWindowBytesRemaining() ?
|
||||
if (messagePushCount == 0 && _outboundMessages.size() > _concurrentMessagesAllowed) {
|
||||
_consecutiveRejections++;
|
||||
_context.statManager().addRateData("udp.rejectConcurrentActive", _outboundMessages.size(), _consecutiveRejections);
|
||||
return false;
|
||||
} else if (messagePushCount == 0) {
|
||||
int size = state.getSendSize(_sendWindowBytesRemaining);
|
||||
if (size > 0) {
|
||||
if (messagePushCount == 0) {
|
||||
_context.statManager().addRateData("udp.allowConcurrentActive", _outboundMessages.size(), _concurrentMessagesAllowed);
|
||||
if (_consecutiveRejections > 0)
|
||||
_context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _outboundMessages.size());
|
||||
_consecutiveRejections = 0;
|
||||
}
|
||||
_sendWindowBytesRemaining -= size;
|
||||
if (_sendWindowBytesRemaining < 0)
|
||||
_sendWindowBytesRemaining = 0;
|
||||
_sendBytes += size;
|
||||
_lastSendTime = now;
|
||||
return true;
|
||||
@@ -853,8 +848,8 @@ public class PeerState {
|
||||
randomResends.add(rack.id);
|
||||
} else {
|
||||
iter.remove();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Expired ack " + rack.id + " sent " + (cutoff + RESEND_ACK_TIMEOUT - rack.time) +
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Expired ack " + rack.id + " sent " + (cutoff + RESEND_ACK_TIMEOUT - rack.time) +
|
||||
" ago, now " + _currentACKsResend.size() + " resend acks");
|
||||
}
|
||||
}
|
||||
@@ -875,8 +870,8 @@ public class PeerState {
|
||||
// acks already in _currentACKsResend.
|
||||
_currentACKsResend.offer(new ResendACK(messageId, _context.clock().now()));
|
||||
// trim happens in getCurrentResendACKs above
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sent ack " + messageId + " now " + _currentACKs.size() + " current and " +
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Sent ack " + messageId + " now " + _currentACKs.size() + " current and " +
|
||||
_currentACKsResend.size() + " resend acks");
|
||||
}
|
||||
// should we only do this if removed?
|
||||
@@ -1124,8 +1119,8 @@ public class PeerState {
|
||||
synchronized(this) {
|
||||
locked_messageACKed(bytesACKed, lifetime, numSends, anyPending, anyQueued);
|
||||
}
|
||||
if (numSends >= 2 && _log.shouldLog(Log.INFO))
|
||||
_log.info(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
|
||||
if (numSends >= 2 && _log.shouldDebug())
|
||||
_log.debug(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
|
||||
|
||||
_context.statManager().addRateData("udp.sendBps", _sendBps);
|
||||
}
|
||||
@@ -1442,7 +1437,7 @@ public class PeerState {
|
||||
_context.statManager().addRateData("udp.sendFailed", state.getPushCount());
|
||||
if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
|
||||
failed.add(state);
|
||||
} else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
|
||||
} else if (state.getMaxSends() > OutboundMessageFragments.MAX_VOLLEYS) {
|
||||
iter.remove();
|
||||
_context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount());
|
||||
if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
|
||||
@@ -1470,7 +1465,7 @@ public class PeerState {
|
||||
// it can not have an OutNetMessage if the source is the
|
||||
// final after establishment message
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Unable to send a direct message: " + state);
|
||||
_log.warn("Unable to send a direct message: " + state + " to: " + this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1496,10 +1491,11 @@ public class PeerState {
|
||||
if (rv != null && !rv.isEmpty()) {
|
||||
synchronized(this) {
|
||||
long old = _retransmitTimer;
|
||||
if (_retransmitTimer == 0)
|
||||
if (_retransmitTimer == 0) {
|
||||
_retransmitTimer = now + getRTO();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer);
|
||||
}
|
||||
}
|
||||
} else if (canSendOld) {
|
||||
// failsafe - push out or cancel timer to prevent looping
|
||||
@@ -1529,7 +1525,7 @@ public class PeerState {
|
||||
synchronized (_outboundMessages) {
|
||||
if (canSendOld) {
|
||||
for (OutboundMessageState state : _outboundMessages) {
|
||||
boolean should = locked_shouldSend(state, now);
|
||||
boolean should = locked_shouldSend(state, now, true);
|
||||
if (should) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
|
||||
@@ -1554,6 +1550,31 @@ public class PeerState {
|
||||
}
|
||||
}
|
||||
return rv;
|
||||
} else if (!_outboundMessages.isEmpty()) {
|
||||
// send some unsent fragments of pending messages, if any
|
||||
for (OutboundMessageState state : _outboundMessages) {
|
||||
if (!state.hasUnsentFragments())
|
||||
continue;
|
||||
boolean should = locked_shouldSend(state, now, false);
|
||||
if (should) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Allocate sending more fragments to " + _remotePeer + ": " + state.getMessageId());
|
||||
if (rv == null)
|
||||
rv = new ArrayList<OutboundMessageState>(_concurrentMessagesAllowed);
|
||||
rv.add(state);
|
||||
} else {
|
||||
// no more bandwidth available
|
||||
if (_log.shouldLog(Log.DEBUG)) {
|
||||
if (rv == null)
|
||||
_log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
|
||||
" / " + _outboundQueue.size() + " remaining");
|
||||
else
|
||||
_log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size());
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
}
|
||||
// fall through to new messages
|
||||
}
|
||||
// Peek at head of _outboundQueue and see if we can send it.
|
||||
// If so, pull it off, put it in _outbundMessages, test
|
||||
@@ -1561,7 +1582,7 @@ public class PeerState {
|
||||
OutboundMessageState state;
|
||||
synchronized (_outboundQueue) {
|
||||
while ((state = _outboundQueue.peek()) != null &&
|
||||
locked_shouldSend(state, now)) {
|
||||
locked_shouldSend(state, now, false)) {
|
||||
// This is guaranted to be the same as what we got in peek(),
|
||||
// due to locking and because we aren't using the dropping CDPBQ.
|
||||
// If we do switch to CDPBQ,
|
||||
@@ -1643,22 +1664,21 @@ public class PeerState {
|
||||
/**
|
||||
* Caller should synch
|
||||
*/
|
||||
private boolean locked_shouldSend(OutboundMessageState state, long now) {
|
||||
int size = state.getUnackedSize();
|
||||
if (allocateSendingBytes(size, state.getPushCount(), now)) {
|
||||
private boolean locked_shouldSend(OutboundMessageState state, long now, boolean resetWindow) {
|
||||
if (allocateSendingBytes(state, now, resetWindow)) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(_remotePeer + " Allocation of " + size + " allowed with "
|
||||
_log.debug(_remotePeer + " Allocation allowed with "
|
||||
+ getSendWindowBytesRemaining()
|
||||
+ "/" + getSendWindowBytes()
|
||||
+ " remaining"
|
||||
+ " for message " + state.getMessageId() + ": " + state);
|
||||
if (state.push())
|
||||
if (state.getPushCount() == 0)
|
||||
_messagesSent++;
|
||||
return true;
|
||||
} else {
|
||||
_context.statManager().addRateData("udp.sendRejected", state.getPushCount());
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(_remotePeer + " Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
|
||||
_log.info(_remotePeer + " Allocation rejected w/ wsize=" + getSendWindowBytes()
|
||||
+ " available=" + getSendWindowBytesRemaining()
|
||||
+ " for message " + state.getMessageId() + ": " + state);
|
||||
return false;
|
||||
@@ -1695,10 +1715,11 @@ public class PeerState {
|
||||
|
||||
if (state != null) {
|
||||
int numSends = state.getMaxSends();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Received ack of " + messageId + " by " + _remotePeer
|
||||
+ " after " + state.getLifetime() + " and " + numSends + " sends");
|
||||
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime());
|
||||
long lifetime = state.getLifetime();
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Received ack of " + messageId + " by " + _remotePeer
|
||||
+ " after " + lifetime + " and " + numSends + " sends");
|
||||
_context.statManager().addRateData("udp.sendConfirmTime", lifetime);
|
||||
if (state.getFragmentCount() > 1)
|
||||
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
|
||||
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
|
||||
@@ -1713,8 +1734,7 @@ public class PeerState {
|
||||
}
|
||||
}
|
||||
// this adjusts the rtt/rto/window/etc
|
||||
messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending, anyQueued);
|
||||
|
||||
messageACKed(state.getUnackedSize(), lifetime, numSends, anyPending, anyQueued);
|
||||
} else {
|
||||
// dupack, likely
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
@@ -1740,14 +1760,18 @@ public class PeerState {
|
||||
OutboundMessageState state = null;
|
||||
boolean isComplete = false;
|
||||
boolean anyPending;
|
||||
int ackedSize = 0;
|
||||
synchronized (_outboundMessages) {
|
||||
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
|
||||
state = iter.next();
|
||||
if (state.getMessageId() == messageId) {
|
||||
ackedSize = state.getUnackedSize();
|
||||
boolean complete = state.acked(bitfield);
|
||||
if (complete) {
|
||||
isComplete = true;
|
||||
iter.remove();
|
||||
} else {
|
||||
ackedSize -= state.getUnackedSize();
|
||||
}
|
||||
break;
|
||||
} else if (state.getPushCount() <= 0) {
|
||||
@@ -1768,18 +1792,23 @@ public class PeerState {
|
||||
int numACKed = bitfield.ackCount();
|
||||
_context.statManager().addRateData("udp.partialACKReceived", numACKed);
|
||||
|
||||
long lifetime = state.getLifetime();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
|
||||
+ " after " + state.getLifetime() + " and " + numSends + " sends: " + bitfield + ": completely removed? "
|
||||
+ isComplete + ": " + state);
|
||||
+ " after " + lifetime + " and " + numSends + " sends"
|
||||
+ " complete? " + isComplete
|
||||
+ " newly-acked: " + ackedSize
|
||||
+ ' ' + bitfield
|
||||
+ " for: " + state);
|
||||
|
||||
if (isComplete) {
|
||||
_context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime());
|
||||
_context.statManager().addRateData("udp.sendConfirmTime", lifetime);
|
||||
if (state.getFragmentCount() > 1)
|
||||
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
|
||||
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
|
||||
_transport.succeeded(state);
|
||||
|
||||
}
|
||||
if (ackedSize > 0) {
|
||||
boolean anyQueued;
|
||||
if (anyPending) {
|
||||
// locked_messageACKed will nudge()
|
||||
@@ -1790,7 +1819,7 @@ public class PeerState {
|
||||
}
|
||||
}
|
||||
// this adjusts the rtt/rto/window/etc
|
||||
messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending, anyQueued);
|
||||
messageACKed(ackedSize, lifetime, numSends, anyPending, anyQueued);
|
||||
}
|
||||
return isComplete;
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user