SSU2: Fixes part 7

Clean up and optimize ack handling
Log tweaks and javadocs
This commit is contained in:
zzz
2022-03-10 10:27:19 -05:00
parent 97736cef1c
commit 393ee71ad9
2 changed files with 157 additions and 44 deletions

View File

@@ -457,11 +457,15 @@ public class PeerState {
/**
* The AES key used to verify packets, set only after the connection is
* established.
*
* SSU 1 only.
*/
SessionKey getCurrentMACKey() { return _currentMACKey; }
/**
* The AES key used to encrypt/decrypt packets, set only after the
* connection is established.
*
* SSU 1 only.
*/
SessionKey getCurrentCipherKey() { return _currentCipherKey; }
@@ -469,6 +473,8 @@ public class PeerState {
* The pending AES key for verifying packets if we are rekeying the
* connection, or null if we are not in the process of rekeying.
*
* SSU 1 only.
*
* @return null always, rekeying unimplemented
*/
SessionKey getNextMACKey() { return _nextMACKey; }
@@ -478,6 +484,8 @@ public class PeerState {
* rekeying the connection, or null if we are not in the process
* of rekeying.
*
* SSU 1 only.
*
* @return null always, rekeying unimplemented
*/
SessionKey getNextCipherKey() { return _nextCipherKey; }
@@ -882,6 +890,8 @@ public class PeerState {
* even if there is room,
* or the packets will have way too much overhead.
*
* SSU 1 only.
*
* @return a new list, do as you like with it
*/
List<Long> getCurrentFullACKs() {
@@ -901,6 +911,8 @@ public class PeerState {
* even if there is room,
* or the packets will have way too much overhead.
*
* SSU 1 only.
*
* @return a new list, do as you like with it
* @since 0.8.12 was included in getCurrentFullACKs()
*/
@@ -930,6 +942,8 @@ public class PeerState {
/**
* The ack was sent.
* Side effect - sets _lastACKSend
*
* SSU 1 only.
*/
void removeACKMessage(Long messageId) {
boolean removed = _currentACKs.remove(messageId);
@@ -956,6 +970,8 @@ public class PeerState {
* Side effect - sets _lastACKSend to now if rv is non-empty.
* Side effect - sets _wantACKSendSince to 0 if _currentACKs is now empty.
*
* SSU 1 only.
*
* @return non-null, possibly empty
*/
private List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
@@ -1047,6 +1063,8 @@ public class PeerState {
}
/**
* SSU 1 only.
*
* @param rv out parameter, populated with true partial ACKBitfields.
* no full bitfields are included.
*/
@@ -1084,6 +1102,8 @@ public class PeerState {
/**
* A dummy "partial" ack which represents a full ACK of a message
*
* SSU 1 only.
*/
private static class FullACKBitfield implements ACKBitfield {
private final long _msgId;
@@ -1344,6 +1364,9 @@ public class PeerState {
/**
* All acks have been sent.
*
* SSU 1 only.
*
* @since 0.9.52
*/
synchronized void clearWantedACKSendSince() {
@@ -1359,12 +1382,16 @@ public class PeerState {
* So just use a fixed threshold of half the resend acks, so that if the
* packet is lost the acks have a decent chance of getting retransmitted.
* Used only by ACKSender.
*
* SSU 1 only.
*/
boolean unsentACKThresholdReached() {
return _currentACKs.size() >= MAX_RESEND_ACKS / 2;
}
/**
* SSU 1 only.
*
* @return how many bytes available for acks in an ack-only packet, == MTU - 83
* Max of 1020
*/
@@ -1791,6 +1818,8 @@ public class PeerState {
* A full ACK was received.
* TODO if messages awaiting ack were a HashMap&lt;Long, OutboundMessageState&gt; this would be faster.
*
* SSU 1 only.
*
* @param highestSeqNumAcked in/out param, will modify if this seq. number is higher
* @return true if the message was acked for the first time
*/
@@ -1839,15 +1868,12 @@ public class PeerState {
long sn = state.getSeqNum();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
// null for PS2
if (_ackedMessages != null) {
synchronized(_ackedMessages) {
_ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn));
}
synchronized(_ackedMessages) {
_ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn));
}
// this adjusts the rtt/rto/window/etc
messageACKed(state.getUnackedSize(), lifetime, numSends, anyPending, anyQueued);
} else if (_ackedMessages != null) { // null for PS2
} else {
// dupack, likely
Long seq;
synchronized(_ackedMessages) {
@@ -1867,6 +1893,8 @@ public class PeerState {
/**
* A partial ACK was received. This is much less common than full ACKs.
*
* SSU 1 only.
*
* @param highestSeqNumAcked in/out param, will modify if this seq. number is higher
* @return true if any fragment of the message was completely acked for the first time
*/
@@ -1922,12 +1950,12 @@ public class PeerState {
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
_transport.succeeded(state);
if (_log.shouldDebug())
_log.debug("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
_log.debug("Received partial ack of " + messageId + " by " + _remotePeer
+ " newly-acked: " + ackedSize
+ ", now complete for: " + state);
} else {
if (_log.shouldInfo())
_log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
if (_log.shouldDebug())
_log.debug("Received partial ack of " + messageId + " by " + _remotePeer
+ " after " + lifetime + " and " + numSends + " sends"
+ " complete? false"
+ " newly-acked: " + ackedSize
@@ -1975,6 +2003,102 @@ public class PeerState {
}
}
/**
* An ACK of a fragment was received.
*
* SSU 2 only.
*
* @return true if this fragment of the message was acked for the first time
*/
protected boolean acked(PacketBuilder.Fragment f) {
if (_dead)
return false;
final OutboundMessageState state = f.state;
boolean isComplete;
int ackedSize;
synchronized(state) {
ackedSize = state.getUnackedSize();
if (ackedSize <= 0)
return false;
isComplete = state.acked(f.num);
if (!isComplete)
ackedSize -= state.getUnackedSize();
}
if (ackedSize <= 0)
return false;
boolean anyPending;
synchronized (_outboundMessages) {
if (isComplete) {
long sn = state.getSeqNum();
boolean found = false;
// we don't do _outboundMessages.remove() so we can use the cached iterator
// and break out early
for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
OutboundMessageState state2 = iter.next();
if (state == state2) {
iter.remove();
found = true;
break;
} else if (state2.getSeqNum() > sn) {
// _outboundMessages is ordered, so once we get to a msg
// with a higher sequence number, we can stop
break;
}
}
if (!found) {
// shouldn't happen except on race
if (_log.shouldWarn())
_log.warn("Acked but not found in outbound messages: " + state);
return false;
}
}
anyPending = !_outboundMessages.isEmpty();
}
int numSends = state.getMaxSends();
_context.statManager().addRateData("udp.partialACKReceived", 1);
long lifetime = state.getLifetime();
if (isComplete) {
_context.statManager().addRateData("udp.sendConfirmTime", lifetime);
if (state.getFragmentCount() > 1)
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount());
_context.statManager().addRateData("udp.sendConfirmVolley", numSends);
_transport.succeeded(state);
if (_log.shouldDebug()) {
if (state.getFragmentCount() > 1) {
_log.debug("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
+ " newly-acked: " + ackedSize
+ ", now complete for: " + state);
} else {
_log.debug("Received ack of " + state.getMessageId() + " by " + _remotePeer
+ " after " + lifetime + " and " + numSends + " sends");
}
}
} else {
if (_log.shouldDebug())
_log.debug("Received partial ack of " + state.getMessageId() + " by " + _remotePeer
+ " after " + lifetime + " and " + numSends + " sends"
+ " complete? false"
+ " newly-acked: " + ackedSize
+ " fragment: " + f.num
+ " for: " + state);
}
state.clearNACKs();
boolean anyQueued;
if (anyPending) {
// locked_messageACKed will nudge()
anyQueued = false;
} else {
synchronized (_outboundQueue) {
anyQueued = !_outboundQueue.isEmpty();
}
}
// this adjusts the rtt/rto/window/etc
messageACKed(ackedSize, lifetime, numSends, anyPending, anyQueued);
return true;
}
/**
* Enter or leave fast retransmit mode, and adjust
* SST and window variables accordingly.
@@ -2070,6 +2194,8 @@ public class PeerState {
/**
* Transfer the basic activity/state from the old peer to the current peer
*
* SSU 1 only.
*
* @param oldPeer non-null
*/
void loadFrom(PeerState oldPeer) {
@@ -2133,6 +2259,9 @@ public class PeerState {
/**
* A message ID and a timestamp. Used for the resend ACKS.
*
* SSU 1 only.
*
* @since 0.9.17
*/
private static class ResendACK {
@@ -2148,6 +2277,9 @@ public class PeerState {
/**
* Message ID to sequence number.
* Insertion order. Caller must synch.
*
* SSU 1 only.
*
* @since 0.9.49
*/
private static class AckedMessages extends LinkedHashMap<Integer, Long> {
@@ -2160,6 +2292,9 @@ public class PeerState {
/**
* A timer to send an ack-only packet.
*
* SSU 1 only.
*
* @since 0.9.52
*/
private class ACKTimer extends SimpleTimer2.TimedEvent {
@@ -2249,7 +2384,10 @@ public class PeerState {
if (_remotePeer != null)
buf.append(" ").append(_remotePeer.toBase64().substring(0,6));
buf.append(_isInbound? " IB " : " OB ");
if (getVersion() == 2)
buf.append(_isInbound? " IB2 " : " OB2 ");
else
buf.append(_isInbound? " IB " : " OB ");
long now = _context.clock().now();
buf.append(" recvAge: ").append(now-_lastReceiveTime);
buf.append(" sendAge: ").append(now-_lastSendFullyTime);

View File

@@ -527,46 +527,21 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
}
if (_log.shouldDebug())
_log.debug("New ACK of pkt " + pktNum + " containing " + fragments.size() + " fragments on " + this);
ModifiableLong highestSeqNumAcked = new ModifiableLong(-1);
long highest = -1;
for (PacketBuilder.Fragment f : fragments) {
OutboundMessageState state = f.state;
if (state.isComplete()) {
if (_log.shouldWarn())
_log.warn("New ACK but state complete? " + state);
continue;
}
int num = f.num;
if (!state.needsSending(num)) {
if (acked(f)) {
if (_log.shouldDebug())
_log.debug("New ACK of fragment " + f.num + " of " + state);
} else {
// will happen with retransmission as a different packet number
if (_log.shouldWarn())
_log.warn("New ACK for fragment " + num + " but already acked? " + state);
continue;
}
int ackedSize = state.getUnackedSize();
boolean complete = state.acked(f.num);
if (complete) {
if (_log.shouldDebug())
_log.debug("Received ACK of fragment " + num + " of " + state +
", now complete");
acked(state.getMessageId(), highestSeqNumAcked);
if (ackedSize > 0) {
// TODO acked() will have an ackedSize of 0, so do it here also
messageACKed(ackedSize, state.getLifetime(), state.getMaxSends(), false, false);
}
} else {
ackedSize -= state.getUnackedSize();
if (_log.shouldDebug())
_log.debug("Received ACK of fragment " + num + " of " + state +
", still incomplete");
if (ackedSize > 0) {
state.clearNACKs();
// this adjusts the rtt/rto/window/etc
// flags TODO
messageACKed(ackedSize, state.getLifetime(), state.getMaxSends(), false, false);
}
_log.warn("Dup ACK of fragment " + f.num + " of " + state);
}
long sn = state.getSeqNum();
if (sn > highest)
highest = sn;
}
long highest = highestSeqNumAcked.value;
if (highest >= 0)
highestSeqNumAcked(highest);
}