diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 0515d5c34459e84384b6fdd17885880e0201fd43..0e858ef779e630b9718523bb7ffc6944cbbbfd5e 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -34,6 +34,8 @@ import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; import net.i2p.util.Log; +import net.i2p.util.SimpleScheduler; +import net.i2p.util.SimpleTimer; /** * Send a client message out a random outbound tunnel and into a random inbound @@ -98,6 +100,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { */ private static final int BUNDLE_PROBABILITY_DEFAULT = 100; + private static final Object _initializeLock = new Object(); + private static boolean _initialized = false; + private static final int CLEAN_INTERVAL = 5*60*1000; + /** * Send the sucker */ @@ -105,20 +111,26 @@ public class OutboundClientMessageOneShotJob extends JobImpl { super(ctx); _log = ctx.logManager().getLog(OutboundClientMessageOneShotJob.class); - ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.sendAckTime", "Message round trip time", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look for a remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.dispatchNoTunnels", "How long after start do we run out of tunnels to send/receive with?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); - ctx.statManager().createRateStat("client.dispatchNoACK", "Repeated message sends to a peer (no ack required)", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l }); + synchronized (_initializeLock) { + if (!_initialized) { + SimpleScheduler.getInstance().addPeriodicEvent(new OCMOSJCacheCleaner(ctx), CLEAN_INTERVAL, CLEAN_INTERVAL); + ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.sendAckTime", "Message round trip time", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.timeoutCongestionTunnel", "How lagged our tunnels are when a send times out?", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.timeoutCongestionMessage", "How fast we process messages locally when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.timeoutCongestionInbound", "How much faster we are receiving data than our average bps when a send times out?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look for a remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchNoTunnels", "How long after start do we run out of tunnels to send/receive with?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchNoACK", "Repeated message sends to a peer (no ack required)", "ClientMessages", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l }); + _initialized = true; + } + } long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; _clientMessage = msg; _clientMessageId = msg.getMessageId(); @@ -201,7 +213,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * Key the cache on the source+dest pair. */ private static HashMap _leaseSetCache = new HashMap(); - private static long _lscleanTime = 0; private LeaseSet getReplyLeaseSet(boolean force) { LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash()); if (newLS == null) @@ -235,10 +246,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // If the last leaseSet we sent him is still good, don't bother sending again long now = getContext().clock().now(); synchronized (_leaseSetCache) { - if (now - _lscleanTime > 5*60*1000) { // clean out periodically - cleanLeaseSetCache(_leaseSetCache); - _lscleanTime = now; - } if (!force) { LeaseSet ls = (LeaseSet) _leaseSetCache.get(hashPair()); if (ls != null) { @@ -306,7 +313,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * */ private static HashMap _leaseCache = new HashMap(); - private static long _lcleanTime = 0; private boolean getNextLease() { _leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash()); if (_leaseSet == null) { @@ -319,10 +325,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // Use the same lease if it's still good // Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't... synchronized (_leaseCache) { - if (now - _lcleanTime > 5*60*1000) { // clean out periodically - cleanLeaseCache(_leaseCache); - _lcleanTime = now; - } _lease = (Lease) _leaseCache.get(hashPair()); if (_lease != null) { // if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ?? @@ -607,7 +609,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * (needed for cleanTunnelCache) * 44 = 32 * 4 / 3 */ - private Hash sourceFromHashPair(String s) { + private static Hash sourceFromHashPair(String s) { return new Hash(Base64.decode(s.substring(44, 88))); } @@ -648,8 +650,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * Clean out old leaseSets from a set. * Caller must synchronize on tc. */ - private void cleanLeaseSetCache(HashMap tc) { - long now = getContext().clock().now(); + private static void cleanLeaseSetCache(RouterContext ctx, HashMap tc) { + long now = ctx.clock().now(); List deleteList = new ArrayList(); for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = (Map.Entry)iter.next(); @@ -668,7 +670,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * Clean out old leases from a set. * Caller must synchronize on tc. */ - private void cleanLeaseCache(HashMap tc) { + private static void cleanLeaseCache(HashMap tc) { List deleteList = new ArrayList(); for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = (Map.Entry)iter.next(); @@ -687,13 +689,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * Clean out old tunnels from a set. * Caller must synchronize on tc. */ - private void cleanTunnelCache(HashMap tc) { + private static void cleanTunnelCache(RouterContext ctx, HashMap tc) { List deleteList = new ArrayList(); for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = (Map.Entry)iter.next(); String k = (String) entry.getKey(); TunnelInfo tunnel = (TunnelInfo) entry.getValue(); - if (!getContext().tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel)) + if (!ctx.tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel)) deleteList.add(k); } for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) { @@ -702,6 +704,25 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } } + private static class OCMOSJCacheCleaner implements SimpleTimer.TimedEvent { + private RouterContext _ctx; + private OCMOSJCacheCleaner(RouterContext ctx) { + _ctx = ctx; + } + public void timeReached() { + synchronized(_leaseSetCache) { + cleanLeaseSetCache(_ctx, _leaseSetCache); + } + synchronized(_leaseCache) { + cleanLeaseCache(_leaseCache); + } + synchronized(_tunnelCache) { + cleanTunnelCache(_ctx, _tunnelCache); + cleanTunnelCache(_ctx, _backloggedTunnelCache); + } + } + } + /** * Use the same outbound tunnel as we did for the same destination previously, * if possible, to keep the streaming lib happy @@ -712,16 +733,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { */ private static HashMap _tunnelCache = new HashMap(); private static HashMap _backloggedTunnelCache = new HashMap(); - private static long _cleanTime = 0; private TunnelInfo selectOutboundTunnel(Destination to) { TunnelInfo tunnel; long now = getContext().clock().now(); synchronized (_tunnelCache) { - if (now - _cleanTime > 5*60*1000) { // clean out periodically - cleanTunnelCache(_tunnelCache); - cleanTunnelCache(_backloggedTunnelCache); - _cleanTime = now; - } /** * If old tunnel is valid and no longer backlogged, use it. * This prevents an active anonymity attack, where a peer could tell