SSU: PacketHandler cleanup (prep for SSU2)

This commit is contained in:
zzz
2022-02-22 10:22:16 -05:00
parent f0ad921fd2
commit 80535875ad

View File

@@ -82,8 +82,6 @@ class PacketHandler {
_handlers[i] = new Handler();
}
//_context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.droppedInvalidUnkown", "How old the packet we dropped due to invalidity (unkown type) was", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.droppedInvalidReestablish", "How old the packet we dropped due to invalidity (doesn't use existing key, not an establishment) was", "udp", UDPTransport.RATES);
@@ -94,22 +92,10 @@ class PacketHandler {
_context.statManager().createRateStat("udp.droppedInvalidInboundEstablish", "How old the packet we dropped due to invalidity (inbound establishment, bad key) was", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.droppedInvalidSkew", "How skewed the packet we dropped due to invalidity (valid except bad skew) was", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.destroyedInvalidSkew", "Destroyed session due to bad skew", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (period is dequeue time)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.packetValidateMultipleCount", "How many times we validate a packet, if done more than once (period = afterValidate-enqueue)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.packetNoValidationLifetime", "How long packets that are never validated are around for", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.sessionRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.sessionConfirmed", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.sessionCreated", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.dataKnown", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.dataKnownAck", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.dataUnknown", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receivePacketSize.dataUnknownAck", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.test", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.relayRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.relayIntro", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.receivePacketSize.relayResponse", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES);
}
public synchronized void startup() {
@@ -130,7 +116,7 @@ class PacketHandler {
rv.append("Handlers: ").append(_handlers.length);
for (int i = 0; i < _handlers.length; i++) {
Handler handler = _handlers[i];
rv.append(" handler ").append(i).append(" state: ").append(handler._state);
rv.append(" handler ").append(i);
}
return rv.toString();
}
@@ -193,163 +179,108 @@ class PacketHandler {
return packet.validate(key, _transport.getHMAC());
}
/** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */
private static final short OUTBOUND_FALLBACK = 1;
/** the packet is from a peer we are establishing an inbound con to, but failed validation, so fallback */
private static final short INBOUND_FALLBACK = 2;
/** the packet is not from anyone we know */
private static final short NEW_PEER = 3;
private enum PeerType {
/** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */
OUTBOUND_FALLBACK,
/** the packet is from a peer we are establishing an inbound con to, but failed validation, so fallback */
INBOUND_FALLBACK,
/** the packet is not from anyone we know */
NEW_PEER
}
private class Handler implements Runnable {
private final UDPPacketReader _reader;
// TODO comment out all uses of _state
public /* volatile */ int _state;
public Handler() {
_reader = new UDPPacketReader(_context);
}
public void run() {
_state = 1;
while (_keepReading) {
_state = 2;
UDPPacket packet = receiveNext();
_state = 3;
if (packet == null) break; // keepReading is probably false, or bind failed...
packet.received();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Received: " + packet);
_state = 4;
//long queueTime = packet.getLifetime();
//long handleStart = _context.clock().now();
try {
_state = 5;
handlePacket(_reader, packet);
_state = 6;
} catch (RuntimeException e) {
_state = 7;
if (_log.shouldLog(Log.ERROR))
_log.error("Crazy error handling a packet: " + packet, e);
}
//long handleTime = _context.clock().now() - handleStart;
//packet.afterHandling();
//_context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime());
//_context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime());
_state = 8;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Done receiving: " + packet);
/********
if (handleTime > 1000) {
if (_log.shouldLog(Log.WARN))
_log.warn("Took " + handleTime + " to process the packet "
+ packet + ": " + _reader);
}
long enqueueTime = packet.getEnqueueTime();
long recvTime = packet.getReceivedTime();
long beforeValidateTime = packet.getBeforeValidate();
long afterValidateTime = packet.getAfterValidate();
int validateCount = packet.getValidateCount();
long timeToDequeue = recvTime - enqueueTime;
long timeToValidate = 0;
long authTime = 0;
if (afterValidateTime > 0) {
timeToValidate = afterValidateTime - enqueueTime;
authTime = afterValidateTime - beforeValidateTime;
}
if (timeToDequeue > 50)
_context.statManager().addRateData("udp.packetDequeueTime", timeToDequeue, timeToDequeue);
if (authTime > 50)
_context.statManager().addRateData("udp.packetAuthRecvTime", authTime, beforeValidateTime-recvTime);
if (afterValidateTime > 0) {
_context.statManager().addRateData("udp.packetVerifyTime", timeToValidate, authTime);
if (timeToValidate > 50)
_context.statManager().addRateData("udp.packetVerifyTimeSlow", timeToValidate, authTime);
}
if (validateCount > 1)
_context.statManager().addRateData("udp.packetValidateMultipleCount", validateCount, timeToValidate);
else if (validateCount <= 0)
_context.statManager().addRateData("udp.packetNoValidationLifetime", packet.getLifetime(), 0);
********/
// back to the cache with thee!
packet.release();
_state = 9;
}
}
//}
/**
* Initial handling, called for every packet
* Find the state and call the correct receivePacket() variant
* Initial handling, called for every packet.
* Find the state and call the correct receivePacket() variant.
*
* Classify the packet by source IP/port, into 4 groups:
*<ol>
*<li>Established session
*<li>Pending inbound establishement
*<li>Pending outbound establishement
*<li>No established or pending session found
*</ol>
*/
private void handlePacket(UDPPacketReader reader, UDPPacket packet) {
_state = 10;
RemoteHostId rem = packet.getRemoteHost();
PeerState state = _transport.getPeerState(rem);
if (state == null) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Packet received is not for a connected peer");
_state = 11;
InboundEstablishState est = _establisher.getInboundState(rem);
if (est != null) {
// Group 2: Inbound Establishment
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an inbound establishment");
_state = 12;
receivePacket(reader, packet, est);
} else {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Packet received is not for an inbound establishment");
_state = 13;
OutboundEstablishState oest = _establisher.getOutboundState(rem);
if (oest != null) {
// Group 3: Outbound Establishment
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an outbound establishment");
_state = 14;
receivePacket(reader, packet, oest);
} else {
// Group 4: New conn or needs fallback
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received is not for an inbound or outbound establishment");
// ok, not already known establishment, try as a new one
// Last chance for success, using our intro key
_state = 15;
receivePacket(reader, packet, NEW_PEER);
receivePacket(reader, packet, PeerType.NEW_PEER);
}
}
} else {
// Group 1: Established
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received IS for an existing peer");
_state = 16;
receivePacket(reader, packet, state);
}
}
/**
* Established conn
* Group 1: Established conn
* Decrypt and validate the packet then call handlePacket()
*/
private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerState state) {
_state = 17;
AuthType auth = AuthType.NONE;
boolean isValid = validate(packet, state.getCurrentMACKey());
if (!isValid) {
_state = 18;
if (state.getNextMACKey() != null)
isValid = validate(packet, state.getNextMACKey());
if (!isValid) {
_state = 19;
if (_log.shouldLog(Log.INFO))
_log.info("Failed validation with existing con, trying as new con: " + packet);
isValid = validate(packet, _transport.getIntroKey());
if (isValid) {
_state = 20;
// this is a stray packet from an inbound establishment
// process, so try our intro key
// (after an outbound establishment process, there wouldn't
@@ -360,12 +291,10 @@ class PacketHandler {
packet.decrypt(_transport.getIntroKey());
auth = AuthType.INTRO;
} else {
_state = 21;
InboundEstablishState est = _establisher.getInboundState(packet.getRemoteHost());
if (est != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet from an existing peer IS for an inbound establishment");
_state = 22;
receivePacket(reader, packet, est, false);
} else {
if (_log.shouldLog(Log.WARN))
@@ -375,31 +304,26 @@ class PacketHandler {
return;
}
} else {
_state = 23;
packet.decrypt(state.getNextCipherKey());
auth = AuthType.SESSION;
}
} else {
_state = 24;
packet.decrypt(state.getCurrentCipherKey());
auth = AuthType.SESSION;
}
_state = 25;
handlePacket(reader, packet, state, null, null, auth);
_state = 26;
}
/**
* New conn or failed validation - we have no Session Key.
* Group 4: New conn or failed validation - we have no Session Key.
* Here we attempt to validate the packet with our intro key,
* then decrypt the packet with our intro key,
* then call handlePacket().
*
* @param peerType OUTBOUND_FALLBACK, INBOUND_FALLBACK, or NEW_PEER
*/
private void receivePacket(UDPPacketReader reader, UDPPacket packet, short peerType) {
_state = 27;
private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerType peerType) {
boolean isValid = validate(packet, _transport.getIntroKey());
if (!isValid) {
// Note that the vast majority of these are NOT corrupted packets, but
@@ -481,7 +405,6 @@ class PacketHandler {
_context.statManager().addRateData("udp.droppedInvalidEstablish.new", packet.getLifetime(), packet.getTimeSinceReceived());
break;
}
_state = 28;
return;
} else {
if (_log.shouldLog(Log.DEBUG))
@@ -493,10 +416,8 @@ class PacketHandler {
// 96 byte Relay Request
// 60 byte Relay Response
// 80 byte Peer Test
_state = 29;
packet.decrypt(_transport.getIntroKey());
handlePacket(reader, packet, null, null, null, AuthType.INTRO);
_state = 30;
}
/**
@@ -507,14 +428,13 @@ class PacketHandler {
}
/**
* Inbound establishing conn
* Group 2: Inbound establishing conn
* Decrypt and validate the packet then call handlePacket()
*
* @param state non-null
* @param allowFallback if it isn't valid for this establishment state, try as a non-establishment packet
*/
private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state, boolean allowFallback) {
_state = 31;
if (_log.shouldLog(Log.DEBUG)) {
StringBuilder buf = new StringBuilder(128);
buf.append("Attempting to receive a packet on a known inbound state: ");
@@ -530,34 +450,30 @@ class PacketHandler {
if (_log.shouldLog(Log.INFO))
_log.info("Valid introduction packet received for inbound con: " + packet);
_state = 32;
packet.decrypt(state.getCipherKey());
handlePacket(reader, packet, null, null, null, AuthType.SESSION);
return;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid introduction packet received for inbound con, falling back: " + packet);
_state = 33;
}
}
if (allowFallback) {
// ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets
_state = 34;
receivePacket(reader, packet, INBOUND_FALLBACK);
receivePacket(reader, packet, PeerType.INBOUND_FALLBACK);
} else {
_context.statManager().addRateData("udp.droppedInvalidInboundEstablish", packet.getLifetime());
}
}
/**
* Outbound establishing conn
* Group 3: Outbound establishing conn
* Decrypt and validate the packet then call handlePacket()
*
* @param state non-null
*/
private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) {
_state = 35;
if (_log.shouldLog(Log.DEBUG)) {
StringBuilder buf = new StringBuilder(128);
buf.append("Attempting to receive a packet on a known outbound state: ");
@@ -569,16 +485,13 @@ class PacketHandler {
boolean isValid = false;
if (state.getMACKey() != null) {
_state = 36;
isValid = validate(packet, state.getMACKey());
if (isValid) {
// this should be the Session Confirmed packet
if (_log.shouldLog(Log.INFO))
_log.info("Valid introduction packet received for outbound established con: " + packet);
_state = 37;
packet.decrypt(state.getCipherKey());
handlePacket(reader, packet, null, state, null, AuthType.SESSION);
_state = 38;
return;
}
}
@@ -588,11 +501,9 @@ class PacketHandler {
if (isValid) {
if (_log.shouldLog(Log.INFO))
_log.info("Valid packet received for " + state + " with Bob's intro key: " + packet);
_state = 39;
packet.decrypt(state.getIntroKey());
// the only packet we should be getting with Bob's intro key is Session Created
handlePacket(reader, packet, null, state, null, AuthType.BOBINTRO);
_state = 40;
return;
} else {
if (_log.shouldLog(Log.WARN))
@@ -601,9 +512,7 @@ class PacketHandler {
// ok, we couldn't handle it with the established stuff, so fall back
// on earlier state packets
_state = 41;
receivePacket(reader, packet, OUTBOUND_FALLBACK);
_state = 42;
receivePacket(reader, packet, PeerType.OUTBOUND_FALLBACK);
}
/**
@@ -619,9 +528,7 @@ class PacketHandler {
private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state,
OutboundEstablishState outState, InboundEstablishState inState,
AuthType auth) {
_state = 43;
reader.initialize(packet);
_state = 44;
long recvOn = packet.getBegin();
long sendOn = reader.readTimestamp() * 1000;
// Positive when we are ahead of them
@@ -706,33 +613,26 @@ class PacketHandler {
//InetAddress fromHost = packet.getPacket().getAddress();
//int fromPort = packet.getPacket().getPort();
//RemoteHostId from = new RemoteHostId(fromHost.getAddress(), fromPort);
_state = 45;
RemoteHostId from = packet.getRemoteHost();
_state = 46;
switch (type) {
case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST:
_state = 47;
if (auth == AuthType.BOBINTRO) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping type " + type + " auth " + auth + ": " + packet);
break;
}
_establisher.receiveSessionRequest(from, reader);
//_context.statManager().addRateData("udp.receivePacketSize.sessionRequest", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED:
_state = 48;
if (auth != AuthType.SESSION) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping type " + type + " auth " + auth + ": " + packet);
break;
}
_establisher.receiveSessionConfirmed(from, reader);
//_context.statManager().addRateData("udp.receivePacketSize.sessionConfirmed", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED:
_state = 49;
// this is the only type that allows BOBINTRO
if (auth != AuthType.BOBINTRO && auth != AuthType.SESSION) {
if (_log.shouldLog(Log.WARN))
@@ -740,10 +640,8 @@ class PacketHandler {
break;
}
_establisher.receiveSessionCreated(from, reader);
//_context.statManager().addRateData("udp.receivePacketSize.sessionCreated", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_DATA:
_state = 50;
if (auth != AuthType.SESSION) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping type " + type + " auth " + auth + ": " + packet);
@@ -784,7 +682,6 @@ class PacketHandler {
} catch (DataFormatException dfe) {}
break;
case UDPPacket.PAYLOAD_TYPE_TEST:
_state = 51;
if (auth == AuthType.BOBINTRO) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping type " + type + " auth " + auth + ": " + packet);
@@ -793,7 +690,6 @@ class PacketHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received test packet: " + reader + " from " + from);
_testManager.receiveTest(from, state, auth == AuthType.SESSION, reader);
//_context.statManager().addRateData("udp.receivePacketSize.test", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_RELAY_REQUEST:
if (auth == AuthType.BOBINTRO) {
@@ -804,7 +700,6 @@ class PacketHandler {
if (_log.shouldLog(Log.INFO))
_log.info("Received relay request packet: " + reader + " from " + from);
_introManager.receiveRelayRequest(from, reader);
//_context.statManager().addRateData("udp.receivePacketSize.relayRequest", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_RELAY_INTRO:
if (auth != AuthType.SESSION) {
@@ -815,7 +710,6 @@ class PacketHandler {
if (_log.shouldLog(Log.INFO))
_log.info("Received relay intro packet: " + reader + " from " + from);
_introManager.receiveRelayIntro(from, reader);
//_context.statManager().addRateData("udp.receivePacketSize.relayIntro", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_RELAY_RESPONSE:
if (auth == AuthType.BOBINTRO) {
@@ -826,10 +720,8 @@ class PacketHandler {
if (_log.shouldLog(Log.INFO))
_log.info("Received relay response packet: " + reader + " from " + from);
_establisher.receiveRelayResponse(from, reader);
//_context.statManager().addRateData("udp.receivePacketSize.relayResponse", packet.getPacket().getLength(), packet.getLifetime());
break;
case UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY:
_state = 53;
if (auth == AuthType.BOBINTRO) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping type " + type + " auth " + auth + ": " + packet);
@@ -843,7 +735,6 @@ class PacketHandler {
_establisher.receiveSessionDestroy(from); // drops
break;
default:
_state = 52;
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping type " + type + " auth " + auth + ": " + packet);
_context.statManager().addRateData("udp.droppedInvalidUnknown", packet.getLifetime());