diff --git a/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java index c2a61ffd45df6b25081b0e01f5db6243dd95b843..36c32e90165bd9671a27b372ede6719c7dd0c5c5 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java @@ -1,5 +1,8 @@ package net.i2p.router.tunnel; +import java.util.HashSet; +import java.util.Set; + import net.i2p.data.Hash; import net.i2p.data.RouterInfo; import net.i2p.data.TunnelId; @@ -18,13 +21,31 @@ class OutboundMessageDistributor { private final RouterContext _context; private final int _priority; private final Log _log; + // following only for somebody else's OBEP, not for zero-hop + private final Set<Hash> _toRouters; + private int _newRouterCount; + private long _newRouterTime; private static final long MAX_DISTRIBUTE_TIME = 15*1000; + // 40 routers per minute per OBEP + // This is probably too high, to be reduced later + private static final int MAX_ROUTERS_PER_PERIOD = 20; + private static final long NEW_ROUTER_PERIOD = 30*1000; + /** + * @param priority OutNetMessage.PRIORITY_PARTICIPATING for somebody else's OBEP, or + * OutNetMessage.PRIORITY_MY_DATA for our own zero-hop OBGW/EP + */ public OutboundMessageDistributor(RouterContext ctx, int priority) { _context = ctx; _priority = priority; _log = ctx.logManager().getLog(OutboundMessageDistributor.class); + if (priority <= OutNetMessage.PRIORITY_PARTICIPATING) { + _toRouters = new HashSet<Hash>(4); + _toRouters.add(ctx.routerHash()); + } else { + _toRouters = null; + } // all createRateStat() in TunnelDispatcher } @@ -33,6 +54,12 @@ class OutboundMessageDistributor { } public void distribute(I2NPMessage msg, Hash target, TunnelId tunnel) { + if (shouldDrop(target)) { + _context.statManager().addRateData("tunnel.dropAtOBEP", 1); + if (_log.shouldLog(Log.WARN)) + _log.warn("Drop msg at OBEP (new conn throttle) to " + target + ' ' + msg); + return; + } RouterInfo info = _context.netDb().lookupRouterInfoLocally(target); if (info == null) { if (_log.shouldLog(Log.INFO)) @@ -48,7 +75,33 @@ class OutboundMessageDistributor { } } - public void distribute(I2NPMessage msg, RouterInfo target, TunnelId tunnel) { + /** + * Throttle msgs to unconnected routers after we hit + * the limit of new routers in a given time period. + * @since 0.9.12 + */ + private boolean shouldDrop(Hash target) { + if (_toRouters == null) + return false; + synchronized(this) { + if (_toRouters.contains(target)) + return false; + // haven't sent to this router before + long now = _context.clock().now(); + if (_newRouterTime < now - NEW_ROUTER_PERIOD) { + _newRouterCount = 0; + _newRouterTime = now; + } else if (_newRouterCount >= MAX_ROUTERS_PER_PERIOD) { + if (!_context.commSystem().isEstablished(target)) + return true; //drop + } + _newRouterCount++; + _toRouters.add(target); + } + return false; + } + + private void distribute(I2NPMessage msg, RouterInfo target, TunnelId tunnel) { I2NPMessage m = msg; if (tunnel != null) { TunnelGatewayMessage t = new TunnelGatewayMessage(_context); @@ -105,7 +158,7 @@ class OutboundMessageDistributor { + ": NOT found on search"); stat = 0; } - _context.statManager().addRateData("tunnel.distributeLookupSuccess", stat, 0); + _context.statManager().addRateData("tunnel.distributeLookupSuccess", stat); } } } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index b5797474919a4aaaecc4c1a072b10719943c6f0b..7e786016ceb69bed33052d5abdbf10918f5b9482 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -190,6 +190,7 @@ public class TunnelDispatcher implements Service { ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); // following is for OutboundMessageDistributor ctx.statManager().createRateStat("tunnel.distributeLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 }); + ctx.statManager().createRateStat("tunnel.dropAtOBEP", "New conn throttle", "Tunnels", new long[] { 60*60*1000 }); // following is for OutboundReceiver ctx.statManager().createRateStat("tunnel.outboundLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 }); // following is for InboundGatewayReceiver