diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java index 3e83562489bafeb3eb74b39b9688f7335440b325..3bae2fd964aafc03a077d322bfd5bbe834fc72b4 100644 --- a/router/java/src/net/i2p/router/RouterThrottleImpl.java +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -115,6 +115,10 @@ class RouterThrottleImpl implements RouterThrottle { } /** + * If we should send a reject, return a nonzero reject code. + * Anything that causes us to drop a request instead of rejecting it + * must go in BuildHandler.handleInboundRequest(), not here. + * * @return 0 for accept or nonzero reject code */ public int acceptTunnelRequest() { @@ -132,6 +136,7 @@ class RouterThrottleImpl implements RouterThrottle { return TunnelHistory.TUNNEL_REJECT_BANDWIDTH; } + /**** Moved to BuildHandler long lag = _context.jobQueue().getMaxLag(); if (lag > JOB_LAG_LIMIT_TUNNEL) { if (_log.shouldLog(Log.WARN)) @@ -140,6 +145,7 @@ class RouterThrottleImpl implements RouterThrottle { setTunnelStatus(_x("Rejecting tunnels: High job lag")); return TunnelHistory.TUNNEL_REJECT_BANDWIDTH; } + ****/ RateAverages ra = RateAverages.getTemp(); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java index c386d10b8de992a649509acf359bb06840ef88f0..bb75b682535f1b699584aba6f99a29d5076cfb0e 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/ExploreJob.java @@ -73,8 +73,8 @@ class ExploreJob extends SearchJob { * and PeerSelector doesn't include the floodfill peers, * so we add the ff peers ourselves and then use the regular PeerSelector. * - * @param replyTunnelId tunnel to receive replies through - * @param replyGateway gateway for the reply tunnel + * @param replyTunnelId tunnel to receive replies through, or our router hash if replyGateway is null + * @param replyGateway gateway for the reply tunnel, if null, we are sending direct, do not encrypt * @param expiration when the search should stop * @param peer the peer to send it to * @@ -89,7 +89,8 @@ class ExploreJob extends SearchJob { //msg.setDontIncludePeers(getState().getClosestAttempted(MAX_CLOSEST)); Set<Hash> dontIncludePeers = getState().getClosestAttempted(MAX_CLOSEST); msg.setMessageExpiration(expiration); - msg.setReplyTunnel(replyTunnelId); + if (replyTunnelId != null) + msg.setReplyTunnel(replyTunnelId); int available = MAX_CLOSEST - dontIncludePeers.size(); if (available > 0) { @@ -134,7 +135,8 @@ class ExploreJob extends SearchJob { // Now encrypt if we can I2NPMessage outMsg; - if (getContext().getProperty(IterativeSearchJob.PROP_ENCRYPT_RI, IterativeSearchJob.DEFAULT_ENCRYPT_RI)) { + if (replyTunnelId != null && + getContext().getProperty(IterativeSearchJob.PROP_ENCRYPT_RI, IterativeSearchJob.DEFAULT_ENCRYPT_RI)) { // request encrypted reply? if (DatabaseLookupMessage.supportsEncryptedReplies(peer)) { MessageWrapper.OneTimeSession sess; diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/IterativeSearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/IterativeSearchJob.java index 1ad9eb543b800add6ba14e0e8438e1b966ab5611..66e0389cea2a7effd0f384e6c5019aac99546b74 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/IterativeSearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/IterativeSearchJob.java @@ -347,7 +347,9 @@ class IterativeSearchJob extends FloodSearchJob { I2NPMessage outMsg = null; if (isDirect) { // never wrap - } else if (_isLease || getContext().getProperty(PROP_ENCRYPT_RI, DEFAULT_ENCRYPT_RI)) { + } else if (_isLease || + (getContext().getProperty(PROP_ENCRYPT_RI, DEFAULT_ENCRYPT_RI) && + getContext().jobQueue().getMaxLag() < 300)) { // Full ElG is fairly expensive so only do it for LS lookups // and for RI lookups on fast boxes. // if we have the ff RI, garlic encrypt it diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index f0edded8b0d63888bed92992069459cd4b032fbd..450aca44338eabda1da66a2b14232eaf425c3787 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -28,6 +28,7 @@ import net.i2p.router.JobImpl; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; +import net.i2p.router.message.SendMessageDirectJob; import net.i2p.util.Log; /** @@ -47,6 +48,7 @@ class SearchJob extends JobImpl { private final long _expiration; private final long _timeoutMs; private final boolean _keepStats; + private final boolean _isLease; private Job _pendingRequeueJob; private final PeerSelector _peerSelector; private final List<Search> _deferredSearches; @@ -88,7 +90,8 @@ class SearchJob extends JobImpl { * Create a new search for the routingKey specified * */ - public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) { + public SearchJob(RouterContext context, KademliaNetworkDatabaseFacade facade, Hash key, + Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) { super(context); if ( (key == null) || (key.getData() == null) ) throw new IllegalArgumentException("Search for null key? wtf"); @@ -99,13 +102,14 @@ class SearchJob extends JobImpl { _onFailure = onFailure; _timeoutMs = timeoutMs; _keepStats = keepStats; + _isLease = isLease; _deferredSearches = new ArrayList<Search>(0); _peerSelector = facade.getPeerSelector(); _startedOn = -1; _expiration = getContext().clock().now() + timeoutMs; - getContext().statManager().addRateData("netDb.searchCount", 1, 0); + getContext().statManager().addRateData("netDb.searchCount", 1); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by")); + _log.debug("Search (" + getClass().getName() + " for " + key, new Exception("Search enqueued by")); } public void runJob() { @@ -348,6 +352,7 @@ class SearchJob extends JobImpl { else requeuePending(REQUEUE_DELAY); } + private void requeuePending(long ms) { if (_pendingRequeueJob == null) _pendingRequeueJob = new RequeuePending(getContext()); @@ -390,17 +395,24 @@ class SearchJob extends JobImpl { return; } else { if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Send search to " + router.getIdentity().getHash().toBase64() - + " for " + _state.getTarget().toBase64() + _log.info(getJobId() + ": Send search to " + router.getIdentity().getHash() + + " for " + _state.getTarget() + " w/ timeout " + getPerPeerTimeoutMs(router.getIdentity().calculateHash())); } - getContext().statManager().addRateData("netDb.searchMessageCount", 1, 0); + getContext().statManager().addRateData("netDb.searchMessageCount", 1); + + // To minimize connection congestion, send RI lokups through exploratory tunnels if not connected. + // To minimize crypto overhead and response latency, send RI lookups directly if connected. + // But not too likely since we don't explore when we're floodfill. + // Always send LS lookups thru expl tunnels. + // But this is never used for LSes... - //if (_isLease || true) // always send searches out tunnels + if (_isLease || + !getContext().commSystem().isEstablished(router.getIdentity().calculateHash())) sendLeaseSearch(router); - //else - // sendRouterSearch(router); + else + sendRouterSearch(router); } @@ -461,27 +473,27 @@ class SearchJob extends JobImpl { } /** we're searching for a router, so we can just send direct */ -/******* always send through the lease protected void sendRouterSearch(RouterInfo router) { int timeout = _facade.getPeerTimeout(router.getIdentity().getHash()); long expiration = getContext().clock().now() + timeout; - DatabaseLookupMessage msg = buildMessage(expiration); + // use the 4-arg one so we pick up the override in ExploreJob + //I2NPMessage msg = buildMessage(expiration); + I2NPMessage msg = buildMessage(null, router.getIdentity().getHash(), expiration, router); if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64() - + " for " + msg.getSearchKey().toBase64() + " w/ replies to us [" - + msg.getFrom().toBase64() + "]"); + _log.debug(getJobId() + ": Sending router search directly to " + router.getIdentity().getHash() + + " for " + _state.getTarget()); SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(), - reply, new FailedJob(getContext(), router), sel, timeout, SEARCH_PRIORITY); + reply, new FailedJob(getContext(), router), sel, timeout, + OutNetMessage.PRIORITY_EXPLORATORY); if (FloodfillNetworkDatabaseFacade.isFloodfill(router)) _floodfillSearchesOutstanding++; j.runJob(); //getContext().jobQueue().addJob(j); } -**********/ /** @@ -495,6 +507,8 @@ class SearchJob extends JobImpl { * @return a DatabaseLookupMessage */ protected I2NPMessage buildMessage(TunnelId replyTunnelId, Hash replyGateway, long expiration, RouterInfo peer) { + throw new UnsupportedOperationException("see ExploreJob"); +/******* DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true); msg.setSearchKey(_state.getTarget()); //msg.setFrom(replyGateway.getIdentity().getHash()); @@ -503,6 +517,7 @@ class SearchJob extends JobImpl { msg.setMessageExpiration(expiration); msg.setReplyTunnel(replyTunnelId); return msg; +*********/ } /** @@ -522,6 +537,7 @@ class SearchJob extends JobImpl { } *********/ + /** found a reply */ void replyFound(DatabaseSearchReplyMessage message, Hash peer) { long duration = _state.replyFound(peer); // this processing can take a while, so split 'er up @@ -569,13 +585,13 @@ class SearchJob extends JobImpl { _state.replyTimeout(_peer); if (_penalizePeer) { if (_log.shouldLog(Log.INFO)) - _log.info("Penalizing peer for timeout on search: " + _peer.toBase64() + " after " + (getContext().clock().now() - _sentOn)); + _log.info("Penalizing peer for timeout on search: " + _peer + " after " + (getContext().clock().now() - _sentOn)); getContext().profileManager().dbLookupFailed(_peer); } else { if (_log.shouldLog(Log.ERROR)) - _log.error("NOT (!!) Penalizing peer for timeout on search: " + _peer.toBase64()); + _log.error("NOT (!!) Penalizing peer for timeout on search: " + _peer); } - getContext().statManager().addRateData("netDb.failedPeers", 1, 0); + getContext().statManager().addRateData("netDb.failedPeers", 1); searchNext(); } public String getName() { return "Kademlia Search Failed"; } @@ -593,7 +609,7 @@ class SearchJob extends JobImpl { if (_keepStats) { long time = getContext().clock().now() - _state.getWhenStarted(); - getContext().statManager().addRateData("netDb.successTime", time, 0); + getContext().statManager().addRateData("netDb.successTime", time); getContext().statManager().addRateData("netDb.successPeers", _state.getAttempted().size(), time); } if (_onSuccess != null) @@ -682,7 +698,7 @@ class SearchJob extends JobImpl { protected void fail() { if (isLocal()) { if (_log.shouldLog(Log.ERROR)) - _log.error(getJobId() + ": why did we fail if the target is local?: " + _state.getTarget().toBase64(), new Exception("failure cause")); + _log.error(getJobId() + ": why did we fail if the target is local?: " + _state.getTarget(), new Exception("failure cause")); succeed(); return; } @@ -697,7 +713,7 @@ class SearchJob extends JobImpl { getContext().statManager().addRateData("netDb.failedAttemptedPeers", attempted, time); if (_keepStats) { - getContext().statManager().addRateData("netDb.failedTime", time, 0); + getContext().statManager().addRateData("netDb.failedTime", time); //_facade.fail(_state.getTarget()); } if (_onFailure != null) @@ -782,6 +798,7 @@ class SearchJob extends JobImpl { } boolean wasAttempted(Hash peer) { return _state.wasAttempted(peer); } + long timeoutMs() { return _timeoutMs; } /** @return true if peer was new */ @@ -795,5 +812,6 @@ class SearchJob extends JobImpl { } return rv; } + void decrementOutstandingFloodfillSearches() { _floodfillSearchesOutstanding--; } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java index 7938ab072c200b7f957ec8f4d444a6695bd11063..f9023a00d3824d3c81f514e5dd8a1b0713447c02 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StartExplorersJob.java @@ -42,6 +42,8 @@ class StartExplorersJob extends JobImpl { private static final int MIN_ROUTERS = 250; /** explore slowly if we have more than this many routers */ private static final int MAX_ROUTERS = 800; + private static final long MAX_LAG = 100; + private static final long MAX_MSG_DELAY = 1500; public StartExplorersJob(RouterContext context, KademliaNetworkDatabaseFacade facade) { super(context); @@ -50,8 +52,12 @@ class StartExplorersJob extends JobImpl { } public String getName() { return "Start Explorers Job"; } + public void runJob() { if (! (_facade.floodfillEnabled() || + getContext().jobQueue().getMaxLag() > MAX_LAG || + getContext().throttle().getMessageDelay() > MAX_MSG_DELAY || + // message delay limit also? getContext().router().gracefulShutdownInProgress())) { int num = MAX_PER_RUN; if (_facade.getDataStore().size() < LOW_ROUTERS) diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index 6849f3d3e6d577c30bd87d8bfe006ae558107fef..7fdd248020a4ac86d56c7df2a4d9750cb20d1098 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -93,6 +93,8 @@ class BuildHandler implements Runnable { /** must be > 1 hour due to rouding down */ private static final long MAX_REQUEST_AGE = 65*60*1000; + private static final long JOB_LAG_LIMIT_TUNNEL = 350; + public BuildHandler(RouterContext ctx, TunnelPoolManager manager, BuildExecutor exec) { _context = ctx; @@ -248,6 +250,17 @@ class BuildHandler implements Runnable { _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow")); return; } + + long lag = _context.jobQueue().getMaxLag(); + // TODO reject instead of drop also for a lower limit? see throttle + if (lag > JOB_LAG_LIMIT_TUNNEL) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping tunnel request, as the job lag is " + lag); + _context.statManager().addRateData("router.throttleTunnelCause", lag); + _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High job lag")); + return; + } + handleRequest(state); //int remaining = _inboundBuildMessages.size();