UDP: Pass message priority through to the packets

Change UDP-Sender queue from CoDel to CoDelPriority
No change to CoDel params

UDP msg priorities:
High priority: ack-only, session request/created/confirmed, relay request, hole punch, injected
Low priority: ping, destroy, peer test, relay intro/response
This commit is contained in:
zzz
2022-01-25 09:27:49 -05:00
parent b9f53069bb
commit f17cd24dc8
4 changed files with 56 additions and 7 deletions

View File

@@ -500,7 +500,7 @@ class OutboundMessageState implements CDPQEntry {
* @since 0.9.3
*/
public int getPriority() {
return _message != null ? _message.getPriority() : 1000;
return _message != null ? _message.getPriority() : PacketBuilder.PRIORITY_HIGH;
}
@Override

View File

@@ -18,6 +18,7 @@ import net.i2p.data.router.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.data.router.RouterAddress;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.TransportUtil;
import net.i2p.util.Addresses;
@@ -182,6 +183,12 @@ class PacketBuilder {
private static final byte DATA_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_DATA << 4;
private static final byte PEER_TEST_FLAG_BYTE = UDPPacket.PAYLOAD_TYPE_TEST << 4;
private static final byte SESSION_DESTROY_FLAG_BYTE = (byte) (UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY << 4);
/* Higher than all other OutNetMessage priorities, but still droppable,
* and will be shown in the codel.UDP-Sender.drop.500 stat.
*/
static final int PRIORITY_HIGH = 550;
private static final int PRIORITY_LOW = OutNetMessage.PRIORITY_LOWEST;
/**
* No state, all methods are thread-safe.
@@ -318,9 +325,13 @@ class PacketBuilder {
// calculate data size
int numFragments = fragments.size();
int dataSize = 0;
int priority = 0;
for (int i = 0; i < numFragments; i++) {
Fragment frag = fragments.get(i);
OutboundMessageState state = frag.state;
int pri = state.getPriority();
if (pri > priority)
priority = pri;
int fragment = frag.num;
int sz = state.fragmentSize(fragment);
dataSize += sz;
@@ -569,6 +580,7 @@ class PacketBuilder {
}
}
packet.setPriority(priority);
return packet;
}
@@ -681,6 +693,7 @@ class PacketBuilder {
pkt.setLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
packet.setPriority((fullACKCount > 0 || partialACKCount > 0) ? PRIORITY_HIGH : PRIORITY_LOW);
return packet;
}
@@ -775,6 +788,7 @@ class PacketBuilder {
setTo(packet, to, state.getSentPort());
SimpleByteCache.release(iv);
packet.setMessageType(TYPE_CREAT);
packet.setPriority(PRIORITY_HIGH);
return packet;
}
@@ -840,6 +854,7 @@ class PacketBuilder {
authenticate(packet, state.getIntroKey(), state.getIntroKey());
setTo(packet, to, port);
packet.setMessageType(TYPE_SREQ);
packet.setPriority(PRIORITY_HIGH);
return packet;
}
@@ -944,6 +959,7 @@ class PacketBuilder {
authenticate(packet, state.getCipherKey(), state.getMACKey());
setTo(packet, to, state.getSentPort());
packet.setMessageType(TYPE_CONF);
packet.setPriority(PRIORITY_HIGH);
return packet;
}
@@ -1040,6 +1056,7 @@ class PacketBuilder {
pkt.setLength(off);
authenticate(packet, cipherKey, macKey);
setTo(packet, addr, port);
packet.setPriority(PRIORITY_LOW);
return packet;
}
@@ -1084,6 +1101,7 @@ class PacketBuilder {
authenticate(packet, toCipherKey, toMACKey);
setTo(packet, toIP, toPort);
packet.setMessageType(TYPE_TFA);
packet.setPriority(PRIORITY_LOW);
return packet;
}
@@ -1135,6 +1153,7 @@ class PacketBuilder {
authenticate(packet, aliceCipherKey, aliceMACKey);
setTo(packet, aliceIP, alicePort);
packet.setMessageType(TYPE_TTA);
packet.setPriority(PRIORITY_LOW);
return packet;
}
@@ -1172,6 +1191,7 @@ class PacketBuilder {
authenticate(packet, charlieCipherKey, charlieMACKey);
setTo(packet, charlieIP, charliePort);
packet.setMessageType(TYPE_TBC);
packet.setPriority(PRIORITY_LOW);
return packet;
}
@@ -1209,6 +1229,7 @@ class PacketBuilder {
authenticate(packet, bobCipherKey, bobMACKey);
setTo(packet, bobIP, bobPort);
packet.setMessageType(TYPE_TCB);
packet.setPriority(PRIORITY_LOW);
return packet;
}
@@ -1372,6 +1393,7 @@ class PacketBuilder {
authenticate(packet, cipherKey, macKey);
setTo(packet, introHost, introPort);
packet.setMessageType(TYPE_RREQ);
packet.setPriority(PRIORITY_HIGH);
return packet;
}
@@ -1405,6 +1427,7 @@ class PacketBuilder {
authenticate(packet, charlie.getCurrentCipherKey(), charlie.getCurrentMACKey());
setTo(packet, charlie.getRemoteIPAddress(), charlie.getRemotePort());
packet.setMessageType(TYPE_INTRO);
packet.setPriority(PRIORITY_LOW);
return packet;
}
@@ -1451,6 +1474,7 @@ class PacketBuilder {
authenticate(packet, cipherKey, macKey);
setTo(packet, aliceAddr, alice.getPort());
packet.setMessageType(TYPE_RESP);
packet.setPriority(PRIORITY_LOW);
return packet;
}
@@ -1469,6 +1493,7 @@ class PacketBuilder {
setTo(packet, to, port);
packet.setMessageType(TYPE_PUNCH);
packet.setPriority(PRIORITY_HIGH);
return packet;
}

View File

@@ -12,7 +12,7 @@ import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CDQEntry;
import net.i2p.router.util.CDPQEntry;
import net.i2p.util.TryCache;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
@@ -23,10 +23,10 @@ import net.i2p.util.SystemVersion;
* of object instances to allow rapid reuse.
*
*/
class UDPPacket implements CDQEntry {
class UDPPacket implements CDPQEntry {
private RouterContext _context;
private final DatagramPacket _packet;
private volatile short _priority;
private int _priority;
private volatile long _initializeTime;
//private volatile long _expiration;
private final byte[] _data;
@@ -46,6 +46,7 @@ class UDPPacket implements CDQEntry {
private int _validateCount;
// private boolean _isInbound;
private FIFOBandwidthLimiter.Request _bandwidthRequest;
private long _seqNum;
private static class PacketFactory implements TryCache.ObjectFactory<UDPPacket> {
static RouterContext context;
@@ -168,6 +169,20 @@ class UDPPacket implements CDQEntry {
_receivedTime = 0;
_fragmentCount = 0;
}
/**
* CDPQEntry
* @since 0.9.53
*/
public void setSeqNum(long num) { _seqNum = num; }
/**
* CDPQEntry
* @since 0.9.53
*/
public long getSeqNum() { return _seqNum; }
/****
public void writeData(byte src[], int offset, int len) {
@@ -180,7 +195,13 @@ class UDPPacket implements CDQEntry {
/** */
public synchronized DatagramPacket getPacket() { verifyNotReleased(); return _packet; }
public synchronized short getPriority() { verifyNotReleased(); return _priority; }
public int getPriority() { return _priority; }
/**
* @since 0.9.53
*/
public void setPriority(int pri) { _priority = pri; }
//public long getExpiration() { verifyNotReleased(); return _expiration; }
public synchronized long getBegin() { verifyNotReleased(); return _initializeTime; }
public long getLifetime() { /** verifyNotReleased(); */ return _context.clock().now() - _initializeTime; }
@@ -394,6 +415,7 @@ class UDPPacket implements CDQEntry {
buf.append(" byte pkt with ");
buf.append(Addresses.toString(_packet.getAddress().getAddress(), _packet.getPort()));
//buf.append(" id=").append(System.identityHashCode(this));
buf.append(" priority=").append(_priority);
if (_messageType >= 0)
buf.append(" msgType=").append(_messageType);
if (_markedType >= 0)

View File

@@ -7,7 +7,8 @@ import java.util.concurrent.BlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelBlockingQueue;
//import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SystemVersion;
@@ -46,7 +47,8 @@ class UDPSender {
_log = ctx.logManager().getLog(UDPSender.class);
long maxMemory = SystemVersion.getMaxMemory();
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
_outboundQueue = new CoDelBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
//_outboundQueue = new CoDelBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
_outboundQueue = new CoDelPriorityBlockingQueue<UDPPacket>(ctx, "UDP-Sender", qsize, CODEL_TARGET, CODEL_INTERVAL);
_socket = socket;
_runner = new Runner();
_name = name;