From a6f3478db3e119a6ec4959bf0dd332635a36b8a0 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Mon, 5 May 2008 14:01:22 +0000
Subject: [PATCH]     * Outbound message:       - Fix a couple of tunnel cache
 cleaning bugs       - Cache based on source+dest pairs rather than just dest 
      - Send the reply leaseSet only when necessary,         rather than all
 the time (big savings in overhead)       - Enable persistent lease selection
 again       - Logging tweaks

---
 .../OutboundClientMessageOneShotJob.java      | 273 +++++++++++++-----
 1 file changed, 205 insertions(+), 68 deletions(-)

diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java
index 692fc575e7..03b31bc368 100644
--- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java
+++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java
@@ -10,6 +10,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
 
+import net.i2p.data.Base64;
 import net.i2p.data.Certificate;
 import net.i2p.data.Destination;
 import net.i2p.data.Hash;
@@ -48,7 +49,6 @@ import net.i2p.util.Log;
 public class OutboundClientMessageOneShotJob extends JobImpl {
     private Log _log;
     private long _overallExpiration;
-    private boolean _shouldBundle;
     private ClientMessage _clientMessage;
     private MessageId _clientMessageId;
     private int _clientMessageSize;
@@ -147,7 +147,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
         
         _start = getContext().clock().now();
         _overallExpiration = timeoutMs + _start;
-        _shouldBundle = getShouldBundle();
         _finished = false;
     }
     
@@ -177,30 +176,89 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
         }
     }
     
-    private boolean getShouldBundle() {
-        Properties opts = _clientMessage.getSenderConfig().getOptions();
-        String wantBundle = opts.getProperty(BUNDLE_REPLY_LEASESET, "true");
-        if ("true".equals(wantBundle)) {
-            int probability = BUNDLE_PROBABILITY_DEFAULT;
-            String str = opts.getProperty(BUNDLE_PROBABILITY);
-            try { 
-                if (str != null) 
-                    probability = Integer.parseInt(str);
-            } catch (NumberFormatException nfe) {
-                if (_log.shouldLog(Log.WARN))
-                    _log.warn(getJobId() + ": Bundle leaseSet probability overridden incorrectly [" 
-                              + str + "]", nfe);
+    /**
+      * Returns the reply lease set if forced to do so,
+      * or if configured to do so,
+      * or if a certain percentage of the time if configured to do so,
+      * or if our lease set has changed since we last talked to them,
+      * or 10% of the time anyway so they don't forget us (disabled for now),
+      * or null otherwise.
+      *
+      * Note that wantACK randomly forces us another 5% of the time.
+      *
+      * We don't want to do this too often as a typical 2-lease leaseset
+      * in a DatabaseStoreMessage is 861+37=898 bytes -
+      * when added to garlic it's a 1056-byte addition total, which is huge.
+      *
+      * Key the cache on the source+dest pair.
+      */
+    private static HashMap _leaseSetCache = new HashMap();
+    private static long _lscleanTime = 0;
+    private LeaseSet getReplyLeaseSet(boolean force) {
+        LeaseSet newLS = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
+        if (newLS == null)
+            return null;   // punt
+
+        if (!force) {
+            // Don't send it every time unless configured to; default=false
+            Properties opts = _clientMessage.getSenderConfig().getOptions();
+            String wantBundle = opts.getProperty(BUNDLE_REPLY_LEASESET, "false");
+            if ("true".equals(wantBundle)) {
+                int probability = BUNDLE_PROBABILITY_DEFAULT;
+                String str = opts.getProperty(BUNDLE_PROBABILITY);
+                try { 
+                    if (str != null) 
+                        probability = Integer.parseInt(str);
+                } catch (NumberFormatException nfe) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(getJobId() + ": Bundle leaseSet probability overridden incorrectly [" 
+                                  + str + "]", nfe);
+                }
+                if (probability >= 100)
+                    return newLS;  // do this every time so don't worry about cache
+                if (_log.shouldLog(Log.INFO))
+                    _log.info(getJobId() + ": Bundle leaseSet probability is " + probability);
+                if (probability >= getContext().random().nextInt(100))
+                    force = true;  // just add newLS to cache below and return
+                // fall through to cache check and add
             }
-            if (probability >= 100)
-                return true;
-            _log.error(getJobId() + ": Bundle leaseSet probability is " + probability);
-            if (probability >= getContext().random().nextInt(100))
-                return true;
-            else
-                return false;
-        } else {
-            return false;
         }
+
+        // If the last leaseSet we sent him is still good, don't bother sending again
+        long now = getContext().clock().now();
+        synchronized (_leaseSetCache) {
+            if (now - _lscleanTime > 5*60*1000) {  // clean out periodically
+                cleanLeaseSetCache(_leaseSetCache);
+                _lscleanTime = now;
+            }
+            if (!force) {
+                LeaseSet ls = (LeaseSet) _leaseSetCache.get(hashPair());
+                if (ls != null) {
+                    if (ls.equals(newLS)) {
+                        // still good, send it 10% of the time
+                        // sendACK does 5% random which forces us, good enough
+                        //if (10 >= getContext().random().nextInt(100)) {
+                        //    if (_log.shouldLog(Log.INFO))
+                        //        _log.info("Found in cache - including reply leaseset for " + _toString); 
+                        //    return ls;
+                        //} else {
+                            if (_log.shouldLog(Log.INFO))
+                                _log.info("Found in cache - NOT including reply leaseset for " + _toString); 
+                            return null;
+                        //}
+                    } else {
+                        if (_log.shouldLog(Log.WARN))
+                            _log.warn("Expired from cache - reply leaseset for " + _toString); 
+                        // will get overwritten below
+                        // _leaseSetCache.remove(hashPair());
+                    }
+                }
+            }
+            _leaseSetCache.put(hashPair(), newLS);
+        }
+        if (_log.shouldLog(Log.WARN))
+            _log.warn("Added to cache - reply leaseset for " + _toString); 
+        return newLS;
     }
     
     /** send a message to a lease */
@@ -228,9 +286,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
     /**
      * Use the same inbound tunnel (i.e. lease) as we did for the same destination previously,
      * if possible, to keep the streaming lib happy
-     * Key the cache just on the dest, not on source+dest, as different sources
-     * simultaneously talking to the same dest is probably rare enough
-     * to not bother separating out.
+     * Key the caches on the source+dest pair.
      *
      * We're going to use the lease until it expires, as long as it remains in the current leaseSet.
      *
@@ -251,15 +307,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
         } 
         long now = getContext().clock().now();
 
-/*** removed until we fix SSU reachability
         // Use the same lease if it's still good
         // Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't...
         synchronized (_leaseCache) {
-            if (now - _cleanTime > 5*60*1000) {  // clean out periodically
+            if (now - _lcleanTime > 5*60*1000) {  // clean out periodically
                 cleanLeaseCache(_leaseCache);
-                _cleanTime = now;
+                _lcleanTime = now;
             }
-            _lease = (Lease) _leaseCache.get(_to);
+            _lease = (Lease) _leaseCache.get(hashPair());
             if (_lease != null) {
                 // if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ??
                 if (!_lease.isExpired(Router.CLOCK_FUDGE_FACTOR)) {
@@ -279,7 +334,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
                 _leaseCache.remove(_to);
             }
         }
-***/
+
         // get the possible leases
         List leases = new ArrayList(_leaseSet.getLeaseCount());
         for (int i = 0; i < _leaseSet.getLeaseCount(); i++) {
@@ -345,13 +400,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
             if (_log.shouldLog(Log.WARN))
                 _log.warn(getJobId() + ": All leases are unreachable for " + _toString); 
         }
-/*** removed until we fix SSU reachability
         synchronized (_leaseCache) {
-            _leaseCache.put(_to, _lease);
+            _leaseCache.put(hashPair(), _lease);
         }
         if (_log.shouldLog(Log.WARN))
             _log.warn("Added to cache - lease for " + _toString); 
-***/
         return true;
     }
 
@@ -400,15 +453,15 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
         if ( (existingTags > 30) && (getContext().random().nextInt(100) >= 5) )
             wantACK = false;
         
-        long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1);
         PublicKey key = _leaseSet.getEncryptionKey();
         SessionKey sessKey = new SessionKey();
         Set tags = new HashSet();
-        LeaseSet replyLeaseSet = null;
-        if (_shouldBundle) {
-            replyLeaseSet = getContext().netDb().lookupLeaseSetLocally(_from.calculateHash());
-        }
-        
+        // If we want an ack, bundle a leaseSet... (so he can get back to us)
+        LeaseSet replyLeaseSet = getReplyLeaseSet(wantACK);
+        // ... and vice versa  (so we know he got it)
+        if (replyLeaseSet != null)
+            wantACK = true;
+        long token = (wantACK ? getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE) : -1);
         if (wantACK)
             _inTunnel = selectInboundTunnel();
 
@@ -511,6 +564,91 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
         }
     }
     
+    /**
+     * This is the place where we make I2P go fast.
+     *
+     * We have four static caches.
+     * - The LeaseSet cache is used to decide whether to bundle our own leaseset,
+     *   which minimizes overhead.
+     * - The Lease cache is used to persistently send to the same lease for the destination,
+     *   which keeps the streaming lib happy by minimizing out-of-order delivery.
+     * - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel
+     *   for the same destination,
+     *   which keeps the streaming lib happy by minimizing out-of-order delivery.
+     *
+     */
+
+    /**
+     * String used to cache things with based on source + dest
+     */
+    private String _hashPair;
+    private String hashPair() {
+        if (_hashPair == null)
+            _hashPair = _from.calculateHash().toBase64() + _to.calculateHash().toBase64();
+        return _hashPair;
+    }
+
+    /**
+     * This is a little sneaky, but get the _from back out of the "opaque" hash key
+     * (needed for cleanTunnelCache)
+     * 44 = 32 * 4 / 3
+     */
+    private Hash sourceFromHashPair(String s) {
+        return new Hash(Base64.decode(s.substring(0, 44)));
+    }
+
+    /**
+     * Called on failure to give us a better chance of success next time.
+     * Of course this is probably 60s too late.
+     * And we could pick the bad ones at random again.
+     * Or remove entries that were sent and succeeded after this was sent but before this failed.
+     * But it's a start.
+     */
+    private void clearCaches() {
+        String key = hashPair();
+        if (_inTunnel != null) {   // if we wanted an ack, we sent our lease too
+            synchronized(_leaseSetCache) {
+                _leaseSetCache.remove(key);
+            }
+        }
+        if (_lease != null) {
+            synchronized(_leaseCache) {
+                Lease l = (Lease) _leaseCache.get(key);
+                if (l != null && l.equals(_lease))
+                    _leaseCache.remove(key);
+            }
+        }
+        if (_outTunnel != null) {
+            synchronized(_tunnelCache) {
+                TunnelInfo t =(TunnelInfo) _backloggedTunnelCache.get(key);
+                if (t != null && t.equals(_outTunnel))
+                    _backloggedTunnelCache.remove(key);
+                t = (TunnelInfo) _tunnelCache.get(key);
+                if (t != null && t.equals(_outTunnel))
+                    _tunnelCache.remove(key);
+            }
+        }
+    }
+
+    /**
+     * Clean out old leaseSets from a set.
+     * Caller must synchronize on tc.
+     */
+    private void cleanLeaseSetCache(HashMap tc) {
+        long now = getContext().clock().now();
+        List deleteList = new ArrayList();
+        for (Iterator iter = tc.keySet().iterator(); iter.hasNext(); ) {
+            String k = (String) iter.next();
+            LeaseSet l = (LeaseSet) tc.get(k);
+            if (l.getEarliestLeaseDate() < now)
+                deleteList.add(k);
+        }
+        for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
+            String k = (String) iter.next();
+            tc.remove(k);
+        }
+    }
+
     /**
      * Clean out old leases from a set.
      * Caller must synchronize on tc.
@@ -518,14 +656,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
     private void cleanLeaseCache(HashMap tc) {
         List deleteList = new ArrayList();
         for (Iterator iter = tc.keySet().iterator(); iter.hasNext(); ) {
-            Destination dest = (Destination) iter.next();
-            Lease l = (Lease) tc.get(dest);
+            String k = (String) iter.next();
+            Lease l = (Lease) tc.get(k);
             if (l.isExpired(Router.CLOCK_FUDGE_FACTOR))
-                deleteList.add(dest);
+                deleteList.add(k);
         }
         for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
-            Destination dest = (Destination) iter.next();
-            tc.remove(dest);
+            String k = (String) iter.next();
+            tc.remove(k);
         }
     }
 
@@ -536,14 +674,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
     private void cleanTunnelCache(HashMap tc) {
         List deleteList = new ArrayList();
         for (Iterator iter = tc.keySet().iterator(); iter.hasNext(); ) {
-            Destination dest = (Destination) iter.next();
-            TunnelInfo tunnel = (TunnelInfo) tc.get(dest);
-            if (!getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel))
-                deleteList.add(dest);
+            String k = (String) iter.next();
+            TunnelInfo tunnel = (TunnelInfo) tc.get(k);
+            if (!getContext().tunnelManager().isValidTunnel(sourceFromHashPair(k), tunnel))
+                deleteList.add(k);
         }
         for (Iterator iter = deleteList.iterator(); iter.hasNext(); ) {
-            Destination dest = (Destination) iter.next();
-            tc.remove(dest);
+            String k = (String) iter.next();
+            tc.remove(k);
         }
     }
 
@@ -552,9 +690,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
      * if possible, to keep the streaming lib happy
      * Use two caches - although a cache of a list of tunnels per dest might be
      * more elegant.
-     * Key the caches just on the dest, not on source+dest, as different sources
-     * simultaneously talking to the same dest is probably rare enough
-     * to not bother separating out.
+     * Key the caches on the source+dest pair.
      *
      */
     private static HashMap _tunnelCache = new HashMap();
@@ -575,21 +711,21 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
              * if you were the originator by backlogging the tunnel, then removing the
              * backlog and seeing if traffic came back or not.
              */
-            tunnel = (TunnelInfo) _backloggedTunnelCache.get(to);
+            tunnel = (TunnelInfo) _backloggedTunnelCache.get(hashPair());
             if (tunnel != null) {
                 if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
                     if (!getContext().commSystem().isBacklogged(tunnel.getPeer(1))) {
                         if (_log.shouldLog(Log.WARN))
                             _log.warn("Switching back to tunnel " + tunnel + " for " + _toString); 
-                        _backloggedTunnelCache.remove(to);
-                        _tunnelCache.put(to, tunnel);
+                        _backloggedTunnelCache.remove(hashPair());
+                        _tunnelCache.put(hashPair(), tunnel);
                         return tunnel;
                     }  // else still backlogged
                 } else // no longer valid
-                    _backloggedTunnelCache.remove(to);
+                    _backloggedTunnelCache.remove(hashPair());
             }
             // Use the same tunnel unless backlogged
-            tunnel = (TunnelInfo) _tunnelCache.get(to);
+            tunnel = (TunnelInfo) _tunnelCache.get(hashPair());
             if (tunnel != null) {
                 if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
                     if (tunnel.getLength() <= 1 || !getContext().commSystem().isBacklogged(tunnel.getPeer(1)))
@@ -597,14 +733,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
                     // backlogged
                     if (_log.shouldLog(Log.WARN))
                         _log.warn("Switching from backlogged " + tunnel + " for " + _toString); 
-                    _backloggedTunnelCache.put(to, tunnel);
+                    _backloggedTunnelCache.put(hashPair(), tunnel);
                 } // else no longer valid
-                _tunnelCache.remove(to);
+                _tunnelCache.remove(hashPair());
             }
             // Pick a new tunnel
             tunnel = selectOutboundTunnel();
             if (tunnel != null)
-                _tunnelCache.put(to, tunnel);
+                _tunnelCache.put(hashPair(), tunnel);
         }
         return tunnel;
     }
@@ -647,6 +783,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
         getContext().statManager().addRateData("client.timeoutCongestionMessage", messageDelay, 1);
         getContext().statManager().addRateData("client.timeoutCongestionInbound", inboundDelta, 1);
     
+        clearCaches();
         getContext().messageHistory().sendPayloadMessage(_clientMessageId.getMessageId(), false, sendTime);
         getContext().clientManager().messageDeliveryStatusUpdate(_from, _clientMessageId, false);
         getContext().statManager().updateFrequency("client.sendMessageFailFrequency");
@@ -704,22 +841,22 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
             _pendingToken = token;
             if (_log.shouldLog(Log.INFO))
                 _log.info(OutboundClientMessageOneShotJob.this.getJobId() 
-                           + "Reply selector for client message: token=" + token);
+                           + ": Reply selector for client message: token=" + token);
         }
         
         public boolean continueMatching() { 
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug(OutboundClientMessageOneShotJob.this.getJobId() 
-                           + "dont continue matching for token=" + _pendingToken);
+                           + ": dont continue matching for token=" + _pendingToken);
             return false; 
         }
         public long getExpiration() { return _overallExpiration; }
         
         public boolean isMatch(I2NPMessage inMsg) {
             if (inMsg.getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info(OutboundClientMessageOneShotJob.this.getJobId() 
-                               + "delivery status message received: " + inMsg + " our token: " + _pendingToken);
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(OutboundClientMessageOneShotJob.this.getJobId() 
+                               + ": delivery status message received: " + inMsg + " our token: " + _pendingToken);
                 return _pendingToken == ((DeliveryStatusMessage)inMsg).getMessageId();
             } else {
                 return false;
-- 
GitLab