diff --git a/router/java/src/net/i2p/router/tunnel/OutboundGatewayMessage.java b/router/java/src/net/i2p/router/tunnel/OutboundGatewayMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..b3fe4e7a51d71b1c0fa7d614d6330cd784db3568 --- /dev/null +++ b/router/java/src/net/i2p/router/tunnel/OutboundGatewayMessage.java @@ -0,0 +1,91 @@ +package net.i2p.router.tunnel; + +import net.i2p.data.Hash; +import net.i2p.data.TunnelId; +import net.i2p.data.i2np.*; +import net.i2p.router.util.CDPQEntry; + +/** + * Stores all the state for an unsent or partially-sent message + * + * @since 0.9.3 + */ +class OutboundGatewayMessage extends PendingGatewayMessage implements CDPQEntry { + private long _seqNum; + private final int _priority; + + public OutboundGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { + super(message, toRouter, toTunnel); + _priority = getPriority(message); + } + + /** + * For CDPQ + */ + public void setSeqNum(long num) { + _seqNum = num; + } + + /** + * For CDPQ + */ + public long getSeqNum() { + return _seqNum; + } + + /** + * For CDPQ + */ + public int getPriority() { + return _priority; + } + + /** + * This is just for priority in the queue waiting for the fragmenter. + * After the fragmenter, they will be OutNetMessages with priority 400. + * We use the same 100-500 priority as OutNetMessage so the stats + * in CoDelPriorityBlockingQueue work. + * + * We could - perhaps - have BatchedPreprocessor pass the max priority of + * any message fragment in a TunnelDataMessage to the OutboundReceiver, to + * set the OutNetMessage priority - but that may just make more of an + * out-of-order mess and failed reconstruction of fragments. + */ + private static int getPriority(I2NPMessage message) { + switch (message.getType()) { + + // tagset/LS reply + case DeliveryStatusMessage.MESSAGE_TYPE: + return 1000; + + // building new IB tunnel + case TunnelBuildMessage.MESSAGE_TYPE: + case VariableTunnelBuildMessage.MESSAGE_TYPE: + return 500; + + // LS store + case DatabaseStoreMessage.MESSAGE_TYPE: + return 400; + + // LS verify + case DatabaseLookupMessage.MESSAGE_TYPE: + return 300; + + // regular data + case GarlicMessage.MESSAGE_TYPE: + return 200; + + // these shouldn't go into a OBGW + case DatabaseSearchReplyMessage.MESSAGE_TYPE: + case DataMessage.MESSAGE_TYPE: + case TunnelBuildReplyMessage.MESSAGE_TYPE: + case TunnelDataMessage.MESSAGE_TYPE: + case TunnelGatewayMessage.MESSAGE_TYPE: + case VariableTunnelBuildReplyMessage.MESSAGE_TYPE: + default: + return 100; + + } + } +} + diff --git a/router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java b/router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java index 302c4f94cd961e9020c7208d6261ec70ede245da..6e50e4e92f9b336d398a2b039e5f31a965c66020 100644 --- a/router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java +++ b/router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java @@ -7,13 +7,14 @@ import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.RouterContext; +import net.i2p.router.util.CDQEntry; /** * Stores all the state for an unsent or partially-sent message * * @since 0.9.3 refactored from TunnelGateway.Pending */ -class PendingGatewayMessage { +class PendingGatewayMessage implements CDQEntry { protected final Hash _toRouter; protected final TunnelId _toTunnel; protected final long _messageId; @@ -23,6 +24,7 @@ class PendingGatewayMessage { protected int _fragmentNumber; protected final long _created; private List<Long> _messageIds; + private long _enqueueTime; public PendingGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { _toRouter = toRouter; @@ -84,6 +86,29 @@ class PendingGatewayMessage { return new ArrayList(); } } + + /** + * For CDQ + * @since 0.9.3 + */ + public void setEnqueueTime(long now) { + _enqueueTime = now; + } + + /** + * For CDQ + * @since 0.9.3 + */ + public long getEnqueueTime() { + return _enqueueTime; + } + + /** + * For CDQ + * @since 0.9.3 + */ + public void drop() { + } @Override public String toString() { diff --git a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java index 2e9bad9c89aa8eec1f8528bae3674bf3f5b5842d..488af9e7f694cb308ea7c1d4892e2f25f0aa958d 100644 --- a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java @@ -2,13 +2,14 @@ package net.i2p.router.tunnel; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.Router; import net.i2p.router.RouterContext; +import net.i2p.router.util.CoDelBlockingQueue; +import net.i2p.router.util.CoDelPriorityBlockingQueue; import net.i2p.util.Log; /** @@ -37,9 +38,11 @@ import net.i2p.util.Log; class PumpedTunnelGateway extends TunnelGateway { private final BlockingQueue<PendingGatewayMessage> _prequeue; private final TunnelGatewayPumper _pumper; + private final boolean _isInbound; - private static final int MAX_MSGS_PER_PUMP = 16; - private static final int MAX_OB_QUEUE = 2048; + private static final int MAX_OB_MSGS_PER_PUMP = 16; + private static final int MAX_IB_MSGS_PER_PUMP = 8; + private static final int INITIAL_OB_QUEUE = 64; private static final int MAX_IB_QUEUE = 1024; /** @@ -52,10 +55,15 @@ class PumpedTunnelGateway extends TunnelGateway { */ public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) { super(context, preprocessor, sender, receiver); - if (getClass() == PumpedTunnelGateway.class) - _prequeue = new LinkedBlockingQueue(MAX_OB_QUEUE); - else // extended by ThrottledPTG for IB - _prequeue = new LinkedBlockingQueue(MAX_IB_QUEUE); + if (getClass() == PumpedTunnelGateway.class) { + // Unbounded priority queue for outbound + _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE); + _isInbound = false; + } else { // extended by ThrottledPTG for IB + // Bounded non-priority queue for inbound + _prequeue = new CoDelBlockingQueue(context, "IBGW", MAX_IB_QUEUE); + _isInbound = true; + } _pumper = pumper; } @@ -64,14 +72,22 @@ class PumpedTunnelGateway extends TunnelGateway { * coallesced with other pending messages) or after a brief pause (_flushFrequency). * If it is queued up past its expiration, it is silently dropped * + * This is only for OBGWs. See TPTG override for IBGWs. + * * @param msg message to be sent through the tunnel * @param toRouter router to send to after the endpoint (or null for endpoint processing) * @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing) */ @Override public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { + OutboundGatewayMessage cur = new OutboundGatewayMessage(msg, toRouter, toTunnel); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("OB PTG add type " + msg.getType() + " pri " + cur.getPriority()); + add(cur); + } + + protected void add(PendingGatewayMessage cur) { _messagesSent++; - PendingGatewayMessage cur = new PendingGatewayMessage(msg, toRouter, toTunnel); if (_prequeue.offer(cur)) _pumper.wantsPumping(this); else @@ -89,7 +105,11 @@ class PumpedTunnelGateway extends TunnelGateway { * Must be empty when called; will always be emptied before return. */ void pump(List<PendingGatewayMessage> queueBuf) { - _prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP); + // TODO if an IBGW, and the next hop is backlogged, + // drain less or none... better to let things back up here. + // Don't do this for OBGWs? + int max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP; + _prequeue.drainTo(queueBuf, max); if (queueBuf.isEmpty()) return; diff --git a/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java index efb65aa476dc4600266ca66d3f44c85b61fc4abf..d6da1494490c97b51c0431d9febf13b79b0455d3 100644 --- a/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java @@ -42,6 +42,6 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway { _config.incrementProcessedMessages(); return; } - super.add(msg, toRouter, toTunnel); + add(new PendingGatewayMessage(msg, toRouter, toTunnel)); } }