I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit a62b7a43 authored by zzz's avatar zzz
Browse files

* Tunnels: Rate-limit connections at the OBEP (ticket #1134)

parent 9d7a9c98
No related branches found
No related tags found
No related merge requests found
package net.i2p.router.tunnel;
import java.util.HashSet;
import java.util.Set;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.TunnelId;
......@@ -18,13 +21,31 @@ class OutboundMessageDistributor {
private final RouterContext _context;
private final int _priority;
private final Log _log;
// following only for somebody else's OBEP, not for zero-hop
private final Set<Hash> _toRouters;
private int _newRouterCount;
private long _newRouterTime;
private static final long MAX_DISTRIBUTE_TIME = 15*1000;
// 40 routers per minute per OBEP
// This is probably too high, to be reduced later
private static final int MAX_ROUTERS_PER_PERIOD = 20;
private static final long NEW_ROUTER_PERIOD = 30*1000;
/**
* @param priority OutNetMessage.PRIORITY_PARTICIPATING for somebody else's OBEP, or
* OutNetMessage.PRIORITY_MY_DATA for our own zero-hop OBGW/EP
*/
public OutboundMessageDistributor(RouterContext ctx, int priority) {
_context = ctx;
_priority = priority;
_log = ctx.logManager().getLog(OutboundMessageDistributor.class);
if (priority <= OutNetMessage.PRIORITY_PARTICIPATING) {
_toRouters = new HashSet<Hash>(4);
_toRouters.add(ctx.routerHash());
} else {
_toRouters = null;
}
// all createRateStat() in TunnelDispatcher
}
......@@ -33,6 +54,12 @@ class OutboundMessageDistributor {
}
public void distribute(I2NPMessage msg, Hash target, TunnelId tunnel) {
if (shouldDrop(target)) {
_context.statManager().addRateData("tunnel.dropAtOBEP", 1);
if (_log.shouldLog(Log.WARN))
_log.warn("Drop msg at OBEP (new conn throttle) to " + target + ' ' + msg);
return;
}
RouterInfo info = _context.netDb().lookupRouterInfoLocally(target);
if (info == null) {
if (_log.shouldLog(Log.INFO))
......@@ -48,7 +75,33 @@ class OutboundMessageDistributor {
}
}
public void distribute(I2NPMessage msg, RouterInfo target, TunnelId tunnel) {
/**
* Throttle msgs to unconnected routers after we hit
* the limit of new routers in a given time period.
* @since 0.9.12
*/
private boolean shouldDrop(Hash target) {
if (_toRouters == null)
return false;
synchronized(this) {
if (_toRouters.contains(target))
return false;
// haven't sent to this router before
long now = _context.clock().now();
if (_newRouterTime < now - NEW_ROUTER_PERIOD) {
_newRouterCount = 0;
_newRouterTime = now;
} else if (_newRouterCount >= MAX_ROUTERS_PER_PERIOD) {
if (!_context.commSystem().isEstablished(target))
return true; //drop
}
_newRouterCount++;
_toRouters.add(target);
}
return false;
}
private void distribute(I2NPMessage msg, RouterInfo target, TunnelId tunnel) {
I2NPMessage m = msg;
if (tunnel != null) {
TunnelGatewayMessage t = new TunnelGatewayMessage(_context);
......@@ -105,7 +158,7 @@ class OutboundMessageDistributor {
+ ": NOT found on search");
stat = 0;
}
_context.statManager().addRateData("tunnel.distributeLookupSuccess", stat, 0);
_context.statManager().addRateData("tunnel.distributeLookupSuccess", stat);
}
}
}
......@@ -190,6 +190,7 @@ public class TunnelDispatcher implements Service {
ctx.statManager().createRateStat("tunnel.batchFragmentation", "Avg. number of fragments per msg", "Tunnels", new long[] { 10*60*1000, 60*60*1000 });
// following is for OutboundMessageDistributor
ctx.statManager().createRateStat("tunnel.distributeLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
ctx.statManager().createRateStat("tunnel.dropAtOBEP", "New conn throttle", "Tunnels", new long[] { 60*60*1000 });
// following is for OutboundReceiver
ctx.statManager().createRateStat("tunnel.outboundLookupSuccess", "Was a deferred lookup successful?", "Tunnels", new long[] { 60*60*1000 });
// following is for InboundGatewayReceiver
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment