- More synchronization fixes
   - Reduce chance of dup acks in a single message
   - Reduce max unsent acks to 50
   - Use last ack time in ping decision too
   - Reduce ack delay
This commit is contained in:
zzz
2012-10-05 13:08:05 +00:00
parent 090d59fcb7
commit b20e298f6e
3 changed files with 96 additions and 43 deletions

View File

@@ -28,7 +28,7 @@ class ACKSender implements Runnable {
private static final long POISON_PS = -9999999999l;
/** how frequently do we want to send ACKs to a peer? */
static final int ACK_FREQUENCY = 500;
static final int ACK_FREQUENCY = 350;
public ACKSender(RouterContext ctx, UDPTransport transport) {
_context = ctx;
@@ -73,12 +73,12 @@ class ACKSender implements Runnable {
_peersToACK.clear();
}
private long ackFrequency(long timeSinceACK, long rtt) {
private static long ackFrequency(long timeSinceACK, long rtt) {
// if we are actively pumping lots of data to them, we can depend upon
// the unsentACKThreshold to figure out when to send an ACK instead of
// using the timer, so we can set the timeout/frequency higher
if (timeSinceACK < 2*1000)
return Math.max(rtt/2, 500);
return Math.max(rtt/2, ACK_FREQUENCY);
else
return ACK_FREQUENCY;
}
@@ -162,15 +162,15 @@ class ACKSender implements Runnable {
}
if (!ackBitfields.isEmpty()) {
_context.statManager().addRateData("udp.sendACKCount", ackBitfields.size(), 0);
_context.statManager().addRateData("udp.sendACKCount", ackBitfields.size());
if (remaining > 0)
_context.statManager().addRateData("udp.sendACKRemaining", remaining, 0);
_context.statManager().addRateData("udp.sendACKRemaining", remaining);
// set above before the break
//now = _context.clock().now();
if (lastSend < 0)
lastSend = now - 1;
_context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
//_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size(), 0);
//_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size());
UDPPacket ack = _builder.buildACK(peer, ackBitfields);
ack.markType(1);
ack.setFragmentCount(-1);
@@ -193,7 +193,7 @@ class ACKSender implements Runnable {
ackPeer(peer);
}
} else {
_context.statManager().addRateData("udp.abortACK", 1, 0);
_context.statManager().addRateData("udp.abortACK", 1);
}
}
}

View File

@@ -185,14 +185,15 @@ class IntroductionManager {
*/
public void pingIntroducers() {
// Try to keep the connection up for two hours after we made anybody an introducer
long pingCutoff = _context.clock().now() - (105 * 60 * 1000);
long inactivityCutoff = _context.clock().now() - UDPTransport.MIN_EXPIRE_TIMEOUT;
long now = _context.clock().now();
long pingCutoff = now - (105 * 60 * 1000);
long inactivityCutoff = now - UDPTransport.MIN_EXPIRE_TIMEOUT;
for (PeerState cur : _inbound) {
if (cur.getIntroducerTime() > pingCutoff &&
cur.getLastSendTime() < inactivityCutoff) {
if (_log.shouldLog(Log.INFO))
_log.info("Pinging introducer: " + cur);
cur.setLastSendTime(_context.clock().now());
cur.setLastSendTime(now);
_transport.send(_builder.buildPing(cur));
}
}

View File

@@ -406,13 +406,20 @@ class PeerState {
public long getLastReceiveTime() { return _lastReceiveTime; }
/** how many seconds have we sent packets without any ACKs received? */
public int getConsecutiveFailedSends() { return _consecutiveFailedSends; }
/** have we received a packet with the ECN bit set in the current second? */
/**
* have we received a packet with the ECN bit set in the current second?
* @return false always
* @deprecated unused, ECNs are never sent, always returns false
*/
public boolean getCurrentSecondECNReceived() { return _currentSecondECNReceived; }
/**
* have all of the packets received in the current second requested that
* the previous second's ACKs be sent?
*/
//public boolean getRemoteWantsPreviousACKs() { return _remoteWantsPreviousACKs; }
/** how many bytes should we send to the peer in a second */
public int getSendWindowBytes() { return _sendWindowBytes; }
/** how many bytes can we send to the peer in the current second */
@@ -548,10 +555,12 @@ class PeerState {
public void setLastPingTime(long when) { _lastPingTime = when; }
/**
* Latest of last sent and last ping
* Latest of last sent, last ACK, last ping
* @since 0.9.3
*/
public long getLastSendOrPingTime() { return Math.max(_lastSendTime, _lastPingTime); }
public long getLastSendOrPingTime() {
return Math.max(Math.max(_lastSendTime, _lastACKSend), _lastPingTime);
}
/** return the smoothed send transfer rate */
public int getSendBps() { return _sendBps; }
@@ -754,7 +763,8 @@ class PeerState {
/**
* either they told us to back off, or we had to resend to get
* the data through.
*
* Caller should synch on this
* @return true if window shrunk, but nobody uses the return value
*/
private boolean congestionOccurred() {
long now = _context.clock().now();
@@ -762,8 +772,6 @@ class PeerState {
return false; // only shrink once every few seconds
_lastCongestionOccurred = now;
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps);
int congestionAt = _sendWindowBytes;
//if (true)
// _sendWindowBytes -= 10000;
@@ -819,7 +827,10 @@ class PeerState {
return randomResends;
}
/** the ack was sent */
/**
* The ack was sent.
* Side effect - sets _lastACKSend
*/
public void removeACKMessage(Long messageId) {
boolean removed = _currentACKs.remove(messageId);
if (removed) {
@@ -840,15 +851,15 @@ class PeerState {
/**
* The max number of acks we save to send as duplicates
*/
private static final int MAX_RESEND_ACKS = 16;
private static final int MAX_RESEND_ACKS = 64;
/**
* The max number of duplicate acks sent in each ack-only messge.
* Doesn't really matter, we have plenty of room...
* @since 0.7.13
*/
private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS;
private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS / 3;
/** for small MTU */
private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS;
private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS / 5;
/**
* grab a list of ACKBitfield instances, some of which may fully
@@ -867,11 +878,11 @@ class PeerState {
* See above. Only called by ACKSender with alwaysIncludeRetransmissions = false.
* So this is only for ACK-only packets, so all the size limiting is useless.
* FIXME.
* Side effect - sets _lastACKSend if rv is non-empty
*
* @return non-null, possibly empty
*/
public List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
List<ACKBitfield> rv = new ArrayList(MAX_RESEND_ACKS);
int bytesRemaining = countMaxACKData();
// Limit the overhead of all the resent acks when using small MTU
@@ -883,8 +894,10 @@ class PeerState {
maxResendAcks = MAX_RESEND_ACKS_SMALL;
else
maxResendAcks = MAX_RESEND_ACKS_LARGE;
List<Long> randomResends = new ArrayList(_currentACKsResend);
List<ACKBitfield> rv = new ArrayList(maxResendAcks);
// save to add to currentACKsResend later so we don't include twice
List<Long> currentACKsRemoved = new ArrayList(_currentACKs.size());
// As explained above, we include the acks in any order
// since we are unlikely to get backed up -
// just take them using the Set iterator.
@@ -894,12 +907,13 @@ class PeerState {
iter.remove();
long id = val.longValue();
rv.add(new FullACKBitfield(id));
_currentACKsResend.offer(val);
currentACKsRemoved.add(val);
bytesRemaining -= 4;
}
if (_currentACKs.isEmpty())
_wantACKSendSince = -1;
if (alwaysIncludeRetransmissions || !rv.isEmpty()) {
List<Long> randomResends = new ArrayList(_currentACKsResend);
// now repeat by putting in some old ACKs
// randomly selected from the Resend queue.
// Maybe we should only resend each one a certain number of times...
@@ -920,6 +934,9 @@ class PeerState {
bytesRemaining -= 4;
//}
}
for (Long val : currentACKsRemoved) {
_currentACKsResend.offer(val);
}
}
// trim down the resends
while (_currentACKsResend.size() > MAX_RESEND_ACKS)
@@ -947,7 +964,8 @@ class PeerState {
}
}
_lastACKSend = _context.clock().now();
if (!rv.isEmpty())
_lastACKSend = _context.clock().now();
//if (rv == null)
// rv = Collections.EMPTY_LIST;
if (partialIncluded > 0)
@@ -1015,8 +1033,11 @@ class PeerState {
public String toString() { return "Full ACK of " + _msgId; }
}
/** we sent a message which was ACKed containing the given # of bytes */
public void messageACKed(int bytesACKed, long lifetime, int numSends) {
/**
* We sent a message which was ACKed containing the given # of bytes.
* Caller should synch on this
*/
private void locked_messageACKed(int bytesACKed, long lifetime, int numSends) {
_concurrentMessagesActive--;
if (_concurrentMessagesActive < 0)
_concurrentMessagesActive = 0;
@@ -1030,15 +1051,15 @@ class PeerState {
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
} else {
if (false) {
_sendWindowBytes += 16; // why 16?
} else {
//if (false) {
// _sendWindowBytes += 16; // why 16?
//} else {
float prob = ((float)bytesACKed) / ((float)(_sendWindowBytes<<1));
float v = _context.random().nextFloat();
if (v < 0) v = 0-v;
if (v <= prob)
_sendWindowBytes += bytesACKed; //512; // bytesACKed;
}
//}
}
} else {
int allow = _concurrentMessagesAllowed - 1;
@@ -1051,21 +1072,31 @@ class PeerState {
_lastReceiveTime = _context.clock().now();
_lastSendFullyTime = _lastReceiveTime;
if (true) {
//if (true) {
if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
_sendWindowBytesRemaining += bytesACKed;
else
_sendWindowBytesRemaining = _sendWindowBytes;
}
//}
_messagesSent++;
if (numSends < 2) {
synchronized (this) {
// caller synchs
//synchronized (this) {
recalculateTimeouts(lifetime);
adjustMTU();
}
//}
}
else if (_log.shouldLog(Log.INFO))
}
/**
* We sent a message which was ACKed containing the given # of bytes.
*/
private void messageACKed(int bytesACKed, long lifetime, int numSends) {
synchronized(this) {
locked_messageACKed(bytesACKed, lifetime, numSends);
}
if (numSends >= 2 && _log.shouldLog(Log.INFO))
_log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
_context.statManager().addRateData("udp.sendBps", _sendBps, lifetime);
@@ -1144,9 +1175,10 @@ class PeerState {
_packetsRetransmitted = packets;
}
*****/
congestionOccurred();
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps);
_context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation);
synchronized (this) {
congestionOccurred();
adjustMTU();
}
//_rto *= 2;
@@ -1208,10 +1240,14 @@ class PeerState {
}
/**
* we received a backoff request, so cut our send window
* We received a backoff request, so cut our send window.
* NOTE: ECN sending is unimplemented, this is never called.
*/
public void ECNReceived() {
congestionOccurred();
synchronized(this) {
congestionOccurred();
}
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps);
_currentSecondECNReceived = true;
_lastReceiveTime = _context.clock().now();
}
@@ -1222,17 +1258,33 @@ class PeerState {
/** when did we last send an ACK to the peer? */
public long getLastACKSend() { return _lastACKSend; }
/** @deprecated unused */
public void setLastACKSend(long when) { _lastACKSend = when; }
public long getWantedACKSendSince() { return _wantACKSendSince; }
/**
* Are we out of room to send all the current unsent acks in a single packet?
* This is a huge threshold (134 for small MTU and 255 for large MTU)
* that is rarely if ever exceeded in practice.
* So just use a fixed threshold of half the resend acks, so that if the
* packet is lost the acks have a decent chance of getting retransmitted.
* Used only by ACKSender.
*/
public boolean unsentACKThresholdReached() {
int threshold = countMaxACKData() / 4;
return _currentACKs.size() >= threshold;
//int threshold = countMaxACKData() / 4;
//return _currentACKs.size() >= threshold;
return _currentACKs.size() >= MAX_RESEND_ACKS / 2;
}
/** @return MTU - 83 */
/**
* @return how many bytes available for acks in an ack-only packet, == MTU - 83
* Max of 1020
*/
private int countMaxACKData() {
return _mtu
return Math.min(PacketBuilder.ABSOLUTE_MAX_ACKS * 4,
_mtu
- PacketBuilder.IP_HEADER_SIZE
- PacketBuilder.UDP_HEADER_SIZE
- UDPPacket.IV_SIZE
@@ -1241,7 +1293,7 @@ class PeerState {
- 4 // timestamp
- 1 // data flag
- 1 // # ACKs
- 16; // padding safety
- 16); // padding safety
}
private int minRTO() {