From 3a568096f2a18e408adedf45094592e8d262f2ad Mon Sep 17 00:00:00 2001 From: jrandom Date: Fri, 9 Jul 2004 03:56:22 +0000 Subject: [PATCH] new throttling code which rejects tunnel create requests, networkDb lookup requests, and even tells the I2NP components to stop reading from the network (it doesnt affect writing to the network) the simple RouterThrottleImpl bases its decision entirely on how congested the jobQueue is - if there are jobs that have been waiting 5+ seconds, reject everything and stop reading from the network (each i2npMessageReader randomly waits .5-1s when throttled before rechecking it) minor adjustments in the stats published - removing a few useless ones and adding the router.throttleNetworkCause (which is the average ms lag in the jobQueue when an I2NP reader is throttled) --- .../net/i2p/data/i2np/I2NPMessageReader.java | 3 + .../src/net/i2p/router/RouterContext.java | 7 ++ .../src/net/i2p/router/RouterThrottle.java | 34 ++++++++ .../net/i2p/router/RouterThrottleImpl.java | 82 +++++++++++++++++++ .../src/net/i2p/router/StatisticsManager.java | 22 ++--- .../DatabaseLookupMessageHandler.java | 15 +++- .../HandleTunnelCreateMessageJob.java | 10 ++- 7 files changed, 159 insertions(+), 14 deletions(-) create mode 100644 router/java/src/net/i2p/router/RouterThrottle.java create mode 100644 router/java/src/net/i2p/router/RouterThrottleImpl.java diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java b/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java index f55dffbc7..bb9fa3bab 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java @@ -114,6 +114,9 @@ public class I2NPMessageReader { public void run() { while (_stayAlive) { while (_doRun) { + while (!_context.throttle().acceptNetworkMessage()) { + try { Thread.sleep(500 + _context.random().nextInt(512)); } catch (InterruptedException ie) {} + } // do read try { I2NPMessage msg = _handler.readMessage(_stream); diff --git a/router/java/src/net/i2p/router/RouterContext.java b/router/java/src/net/i2p/router/RouterContext.java index 5c56f2706..9b69834a7 100644 --- a/router/java/src/net/i2p/router/RouterContext.java +++ b/router/java/src/net/i2p/router/RouterContext.java @@ -49,6 +49,7 @@ public class RouterContext extends I2PAppContext { private Shitlist _shitlist; private MessageValidator _messageValidator; private MessageStateMonitor _messageStateMonitor; + private RouterThrottle _throttle; private Calculator _isFailingCalc; private Calculator _integrationCalc; private Calculator _speedCalc; @@ -83,6 +84,7 @@ public class RouterContext extends I2PAppContext { _statPublisher = new StatisticsManager(this); _shitlist = new Shitlist(this); _messageValidator = new MessageValidator(this); + _throttle = new RouterThrottleImpl(this); _isFailingCalc = new IsFailingCalculator(this); _integrationCalc = new IntegrationCalculator(this); _speedCalc = new SpeedCalculator(this); @@ -188,6 +190,11 @@ public class RouterContext extends I2PAppContext { * well as other criteria for "validity". */ public MessageValidator messageValidator() { return _messageValidator; } + /** + * Component to coordinate our accepting/rejecting of requests under load + * + */ + public RouterThrottle throttle() { return _throttle; } /** how do we rank the failure of profiles? */ public Calculator isFailingCalculator() { return _isFailingCalc; } diff --git a/router/java/src/net/i2p/router/RouterThrottle.java b/router/java/src/net/i2p/router/RouterThrottle.java new file mode 100644 index 000000000..45d8bf9f5 --- /dev/null +++ b/router/java/src/net/i2p/router/RouterThrottle.java @@ -0,0 +1,34 @@ +package net.i2p.router; + +import net.i2p.data.Hash; +import net.i2p.data.i2np.TunnelCreateMessage; + +/** + * Gatekeeper for deciding whether to throttle the further processing + * of messages through the router. This is seperate from the bandwidth + * limiting which simply makes sure the bytes transferred dont exceed the + * bytes allowed (though the router throttle should take into account the + * current bandwidth usage and limits when determining whether to accept or + * reject certain activities, such as tunnels) + * + */ +public interface RouterThrottle { + /** + * Should we accept any more data from the network for any sort of message, + * taking into account our current load, or should we simply slow down? + * + */ + public boolean acceptNetworkMessage(); + /** + * Should we accept the request to participate in the given tunnel, + * taking into account our current load and bandwidth usage commitments? + * + */ + public boolean acceptTunnelRequest(TunnelCreateMessage msg); + /** + * Should we accept the netDb lookup message, replying either with the + * value or some closer peers, or should we simply drop it due to overload? + * + */ + public boolean acceptNetDbLookupRequest(Hash key); +} diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java new file mode 100644 index 000000000..6cbc28468 --- /dev/null +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -0,0 +1,82 @@ +package net.i2p.router; + +import net.i2p.data.Hash; +import net.i2p.data.i2np.TunnelCreateMessage; +import net.i2p.util.Log; + +/** + * Simple throttle that basically stops accepting messages or nontrivial + * requests if the jobQueue lag is too large. + * + */ +class RouterThrottleImpl implements RouterThrottle { + private RouterContext _context; + private Log _log; + + /** + * arbitrary hard limit of 5 seconds - if its taking this long to get + * to a job, we're congested. + * + */ + private static int JOB_LAG_LIMIT = 5000; + + public RouterThrottleImpl(RouterContext context) { + _context = context; + _log = context.logManager().getLog(RouterThrottleImpl.class); + _context.statManager().createRateStat("router.throttleNetworkCause", "How lagged the jobQueue was when an I2NP was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("router.throttleNetDbCause", "How lagged the jobQueue was when a networkDb request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("router.throttleTunnelCause", "How lagged the jobQueue was when a tunnel request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("tunnel.bytesAllocatedAtAccept", "How many bytes had been 'allocated' for participating tunnels when we accepted a request?", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); + } + + public boolean acceptNetworkMessage() { + long lag = _context.jobQueue().getMaxLag(); + if (lag > JOB_LAG_LIMIT) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Throttling network reader, as the job lag is " + lag); + _context.statManager().addRateData("router.throttleNetworkCause", lag, lag); + return false; + } else { + return true; + } + } + + public boolean acceptNetDbLookupRequest(Hash key) { + long lag = _context.jobQueue().getMaxLag(); + if (lag > JOB_LAG_LIMIT) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Refusing netDb request, as the job lag is " + lag); + _context.statManager().addRateData("router.throttleNetDbCause", lag, lag); + return false; + } else { + return true; + } + } + public boolean acceptTunnelRequest(TunnelCreateMessage msg) { + long lag = _context.jobQueue().getMaxLag(); + if (lag > JOB_LAG_LIMIT) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Refusing tunnel request, as the job lag is " + lag); + _context.statManager().addRateData("router.throttleTunnelCause", lag, lag); + return false; + } else { + // ok, we're not hosed, but can we handle the bandwidth requirements + // of another tunnel? + double msgsPerTunnel = _context.statManager().getRate("tunnel.participatingMessagesProcessed").getRate(10*60*1000).getAverageValue(); + double bytesPerMsg = _context.statManager().getRate("tunnel.relayMessageSize").getRate(10*60*1000).getAverageValue(); + double bytesPerTunnel = msgsPerTunnel * bytesPerMsg; + + + int numTunnels = _context.tunnelManager().getParticipatingCount(); + double bytesAllocated = (numTunnels + 1) * bytesPerTunnel; + + _context.statManager().addRateData("tunnel.bytesAllocatedAtAccept", (long)bytesAllocated, msg.getTunnelDurationSeconds()*1000); + // todo: um, throttle (include bw usage of the netDb, our own tunnels, the clients, + // and check to see that they are less than the bandwidth limits + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Accepting a new tunnel request (now allocating " + bytesAllocated + " bytes across " + numTunnels + " tunnels"); + return true; + } + } +} diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 709473694..7ef180850 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -106,7 +106,7 @@ public class StatisticsManager implements Service { includeRate("crypto.garlic.decryptFail", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("tunnel.unknownTunnelTimeLeft", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("jobQueue.readyJobs", stats, new long[] { 60*1000, 60*60*1000 }); - includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 }); + //includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 }); includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("tunnel.testSuccessTime", stats, new long[] { 60*60*1000l, 24*60*60*1000l }); @@ -114,6 +114,7 @@ public class StatisticsManager implements Service { includeRate("tunnel.inboundMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 }); includeRate("tunnel.participatingMessagesProcessed", stats, new long[] { 10*60*1000, 60*60*1000 }); includeRate("tunnel.expiredAfterAcceptTime", stats, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + includeRate("tunnel.bytesAllocatedAtAccept", stats, new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 }); @@ -121,16 +122,17 @@ public class StatisticsManager implements Service { includeRate("netDb.successPeers", stats, new long[] { 60*60*1000 }); includeRate("netDb.failedPeers", stats, new long[] { 60*60*1000 }); includeRate("netDb.searchCount", stats, new long[] { 3*60*60*1000}); - includeRate("inNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 }); - includeRate("outNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 }); + //includeRate("inNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 }); + //includeRate("outNetMessage.timeToDiscard", stats, new long[] { 5*60*1000, 10*60*1000, 60*60*1000 }); + includeRate("router.throttleNetworkCause", stats, new long[] { 10*60*1000, 60*60*1000 }); includeRate("transport.receiveMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.sendMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.sendMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.sendMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.receiveMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 }); - includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.sendMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.sendMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.sendMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.receiveMessageSmall", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.receiveMessageMedium", stats, new long[] { 5*60*1000, 60*60*1000 }); + //includeRate("transport.receiveMessageLarge", stats, new long[] { 5*60*1000, 60*60*1000 }); includeRate("client.sendAckTime", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true); stats.setProperty("stat_uptime", DataHelper.formatDuration(_context.router().getUptime())); stats.setProperty("stat__rateKey", "avg;maxAvg;pctLifetime;[sat;satLim;maxSat;maxSatLim;][num;lifetimeFreq;maxFreq]"); diff --git a/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java b/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java index ab42e2d37..52dee9a6e 100644 --- a/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java +++ b/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java @@ -16,6 +16,7 @@ import net.i2p.data.i2np.SourceRouteBlock; import net.i2p.router.HandlerJobBuilder; import net.i2p.router.Job; import net.i2p.router.RouterContext; +import net.i2p.util.Log; /** * Build a HandleDatabaseLookupMessageJob whenever a DatabaseLookupMessage arrives @@ -23,14 +24,24 @@ import net.i2p.router.RouterContext; */ public class DatabaseLookupMessageHandler implements HandlerJobBuilder { private RouterContext _context; + private Log _log; public DatabaseLookupMessageHandler(RouterContext context) { _context = context; + _log = context.logManager().getLog(DatabaseLookupMessageHandler.class); _context.statManager().createRateStat("netDb.lookupsReceived", "How many netDb lookups have we received?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _context.statManager().createRateStat("netDb.lookupsDropped", "How many netDb lookups did we drop due to throttling?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); } public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) { _context.statManager().addRateData("netDb.lookupsReceived", 1, 0); - // ignore the reply block for the moment - return new HandleDatabaseLookupMessageJob(_context, (DatabaseLookupMessage)receivedMessage, from, fromHash); + + if (_context.throttle().acceptNetDbLookupRequest(((DatabaseLookupMessage)receivedMessage).getSearchKey())) { + return new HandleDatabaseLookupMessageJob(_context, (DatabaseLookupMessage)receivedMessage, from, fromHash); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Dropping lookup request as throttled"); + _context.statManager().addRateData("netDb.lookupsDropped", 1, 1); + return null; + } } } diff --git a/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java index 7ef558e13..85ccc5fed 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java +++ b/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java @@ -39,6 +39,7 @@ public class HandleTunnelCreateMessageJob extends JobImpl { RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) { super(ctx); _log = ctx.logManager().getLog(HandleTunnelCreateMessageJob.class); + ctx.statManager().createRateStat("tunnel.rejectOverloaded", "How many tunnels did we deny due to throttling?", "Tunnels", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); _message = receivedMessage; _from = from; _fromHash = fromHash; @@ -83,8 +84,13 @@ public class HandleTunnelCreateMessageJob extends JobImpl { } private boolean isOverloaded() { - // hmmm.... - return false; + boolean shouldAccept = _context.throttle().acceptTunnelRequest(_message); + if (!shouldAccept) { + _context.statManager().addRateData("tunnel.rejectOverloaded", 1, 1); + if (_log.shouldLog(Log.INFO)) + _log.info("Refusing tunnel request due to overload"); + } + return !shouldAccept; } private class TestJob extends JobImpl {