SSU: Don't resend ACKS that are too old (ticket #772)

untested
This commit is contained in:
zzz
2014-10-21 18:37:11 +00:00
parent 239fe518a9
commit bcbda3cd27

View File

@@ -106,7 +106,7 @@ class PeerState {
* an ACK for. We keep a few of these around to retransmit with _currentACKs,
* hopefully saving some spurious retransmissions
*/
private final Queue<Long> _currentACKsResend;
private final Queue<ResendACK> _currentACKsResend;
/** when did we last send ACKs to the peer? */
private volatile long _lastACKSend;
@@ -306,6 +306,22 @@ class PeerState {
private static final int MAX_RTO = 15*1000;
private static final int CLOCK_SKEW_FUDGE = (ACKSender.ACK_FREQUENCY * 2) / 3;
/**
* The max number of acks we save to send as duplicates
*/
private static final int MAX_RESEND_ACKS = 64;
/**
* The max number of duplicate acks sent in each ack-only messge.
* Doesn't really matter, we have plenty of room...
* @since 0.7.13
*/
private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS / 3;
/** for small MTU */
private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS / 5;
private static final long RESEND_ACK_TIMEOUT = 5*60*1000;
public PeerState(RouterContext ctx, UDPTransport transport,
byte[] remoteIP, int remotePort, Hash remotePeer, boolean isInbound) {
_context = ctx;
@@ -317,7 +333,7 @@ class PeerState {
_lastSendTime = now;
_lastReceiveTime = now;
_currentACKs = new ConcurrentHashSet<Long>();
_currentACKsResend = new LinkedBlockingQueue<Long>();
_currentACKsResend = new LinkedBlockingQueue<ResendACK>();
_sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
_sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
_slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
@@ -877,10 +893,25 @@ class PeerState {
* @since 0.8.12 was included in getCurrentFullACKs()
*/
public List<Long> getCurrentResendACKs() {
List<Long> randomResends = new ArrayList<Long>(_currentACKsResend);
Collections.shuffle(randomResends, _context.random());
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Returning " + randomResends.size() + " resend acks");
int sz = _currentACKsResend.size();
List<Long> randomResends = new ArrayList<Long>(sz);
if (sz > 0) {
long cutoff = _context.clock().now() - RESEND_ACK_TIMEOUT;
int i = 0;
for (Iterator<ResendACK> iter = _currentACKsResend.iterator(); iter.hasNext(); ) {
ResendACK rack = iter.next();
if (rack.time > cutoff && i++ < MAX_RESEND_ACKS) {
randomResends.add(rack.id);
} else {
iter.remove();
if (_log.shouldLog(Log.INFO))
_log.info("Expired ack " + rack.id + " sent " + (cutoff + RESEND_ACK_TIMEOUT - rack.time) +
" ago, now " + i + " resend acks");
}
}
if (i > 1)
Collections.shuffle(randomResends, _context.random());
}
return randomResends;
}
@@ -891,12 +922,10 @@ class PeerState {
public void removeACKMessage(Long messageId) {
boolean removed = _currentACKs.remove(messageId);
if (removed) {
// only add if reoved from current, as this may be called for
// only add if removed from current, as this may be called for
// acks already in _currentACKsResend.
_currentACKsResend.offer(messageId);
// trim down the resends
while (_currentACKsResend.size() > MAX_RESEND_ACKS)
_currentACKsResend.poll();
_currentACKsResend.offer(new ResendACK(messageId, _context.clock().now()));
// trim happens in getCurrentResendACKs above
if (_log.shouldLog(Log.INFO))
_log.info("Sent ack " + messageId + " now " + _currentACKs.size() + " current and " +
_currentACKsResend.size() + " resend acks");
@@ -905,19 +934,6 @@ class PeerState {
_lastACKSend = _context.clock().now();
}
/**
* The max number of acks we save to send as duplicates
*/
private static final int MAX_RESEND_ACKS = 64;
/**
* The max number of duplicate acks sent in each ack-only messge.
* Doesn't really matter, we have plenty of room...
* @since 0.7.13
*/
private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS / 3;
/** for small MTU */
private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS / 5;
/**
* grab a list of ACKBitfield instances, some of which may fully
* ACK a message while others may only partially ACK a message.
@@ -970,13 +986,11 @@ class PeerState {
if (_currentACKs.isEmpty())
_wantACKSendSince = -1;
if (alwaysIncludeRetransmissions || !rv.isEmpty()) {
List<Long> randomResends = new ArrayList<Long>(_currentACKsResend);
List<Long> randomResends = getCurrentResendACKs();
// now repeat by putting in some old ACKs
// randomly selected from the Resend queue.
// Maybe we should only resend each one a certain number of times...
int oldIndex = Math.min(resendSize, maxResendAcks);
if (oldIndex > 0 && oldIndex < resendSize)
Collections.shuffle(randomResends, _context.random());
iter = randomResends.iterator();
while (bytesRemaining >= 4 && oldIndex-- > 0 && iter.hasNext()) {
Long cur = iter.next();
@@ -991,13 +1005,18 @@ class PeerState {
bytesRemaining -= 4;
//}
}
for (Long val : currentACKsRemoved) {
_currentACKsResend.offer(val);
if (!currentACKsRemoved.isEmpty()) {
long now = _context.clock().now();
for (Long val : currentACKsRemoved) {
_currentACKsResend.offer(new ResendACK(val, now));
}
// trim happens in getCurrentResendACKs above
}
}
// trim down the resends
while (_currentACKsResend.size() > MAX_RESEND_ACKS)
_currentACKsResend.poll();
int partialIncluded = 0;
if (bytesRemaining > 4) {
@@ -1956,13 +1975,13 @@ class PeerState {
if (!_dead) {
_currentACKs.addAll(tmp);
}
tmp.clear();
tmp.addAll(oldPeer._currentACKsResend);
List<ResendACK> tmp3 = new ArrayList<ResendACK>();
tmp3.addAll(oldPeer._currentACKsResend);
oldPeer._currentACKsResend.clear();
if (!_dead) {
_currentACKsResend.addAll(tmp);
_currentACKsResend.addAll(tmp3);
}
Map<Long, InboundMessageState> msgs = new HashMap<Long, InboundMessageState>();
@@ -1999,6 +2018,20 @@ class PeerState {
return _transport;
}
/**
* A message ID and a timestamp. Used for the resend ACKS.
* @since 0.9.17
*/
private static class ResendACK {
public final Long id;
public final long time;
public ResendACK(Long id, long time) {
this.id = id;
this.time = time;
}
}
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
/*