diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java index 4c0edcac4bd47d9d0a6e955759321c7c32e73b27..1f437e48aeff7c501b46d02bd15646c743339053 100644 --- a/router/java/src/net/i2p/router/RouterThrottleImpl.java +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -17,11 +17,11 @@ class RouterThrottleImpl implements RouterThrottle { private Log _log; /** - * arbitrary hard limit of 2 seconds - if its taking this long to get + * arbitrary hard limit of 10 seconds - if its taking this long to get * to a job, we're congested. * */ - private static int JOB_LAG_LIMIT = 2000; + private static int JOB_LAG_LIMIT = 10*1000; /** * Arbitrary hard limit - if we throttle our network connection this many * times in the previous 2 minute period, don't accept requests to @@ -56,6 +56,7 @@ class RouterThrottleImpl implements RouterThrottle { } public boolean acceptNetworkMessage() { + //if (true) return true; long lag = _context.jobQueue().getMaxLag(); if ( (lag > JOB_LAG_LIMIT) && (_context.router().getUptime() > 60*1000) ) { if (_log.shouldLog(Log.DEBUG)) @@ -87,6 +88,7 @@ class RouterThrottleImpl implements RouterThrottle { } long lag = _context.jobQueue().getMaxLag(); + /* RateStat rs = _context.statManager().getRate("router.throttleNetworkCause"); Rate r = null; if (rs != null) @@ -100,11 +102,13 @@ class RouterThrottleImpl implements RouterThrottle { _context.statManager().addRateData("router.throttleTunnelCause", lag, lag); return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD; } + */ - rs = _context.statManager().getRate("transport.sendProcessingTime"); - r = null; + RateStat rs = _context.statManager().getRate("transport.sendProcessingTime"); + Rate r = null; + /* if (rs != null) - r = rs.getRate(10*60*1000); + r = rs.getRate(1*60*1000); double processTime = (r != null ? r.getAverageValue() : 0); if (processTime > 2000) { if (_log.shouldLog(Log.DEBUG)) @@ -113,9 +117,10 @@ class RouterThrottleImpl implements RouterThrottle { _context.statManager().addRateData("router.throttleTunnelProcessingTime10m", (long)processTime, (long)processTime); return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD; } + */ if (rs != null) r = rs.getRate(60*1000); - processTime = (r != null ? r.getAverageValue() : 0); + double processTime = (r != null ? r.getAverageValue() : 0); if (processTime > 2000) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Refusing tunnel request with the job lag of " + lag @@ -124,6 +129,7 @@ class RouterThrottleImpl implements RouterThrottle { return TunnelHistory.TUNNEL_REJECT_TRANSIENT_OVERLOAD; } + /* rs = _context.statManager().getRate("transport.sendMessageFailureLifetime"); r = null; if (rs != null) @@ -142,6 +148,7 @@ class RouterThrottleImpl implements RouterThrottle { } } } + */ int numTunnels = _context.tunnelManager().getParticipatingCount(); @@ -251,7 +258,7 @@ class RouterThrottleImpl implements RouterThrottle { if (_log.shouldLog(Log.DEBUG)) _log.debug("Accepting a new tunnel request (now allocating " + bytesAllocated + " bytes across " + numTunnels - + " tunnels with lag of " + lag + " and " + throttleEvents + " throttle events)"); + + " tunnels with lag of " + lag + ")"); return TUNNEL_ACCEPT; } diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 6b5f076787464a6efc469d7a2852f2583ede8551..51bfac4eb6a41f1e4c4e4211186977b4685cf846 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.338 $ $Date: 2006/01/22 19:51:56 $"; + public final static String ID = "$Revision: 1.339 $ $Date: 2006/01/25 10:34:31 $"; public final static String VERSION = "0.6.1.9"; - public final static long BUILD = 7; + public final static long BUILD = 8; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 83b80137a77f29701d3cbbf817b6ef5c33552451..91ca2877a652c7f2343cab88fdc1c1aba3ef999b 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -322,7 +322,8 @@ public class ClientConnectionRunner { */ void receiveMessage(Destination toDest, Destination fromDest, Payload payload) { if (_dead) return; - _context.jobQueue().addJob(new MessageReceivedJob(_context, this, toDest, fromDest, payload)); + MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload); + j.runJob(); } /** diff --git a/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java b/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java index fe881a54991282004cbff80aa5a93a6cf897a8eb..6d982f24c5b43697cb2aa66de6a137aff8fd97a0 100644 --- a/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java @@ -82,7 +82,9 @@ public class HandleGarlicMessageJob extends JobImpl implements GarlicMessageRece SendMessageDirectJob j = new SendMessageDirectJob(getContext(), data, instructions.getRouter(), 10*1000, 100); - getContext().jobQueue().addJob(j); + // run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup) + j.runJob(); + //getContext().jobQueue().addJob(j); } return; case DeliveryInstructions.DELIVERY_MODE_TUNNEL: @@ -90,9 +92,12 @@ public class HandleGarlicMessageJob extends JobImpl implements GarlicMessageRece gw.setMessage(data); gw.setTunnelId(instructions.getTunnelId()); gw.setMessageExpiration(data.getMessageExpiration()); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), gw, - instructions.getRouter(), - 10*1000, 100)); + SendMessageDirectJob job = new SendMessageDirectJob(getContext(), gw, + instructions.getRouter(), + 10*1000, 100); + // run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup) + job.runJob(); + // getContext().jobQueue().addJob(job); return; default: _log.error("Unknown instruction " + instructions.getDeliveryMode() + ": " + instructions); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 97e1dcd113be2e9deb1ee0bbeaa0ac7350f95756..e94c606339d2753eb1fc7e12a0a71d9dd55abf6d 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -117,6 +117,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchNoTunnels", "How long after start do we run out of tunnels to send/receive with?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; _clientMessage = msg; _clientMessageId = msg.getMessageId(); @@ -335,8 +336,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // set to null if there are no tunnels to ack the reply back through // (should we always fail for this? or should we send it anyway, even if // we dont receive the reply? hmm...) - if (_log.shouldLog(Log.ERROR)) - _log.error(getJobId() + ": Unable to create the garlic message (no tunnels left) to " + _toString); + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left) to " + _toString); + getContext().statManager().addRateData("client.dispatchNoTunnels", getContext().clock().now() - _start, 0); dieFatal(); return; } @@ -368,8 +370,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { else dispatchJob.runJob(); } else { - if (_log.shouldLog(Log.ERROR)) - _log.error(getJobId() + ": Could not find any outbound tunnels to send the payload through... wtf?"); + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while"); + getContext().statManager().addRateData("client.dispatchNoTunnels", getContext().clock().now() - _start, 0); dieFatal(); } _clientMessage = null; diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java index 0bcb00aa2c543cdec044c72b709d2a1c5d806a5f..c2dd0bc56c77bbb81bbc27fa154bdc1eab9818eb 100644 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java @@ -236,7 +236,8 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending reply directly to " + toPeer); Job send = new SendMessageDirectJob(getContext(), message, toPeer, REPLY_TIMEOUT, MESSAGE_PRIORITY); - getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT); + send.runJob(); + //getContext().netDb().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT); } } @@ -255,7 +256,8 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { m.setMessageExpiration(message.getMessageExpiration()); m.setTunnelId(replyTunnel); SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, 100); - getContext().jobQueue().addJob(j); + j.runJob(); + //getContext().jobQueue().addJob(j); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java index 22f2ffe990604ffdcc9d8de06e7ac932aa5f70db..e8d6e6ac323604d10bfbe736e6c964b4607d3dd1 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java @@ -34,6 +34,9 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { ctx.statManager().createRateStat("netDb.storeHandled", "How many netDb store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("netDb.storeLeaseSetHandled", "How many leaseSet store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("netDb.storeRouterInfoHandled", "How many routerInfo store messages have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("netDb.storeRecvTime", "How long it takes to handle the local store part of a dbStore?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l }); + ctx.statManager().createRateStat("netDb.storeFloodNew", "How long it takes to flood out a newly received entry?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l }); + ctx.statManager().createRateStat("netDb.storeFloodOld", "How often we receive an old entry?", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l }); _message = receivedMessage; _from = from; _fromHash = fromHash; @@ -44,6 +47,8 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Handling database store message"); + long recvBegin = System.currentTimeMillis(); + String invalidMessage = null; boolean wasNew = false; if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) { @@ -56,7 +61,7 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { // receive in response to our own lookups. ls.setReceivedAsPublished(true); LeaseSet match = getContext().netDb().store(_message.getKey(), _message.getLeaseSet()); - if (match == null) { + if ( (match == null) || (match.getEarliestLeaseDate() < _message.getLeaseSet().getEarliestLeaseDate()) ) { wasNew = true; } else { wasNew = false; @@ -71,8 +76,8 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { _log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of " + new Date(_message.getRouterInfo().getPublished())); try { - Object match = getContext().netDb().store(_message.getKey(), _message.getRouterInfo()); - wasNew = (null == match); + RouterInfo match = getContext().netDb().store(_message.getKey(), _message.getRouterInfo()); + wasNew = ((null == match) || (match.getPublished() < _message.getRouterInfo().getPublished())); getContext().profileManager().heardAbout(_message.getKey()); } catch (IllegalArgumentException iae) { invalidMessage = iae.getMessage(); @@ -83,22 +88,34 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { + ": " + _message); } + long recvEnd = System.currentTimeMillis(); + getContext().statManager().addRateData("netDb.storeRecvTime", recvEnd-recvBegin, 0); + if (_message.getReplyToken() > 0) sendAck(); + long ackEnd = System.currentTimeMillis(); if (_from != null) _fromHash = _from.getHash(); if (_fromHash != null) { if (invalidMessage == null) { getContext().profileManager().dbStoreReceived(_fromHash, wasNew); - getContext().statManager().addRateData("netDb.storeHandled", 1, 0); + getContext().statManager().addRateData("netDb.storeHandled", ackEnd-recvEnd, 0); if (FloodfillNetworkDatabaseFacade.floodfillEnabled(getContext()) && (_message.getReplyToken() > 0) ) { - if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) - _facade.flood(_message.getLeaseSet()); - // ERR: see comment in HandleDatabaseLookupMessageJob regarding hidden mode - //else if (!_message.getRouterInfo().isHidden()) - else - _facade.flood(_message.getRouterInfo()); + if (wasNew) { + long floodBegin = System.currentTimeMillis(); + if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) + _facade.flood(_message.getLeaseSet()); + // ERR: see comment in HandleDatabaseLookupMessageJob regarding hidden mode + //else if (!_message.getRouterInfo().isHidden()) + else + _facade.flood(_message.getRouterInfo()); + long floodEnd = System.currentTimeMillis(); + getContext().statManager().addRateData("netDb.storeFloodNew", floodEnd-floodBegin, 0); + } else { + // don't flood it *again* + getContext().statManager().addRateData("netDb.storeFloodOld", 1, 0); + } } } else { if (_log.shouldLog(Log.WARN)) @@ -111,14 +128,26 @@ public class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { DeliveryStatusMessage msg = new DeliveryStatusMessage(getContext()); msg.setMessageId(_message.getReplyToken()); msg.setArrival(getContext().clock().now()); - TunnelInfo outTunnel = selectOutboundTunnel(); - if (outTunnel == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("No outbound tunnel could be found"); - return; + /* + if (FloodfillNetworkDatabaseFacade.floodfillEnabled(getContext())) { + // no need to do anything but send it where they ask + TunnelGatewayMessage tgm = new TunnelGatewayMessage(getContext()); + tgm.setMessage(msg); + tgm.setTunnelId(_message.getReplyTunnel()); + tgm.setMessageExpiration(msg.getMessageExpiration()); + + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), tgm, _message.getReplyGateway(), 10*1000, 200)); } else { - getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), _message.getReplyTunnel(), _message.getReplyGateway()); - } + */ + TunnelInfo outTunnel = selectOutboundTunnel(); + if (outTunnel == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No outbound tunnel could be found"); + return; + } else { + getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), _message.getReplyTunnel(), _message.getReplyGateway()); + } + //} } private TunnelInfo selectOutboundTunnel() { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java index 164192a5e0cf4db79b20b2f25fd6393f2921c53b..f635501e724ed4673591c4652dba7d63da5ca828 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java @@ -113,6 +113,7 @@ class HarvesterJob extends JobImpl { msg.setSearchKey(peer); msg.setReplyTunnel(null); SendMessageDirectJob job = new SendMessageDirectJob(getContext(), msg, peer, 10*1000, PRIORITY); - getContext().jobQueue().addJob(job); + job.runJob(); + //getContext().jobQueue().addJob(job); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index 363c3aec226a42145468f96abbb7c10aa00bd0d0..5c8fc08afa7f46c87d9df0053f70593847c93467 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -847,7 +847,9 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { public int getPeerTimeout(Hash peer) { PeerProfile prof = _context.profileOrganizer().getProfile(peer); - double responseTime = prof.getDbResponseTime().getLifetimeAverageValue(); + double responseTime = MAX_PER_PEER_TIMEOUT; + if (prof != null) + responseTime = prof.getDbResponseTime().getLifetimeAverageValue(); if (responseTime < MIN_PER_PEER_TIMEOUT) responseTime = MIN_PER_PEER_TIMEOUT; else if (responseTime > MAX_PER_PEER_TIMEOUT) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java index eaa4d7cdb9303e0f8803d801a892687e92054382..b5e1c46e5fb681f3006ebcd4efaafef016e4b272 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java @@ -225,12 +225,14 @@ class PersistentDataStore extends TransientDataStore { int routerCount = 0; try { File dbDir = getDbDir(); - File leaseSetFiles[] = dbDir.listFiles(LeaseSetFilter.getInstance()); - if (leaseSetFiles != null) { - for (int i = 0; i < leaseSetFiles.length; i++) { - Hash key = getLeaseSetHash(leaseSetFiles[i].getName()); - if ( (key != null) && (!isKnown(key)) ) - PersistentDataStore.this._context.jobQueue().addJob(new ReadLeaseJob(leaseSetFiles[i], key)); + if (getContext().router().getUptime() < 10*60*1000) { + File leaseSetFiles[] = dbDir.listFiles(LeaseSetFilter.getInstance()); + if (leaseSetFiles != null) { + for (int i = 0; i < leaseSetFiles.length; i++) { + Hash key = getLeaseSetHash(leaseSetFiles[i].getName()); + if ( (key != null) && (!isKnown(key)) ) + PersistentDataStore.this._context.jobQueue().addJob(new ReadLeaseJob(leaseSetFiles[i], key)); + } } } File routerInfoFiles[] = dbDir.listFiles(RouterInfoFilter.getInstance()); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index e2466b56010fce0a07ca64dcb242ea1a66f52aea..de665484d33b9cd8504587fe6eb3ca51e1779b3d 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -339,7 +339,7 @@ class SearchJob extends JobImpl { protected void sendLeaseSearch(RouterInfo router) { TunnelInfo inTunnel = getInboundTunnelId(); if (inTunnel == null) { - _log.error("No tunnels to get search replies through! wtf!"); + _log.warn("No tunnels to get search replies through! wtf!"); getContext().jobQueue().addJob(new FailedJob(getContext(), router)); return; } @@ -362,7 +362,7 @@ class SearchJob extends JobImpl { TunnelInfo outTunnel = getOutboundTunnelId(); if (outTunnel == null) { - _log.error("No tunnels to send search out through! wtf!"); + _log.warn("No tunnels to send search out through! wtf!"); getContext().jobQueue().addJob(new FailedJob(getContext(), router)); return; } @@ -398,7 +398,8 @@ class SearchJob extends JobImpl { SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(), reply, new FailedJob(getContext(), router), sel, timeout, SEARCH_PRIORITY); - getContext().jobQueue().addJob(j); + j.runJob(); + //getContext().jobQueue().addJob(j); } /** diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index b204884817aeb8e946948b98f35cceb42c717eb2..6b0c19a309bee4347cdbbe9dcbf013eacdc34c12 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -278,12 +278,12 @@ class StoreJob extends JobImpl { TunnelInfo replyTunnel = selectInboundTunnel(); if (replyTunnel == null) { - _log.error("No reply inbound tunnels available!"); + _log.warn("No reply inbound tunnels available!"); return; } TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0); if (replyTunnel == null) { - _log.error("No reply inbound tunnels available!"); + _log.warn("No reply inbound tunnels available!"); return; } msg.setReplyToken(token); @@ -312,8 +312,8 @@ class StoreJob extends JobImpl { getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now())); getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, peer.getIdentity().getHash()); } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("No outbound tunnels to send a dbStore out!"); + if (_log.shouldLog(Log.WARN)) + _log.warn("No outbound tunnels to send a dbStore out!"); fail(); } } diff --git a/router/java/src/net/i2p/router/peermanager/DBHistory.java b/router/java/src/net/i2p/router/peermanager/DBHistory.java index 836896b49aa8d2cb6a9580d108aecdacfd6cb144..2c4448be890d74dd380533eaf30cd09b750fd948 100644 --- a/router/java/src/net/i2p/router/peermanager/DBHistory.java +++ b/router/java/src/net/i2p/router/peermanager/DBHistory.java @@ -156,7 +156,8 @@ public class DBHistory { public void setUnpromptedDbStoreOld(long num) { _unpromptedDbStoreOld = num; } public void coalesceStats() { - _log.debug("Coallescing stats"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Coallescing stats"); _failedLookupRate.coalesceStats(); _invalidReplyRate.coalesceStats(); } diff --git a/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java b/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java deleted file mode 100644 index 1a5efc1d4c998534bd394b6c6f6f9fc0de606926..0000000000000000000000000000000000000000 --- a/router/java/src/net/i2p/router/peermanager/EvaluateProfilesJob.java +++ /dev/null @@ -1,39 +0,0 @@ -package net.i2p.router.peermanager; - -import java.util.Date; -import java.util.Iterator; -import java.util.Set; - -import net.i2p.data.Hash; -import net.i2p.router.JobImpl; -import net.i2p.router.RouterContext; -import net.i2p.util.Log; - -/** - * Run across all of the profiles, coallescing the stats and reorganizing them - * into appropriate groups. The stat coalesce must be run at least once a minute, - * so if the group reorg wants to get changed, this may want to be split into two - * jobs. - * - */ -class EvaluateProfilesJob extends JobImpl { - private Log _log; - - public EvaluateProfilesJob(RouterContext ctx) { - super(ctx); - _log = ctx.logManager().getLog(EvaluateProfilesJob.class); - } - - public String getName() { return "Evaluate peer profiles"; } - public void runJob() { - try { - getContext().profileOrganizer().reorganize(true); - } catch (Throwable t) { - _log.log(Log.CRIT, "Error evaluating profiles", t); - } finally { - requeue(30*1000); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Requeued for " + new Date(getTiming().getStartAfter())); - } - } -} diff --git a/router/java/src/net/i2p/router/peermanager/PeerManager.java b/router/java/src/net/i2p/router/peermanager/PeerManager.java index 97fd03a9f709553f6fe45a7d04935e2df12f8d9f..b3b418daa1893cd1d948fca35031c3fb1d7894a2 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerManager.java +++ b/router/java/src/net/i2p/router/peermanager/PeerManager.java @@ -15,6 +15,7 @@ import java.util.*; import net.i2p.data.Hash; import net.i2p.router.PeerSelectionCriteria; import net.i2p.router.RouterContext; +import net.i2p.util.SimpleTimer; import net.i2p.util.Log; /** @@ -40,10 +41,23 @@ class PeerManager { for (int i = 0; i < _peersByCapability.length; i++) _peersByCapability[i] = new ArrayList(64); loadProfiles(); - _context.jobQueue().addJob(new EvaluateProfilesJob(_context)); + ////_context.jobQueue().addJob(new EvaluateProfilesJob(_context)); + SimpleTimer.getInstance().addEvent(new Reorg(), 0); //_context.jobQueue().addJob(new PersistProfilesJob(_context, this)); } + private class Reorg implements SimpleTimer.TimedEvent { + public void timeReached() { + try { + _organizer.reorganize(true); + } catch (Throwable t) { + _log.log(Log.CRIT, "Error evaluating profiles", t); + } finally { + SimpleTimer.getInstance().addEvent(Reorg.this, 30*1000); + } + } + } + void storeProfiles() { Set peers = selectPeers(); for (Iterator iter = peers.iterator(); iter.hasNext(); ) { diff --git a/router/java/src/net/i2p/router/peermanager/PeerTestJob.java b/router/java/src/net/i2p/router/peermanager/PeerTestJob.java index 4fe9555a95bbecd30729005611eb95c01bfd54da..6c7aef92721b024bdde34485753d04eabaf5ccbc 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerTestJob.java +++ b/router/java/src/net/i2p/router/peermanager/PeerTestJob.java @@ -115,7 +115,7 @@ public class PeerTestJob extends JobImpl { private void testPeer(RouterInfo peer) { TunnelInfo inTunnel = getInboundTunnelId(); if (inTunnel == null) { - _log.error("No tunnels to get peer test replies through! wtf!"); + _log.warn("No tunnels to get peer test replies through! wtf!"); return; } TunnelId inTunnelId = inTunnel.getReceiveTunnelId(0); @@ -135,7 +135,7 @@ public class PeerTestJob extends JobImpl { TunnelInfo outTunnel = getOutboundTunnelId(); if (outTunnel == null) { - _log.error("No tunnels to send search out through! wtf!"); + _log.warn("No tunnels to send search out through! wtf!"); return; } diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java index 0e00b892c1240e7a4119a76e9b8292da80bc7520..76779b2dced085953ec33fa91b6a29e990d21bfa 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java @@ -98,6 +98,12 @@ public class ProfileOrganizer { _thresholdCapacityValue = 0.0d; _thresholdIntegrationValue = 0.0d; _persistenceHelper = new ProfilePersistenceHelper(_context); + + _context.statManager().createRateStat("peer.profileSortTime", "How long the reorg takes sorting peers", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("peer.profileCoalesceTime", "How long the reorg takes coalescing peer stats", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("peer.profileThresholdTime", "How long the reorg takes determining the tier thresholds", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("peer.profilePlaceTime", "How long the reorg takes placing peers in the tiers", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("peer.profileReorgTime", "How long the reorg takes overall", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); } public void setUs(Hash us) { _us = us; } @@ -411,6 +417,20 @@ public class ProfileOrganizer { */ public void reorganize() { reorganize(false); } public void reorganize(boolean shouldCoalesce) { + long sortTime = 0; + int coalesceTime = 0; + long thresholdTime = 0; + long placeTime = 0; + int profileCount = 0; + + long uptime = _context.router().getUptime(); + long expireOlderThan = -1; + if (uptime > 60*60*1000) { + // drop profiles that we haven't spoken with in 6 hours + expireOlderThan = _context.clock().now() - 6*60*60*1000; + } + + long start = System.currentTimeMillis(); synchronized (_reorganizeLock) { Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size()); //allPeers.addAll(_failingPeers.values()); @@ -419,15 +439,26 @@ public class ProfileOrganizer { //allPeers.addAll(_fastPeers.values()); Set reordered = new TreeSet(_comp); + long sortStart = System.currentTimeMillis(); for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) { PeerProfile prof = (PeerProfile)iter.next(); - if (shouldCoalesce) + if ( (expireOlderThan > 0) && (prof.getLastSendSuccessful() <= expireOlderThan) ) + continue; // drop, but no need to delete, since we don't periodically reread + + if (shouldCoalesce) { + long coalesceStart = System.currentTimeMillis(); prof.coalesceStats(); + coalesceTime += (int)(System.currentTimeMillis()-coalesceStart); + } reordered.add(prof); + profileCount++; } + sortTime = System.currentTimeMillis() - sortStart; _strictCapacityOrder = reordered; + long thresholdStart = System.currentTimeMillis(); locked_calculateThresholds(allPeers); + thresholdTime = System.currentTimeMillis()-thresholdStart; _failingPeers.clear(); _fastPeers.clear(); @@ -436,6 +467,8 @@ public class ProfileOrganizer { _notFailingPeersList.clear(); _wellIntegratedPeers.clear(); + long placeStart = System.currentTimeMillis(); + for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) { PeerProfile profile = (PeerProfile)iter.next(); locked_placeProfile(profile); @@ -445,6 +478,8 @@ public class ProfileOrganizer { locked_promoteFastAsNecessary(); Collections.shuffle(_notFailingPeersList, _context.random()); + + placeTime = System.currentTimeMillis()-placeStart; if (_log.shouldLog(Log.DEBUG)) { _log.debug("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue @@ -458,6 +493,13 @@ public class ProfileOrganizer { _log.debug("fast: " + _fastPeers.values()); } } + + long total = System.currentTimeMillis()-start; + _context.statManager().addRateData("peer.profileSortTime", sortTime, profileCount); + _context.statManager().addRateData("peer.profileCoalesceTime", coalesceTime, profileCount); + _context.statManager().addRateData("peer.profileThresholdTime", thresholdTime, profileCount); + _context.statManager().addRateData("peer.profilePlaceTime", placeTime, profileCount); + _context.statManager().addRateData("peer.profileReorgTime", total, profileCount); } /** diff --git a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java index df24a3a057e2a593a69fb3e9d0f0f8d868777308..8bfa60a11bd282431d692b976b11e928d763acb4 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java @@ -92,8 +92,8 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec // so we send it out a tunnel first TunnelInfo out = _context.tunnelManager().selectOutboundTunnel(_client); if (out == null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("no outbound tunnel to send the client message for " + _client + ": " + msg); + if (_log.shouldLog(Log.WARN)) + _log.warn("no outbound tunnel to send the client message for " + _client + ": " + msg); return; } if (_log.shouldLog(Log.INFO)) diff --git a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java index f7f93a7c4028b87c6ca7930fe6f3c80a37d89dff..e123776ff6b33cf2e017fcf40829f87d9c6d57d0 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java @@ -532,6 +532,7 @@ public class TunnelDispatcher implements Service { public LeaveTunnel(RouterContext ctx) { super(ctx); + getTiming().setStartAfter(ctx.clock().now()); _configs = new ArrayList(128); _times = new ArrayList(128); } @@ -543,12 +544,15 @@ public class TunnelDispatcher implements Service { _times.add(dropTime); } + if (_log.shouldLog(Log.INFO)) { + long now = getContext().clock().now(); + _log.info("Scheduling leave in " + DataHelper.formatDuration(dropTime.longValue()-now) +": " + cfg); + } + long oldAfter = getTiming().getStartAfter(); - if (oldAfter < getContext().clock().now()) { + if ( (oldAfter <= 0) || (oldAfter < getContext().clock().now()) || (oldAfter >= dropTime.longValue()) ) { getTiming().setStartAfter(dropTime.longValue()); getContext().jobQueue().addJob(LeaveTunnel.this); - } else if (oldAfter >= dropTime.longValue()) { - getTiming().setStartAfter(dropTime.longValue()); } else { // already scheduled for the future, and before this expiration } @@ -559,23 +563,29 @@ public class TunnelDispatcher implements Service { HopConfig cur = null; Long nextTime = null; long now = getContext().clock().now(); - synchronized (LeaveTunnel.this) { - if (_configs.size() <= 0) - return; - nextTime = (Long)_times.get(0); - if (nextTime.longValue() <= now) { - cur = (HopConfig)_configs.remove(0); - _times.remove(0); - if (_times.size() > 0) - nextTime = (Long)_times.get(0); - else - nextTime = null; + while (true) { + synchronized (LeaveTunnel.this) { + if (_configs.size() <= 0) + return; + nextTime = (Long)_times.get(0); + if (nextTime.longValue() <= now) { + cur = (HopConfig)_configs.remove(0); + _times.remove(0); + if (_times.size() > 0) + nextTime = (Long)_times.get(0); + else + nextTime = null; + } else { + cur = null; + } } + + if (cur != null) + remove(cur); + else + break; } - if (cur != null) - remove(cur); - if (nextTime != null) { getTiming().setStartAfter(nextTime.longValue()); getContext().jobQueue().addJob(LeaveTunnel.this); diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java index d6781830e68a43b00ef1a8fb66ab01b86cce0640..876d2160ed9573a7fa1cbfd9ce57ab160152785e 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java @@ -4,6 +4,7 @@ import java.util.*; import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; +import net.i2p.router.TunnelManagerFacade; import net.i2p.router.tunnel.TunnelCreatorConfig; import net.i2p.util.Log; @@ -73,7 +74,7 @@ class BuildExecutor implements Runnable { List wanted = new ArrayList(8); List pools = new ArrayList(8); - while (!_manager.isShutdown()) { + while (!_manager.isShutdown()){ try { _manager.listPools(pools); for (int i = 0; i < pools.size(); i++) { @@ -91,38 +92,51 @@ class BuildExecutor implements Runnable { // zero hop ones can run inline allowed = buildZeroHopTunnels(wanted, allowed); - if ( (allowed > 0) && (wanted.size() > 0) ) { - Collections.shuffle(wanted, _context.random()); - for (int i = 0; (i < allowed) && (wanted.size() > 0); i++) { - TunnelPool pool = (TunnelPool)wanted.remove(0); - //if (pool.countWantedTunnels() <= 0) - // continue; - PooledTunnelCreatorConfig cfg = pool.configureNewTunnel(); - if (cfg != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg); - synchronized (_currentlyBuilding) { - _currentlyBuilding.add(cfg); - } - buildTunnel(pool, cfg); - if (cfg.getLength() <= 1) - i--; //0hop, we can keep going, as there's no worry about throttling - } else { - i--; - } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Zero hops built, Allowed: " + allowed + " wanted: " + wanted); + + TunnelManagerFacade mgr = _context.tunnelManager(); + if ( (mgr == null) || (mgr.selectInboundTunnel() == null) || (mgr.selectOutboundTunnel() == null) ) { + // we don't have either inbound or outbound tunnels, so don't bother trying to build + // non-zero-hop tunnels + synchronized (_currentlyBuilding) { + _currentlyBuilding.wait(5*1000+_context.random().nextInt(5*1000)); } } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Nothin' doin, wait for a while"); - try { - synchronized (_currentlyBuilding) { - if (allowed <= 0) - _currentlyBuilding.wait(_context.random().nextInt(5*1000)); - else // wanted <= 0 - _currentlyBuilding.wait(_context.random().nextInt(30*1000)); + if ( (allowed > 0) && (wanted.size() > 0) ) { + Collections.shuffle(wanted, _context.random()); + for (int i = 0; (i < allowed) && (wanted.size() > 0); i++) { + TunnelPool pool = (TunnelPool)wanted.remove(0); + //if (pool.countWantedTunnels() <= 0) + // continue; + PooledTunnelCreatorConfig cfg = pool.configureNewTunnel(); + if (cfg != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg); + synchronized (_currentlyBuilding) { + _currentlyBuilding.add(cfg); + } + buildTunnel(pool, cfg); + // 0hops are taken care of above, these are nonstandard 0hops + //if (cfg.getLength() <= 1) + // i--; //0hop, we can keep going, as there's no worry about throttling + } else { + i--; + } + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Nothin' doin, wait for a while"); + try { + synchronized (_currentlyBuilding) { + if (allowed <= 0) + _currentlyBuilding.wait(_context.random().nextInt(5*1000)); + else // wanted <= 0 + _currentlyBuilding.wait(_context.random().nextInt(30*1000)); + } + } catch (InterruptedException ie) { + // someone wanted to build something } - } catch (InterruptedException ie) { - // someone wanted to build something } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java index 6c90c0d962e44a83e20ff915f55e6e16a2feb830..67561cb7c33cde6da40924f14fe285bf13d09c9c 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java @@ -193,8 +193,11 @@ public class HandleTunnelCreateMessageJob extends JobImpl { _log.debug("sending (" + status + ") to the tunnel " + _request.getReplyGateway().toBase64().substring(0,4) + ":" + _request.getReplyTunnel() + " wrt " + _request); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), gw, _request.getReplyGateway(), - REPLY_TIMEOUT, REPLY_PRIORITY)); + SendMessageDirectJob job = new SendMessageDirectJob(getContext(), gw, _request.getReplyGateway(), + REPLY_TIMEOUT, REPLY_PRIORITY); + // run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup) + job.runJob(); + //getContext().jobQueue().addJob(job); } private GarlicMessage createReply(TunnelCreateStatusMessage reply) { diff --git a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java index 698807143032b35b6343e5e6dbc88783ca45ed17..b7032babc4949564fd1479e4d09412f3bd649733 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java @@ -14,6 +14,7 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { private boolean _failed; private TestJob _testJob; private Job _expireJob; + private int _failures; /** Creates a new instance of PooledTunnelCreatorConfig */ @@ -24,6 +25,7 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { super(ctx, length, isInbound, destination); _failed = false; _pool = null; + _failures = 0; } @@ -31,6 +33,11 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { if (_testJob != null) { _testJob.testSuccessful(ms); } + int failures = _failures - 1; + if (failures < 0) + _failures = 0; + else + _failures = failures; } public Properties getOptions() { @@ -38,17 +45,25 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { return _pool.getSettings().getUnknownOptions(); } + private static final int MAX_CONSECUTIVE_TEST_FAILURES = 2; + /** * The tunnel failed, so stop using it */ - public void tunnelFailed() { - _failed = true; - // remove us from the pool (but not the dispatcher) so that we aren't - // selected again. _expireJob is left to do its thing, in case there - // are any straggling messages coming down the tunnel - _pool.tunnelFailed(this); - if (_testJob != null) // just in case... - _context.jobQueue().removeJob(_testJob); + public boolean tunnelFailed() { + _failures++; + if (_failures > MAX_CONSECUTIVE_TEST_FAILURES) { + _failed = true; + // remove us from the pool (but not the dispatcher) so that we aren't + // selected again. _expireJob is left to do its thing, in case there + // are any straggling messages coming down the tunnel + _pool.tunnelFailed(this); + if (_testJob != null) // just in case... + _context.jobQueue().removeJob(_testJob); + return false; + } else { + return true; + } } public boolean getTunnelFailed() { return _failed; } public void setTunnelPool(TunnelPool pool) { _pool = pool; } diff --git a/router/java/src/net/i2p/router/tunnel/pool/RequestTunnelJob.java b/router/java/src/net/i2p/router/tunnel/pool/RequestTunnelJob.java index 8d5c6bec59c37f56165382016362431f843dac55..07df695e256769f57cb142f39a6c13bddd535f57 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/RequestTunnelJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/RequestTunnelJob.java @@ -78,8 +78,8 @@ public class RequestTunnelJob extends JobImpl { ctx.statManager().createRateStat("tunnel.buildExploratorySuccess3Hop", "How often we succeed building a 3 hop exploratory tunnel?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("tunnel.buildPartialTime", "How long a non-exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("tunnel.buildExploratoryPartialTime", "How long an exploratory request took to be accepted?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("tunnel.buildExploratoryTimeout", "How often a request for an exploratory peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("tunnel.buildClientTimeout", "How often a request for an exploratory peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("tunnel.buildExploratoryTimeout", "How often a request for an exploratory tunnel's peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("tunnel.buildClientTimeout", "How often a request for a client tunnel's peer times out?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); if (_log.shouldLog(Log.DEBUG)) _log.debug("Requesting hop " + hop + " in " + cfg); @@ -192,9 +192,10 @@ public class RequestTunnelJob extends JobImpl { TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel(); if (replyTunnel == null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("No inbound tunnels to build tunnels with!"); + if (_log.shouldLog(Log.WARN)) + _log.warn("No inbound tunnels to build tunnels with!"); tunnelFail(); + return; } Hash replyGateway = replyTunnel.getPeer(0); @@ -214,6 +215,11 @@ public class RequestTunnelJob extends JobImpl { msg.setReplyTag(replyTag); int duration = 10*60; // (int)((_config.getExpiration() - getContext().clock().now())/1000); msg.setDurationSeconds(duration); + long now = getContext().clock().now(); + if (_isExploratory) + msg.setMessageExpiration(now + HOP_REQUEST_TIMEOUT_EXPLORATORY); + else + msg.setMessageExpiration(now + HOP_REQUEST_TIMEOUT_CLIENT); if (_currentHop == 0) msg.setIsGateway(true); else diff --git a/router/java/src/net/i2p/router/tunnel/pool/SendGarlicMessageJob.java b/router/java/src/net/i2p/router/tunnel/pool/SendGarlicMessageJob.java index 2348f241fd54491bfbb28fe31ddc747f20c547c3..dc37aff364fdc254b4f467990ad3facc36cfb22f 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/SendGarlicMessageJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/SendGarlicMessageJob.java @@ -75,6 +75,7 @@ class SendGarlicMessageJob extends JobImpl { if (_onTimeout != null) getContext().jobQueue().addJob(_onTimeout); getContext().messageRegistry().unregisterPending(dummyMessage); + return; } TunnelId outId = out.getSendTunnelId(0); if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java index 1be33003a1d5e42bc11fc227a641141ec6b6558f..631a514718626cc6eb91aff14a78301c3fc7a5e3 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java @@ -41,6 +41,10 @@ class TestJob extends JobImpl { new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("tunnel.testExploratoryFailedTime", "How long did the failure of an exploratory tunnel take (max of 60s for full timeout)?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("tunnel.testFailedCompletelyTime", "How long did the complete failure take (max of 60s for full timeout)?", "Tunnels", + new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("tunnel.testExploratoryFailedCompletelyTime", "How long did the complete failure of an exploratory tunnel take (max of 60s for full timeout)?", "Tunnels", + new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("tunnel.testSuccessLength", "How long were the tunnels that passed the test?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("tunnel.testSuccessTime", "How long did tunnel testing take?", "Tunnels", @@ -72,8 +76,8 @@ class TestJob extends JobImpl { } if ( (_replyTunnel == null) || (_outTunnel == null) ) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Insufficient tunnels to test " + _cfg + " with: " + _replyTunnel + " / " + _outTunnel); + if (_log.shouldLog(Log.WARN)) + _log.warn("Insufficient tunnels to test " + _cfg + " with: " + _replyTunnel + " / " + _outTunnel); getContext().statManager().addRateData("tunnel.testAborted", _cfg.getLength(), 0); scheduleRetest(); } else { @@ -161,9 +165,17 @@ class TestJob extends JobImpl { getContext().statManager().addRateData("tunnel.testExploratoryFailedTime", timeToFail, timeToFail); else getContext().statManager().addRateData("tunnel.testFailedTime", timeToFail, timeToFail); - _cfg.tunnelFailed(); if (_log.shouldLog(Log.WARN)) _log.warn("Tunnel test failed in " + timeToFail + "ms: " + _cfg); + boolean keepGoing = _cfg.tunnelFailed(); + if (keepGoing) { + scheduleRetest(); + } else { + if (_pool.getSettings().isExploratory()) + getContext().statManager().addRateData("tunnel.testExploratoryFailedCompletelyTime", timeToFail, timeToFail); + else + getContext().statManager().addRateData("tunnel.testFailedCompletelyTime", timeToFail, timeToFail); + } } /** randomized time we should wait before testing */ diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java index 1373099d34c3c6cb6094912056b0b12eb86fe57f..721e885022c7e42f0ec3d846b07dba10d257788c 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java @@ -45,6 +45,7 @@ abstract class TunnelPeerSelector { if (length < 0) length = 0; } + /* if ( (ctx.tunnelManager().getOutboundTunnelCount() <= 0) || (ctx.tunnelManager().getFreeTunnelCount() <= 0) ) { Log log = ctx.logManager().getLog(TunnelPeerSelector.class); @@ -59,6 +60,7 @@ abstract class TunnelPeerSelector { return -1; } } + */ return length; } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index c7ba3edf033104410a317e659fdb086dea1df77f..225f7c78a0ab6fd9360e3ec905ded65bbfedb889 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -240,7 +240,9 @@ public class TunnelPool { _lastSelectionPeriod = 0; } } - + + _manager.getExecutor().repoll(); + _lifetimeProcessed += info.getProcessedMessagesCount(); long lifetimeConfirmed = info.getVerifiedBytesTransferred(); @@ -259,9 +261,7 @@ public class TunnelPool { buildFallback(); } } - - _manager.getExecutor().repoll(); - + boolean connected = true; if ( (_settings.getDestination() != null) && (!_context.clientManager().isLocal(_settings.getDestination())) ) connected = false; @@ -410,35 +410,42 @@ public class TunnelPool { boolean allowZeroHop = ((getSettings().getLength() + getSettings().getLengthVariance()) <= 0); - long expireAfter = _context.clock().now() + (2 * _settings.getRebuildPeriod()); - expireAfter += _expireSkew; + long expireAfter = _context.clock().now() + _expireSkew; // + _settings.getRebuildPeriod() + _expireSkew; + int expire30s = 0; + int expire90s = 0; + int expire150s = 0; + int expire210s = 0; + int expire270s = 0; + int expireLater = 0; - long earliestExpire = -1; - int live = 0; int fallback = 0; - int usable = 0; synchronized (_tunnels) { boolean enough = _tunnels.size() > wanted; for (int i = 0; i < _tunnels.size(); i++) { TunnelInfo info = (TunnelInfo)_tunnels.get(i); - if (info.getExpiration() > expireAfter) { - if (allowZeroHop || (info.getLength() > 1)) { - usable++; - if ( (info.getExpiration() < earliestExpire) || (earliestExpire < 0) ) - earliestExpire = info.getExpiration(); + if (allowZeroHop || (info.getLength() > 1)) { + long timeToExpire = info.getExpiration() - expireAfter; + if (timeToExpire <= 0) { + // consider it unusable + } else if (timeToExpire <= 30*1000) { + expire30s++; + } else if (timeToExpire <= 90*1000) { + expire90s++; + } else if (timeToExpire <= 150*1000) { + expire150s++; + } else if (timeToExpire <= 210*1000) { + expire210s++; + } else if (timeToExpire <= 270*1000) { + expire270s++; + } else { + expireLater++; } - } - live++; - if ( (info.getLength() <= 1) && (info.getExpiration() > expireAfter) ) + } else if (info.getExpiration() > expireAfter) { fallback++; + } } } - if (usable < wanted) { - // if we are short on tunnels, build fast - earliestExpire = 0; - } - int inProgress = 0; synchronized (_inProgress) { inProgress = _inProgress.size(); @@ -448,8 +455,9 @@ public class TunnelPool { fallback++; } } - - return countHowManyToBuild(allowZeroHop, earliestExpire, usable, wanted, inProgress, fallback); + + return countHowManyToBuild(allowZeroHop, expire30s, expire90s, expire150s, expire210s, expire270s, + expireLater, wanted, inProgress, fallback); } /** @@ -459,45 +467,94 @@ public class TunnelPool { * @param allowZeroHop do we normally allow zero hop tunnels? If true, treat fallback tunnels like normal ones * @param earliestExpire how soon do some of our usable tunnels expire, or, if we are missing tunnels, -1 * @param usable how many tunnels will be around for a while (may include fallback tunnels) + * @param wantToReplace how many tunnels are still usable, but approaching unusability * @param standardAmount how many tunnels we want to have, in general * @param inProgress how many tunnels are being built for this pool right now (may include fallback tunnels) * @param fallback how many zero hop tunnels do we have, or are being built */ - private int countHowManyToBuild(boolean allowZeroHop, long earliestExpire, int usable, int standardAmount, - int inProgress, int fallback) { - int howMany = 0; + private int countHowManyToBuild(boolean allowZeroHop, int expire30s, int expire90s, int expire150s, int expire210s, + int expire270s, int expireLater, int standardAmount, int inProgress, int fallback) { + int rv = 0; + int remainingWanted = standardAmount - expireLater; if (allowZeroHop) - howMany = standardAmount - usable; - else - howMany = standardAmount - (usable - fallback); - - int concurrentBuildWeight = 1; - if (howMany > 0) { - long now = _context.clock().now(); - if (earliestExpire - now < 60*1000) - concurrentBuildWeight = 4; // right before expiration, allow up to 4x quantity tunnels to be pending - else if (earliestExpire - now < 120*1000) - concurrentBuildWeight = 3; // allow up to 3x quantity tunnels to be pending from 1-2m - else if (earliestExpire - now < 180*1000) - concurrentBuildWeight = 2; // allow up to 2x quantity tunnels to be pending from 2-3m + remainingWanted -= fallback; - // e.g. we want 3 tunnels, but only have 1 usable, we'd want 2 more. however, if the tunnels - // expire in 90 seconds, we'd act like we wanted 6 (and assume 4 would fail building). - howMany = (howMany * concurrentBuildWeight) - inProgress; + for (int i = 0; i < expire270s && remainingWanted > 0; i++) + remainingWanted--; + if (remainingWanted > 0) { + // 1x the tunnels expiring between 3.5 and 2.5 minutes from now + for (int i = 0; i < expire210s && remainingWanted > 0; i++) { + remainingWanted--; + } + if (remainingWanted > 0) { + // 2x the tunnels expiring between 2.5 and 1.5 minutes from now + for (int i = 0; i < expire150s && remainingWanted > 0; i++) { + remainingWanted--; + } + if (remainingWanted > 0) { + for (int i = 0; i < expire90s && remainingWanted > 0; i++) { + remainingWanted--; + } + if (remainingWanted > 0) { + for (int i = 0; i < expire30s && remainingWanted > 0; i++) { + remainingWanted--; + } + if (remainingWanted > 0) { + rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0); + rv += expire210s; + rv += 2*expire150s; + rv += 4*expire90s; + rv += 6*expire30s; + rv += 6*remainingWanted; + rv -= inProgress; + rv -= expireLater; + } else { + rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0); + rv += expire210s; + rv += 2*expire150s; + rv += 4*expire90s; + rv += 6*expire30s; + rv -= inProgress; + rv -= expireLater; + } + } else { + rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0); + rv += expire210s; + rv += 2*expire150s; + rv += 4*expire90s; + rv -= inProgress; + rv -= expireLater; + } + } else { + rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0); + rv += expire210s; + rv += 2*expire150s; + rv -= inProgress; + rv -= expireLater; + } + } else { + rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0); + rv += expire210s; + rv -= inProgress; + rv -= expireLater; + } + } else { + rv = (((expire270s > 0) && _context.random().nextBoolean()) ? 1 : 0); + rv -= inProgress; + rv -= expireLater; } - - int rv = howMany; - // ok, we're actually swamped with tunnels, so lets hold off on replacing the - // fallback ones for a bit - if ( (usable + inProgress + fallback > 2*standardAmount) && (howMany > 0) ) - rv = 0; + // yes, the above numbers and periods are completely arbitrary. suggestions welcome if (allowZeroHop && (rv > standardAmount)) rv = standardAmount; + if (rv + inProgress + expireLater + fallback > 4*standardAmount) + rv = 4*standardAmount - inProgress - expireLater - fallback; + if (_log.shouldLog(Log.DEBUG)) - _log.debug("Count: rv: " + rv + " howMany " + howMany + " concurrentWeight " + concurrentBuildWeight - + " allow? " + allowZeroHop + " usable " + usable + _log.debug("Count: rv: " + rv + " allow? " + allowZeroHop + + " 30s " + expire30s + " 90s " + expire90s + " 150s " + expire150s + " 210s " + expire210s + + " 270s " + expire270s + " later " + expireLater + " std " + standardAmount + " inProgress " + inProgress + " fallback " + fallback + " for " + toString()); @@ -518,11 +575,11 @@ public class TunnelPool { // no inbound or outbound tunnels to send the request through, and // the pool is refusing 0 hop tunnels if (peers == null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("No peers to put in the new tunnel! selectPeers returned null! boo, hiss!"); + if (_log.shouldLog(Log.WARN)) + _log.warn("No peers to put in the new tunnel! selectPeers returned null! boo, hiss!"); } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("No peers to put in the new tunnel! selectPeers returned an empty list?!"); + if (_log.shouldLog(Log.WARN)) + _log.warn("No peers to put in the new tunnel! selectPeers returned an empty list?!"); } return null; } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index 664d00feb7bc4713b7b385eab3a3bc76e2cc727b..9c5ab345e508e79a216a13da7cccc37236a8b5d3 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -73,7 +73,9 @@ public class TunnelPoolManager implements TunnelManagerFacade { /** pick an inbound tunnel not bound to a particular destination */ public TunnelInfo selectInboundTunnel() { - TunnelInfo info = _inboundExploratory.selectTunnel(); + TunnelPool pool = _inboundExploratory; + if (pool == null) return null; + TunnelInfo info = pool.selectTunnel(); if (info == null) { _inboundExploratory.buildFallback(); // still can be null, but probably not @@ -100,11 +102,13 @@ public class TunnelPoolManager implements TunnelManagerFacade { /** pick an outbound tunnel not bound to a particular destination */ public TunnelInfo selectOutboundTunnel() { - TunnelInfo info = _outboundExploratory.selectTunnel(); + TunnelPool pool = _outboundExploratory; + if (pool == null) return null; + TunnelInfo info = pool.selectTunnel(); if (info == null) { - _outboundExploratory.buildFallback(); + pool.buildFallback(); // still can be null, but probably not - info = _outboundExploratory.selectTunnel(); + info = pool.selectTunnel(); } return info; }