SSU2: Track dup packets received

Log tweaks
This commit is contained in:
zzz
2022-03-09 04:37:36 -05:00
parent a7a5b06b5c
commit e594b9532c
8 changed files with 70 additions and 27 deletions

View File

@@ -247,6 +247,21 @@ class InboundMessageState implements CDQEntry {
return true;
}
/**
* Do we have this fragment?
*
* SSU 2 only.
*
* @param fragmentNum the fragment number
* @return true if we have the fragment
* @since 0.9.54
*/
public boolean hasFragment(int fragmentNum) {
if (fragmentNum >= _fragments.length)
return false;
return _fragments[fragmentNum] != null;
}
/**
* May not be valid after released.
* Probably doesn't need to be synced by caller, given the order of

View File

@@ -196,6 +196,8 @@ class PacketBuilder2 {
int sz = block.getTotalLength();
off += sz;
sizeWritten += sz;
if (_log.shouldDebug())
_log.debug("Sending acks " + block + " to " + peer);
}
}

View File

@@ -187,7 +187,7 @@ public class PeerState {
private final AtomicBoolean _fastRetransmit = new AtomicBoolean();
/** how many dup packets were received within the last RETRANSMISSION_PERIOD_WIDTH packets */
private int _packetsReceivedDuplicate;
protected int _packetsReceivedDuplicate;
private int _packetsReceived;
private boolean _mayDisconnect;

View File

@@ -232,8 +232,9 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
byte[] getRcvHeaderEncryptKey2() { return _rcvHeaderEncryptKey2; }
SSU2Bitfield getReceivedMessages() {
if (_log.shouldDebug())
_log.debug("Sending acks " + _receivedMessages + " on " + this);
// logged in PacketBuilder2
//if (_log.shouldDebug())
// _log.debug("Sending acks " + _receivedMessages + " on " + this);
return _receivedMessages;
}
SSU2Bitfield getAckedMessages() { return _ackedMessages; }
@@ -286,13 +287,16 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
_rcvCha.setNonce(n);
// decrypt in-place
_rcvCha.decryptWithAd(header.data, data, off + SHORT_HEADER_SIZE, data, off + SHORT_HEADER_SIZE, len - SHORT_HEADER_SIZE);
//if (_log.shouldDebug())
// _log.debug("Packet " + n + " after full decryption:\n" + HexDump.dump(data, off, len - MAC_LEN));
if (_receivedMessages.set(n)) {
if (_log.shouldWarn())
_log.warn("dup pkt rcvd " + n + " on " + this);
return;
}
//if (_log.shouldDebug())
// _log.debug("Packet " + n + " after full decryption:\n" + HexDump.dump(data, off, len - MAC_LEN));
if (_receivedMessages.set(n)) {
synchronized(this) {
_packetsReceivedDuplicate++;
}
if (_log.shouldWarn())
_log.warn("dup pkt rcvd: " + n + " on " + this);
return;
}
int payloadLen = len - (SHORT_HEADER_SIZE + MAC_LEN);
if (_log.shouldInfo())
@@ -365,6 +369,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
InboundMessageState state;
boolean messageComplete = false;
boolean messageExpired = false;
boolean messageDup = false;
synchronized (_inboundMessages) {
state = _inboundMessages.get(messageId);
@@ -372,22 +377,34 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
state = new InboundMessageState(_context, messageId, _remotePeer, data, off, len, frag, isLast);
_inboundMessages.put(messageId, state);
} else {
boolean fragmentOK = state.receiveFragment(data, off, len, frag, isLast);
if (!fragmentOK)
return;
if (state.isComplete()) {
messageComplete = true;
_inboundMessages.remove(messageId);
} else if (state.isExpired()) {
messageExpired = true;
_inboundMessages.remove(messageId);
messageDup = state.hasFragment(frag);
if (!messageDup) {
boolean fragmentOK = state.receiveFragment(data, off, len, frag, isLast);
if (!fragmentOK)
return;
if (state.isComplete()) {
messageComplete = true;
_inboundMessages.remove(messageId);
} else if (state.isExpired()) {
messageExpired = true;
_inboundMessages.remove(messageId);
}
}
}
}
if (messageDup) {
synchronized(this) {
_packetsReceivedDuplicate++;
}
if (_log.shouldWarn())
_log.warn("dup fragment rcvd: " + frag + " for " + state);
return;
}
if (messageComplete) {
messageFullyReceived(messageId, state.getCompleteSize());
if (_log.shouldDebug())
messageFullyReceived(messageId, state.getCompleteSize());
if (_log.shouldDebug())
_log.debug("Message received completely! " + state);
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
_context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
@@ -609,7 +626,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
}
UDPPacket ack = _transport.getBuilder2().buildACK(PeerState2.this);
if (_log.shouldDebug())
_log.debug("Sending acks to " + PeerState2.this);
_log.debug("ACKTimer sending acks to " + PeerState2.this);
_transport.send(ack);
}
}

View File

@@ -308,14 +308,15 @@ class SSU2Bitfield {
}
}
}
sb.append(" RAW: ").append(thru).append(" A:").append(acnt);
sb.append(" (RAW: ").append(thru).append(" A:").append(acnt);
if (ranges != null) {
for (int i = 0; i < rangeCount * 2; i += 2) {
sb.append(" N:").append(ranges[i] & 0xff);
sb.append(" A:").append(ranges[i + 1] & 0xff);
}
}
return sb.toString();
}
sb.append(')');
return sb.toString();
}
@Override

View File

@@ -582,6 +582,11 @@ class SSU2Payload {
System.arraycopy(r, 0, tgt, off, rc * 2);
return off + (rc * 2);
}
@Override
public String toString() {
return SSU2Bitfield.toString(t, a, r, rc);
}
}
public static class AddressBlock extends Block {

View File

@@ -412,7 +412,10 @@ class UDPPacket implements CDPQEntry {
@Override
public String toString() {
verifyNotReleased();
synchronized(this) {
if (_released)
return "RELEASED PACKET";
}
StringBuilder buf = new StringBuilder(256);
buf.append(_packet.getLength());
buf.append(" byte pkt with ");

View File

@@ -193,8 +193,8 @@ class UDPSender {
public void add(UDPPacket packet) {
if (packet == null || !_keepRunning) return;
int psz = packet.getPacket().getLength();
if (psz > PeerState.MAX_MTU) {
_log.error("Dropping large UDP packet " + psz + " bytes: " + packet);
if (psz > PeerState2.MAX_MTU) {
_log.error("Dropping large UDP packet " + psz + " bytes: " + packet, new Exception());
return;
}
if (_dummy) {