TunnelGateway:

- Limit queue sizes
  - Add stat for overflow
  - Remove some stats
  - Change pumper to LinkedHashSet for efficiency
    (like NTCP Reader/Writer)
  - Limit messages pumped per cycle to increase
    round-robin fairness
  - Comment out some unused code
  - Javadoc
This commit is contained in:
zzz
2012-09-01 17:20:52 +00:00
parent 10d9eb70c8
commit 4e78517651
4 changed files with 98 additions and 21 deletions

View File

@@ -38,6 +38,10 @@ class PumpedTunnelGateway extends TunnelGateway {
private final BlockingQueue<Pending> _prequeue;
private final TunnelGatewayPumper _pumper;
private static final int MAX_MSGS_PER_PUMP = 16;
private static final int MAX_OB_QUEUE = 2048;
private static final int MAX_IB_QUEUE = 1024;
/**
* @param preprocessor this pulls Pending messages off a list, builds some
* full preprocessed messages, and pumps those into the sender
@@ -48,7 +52,10 @@ class PumpedTunnelGateway extends TunnelGateway {
*/
public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
super(context, preprocessor, sender, receiver);
_prequeue = new LinkedBlockingQueue();
if (getClass() == PumpedTunnelGateway.class)
_prequeue = new LinkedBlockingQueue(MAX_OB_QUEUE);
else // extended by ThrottledPTG for IB
_prequeue = new LinkedBlockingQueue(MAX_IB_QUEUE);
_pumper = pumper;
}
@@ -65,8 +72,10 @@ class PumpedTunnelGateway extends TunnelGateway {
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
_messagesSent++;
Pending cur = new PendingImpl(msg, toRouter, toTunnel);
_prequeue.offer(cur);
_pumper.wantsPumping(this);
if (_prequeue.offer(cur))
_pumper.wantsPumping(this);
else
_context.statManager().addRateData("tunnel.dropGatewayOverflow", 1);
}
/**
@@ -80,7 +89,7 @@ class PumpedTunnelGateway extends TunnelGateway {
* Must be empty when called; will always be emptied before return.
*/
void pump(List<Pending> queueBuf) {
_prequeue.drainTo(queueBuf);
_prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP);
if (queueBuf.isEmpty())
return;
@@ -122,16 +131,19 @@ class PumpedTunnelGateway extends TunnelGateway {
if (delayedFlush) {
_delayedFlush.reschedule(delayAmount);
}
_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
long complete = System.currentTimeMillis();
if (_log.shouldLog(Log.DEBUG))
//_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
if (_log.shouldLog(Log.DEBUG)) {
long complete = System.currentTimeMillis();
_log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd)
+ " delayed? " + delayedFlush + " remaining: " + remaining
+ " add: " + (afterAdded-beforeLock)
+ " preprocess: " + (afterPreprocess-afterAdded)
+ " expire: " + (afterExpire-afterPreprocess)
+ " queue flush: " + (complete-afterExpire));
}
queueBuf.clear();
if (!_prequeue.isEmpty())
_pumper.wantsPumping(this);
}
}

View File

@@ -29,6 +29,43 @@ import net.i2p.util.Log;
* Handle the actual processing and forwarding of messages through the
* various tunnels.
*
*<pre>
* For each type of tunnel, it creates a chain of handlers, as follows:
*
* Following tunnels are created by us:
*
* Outbound Gateway > 0 hops:
* PumpedTunnelGateway
* BatchedRouterPreprocessor -> OutboundSender -> OutboundReceiver -> OutNetMessagePool
*
* Outbound zero-hop Gateway+Endpoint:
* TunnelGatewayZeroHop
* OutboundMessageDistributor -> OutNetMessagePool
*
* Inbound Endpoint > 0 hops:
* TunnelParticipant
* RouterFragmentHandler -> InboundEndpointProcessor -> InboundMessageDistributor -> InNetMessagePool
*
* Inbound zero-hop Gateway+Endpoint:
* TunnelGatewayZeroHop
* InboundMessageDistributor -> InNetMessagePool
*
*
* Following tunnels are NOT created by us:
*
* Participant (not gateway or endpoint)
* TunnelParticipant
* HopProcessor -> OutNetMessagePool
*
* Outbound Endpoint > 0 hops:
* OutboundTunnelEndpoint
* RouterFragmentHandler -> HopProcessor -> OutboundMessageDistributor -> OutNetMessagePool
*
* Inbound Gateway > 0 hops:
* ThrottledPumpedTunnelGateway
* BatchedRouterPreprocessor -> InboundSender -> InboundGatewayReceiver -> OutNetMessagePool
*
*</pre>
*/
public class TunnelDispatcher implements Service {
private final RouterContext _context;
@@ -174,6 +211,8 @@ public class TunnelDispatcher implements Service {
// following are for InboundMessageDistributor
ctx.statManager().createRateStat("tunnel.dropDangerousClientTunnelMessage", "How many tunnel messages come down a client tunnel that we shouldn't expect (lifetime is the 'I2NP type')", "Tunnels", new long[] { 60*60*1000 });
ctx.statManager().createRateStat("tunnel.handleLoadClove", "When do we receive load test cloves", "Tunnels", new long[] { 60*60*1000 });
// following is for PumpedTunnelGateway
ctx.statManager().createRateStat("tunnel.dropGatewayOverflow", "Dropped message at GW, queue full", "Tunnels", new long[] { 60*60*1000 });
}
/** for IBGW */

View File

@@ -32,6 +32,7 @@ import net.i2p.util.SimpleTimer2;
* or if debugging, verify that it can be decrypted properly)</li>
* </ol>
*
* Unused directly - see PumpedTunnelGateway, ThrottledPumpedTunnelGateway, and TunnelGatewayZeroHop overrides.
*/
class TunnelGateway {
protected final RouterContext _context;
@@ -63,8 +64,8 @@ class TunnelGateway {
//_flushFrequency = 500;
_delayedFlush = new DelayedFlush();
_lastFlush = _context.clock().now();
_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
//_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
//_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
}
/**
@@ -81,11 +82,15 @@ class TunnelGateway {
* coallesced with other pending messages) or after a brief pause (_flushFrequency).
* If it is queued up past its expiration, it is silently dropped
*
* UNUSED - see overrides
*
* @param msg message to be sent through the tunnel
* @param toRouter router to send to after the endpoint (or null for endpoint processing)
* @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing)
*/
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
throw new UnsupportedOperationException("unused, right?");
/****
_messagesSent++;
long startAdd = System.currentTimeMillis();
boolean delayedFlush = false;
@@ -137,6 +142,7 @@ class TunnelGateway {
+ " expire: " + (afterExpire-afterPreprocess)
+ " queue flush: " + (complete-afterExpire));
}
****/
}
public int getMessagesSent() { return _messagesSent; }
@@ -312,7 +318,7 @@ class TunnelGateway {
else
_lastFlush = _context.clock().now();
_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
//_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
}
}
}

View File

@@ -1,7 +1,10 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -9,13 +12,17 @@ import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
/**
* run through the tunnel gateways that have had messages added to them and push
* those messages through the preprocessing and sending process
* Run through the tunnel gateways that have had messages added to them and push
* those messages through the preprocessing and sending process.
*
* TODO do we need this many threads?
* TODO this combines IBGWs and OBGWs, do we wish to separate the two
* and/or prioritize OBGWs (i.e. our outbound traffic) over IBGWs (participating)?
*/
class TunnelGatewayPumper implements Runnable {
private final RouterContext _context;
private final BlockingQueue<PumpedTunnelGateway> _wantsPumping;
private boolean _stop;
private final Set<PumpedTunnelGateway> _wantsPumping;
private volatile boolean _stop;
private static final int MIN_PUMPERS = 1;
private static final int MAX_PUMPERS = 4;
private final int _pumpers;
@@ -23,7 +30,7 @@ class TunnelGatewayPumper implements Runnable {
/** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx;
_wantsPumping = new LinkedBlockingQueue();
_wantsPumping = new LinkedHashSet(16);
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
@@ -35,9 +42,10 @@ class TunnelGatewayPumper implements Runnable {
public void stopPumping() {
_stop=true;
_wantsPumping.clear();
PumpedTunnelGateway poison = new PoisonPTG(_context);
for (int i = 0; i < _pumpers; i++)
_wantsPumping.offer(poison);
for (int i = 0; i < _pumpers; i++) {
PumpedTunnelGateway poison = new PoisonPTG(_context);
wantsPumping(poison);
}
for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
@@ -47,8 +55,12 @@ class TunnelGatewayPumper implements Runnable {
}
public void wantsPumping(PumpedTunnelGateway gw) {
if (!_stop)
_wantsPumping.offer(gw);
if (!_stop) {
synchronized (_wantsPumping) {
if (_wantsPumping.add(gw))
_wantsPumping.notify();
}
}
}
public void run() {
@@ -56,7 +68,15 @@ class TunnelGatewayPumper implements Runnable {
List<TunnelGateway.Pending> queueBuf = new ArrayList(32);
while (!_stop) {
try {
gw = _wantsPumping.take();
synchronized (_wantsPumping) {
if (_wantsPumping.isEmpty()) {
_wantsPumping.wait();
} else {
Iterator<PumpedTunnelGateway> iter = _wantsPumping.iterator();
gw = iter.next();
iter.remove();
}
}
} catch (InterruptedException ie) {}
if (gw != null) {
if (gw.getMessagesSent() == POISON_PTG)