I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 4cf10472 authored by zzz's avatar zzz
Browse files

* PumpedTunnelGateway:

    - Move OBGW queue to CoDelPriority
    - Move IBGW queue to CoDel
    - Reduce max pumped per cycle for IBGW
parent 2c866e20
No related branches found
No related tags found
No related merge requests found
package net.i2p.router.tunnel;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.*;
import net.i2p.router.util.CDPQEntry;
/**
* Stores all the state for an unsent or partially-sent message
*
* @since 0.9.3
*/
class OutboundGatewayMessage extends PendingGatewayMessage implements CDPQEntry {
private long _seqNum;
private final int _priority;
public OutboundGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
super(message, toRouter, toTunnel);
_priority = getPriority(message);
}
/**
* For CDPQ
*/
public void setSeqNum(long num) {
_seqNum = num;
}
/**
* For CDPQ
*/
public long getSeqNum() {
return _seqNum;
}
/**
* For CDPQ
*/
public int getPriority() {
return _priority;
}
/**
* This is just for priority in the queue waiting for the fragmenter.
* After the fragmenter, they will be OutNetMessages with priority 400.
* We use the same 100-500 priority as OutNetMessage so the stats
* in CoDelPriorityBlockingQueue work.
*
* We could - perhaps - have BatchedPreprocessor pass the max priority of
* any message fragment in a TunnelDataMessage to the OutboundReceiver, to
* set the OutNetMessage priority - but that may just make more of an
* out-of-order mess and failed reconstruction of fragments.
*/
private static int getPriority(I2NPMessage message) {
switch (message.getType()) {
// tagset/LS reply
case DeliveryStatusMessage.MESSAGE_TYPE:
return 1000;
// building new IB tunnel
case TunnelBuildMessage.MESSAGE_TYPE:
case VariableTunnelBuildMessage.MESSAGE_TYPE:
return 500;
// LS store
case DatabaseStoreMessage.MESSAGE_TYPE:
return 400;
// LS verify
case DatabaseLookupMessage.MESSAGE_TYPE:
return 300;
// regular data
case GarlicMessage.MESSAGE_TYPE:
return 200;
// these shouldn't go into a OBGW
case DatabaseSearchReplyMessage.MESSAGE_TYPE:
case DataMessage.MESSAGE_TYPE:
case TunnelBuildReplyMessage.MESSAGE_TYPE:
case TunnelDataMessage.MESSAGE_TYPE:
case TunnelGatewayMessage.MESSAGE_TYPE:
case VariableTunnelBuildReplyMessage.MESSAGE_TYPE:
default:
return 100;
}
}
}
...@@ -7,13 +7,14 @@ import net.i2p.data.Hash; ...@@ -7,13 +7,14 @@ import net.i2p.data.Hash;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.util.CDQEntry;
/** /**
* Stores all the state for an unsent or partially-sent message * Stores all the state for an unsent or partially-sent message
* *
* @since 0.9.3 refactored from TunnelGateway.Pending * @since 0.9.3 refactored from TunnelGateway.Pending
*/ */
class PendingGatewayMessage { class PendingGatewayMessage implements CDQEntry {
protected final Hash _toRouter; protected final Hash _toRouter;
protected final TunnelId _toTunnel; protected final TunnelId _toTunnel;
protected final long _messageId; protected final long _messageId;
...@@ -23,6 +24,7 @@ class PendingGatewayMessage { ...@@ -23,6 +24,7 @@ class PendingGatewayMessage {
protected int _fragmentNumber; protected int _fragmentNumber;
protected final long _created; protected final long _created;
private List<Long> _messageIds; private List<Long> _messageIds;
private long _enqueueTime;
public PendingGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) { public PendingGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
_toRouter = toRouter; _toRouter = toRouter;
...@@ -84,6 +86,29 @@ class PendingGatewayMessage { ...@@ -84,6 +86,29 @@ class PendingGatewayMessage {
return new ArrayList(); return new ArrayList();
} }
} }
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) {
_enqueueTime = now;
}
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() {
return _enqueueTime;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
}
@Override @Override
public String toString() { public String toString() {
......
...@@ -2,13 +2,14 @@ package net.i2p.router.tunnel; ...@@ -2,13 +2,14 @@ package net.i2p.router.tunnel;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
...@@ -37,9 +38,11 @@ import net.i2p.util.Log; ...@@ -37,9 +38,11 @@ import net.i2p.util.Log;
class PumpedTunnelGateway extends TunnelGateway { class PumpedTunnelGateway extends TunnelGateway {
private final BlockingQueue<PendingGatewayMessage> _prequeue; private final BlockingQueue<PendingGatewayMessage> _prequeue;
private final TunnelGatewayPumper _pumper; private final TunnelGatewayPumper _pumper;
private final boolean _isInbound;
private static final int MAX_MSGS_PER_PUMP = 16; private static final int MAX_OB_MSGS_PER_PUMP = 16;
private static final int MAX_OB_QUEUE = 2048; private static final int MAX_IB_MSGS_PER_PUMP = 8;
private static final int INITIAL_OB_QUEUE = 64;
private static final int MAX_IB_QUEUE = 1024; private static final int MAX_IB_QUEUE = 1024;
/** /**
...@@ -52,10 +55,15 @@ class PumpedTunnelGateway extends TunnelGateway { ...@@ -52,10 +55,15 @@ class PumpedTunnelGateway extends TunnelGateway {
*/ */
public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) { public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
super(context, preprocessor, sender, receiver); super(context, preprocessor, sender, receiver);
if (getClass() == PumpedTunnelGateway.class) if (getClass() == PumpedTunnelGateway.class) {
_prequeue = new LinkedBlockingQueue(MAX_OB_QUEUE); // Unbounded priority queue for outbound
else // extended by ThrottledPTG for IB _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
_prequeue = new LinkedBlockingQueue(MAX_IB_QUEUE); _isInbound = false;
} else { // extended by ThrottledPTG for IB
// Bounded non-priority queue for inbound
_prequeue = new CoDelBlockingQueue(context, "IBGW", MAX_IB_QUEUE);
_isInbound = true;
}
_pumper = pumper; _pumper = pumper;
} }
...@@ -64,14 +72,22 @@ class PumpedTunnelGateway extends TunnelGateway { ...@@ -64,14 +72,22 @@ class PumpedTunnelGateway extends TunnelGateway {
* coallesced with other pending messages) or after a brief pause (_flushFrequency). * coallesced with other pending messages) or after a brief pause (_flushFrequency).
* If it is queued up past its expiration, it is silently dropped * If it is queued up past its expiration, it is silently dropped
* *
* This is only for OBGWs. See TPTG override for IBGWs.
*
* @param msg message to be sent through the tunnel * @param msg message to be sent through the tunnel
* @param toRouter router to send to after the endpoint (or null for endpoint processing) * @param toRouter router to send to after the endpoint (or null for endpoint processing)
* @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing) * @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing)
*/ */
@Override @Override
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
OutboundGatewayMessage cur = new OutboundGatewayMessage(msg, toRouter, toTunnel);
if (_log.shouldLog(Log.DEBUG))
_log.debug("OB PTG add type " + msg.getType() + " pri " + cur.getPriority());
add(cur);
}
protected void add(PendingGatewayMessage cur) {
_messagesSent++; _messagesSent++;
PendingGatewayMessage cur = new PendingGatewayMessage(msg, toRouter, toTunnel);
if (_prequeue.offer(cur)) if (_prequeue.offer(cur))
_pumper.wantsPumping(this); _pumper.wantsPumping(this);
else else
...@@ -89,7 +105,11 @@ class PumpedTunnelGateway extends TunnelGateway { ...@@ -89,7 +105,11 @@ class PumpedTunnelGateway extends TunnelGateway {
* Must be empty when called; will always be emptied before return. * Must be empty when called; will always be emptied before return.
*/ */
void pump(List<PendingGatewayMessage> queueBuf) { void pump(List<PendingGatewayMessage> queueBuf) {
_prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP); // TODO if an IBGW, and the next hop is backlogged,
// drain less or none... better to let things back up here.
// Don't do this for OBGWs?
int max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP;
_prequeue.drainTo(queueBuf, max);
if (queueBuf.isEmpty()) if (queueBuf.isEmpty())
return; return;
......
...@@ -42,6 +42,6 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway { ...@@ -42,6 +42,6 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
_config.incrementProcessedMessages(); _config.incrementProcessedMessages();
return; return;
} }
super.add(msg, toRouter, toTunnel); add(new PendingGatewayMessage(msg, toRouter, toTunnel));
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment