Compare commits

...

1 Commits

Author SHA1 Message Date
zzz
46985269aa WIP: Per-tunnel bw limiters 2024-05-23 08:20:26 -04:00
8 changed files with 79 additions and 10 deletions

View File

@@ -211,6 +211,14 @@ public class FIFOBandwidthLimiter {
return _refiller.getCurrentParticipatingBandwidth(); return _refiller.getCurrentParticipatingBandwidth();
} }
/**
* In Bytes per second
* @since 0.9.63
*/
public int getMaxShareBandwidth() {
return _refiller.getMaxShareBandwidth();
}
/** /**
* Request some bytes. Does not block. * Request some bytes. Does not block.
*/ */

View File

@@ -351,6 +351,14 @@ public class FIFOBandwidthRefiller implements Runnable {
return (int) (_partBWE.getBandwidthEstimate() * 1000f); return (int) (_partBWE.getBandwidthEstimate() * 1000f);
} }
/**
* In Bytes per second
* @since 0.9.63
*/
int getMaxShareBandwidth() {
return _partBWE != null ? _partBWE.getMaxBandwidth() : DEFAULT_OUTBOUND_BANDWIDTH;
}
/** /**
* Call a few times a minute to update the stats * Call a few times a minute to update the stats
* *

View File

@@ -15,6 +15,7 @@ import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage; import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SyntheticREDQueue;
/** /**
* When a message arrives at the outbound tunnel endpoint, this distributor * When a message arrives at the outbound tunnel endpoint, this distributor
@@ -26,6 +27,7 @@ class OutboundMessageDistributor {
private final Log _log; private final Log _log;
// following only for somebody else's OBEP, not for zero-hop // following only for somebody else's OBEP, not for zero-hop
private final Set<Hash> _toRouters; private final Set<Hash> _toRouters;
private final SyntheticREDQueue _partBWE;
private int _newRouterCount; private int _newRouterCount;
private long _newRouterTime; private long _newRouterTime;
@@ -37,8 +39,9 @@ class OutboundMessageDistributor {
/** /**
* @param priority OutNetMessage.PRIORITY_PARTICIPATING for somebody else's OBEP, or * @param priority OutNetMessage.PRIORITY_PARTICIPATING for somebody else's OBEP, or
* OutNetMessage.PRIORITY_MY_DATA for our own zero-hop OBGW/EP * OutNetMessage.PRIORITY_MY_DATA for our own zero-hop OBGW/EP
* @param bwe null for our zero-hop
*/ */
public OutboundMessageDistributor(RouterContext ctx, int priority) { public OutboundMessageDistributor(RouterContext ctx, int priority, SyntheticREDQueue bwe) {
_context = ctx; _context = ctx;
_priority = priority; _priority = priority;
_log = ctx.logManager().getLog(OutboundMessageDistributor.class); _log = ctx.logManager().getLog(OutboundMessageDistributor.class);
@@ -48,6 +51,7 @@ class OutboundMessageDistributor {
} else { } else {
_toRouters = null; _toRouters = null;
} }
_partBWE = bwe;
// all createRateStat() in TunnelDispatcher // all createRateStat() in TunnelDispatcher
} }
@@ -75,7 +79,7 @@ class OutboundMessageDistributor {
if (_toRouters != null) { if (_toRouters != null) {
// only if not zero-hop // only if not zero-hop
// credit our lookup message as part. traffic // credit our lookup message as part. traffic
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.OBEP, DatabaseLookupMessage.MESSAGE_TYPE, 1024)) { if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.OBEP, DatabaseLookupMessage.MESSAGE_TYPE, 1024, _partBWE)) {
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("Drop msg at OBEP (lookup bandwidth) to " + target.toBase64() + " type " + msg.getType()); _log.warn("Drop msg at OBEP (lookup bandwidth) to " + target.toBase64() + " type " + msg.getType());
return; return;

View File

@@ -11,6 +11,7 @@ import net.i2p.data.i2np.UnknownI2NPMessage;
import net.i2p.router.OutNetMessage; import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SyntheticREDQueue;
/** /**
* We are the end of an outbound tunnel that we did not create. Gather fragments * We are the end of an outbound tunnel that we did not create. Gather fragments
@@ -24,6 +25,7 @@ class OutboundTunnelEndpoint {
private final HopProcessor _processor; private final HopProcessor _processor;
private final FragmentHandler _handler; private final FragmentHandler _handler;
private final OutboundMessageDistributor _outDistributor; private final OutboundMessageDistributor _outDistributor;
private final SyntheticREDQueue _partBWE;
public OutboundTunnelEndpoint(RouterContext ctx, HopConfig config, HopProcessor processor) { public OutboundTunnelEndpoint(RouterContext ctx, HopConfig config, HopProcessor processor) {
_context = ctx; _context = ctx;
@@ -31,7 +33,9 @@ class OutboundTunnelEndpoint {
_config = config; _config = config;
_processor = processor; _processor = processor;
_handler = new FragmentHandler(ctx, new DefragmentedHandler(), false); _handler = new FragmentHandler(ctx, new DefragmentedHandler(), false);
_outDistributor = new OutboundMessageDistributor(ctx, OutNetMessage.PRIORITY_PARTICIPATING); int max = _context.tunnelDispatcher().getMaxPerTunnelBandwidth(TunnelDispatcher.Location.OBEP);
_partBWE = new SyntheticREDQueue(_context, max);
_outDistributor = new OutboundMessageDistributor(ctx, OutNetMessage.PRIORITY_PARTICIPATING, _partBWE);
} }
public void dispatch(TunnelDataMessage msg, Hash recvFrom) { public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
@@ -113,9 +117,10 @@ class OutboundTunnelEndpoint {
int size = msg.getMessageSize(); int size = msg.getMessageSize();
// don't drop it if we are the target // don't drop it if we are the target
boolean toUs = _context.routerHash().equals(toRouter); boolean toUs = _context.routerHash().equals(toRouter);
if ((!toUs) && if (!toUs) {
_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.OBEP, type, size)) if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.OBEP, type, size, _partBWE))
return; return;
}
// this overstates the stat somewhat, but ok for now // this overstates the stat somewhat, but ok for now
//int kb = (size + 1023) / 1024; //int kb = (size + 1023) / 1024;
//for (int i = 0; i < kb; i++) //for (int i = 0; i < kb; i++)

View File

@@ -4,6 +4,7 @@ import net.i2p.data.Hash;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.SyntheticREDQueue;
/** /**
* Same as PTG, but check to see if a message should be dropped before queueing it. * Same as PTG, but check to see if a message should be dropped before queueing it.
@@ -14,11 +15,14 @@ import net.i2p.router.RouterContext;
class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway { class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
/** saved so we can note messages that get dropped */ /** saved so we can note messages that get dropped */
private final HopConfig _config; private final HopConfig _config;
private final SyntheticREDQueue _partBWE;
public ThrottledPumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, public ThrottledPumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender,
Receiver receiver, TunnelGatewayPumper pumper, HopConfig config) { Receiver receiver, TunnelGatewayPumper pumper, HopConfig config) {
super(context, preprocessor, sender, receiver, pumper); super(context, preprocessor, sender, receiver, pumper);
_config = config; _config = config;
int max = _context.tunnelDispatcher().getMaxPerTunnelBandwidth(TunnelDispatcher.Location.IBGW);
_partBWE = new SyntheticREDQueue(_context, max);
} }
/** /**
@@ -43,7 +47,7 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
// 2:1 batching of small messages // 2:1 batching of small messages
size = 512; size = 512;
} }
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size)) { if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size, _partBWE)) {
// this overstates the stat somewhat, but ok for now // this overstates the stat somewhat, but ok for now
int kb = (size + 1023) / 1024; int kb = (size + 1023) / 1024;
for (int i = 0; i < kb; i++) for (int i = 0; i < kb; i++)

View File

@@ -28,6 +28,7 @@ import net.i2p.router.Service;
import net.i2p.router.peermanager.PeerProfile; import net.i2p.router.peermanager.PeerProfile;
import net.i2p.router.tunnel.pool.PooledTunnelCreatorConfig; import net.i2p.router.tunnel.pool.PooledTunnelCreatorConfig;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SyntheticREDQueue;
/** /**
* Handle the actual processing and forwarding of messages through the * Handle the actual processing and forwarding of messages through the
@@ -779,8 +780,9 @@ public class TunnelDispatcher implements Service {
* @param loc message hop location * @param loc message hop location
* @param type I2NP message type * @param type I2NP message type
* @param length the length of the message * @param length the length of the message
* @param bwe a per-tunnel bandwidth estimator to be checked first, or null
*/ */
public boolean shouldDropParticipatingMessage(Location loc, int type, int length) { boolean shouldDropParticipatingMessage(Location loc, int type, int length, SyntheticREDQueue bwe) {
if (length <= 0) if (length <= 0)
return false; return false;
@@ -807,6 +809,16 @@ public class TunnelDispatcher implements Service {
} else { } else {
factor = 1.0f; factor = 1.0f;
} }
if (bwe != null) {
if (!bwe.offer(length, factor)) {
if (_log.shouldWarn())
_log.warn("Drop (per-tunnel) part. msg. factor=" + factor +
' ' + loc + ' ' + type + ' ' + length + ' ' + bwe);
return true;
}
}
boolean reject = ! _context.bandwidthLimiter().sentParticipatingMessage(length, factor); boolean reject = ! _context.bandwidthLimiter().sentParticipatingMessage(length, factor);
if (reject) { if (reject) {
if (_log.shouldLog(Log.WARN)) { if (_log.shouldLog(Log.WARN)) {
@@ -818,6 +830,26 @@ public class TunnelDispatcher implements Service {
return reject; return reject;
} }
/**
* The maximum bandwidth for a single tunnel
*
* @param loc unused for now
* @since 0.9.63
*/
int getMaxPerTunnelBandwidth(Location loc) {
int max = _context.bandwidthLimiter().getMaxShareBandwidth();
int maxTunnels = _context.getProperty(RouterThrottleImpl.PROP_MAX_TUNNELS, RouterThrottleImpl.DEFAULT_MAX_TUNNELS);
if (maxTunnels > 25) {
if (max >= 128*1024) // N/O/P/X
max /= 4;
else if (max <= 48*1024) // K/L
max /= 2;
else
max = (24*1024) + ((max - (48*1024)) / 10); // M
}
return max;
}
//private static final int DROP_BASE_INTERVAL = 40 * 1000; //private static final int DROP_BASE_INTERVAL = 40 * 1000;
//private static final int DROP_RANDOM_BOOST = 10 * 1000; //private static final int DROP_RANDOM_BOOST = 10 * 1000;

View File

@@ -28,7 +28,7 @@ class TunnelGatewayZeroHop extends TunnelGateway {
if (config.isInbound()) if (config.isInbound())
_inDistributor = new InboundMessageDistributor(context, config.getDestination()); _inDistributor = new InboundMessageDistributor(context, config.getDestination());
else else
_outDistributor = new OutboundMessageDistributor(context, OutNetMessage.PRIORITY_MY_DATA); _outDistributor = new OutboundMessageDistributor(context, OutNetMessage.PRIORITY_MY_DATA, null);
} }
/** /**

View File

@@ -9,6 +9,7 @@ import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage; import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SyntheticREDQueue;
/** /**
* Participate in a tunnel at a location other than the gateway or outbound * Participate in a tunnel at a location other than the gateway or outbound
@@ -25,6 +26,7 @@ class TunnelParticipant {
private final InboundEndpointProcessor _inboundEndpointProcessor; private final InboundEndpointProcessor _inboundEndpointProcessor;
private final InboundMessageDistributor _inboundDistributor; private final InboundMessageDistributor _inboundDistributor;
private final FragmentHandler _handler; private final FragmentHandler _handler;
private final SyntheticREDQueue _partBWE;
private RouterInfo _nextHopCache; private RouterInfo _nextHopCache;
private static final long MAX_LOOKUP_TIME = 15*1000; private static final long MAX_LOOKUP_TIME = 15*1000;
@@ -68,6 +70,12 @@ class TunnelParticipant {
if (_nextHopCache == null) if (_nextHopCache == null)
_context.netDb().lookupRouterInfo(_config.getSendTo(), new Found(_context), null, LONG_MAX_LOOKUP_TIME); _context.netDb().lookupRouterInfo(_config.getSendTo(), new Found(_context), null, LONG_MAX_LOOKUP_TIME);
} }
if (inEndProc == null) {
int max = _context.tunnelDispatcher().getMaxPerTunnelBandwidth(TunnelDispatcher.Location.PARTICIPANT);
_partBWE = new SyntheticREDQueue(_context, max);
} else {
_partBWE = null;
}
// all createRateStat() in TunnelDispatcher // all createRateStat() in TunnelDispatcher
} }
@@ -200,7 +208,7 @@ class TunnelParticipant {
private void send(HopConfig config, TunnelDataMessage msg, RouterInfo ri) { private void send(HopConfig config, TunnelDataMessage msg, RouterInfo ri) {
if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.PARTICIPANT, if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.PARTICIPANT,
TunnelDataMessage.MESSAGE_TYPE, 1024)) TunnelDataMessage.MESSAGE_TYPE, 1024, _partBWE))
return; return;
//_config.incrementSentMessages(); //_config.incrementSentMessages();
long oldId = msg.getUniqueId(); long oldId = msg.getUniqueId();