diff --git a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java index 087d4d5e59208b8f618431808393c08f630eddc7..1d3718a00a0135fe2f371de303c77b1050808e70 100644 --- a/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java @@ -38,6 +38,10 @@ class PumpedTunnelGateway extends TunnelGateway { private final BlockingQueue<Pending> _prequeue; private final TunnelGatewayPumper _pumper; + private static final int MAX_MSGS_PER_PUMP = 16; + private static final int MAX_OB_QUEUE = 2048; + private static final int MAX_IB_QUEUE = 1024; + /** * @param preprocessor this pulls Pending messages off a list, builds some * full preprocessed messages, and pumps those into the sender @@ -48,7 +52,10 @@ class PumpedTunnelGateway extends TunnelGateway { */ public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) { super(context, preprocessor, sender, receiver); - _prequeue = new LinkedBlockingQueue(); + if (getClass() == PumpedTunnelGateway.class) + _prequeue = new LinkedBlockingQueue(MAX_OB_QUEUE); + else // extended by ThrottledPTG for IB + _prequeue = new LinkedBlockingQueue(MAX_IB_QUEUE); _pumper = pumper; } @@ -65,8 +72,10 @@ class PumpedTunnelGateway extends TunnelGateway { public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { _messagesSent++; Pending cur = new PendingImpl(msg, toRouter, toTunnel); - _prequeue.offer(cur); - _pumper.wantsPumping(this); + if (_prequeue.offer(cur)) + _pumper.wantsPumping(this); + else + _context.statManager().addRateData("tunnel.dropGatewayOverflow", 1); } /** @@ -80,7 +89,7 @@ class PumpedTunnelGateway extends TunnelGateway { * Must be empty when called; will always be emptied before return. */ void pump(List<Pending> queueBuf) { - _prequeue.drainTo(queueBuf); + _prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP); if (queueBuf.isEmpty()) return; @@ -122,16 +131,19 @@ class PumpedTunnelGateway extends TunnelGateway { if (delayedFlush) { _delayedFlush.reschedule(delayAmount); } - _context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining); - long complete = System.currentTimeMillis(); - if (_log.shouldLog(Log.DEBUG)) + //_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining); + if (_log.shouldLog(Log.DEBUG)) { + long complete = System.currentTimeMillis(); _log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd) + " delayed? " + delayedFlush + " remaining: " + remaining + " add: " + (afterAdded-beforeLock) + " preprocess: " + (afterPreprocess-afterAdded) + " expire: " + (afterExpire-afterPreprocess) + " queue flush: " + (complete-afterExpire)); + } queueBuf.clear(); + if (!_prequeue.isEmpty()) + _pumper.wantsPumping(this); } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index 2503aa9ea207b8c17786498370a26e8ff667b08c..9d3588c2d1f9ad9de663990b7f6d91ff4dbc321a 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -29,6 +29,43 @@ import net.i2p.util.Log; * Handle the actual processing and forwarding of messages through the * various tunnels. * + *<pre> + * For each type of tunnel, it creates a chain of handlers, as follows: + * + * Following tunnels are created by us: + * + * Outbound Gateway > 0 hops: + * PumpedTunnelGateway + * BatchedRouterPreprocessor -> OutboundSender -> OutboundReceiver -> OutNetMessagePool + * + * Outbound zero-hop Gateway+Endpoint: + * TunnelGatewayZeroHop + * OutboundMessageDistributor -> OutNetMessagePool + * + * Inbound Endpoint > 0 hops: + * TunnelParticipant + * RouterFragmentHandler -> InboundEndpointProcessor -> InboundMessageDistributor -> InNetMessagePool + * + * Inbound zero-hop Gateway+Endpoint: + * TunnelGatewayZeroHop + * InboundMessageDistributor -> InNetMessagePool + * + * + * Following tunnels are NOT created by us: + * + * Participant (not gateway or endpoint) + * TunnelParticipant + * HopProcessor -> OutNetMessagePool + * + * Outbound Endpoint > 0 hops: + * OutboundTunnelEndpoint + * RouterFragmentHandler -> HopProcessor -> OutboundMessageDistributor -> OutNetMessagePool + * + * Inbound Gateway > 0 hops: + * ThrottledPumpedTunnelGateway + * BatchedRouterPreprocessor -> InboundSender -> InboundGatewayReceiver -> OutNetMessagePool + * + *</pre> */ public class TunnelDispatcher implements Service { private final RouterContext _context; @@ -174,6 +211,8 @@ public class TunnelDispatcher implements Service { // following are for InboundMessageDistributor ctx.statManager().createRateStat("tunnel.dropDangerousClientTunnelMessage", "How many tunnel messages come down a client tunnel that we shouldn't expect (lifetime is the 'I2NP type')", "Tunnels", new long[] { 60*60*1000 }); ctx.statManager().createRateStat("tunnel.handleLoadClove", "When do we receive load test cloves", "Tunnels", new long[] { 60*60*1000 }); + // following is for PumpedTunnelGateway + ctx.statManager().createRateStat("tunnel.dropGatewayOverflow", "Dropped message at GW, queue full", "Tunnels", new long[] { 60*60*1000 }); } /** for IBGW */ diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java index b784cf017dab7c9b05df19c54bf065b4526f5e94..ae050784c1859bd952874945f202171989eecee2 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGateway.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGateway.java @@ -32,6 +32,7 @@ import net.i2p.util.SimpleTimer2; * or if debugging, verify that it can be decrypted properly)</li> * </ol> * + * Unused directly - see PumpedTunnelGateway, ThrottledPumpedTunnelGateway, and TunnelGatewayZeroHop overrides. */ class TunnelGateway { protected final RouterContext _context; @@ -63,8 +64,8 @@ class TunnelGateway { //_flushFrequency = 500; _delayedFlush = new DelayedFlush(); _lastFlush = _context.clock().now(); - _context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 }); + //_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 }); + //_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 }); } /** @@ -81,11 +82,15 @@ class TunnelGateway { * coallesced with other pending messages) or after a brief pause (_flushFrequency). * If it is queued up past its expiration, it is silently dropped * + * UNUSED - see overrides + * * @param msg message to be sent through the tunnel * @param toRouter router to send to after the endpoint (or null for endpoint processing) * @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing) */ public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) { + throw new UnsupportedOperationException("unused, right?"); +/**** _messagesSent++; long startAdd = System.currentTimeMillis(); boolean delayedFlush = false; @@ -137,6 +142,7 @@ class TunnelGateway { + " expire: " + (afterExpire-afterPreprocess) + " queue flush: " + (complete-afterExpire)); } +****/ } public int getMessagesSent() { return _messagesSent; } @@ -312,7 +318,7 @@ class TunnelGateway { else _lastFlush = _context.clock().now(); - _context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining); + //_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining); } } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java index ec1e98e6aa8a975727ed46ba362f7a2f9b400648..1483f2e6ad147ce0ac80198a01296a45f86b8454 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java @@ -1,7 +1,10 @@ package net.i2p.router.tunnel; import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -9,13 +12,17 @@ import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; /** - * run through the tunnel gateways that have had messages added to them and push - * those messages through the preprocessing and sending process + * Run through the tunnel gateways that have had messages added to them and push + * those messages through the preprocessing and sending process. + * + * TODO do we need this many threads? + * TODO this combines IBGWs and OBGWs, do we wish to separate the two + * and/or prioritize OBGWs (i.e. our outbound traffic) over IBGWs (participating)? */ class TunnelGatewayPumper implements Runnable { private final RouterContext _context; - private final BlockingQueue<PumpedTunnelGateway> _wantsPumping; - private boolean _stop; + private final Set<PumpedTunnelGateway> _wantsPumping; + private volatile boolean _stop; private static final int MIN_PUMPERS = 1; private static final int MAX_PUMPERS = 4; private final int _pumpers; @@ -23,7 +30,7 @@ class TunnelGatewayPumper implements Runnable { /** Creates a new instance of TunnelGatewayPumper */ public TunnelGatewayPumper(RouterContext ctx) { _context = ctx; - _wantsPumping = new LinkedBlockingQueue(); + _wantsPumping = new LinkedHashSet(16); long maxMemory = Runtime.getRuntime().maxMemory(); if (maxMemory == Long.MAX_VALUE) maxMemory = 96*1024*1024l; @@ -35,9 +42,10 @@ class TunnelGatewayPumper implements Runnable { public void stopPumping() { _stop=true; _wantsPumping.clear(); - PumpedTunnelGateway poison = new PoisonPTG(_context); - for (int i = 0; i < _pumpers; i++) - _wantsPumping.offer(poison); + for (int i = 0; i < _pumpers; i++) { + PumpedTunnelGateway poison = new PoisonPTG(_context); + wantsPumping(poison); + } for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) { try { Thread.sleep(i * 50); @@ -47,8 +55,12 @@ class TunnelGatewayPumper implements Runnable { } public void wantsPumping(PumpedTunnelGateway gw) { - if (!_stop) - _wantsPumping.offer(gw); + if (!_stop) { + synchronized (_wantsPumping) { + if (_wantsPumping.add(gw)) + _wantsPumping.notify(); + } + } } public void run() { @@ -56,7 +68,15 @@ class TunnelGatewayPumper implements Runnable { List<TunnelGateway.Pending> queueBuf = new ArrayList(32); while (!_stop) { try { - gw = _wantsPumping.take(); + synchronized (_wantsPumping) { + if (_wantsPumping.isEmpty()) { + _wantsPumping.wait(); + } else { + Iterator<PumpedTunnelGateway> iter = _wantsPumping.iterator(); + gw = iter.next(); + iter.remove(); + } + } } catch (InterruptedException ie) {} if (gw != null) { if (gw.getMessagesSent() == POISON_PTG)