From c455fa6309fb7c66e9159d99d9f0e162010cddfa Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sun, 1 Mar 2009 20:45:16 +0000 Subject: [PATCH] * OCMOSJ: - Change from 5% reply requests to at least once per minute, in hopes of reducing IRC drops - More clean up of the cache cleaning --- .../OutboundClientMessageOneShotJob.java | 95 +++++++++++-------- 1 file changed, 58 insertions(+), 37 deletions(-) diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 0e858ef779..20d69ea733 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -103,6 +103,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private static final Object _initializeLock = new Object(); private static boolean _initialized = false; private static final int CLEAN_INTERVAL = 5*60*1000; + private static final int REPLY_REQUEST_INTERVAL = 60*1000; /** * Send the sucker @@ -212,7 +213,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * * Key the cache on the source+dest pair. */ - private static HashMap _leaseSetCache = new HashMap(); + private static HashMap<String, LeaseSet> _leaseSetCache = new HashMap(); private LeaseSet getReplyLeaseSet(boolean force) { LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash()); if (newLS == null) @@ -247,7 +248,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { long now = getContext().clock().now(); synchronized (_leaseSetCache) { if (!force) { - LeaseSet ls = (LeaseSet) _leaseSetCache.get(hashPair()); + LeaseSet ls = _leaseSetCache.get(hashPair()); if (ls != null) { if (ls.equals(newLS)) { // still good, send it 10% of the time @@ -312,7 +313,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * lease). * */ - private static HashMap _leaseCache = new HashMap(); + private static HashMap<String, Lease> _leaseCache = new HashMap(); private boolean getNextLease() { _leaseSet = getContext().netDb().lookupLeaseSetLocally(_to.calculateHash()); if (_leaseSet == null) { @@ -325,7 +326,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // Use the same lease if it's still good // Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't... synchronized (_leaseCache) { - _lease = (Lease) _leaseCache.get(hashPair()); + _lease = _leaseCache.get(hashPair()); if (_lease != null) { // if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ?? if (!_lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) { @@ -446,6 +447,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } } + /** + * This cache is used to ensure that we request a reply every so often. + * Hopefully this allows the router to recognize a failed tunnel and switch, + * before upper layers like streaming lib fail, even for low-bandwidth + * connections like IRC. + */ + private static HashMap<String, Long> _lastReplyRequestCache = new HashMap(); + /** * Send the message to the specified tunnel by creating a new garlic message containing * the (already created) payload clove as well as a new delivery status message. This garlic @@ -456,18 +465,27 @@ public class OutboundClientMessageOneShotJob extends JobImpl { */ private void send() { if (_finished) return; - if (getContext().clock().now() >= _overallExpiration) { + long now = getContext().clock().now(); + if (now >= _overallExpiration) { dieFatal(); return; } int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey()); _outTunnel = selectOutboundTunnel(_to); + // boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5; // what's the point of 5% random? possible improvements or replacements: - // - wantACK if we changed their inbound lease (getNextLease() sets _wantACK) - // - wantACK if we changed our outbound tunnel (selectOutboundTunnel() sets _wantACK) - // - wantACK if we haven't in last 1m (requires a new static cache probably) - boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5; + // DONE (getNextLease() is called before this): wantACK if we changed their inbound lease (getNextLease() sets _wantACK) + // DONE (selectOutboundTunnel() moved above here): wantACK if we changed our outbound tunnel (selectOutboundTunnel() sets _wantACK) + // DONE (added new cache): wantACK if we haven't in last 1m (requires a new static cache probably) + boolean wantACK; + synchronized (_lastReplyRequestCache) { + Long lastSent = _lastReplyRequestCache.get(hashPair()); + wantACK = _wantACK || existingTags <= 30 || + lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL; + if (wantACK) + _lastReplyRequestCache.put(hashPair(), Long.valueOf(now)); + } PublicKey key = _leaseSet.getEncryptionKey(); SessionKey sessKey = new SessionKey(); @@ -501,7 +519,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // we dont receive the reply? hmm...) if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left or too lagged) to " + _toString); - getContext().statManager().addRateData("client.dispatchNoTunnels", getContext().clock().now() - _start, 0); + getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0); dieFatal(); return; } @@ -539,12 +557,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } else { if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while"); - getContext().statManager().addRateData("client.dispatchNoTunnels", getContext().clock().now() - _start, 0); + getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0); dieFatal(); } _clientMessage = null; _clove = null; - getContext().statManager().addRateData("client.dispatchPrepareTime", getContext().clock().now() - _start, 0); + getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start, 0); if (!wantACK) getContext().statManager().addRateData("client.dispatchNoACK", 1, 0); } @@ -582,7 +600,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { /** * This is the place where we make I2P go fast. * - * We have four static caches. + * We have five static caches. * - The LeaseSet cache is used to decide whether to bundle our own leaseset, * which minimizes overhead. * - The Lease cache is used to persistently send to the same lease for the destination, @@ -590,6 +608,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel * for the same destination, * which keeps the streaming lib happy by minimizing out-of-order delivery. + * - The last reply requested cache ensures that a reply is requested every so often, + * so that failed tunnels are recognized. * */ @@ -629,17 +649,17 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } if (_lease != null) { synchronized(_leaseCache) { - Lease l = (Lease) _leaseCache.get(key); + Lease l = _leaseCache.get(key); if (l != null && l.equals(_lease)) _leaseCache.remove(key); } } if (_outTunnel != null) { synchronized(_tunnelCache) { - TunnelInfo t =(TunnelInfo) _backloggedTunnelCache.get(key); + TunnelInfo t = _backloggedTunnelCache.get(key); if (t != null && t.equals(_outTunnel)) _backloggedTunnelCache.remove(key); - t = (TunnelInfo) _tunnelCache.get(key); + t = _tunnelCache.get(key); if (t != null && t.equals(_outTunnel)) _tunnelCache.remove(key); } @@ -652,17 +672,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl { */ private static void cleanLeaseSetCache(RouterContext ctx, HashMap tc) { long now = ctx.clock().now(); - List deleteList = new ArrayList(); for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = (Map.Entry)iter.next(); String k = (String) entry.getKey(); LeaseSet l = (LeaseSet) entry.getValue(); if (l.getEarliestLeaseDate() < now) - deleteList.add(k); - } - for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) { - String k = (String) iter.next(); - tc.remove(k); + iter.remove(); } } @@ -671,17 +686,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * Caller must synchronize on tc. */ private static void cleanLeaseCache(HashMap tc) { - List deleteList = new ArrayList(); for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = (Map.Entry)iter.next(); String k = (String) entry.getKey(); Lease l = (Lease) entry.getValue(); if (l.isExpired(Router.CLOCK_FUDGE_FACTOR)) - deleteList.add(k); - } - for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) { - String k = (String) iter.next(); - tc.remove(k); + iter.remove(); } } @@ -690,17 +700,25 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * Caller must synchronize on tc. */ private static void cleanTunnelCache(RouterContext ctx, HashMap tc) { - List deleteList = new ArrayList(); for (Iterator iter = tc.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = (Map.Entry)iter.next(); String k = (String) entry.getKey(); TunnelInfo tunnel = (TunnelInfo) entry.getValue(); if (!ctx.tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel)) - deleteList.add(k); + iter.remove(); } - for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) { - String k = (String) iter.next(); - tc.remove(k); + } + + /** + * Clean out old reply times + * Caller must synchronize on tc. + */ + private static void cleanReplyCache(RouterContext ctx, HashMap tc) { + long now = ctx.clock().now(); + for (Iterator iter = tc.values().iterator(); iter.hasNext(); ) { + Long l = (Long) iter.next(); + if (l.longValue() < now - CLEAN_INTERVAL) + iter.remove(); } } @@ -720,6 +738,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { cleanTunnelCache(_ctx, _tunnelCache); cleanTunnelCache(_ctx, _backloggedTunnelCache); } + synchronized(_lastReplyRequestCache) { + cleanReplyCache(_ctx, _lastReplyRequestCache); + } } } @@ -731,8 +752,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * Key the caches on the source+dest pair. * */ - private static HashMap _tunnelCache = new HashMap(); - private static HashMap _backloggedTunnelCache = new HashMap(); + private static HashMap<String, TunnelInfo> _tunnelCache = new HashMap(); + private static HashMap<String, TunnelInfo> _backloggedTunnelCache = new HashMap(); private TunnelInfo selectOutboundTunnel(Destination to) { TunnelInfo tunnel; long now = getContext().clock().now(); @@ -743,7 +764,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * if you were the originator by backlogging the tunnel, then removing the * backlog and seeing if traffic came back or not. */ - tunnel = (TunnelInfo) _backloggedTunnelCache.get(hashPair()); + tunnel = _backloggedTunnelCache.get(hashPair()); if (tunnel != null) { if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) { if (!getContext().commSystem().isBacklogged(tunnel.getPeer(1))) { @@ -758,7 +779,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _backloggedTunnelCache.remove(hashPair()); } // Use the same tunnel unless backlogged - tunnel = (TunnelInfo) _tunnelCache.get(hashPair()); + tunnel = _tunnelCache.get(hashPair()); if (tunnel != null) { if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) { if (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1))) -- GitLab