From e0f1047d72acb3bbd64225054133417ccad69f4a Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Sun, 30 Aug 2009 16:27:03 +0000
Subject: [PATCH]     * SessionKeyManager, OCMOSJ, Garlic:       - Enable
 per-client SessionKeyManagers for better anonymity       - tagsDelivered()
 now means tags are sent, not acked.       - OCMOSJ uses the new TagSetHandle
 object returned from tagsDelivered()         to call tagsAcked() or
 failTags() as appropriate.       - Assume tags delivered on an established
 session to         reduce streaming lib stalls caused by massive tag
 deliveries;         should increase throughput and window sizes on long-lived
 streams       - Unacked tagsets on a new session are stored on a separate
 list       - Don't kill an OB Session just because it's temporarily out of
 tags       - Increase min tag threshold to 30 (was 20) due to new speculative
         tags delivered scheme, and to increase effective max window       -
 More Java 5 and dead code cleanups, and more comments and javadoc,        
 debug logging cleanups

---
 .../src/net/i2p/crypto/SessionKeyManager.java |   5 +-
 .../java/src/net/i2p/crypto/TagSetHandle.java |   8 +
 .../crypto/TransientSessionKeyManager.java    | 258 ++++++++++++++----
 .../router/message/GarlicMessageBuilder.java  |  95 +++++--
 .../router/message/GarlicMessageParser.java   |   6 +-
 .../router/message/GarlicMessageReceiver.java |   9 +-
 .../OutboundClientMessageJobHelper.java       |  12 +-
 .../OutboundClientMessageOneShotJob.java      |  51 +++-
 8 files changed, 345 insertions(+), 99 deletions(-)
 create mode 100644 core/java/src/net/i2p/crypto/TagSetHandle.java

diff --git a/core/java/src/net/i2p/crypto/SessionKeyManager.java b/core/java/src/net/i2p/crypto/SessionKeyManager.java
index 2cd86ba7d9..126ab0c038 100644
--- a/core/java/src/net/i2p/crypto/SessionKeyManager.java
+++ b/core/java/src/net/i2p/crypto/SessionKeyManager.java
@@ -95,7 +95,8 @@ public class SessionKeyManager {
      * method after receiving an ack to a message delivering them)
      *
      */
-    public void tagsDelivered(PublicKey target, SessionKey key, Set<SessionTag> sessionTags) { // nop
+    public TagSetHandle tagsDelivered(PublicKey target, SessionKey key, Set<SessionTag> sessionTags) { // nop
+         return null;
     }
 
     /**
@@ -134,4 +135,6 @@ public class SessionKeyManager {
     }
 
     public void renderStatusHTML(Writer out) throws IOException {}
+    public void failTags(PublicKey target, SessionKey key, TagSetHandle ts) {}
+    public void tagsAcked(PublicKey target, SessionKey key, TagSetHandle ts) {}
 }
diff --git a/core/java/src/net/i2p/crypto/TagSetHandle.java b/core/java/src/net/i2p/crypto/TagSetHandle.java
new file mode 100644
index 0000000000..7e06939b56
--- /dev/null
+++ b/core/java/src/net/i2p/crypto/TagSetHandle.java
@@ -0,0 +1,8 @@
+package net.i2p.crypto;
+
+/**
+ * An opaque handle to a TagSet returned by the SessionKeyManager,
+ * so that OCMOSJ can report that the tags were later acked, or not.
+ *
+ */
+public interface TagSetHandle {}
diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java
index e079fce5d1..57862c3b83 100644
--- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java
+++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java
@@ -19,6 +19,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -36,6 +37,41 @@ import net.i2p.util.SimpleTimer;
  * to disk).  However, this being java, we cannot guarantee that the keys aren't swapped
  * out to disk so this should not be considered secure in that sense.
  *
+ * The outbound and inbound sides are completely independent, each with
+ * their own keys and tags.
+ *
+ * For a new session, outbound tags are not considered delivered until an ack is received.
+ * Otherwise, the loss of the first message would render all subsequent messages
+ * undecryptable. True?
+ *
+ * For an existing session, outbound tags are immediately considered delivered, and are
+ * later revoked if the ack times out. This prevents massive stream slowdown caused by
+ * repeated tag delivery after the minimum tag threshold is reached. Included tags
+ * pushes messages above the ideal 1956 size by ~2KB and causes excessive fragmentation
+ * and padding. As the tags are not seen by the streaming lib, they aren't accounted
+ * for in the window size, and one or more of a series of large messages is likely to be dropped,
+ * either due to high fragmentation or drop priorites at the tunnel OBEP.
+ *
+ * For this to work, the minimum tag threshold and tag delivery quanitity defined in
+ * GarlicMessageBuilder must be chosen with streaming lib windows sizes in mind.
+ * If a single TagSet is not delivered, there will be no stall as long as the
+ * current window size is smaller than the minimum tag threshold.
+ * A second consecutive TagSet delivery failure will cause a complete stall, as
+ * all subsequent messages will fail to decrypt.
+ * See ConnectionOptions in streaming for more information.
+ *
+ * There are large inefficiencies caused by the repeated delivery of tags in a new session.
+ * With an initial streaming window size of 6 and 40 tags per delivery, a web server
+ * would deliver up to 240 tags (7680 bytes, not including bundled leaseset, etc.)
+ * in the first volley of the response.
+ *
+ * Could the two directions be linked somehow, such that the initial request could
+ * contain a key or tags for the response?
+ *
+ * Should the tag threshold and quantity be adaptive?
+ *
+ * Todo: Switch to ConcurrentHashMaps and ReadWriteLocks, only get write lock during cleanup
+ *
  */
 public class TransientSessionKeyManager extends SessionKeyManager {
     private Log _log;
@@ -126,6 +162,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
     }
 
     /* FIXME Exporting non-public type through public API */
+/****** leftover from when we had the persistent SKM
     protected void setData(Set<TagSet> inboundTagSets, Set<OutboundSession> outboundSessions) {
         if (_log.shouldLog(Log.INFO))
             _log.info("Loading " + inboundTagSets.size() + " inbound tag sets, and " 
@@ -152,6 +189,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
             _outboundSessions.putAll(sessions);
         }
     }
+******/
 
     /**
      * Retrieve the session key currently associated with encryption to the target,
@@ -179,13 +217,10 @@ public class TransientSessionKeyManager extends SessionKeyManager {
      * Associate a new session key with the specified target.  Metrics to determine
      * when to expire that key begin with this call.
      *
-     * Unused except in tests?
      */
     @Override
     public void createSession(PublicKey target, SessionKey key) {
-        OutboundSession sess = new OutboundSession(target);
-        sess.setCurrentKey(key);
-        addSession(sess);
+        createAndReturnSession(target, key);
     }
 
     /**
@@ -218,7 +253,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
         if (sess.getCurrentKey().equals(key)) {
             SessionTag nxt = sess.consumeNext();
             if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Tag consumed: " + nxt + " with key: " + key.toBase64());
+                _log.debug("OB Tag consumed: " + nxt + " with: " + key);
             return nxt;
         }
         if (_log.shouldLog(Log.DEBUG))
@@ -261,23 +296,31 @@ public class TransientSessionKeyManager extends SessionKeyManager {
 
     /**
      * Take note of the fact that the given sessionTags associated with the key for
-     * encryption to the target have definitely been received at the target (aka call this
-     * method after receiving an ack to a message delivering them)
+     * encryption to the target have been sent. Whether to use the tags immediately
+     * (i.e. assume they will be received) or to wait until an ack, is implementation dependent.
+     *
+     * Here, we wait for the ack if the session is new, otherwise we use right away.
+     * Will this work???
+     * If the tags are pipelined sufficiently, it will.
      *
+     * @return the TagSetHandle. Caller MUST subsequently call failTags() or tagsAcked()
+     * with this handle.
      */
     @Override
-    public void tagsDelivered(PublicKey target, SessionKey key, Set sessionTags) {
+    public TagSetHandle tagsDelivered(PublicKey target, SessionKey key, Set<SessionTag> sessionTags) {
         if (_log.shouldLog(Log.DEBUG)) {
             //_log.debug("Tags delivered to set " + set + " on session " + sess);
             if (sessionTags.size() > 0)
-                _log.debug("Tags delivered: " + sessionTags.size() + " for key: " + key.toBase64() + ": " + sessionTags);
+                _log.debug("Tags delivered: " + sessionTags.size() + " for key: " + key + ": " + sessionTags);
         }
         OutboundSession sess = getSession(target);
         if (sess == null)
             sess = createAndReturnSession(target, key);
-        sess.setCurrentKey(key);
+        else
+            sess.setCurrentKey(key);
         TagSet set = new TagSet(sessionTags, key, _context.clock().now());
         sess.addTags(set);
+        return set;
     }
 
     /**
@@ -285,12 +328,44 @@ public class TransientSessionKeyManager extends SessionKeyManager {
      * has failed to respond when they should have.  This call essentially lets the system recover
      * from corrupted tag sets and crashes
      *
+     * @deprecated unused and rather drastic
      */
     @Override
     public void failTags(PublicKey target) {
         removeSession(target);
     }
 
+    /**
+     * Mark these tags as invalid, since the peer
+     * has failed to ack them in time.
+     */
+    @Override
+    public void failTags(PublicKey target, SessionKey key, TagSetHandle ts) {
+        OutboundSession sess = getSession(target);
+        if (sess == null)
+            return;
+        if(!key.equals(sess.getCurrentKey()))
+            return;
+        sess.failTags((TagSet)ts);
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("TagSet failed: " + ts);
+    }
+
+    /**
+     * Mark these tags as acked, start to use them (if we haven't already)
+     */
+    @Override
+    public void tagsAcked(PublicKey target, SessionKey key, TagSetHandle ts) {
+        OutboundSession sess = getSession(target);
+        if (sess == null)
+            return;
+        if(!key.equals(sess.getCurrentKey()))
+            return;
+        sess.ackTags((TagSet)ts);
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("TagSet acked: " + ts);
+    }
+
     /**
      * Accept the given tags and associate them with the given key for decryption
      *
@@ -304,9 +379,9 @@ public class TransientSessionKeyManager extends SessionKeyManager {
         for (Iterator<SessionTag> iter = sessionTags.iterator(); iter.hasNext();) {
             SessionTag tag = iter.next();
             if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Receiving tag " + tag + " for key " + key.toBase64() + " / " + key.toString() + ": tagSet: " + tagSet);
+                _log.debug("Receiving tag " + tag + " for key " + key + ": tagSet: " + tagSet);
             synchronized (_inboundTagSets) {
-                old = (TagSet)_inboundTagSets.put(tag, tagSet);
+                old = _inboundTagSets.put(tag, tagSet);
                 overage = _inboundTagSets.size() - MAX_INBOUND_SESSION_TAGS;
                 if (old != null) {
                     if (!old.getAssociatedKey().equals(tagSet.getAssociatedKey())) {
@@ -334,9 +409,9 @@ public class TransientSessionKeyManager extends SessionKeyManager {
             }
 
             if (_log.shouldLog(Log.WARN)) {
-                _log.warn("Multiple tags matching!  tagSet: " + tagSet + " and old tagSet: " + old + " tag: " + dupTag + "/" + dupTag.toBase64());
-                _log.warn("Earlier tag set creation: " + old + ": key=" + old.getAssociatedKey().toBase64(), old.getCreatedBy());
-                _log.warn("Current tag set creation: " + tagSet + ": key=" + tagSet.getAssociatedKey().toBase64(), tagSet.getCreatedBy());
+                _log.warn("Multiple tags matching!  tagSet: " + tagSet + " and old tagSet: " + old + " tag: " + dupTag + "/" + dupTag);
+                _log.warn("Earlier tag set creation: " + old + ": key=" + old.getAssociatedKey());
+                _log.warn("Current tag set creation: " + tagSet + ": key=" + tagSet.getAssociatedKey());
             }
         }
         
@@ -410,26 +485,26 @@ public class TransientSessionKeyManager extends SessionKeyManager {
      */
     @Override
     public SessionKey consumeTag(SessionTag tag) {
-        if (false) aggressiveExpire();
+        //if (false) aggressiveExpire();
         synchronized (_inboundTagSets) {
             TagSet tagSet = (TagSet) _inboundTagSets.remove(tag);
             if (tagSet == null) {
                 if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Cannot consume tag " + tag + " as it is not known");
+                    _log.debug("Cannot consume IB " + tag + " as it is not known");
                 return null;
             }
             tagSet.consume(tag);
 
             SessionKey key = tagSet.getAssociatedKey();
             if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Consuming tag " + tag.toString() + " for sessionKey " + key.toBase64() + " / " + key.toString() + " on tagSet: " + tagSet);
+                _log.debug("Consuming IB " + tag + " for " + key + " on: " + tagSet);
             return key;
         }
     }
 
     private OutboundSession getSession(PublicKey target) {
         synchronized (_outboundSessions) {
-            return (OutboundSession) _outboundSessions.get(target);
+            return _outboundSessions.get(target);
         }
     }
 
@@ -443,7 +518,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
         if (target == null) return;
         OutboundSession session = null;
         synchronized (_outboundSessions) {
-            session = (OutboundSession)_outboundSessions.remove(target);
+            session = _outboundSessions.remove(target);
         }
         if ( (session != null) && (_log.shouldLog(Log.WARN)) )
             _log.warn("Removing session tags with " + session.availableTags() + " available for "
@@ -461,11 +536,11 @@ public class TransientSessionKeyManager extends SessionKeyManager {
         int remaining = 0;
         long now = _context.clock().now();
         StringBuilder buf = null;
-        StringBuilder bufSummary = null;
+        //StringBuilder bufSummary = null;
         if (_log.shouldLog(Log.DEBUG)) {
             buf = new StringBuilder(128);
             buf.append("Expiring inbound: ");
-            bufSummary = new StringBuilder(1024);
+            //bufSummary = new StringBuilder(1024);
         }
         synchronized (_inboundTagSets) {
             for (Iterator<SessionTag> iter = _inboundTagSets.keySet().iterator(); iter.hasNext();) {
@@ -477,10 +552,10 @@ public class TransientSessionKeyManager extends SessionKeyManager {
                     iter.remove();
                     removed++;
                     if (buf != null)
-                        buf.append(tag.toString()).append(" @ age ").append(DataHelper.formatDuration(age));
-                } else if (false && (bufSummary != null) ) {
-                    bufSummary.append("\nTagSet: " + ts.toString() + ", key: " + ts.getAssociatedKey().toBase64()+"/" + ts.getAssociatedKey().toString() 
-                                      + ": tag: " + tag.toString());
+                        buf.append(tag).append(" @ age ").append(DataHelper.formatDuration(age));
+                //} else if (false && (bufSummary != null) ) {
+                //    bufSummary.append("\nTagSet: " + ts + ", key: " + ts.getAssociatedKey()
+                //                      + ": tag: " + tag);
                 }
             }
             remaining = _inboundTagSets.size();
@@ -488,8 +563,8 @@ public class TransientSessionKeyManager extends SessionKeyManager {
         _context.statManager().addRateData("crypto.sessionTagsRemaining", remaining, 0);
         if ( (buf != null) && (removed > 0) )
             _log.debug(buf.toString());
-        if (bufSummary != null)
-            _log.debug("Cleaning up with remaining: " + bufSummary.toString());
+        //if (bufSummary != null)
+        //    _log.debug("Cleaning up with remaining: " + bufSummary.toString());
 
         //_log.warn("Expiring tags: [" + tagsToDrop + "]");
 
@@ -498,9 +573,11 @@ public class TransientSessionKeyManager extends SessionKeyManager {
                 PublicKey key = iter.next();
                 OutboundSession sess = _outboundSessions.get(key);
                 removed += sess.expireTags();
-                if (sess.availableTags() <= 0) {
+                // don't kill a new session or one that's temporarily out of tags
+                if (sess.getLastUsedDate() < now - (SESSION_LIFETIME_MAX_MS / 2) &&
+                    sess.availableTags() <= 0) {
                     iter.remove();
-                    removed++;
+                    removed++;   // just to have a non-zero return value?
                 }
             }
         }
@@ -563,7 +640,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
                 int size = ts.getTags().size();
                 total += size;
                 buf.append("<li><b>Sent:</b> ").append(DataHelper.formatDuration(now - ts.getDate())).append(" ago with ");
-                buf.append(size).append(" tags remaining</li>");
+                buf.append(size).append(" tags remaining; acked? ").append(ts.getAcked()).append("</li>");
             }
             buf.append("</ul></td></tr>\n");
             out.write(buf.toString());
@@ -580,18 +657,27 @@ public class TransientSessionKeyManager extends SessionKeyManager {
      *  Just for the HTML method above so we can see what's going on easier
      *  Earliest first
      */
-    private class TagSetComparator implements Comparator {
+    private static class TagSetComparator implements Comparator {
          public int compare(Object l, Object r) {
              return (int) (((TagSet)l).getDate() - ((TagSet)r).getDate());
         }
     }
 
-    class OutboundSession {
+    private class OutboundSession {
         private PublicKey _target;
         private SessionKey _currentKey;
         private long _established;
         private long _lastUsed;
+        /** before the first ack, all tagsets go here. These are never expired, we rely
+            on the callers to call failTags() or ackTags() to remove them from this list. */
+        private /* FIXME final FIXME */ List<TagSet> _unackedTagSets;
+        /**
+         *  As tagsets are acked, they go here.
+         *  After the first ack, new tagsets go here (i.e. presumed acked)
+         */
         private /* FIXME final FIXME */ List<TagSet> _tagSets;
+        /** set to true after first tagset is acked */
+        private boolean _acked;
 
         public OutboundSession(PublicKey target) {
             this(target, null, _context.clock().now(), _context.clock().now(), new ArrayList());
@@ -602,14 +688,43 @@ public class TransientSessionKeyManager extends SessionKeyManager {
             _currentKey = curKey;
             _established = established;
             _lastUsed = lastUsed;
-            _tagSets = tagSets;
+            _unackedTagSets = tagSets;
+            _tagSets = new ArrayList();
         }
 
-        /** list of TagSet objects */
+        /**
+         *  @return list of TagSet objects
+         *  This is used only by renderStatusHTML().
+         *  It includes both acked and unacked TagSets.
+         */
         List<TagSet> getTagSets() {
+            List<TagSet> rv;
+            synchronized (_unackedTagSets) {
+                rv = new ArrayList(_unackedTagSets);
+            }
             synchronized (_tagSets) {
-                return new ArrayList(_tagSets);
+                rv.addAll(_tagSets);
             }
+            return rv;
+        }
+
+        /**
+         *  got an ack for these tags
+         *  For tagsets delivered after the session was acked, this is a nop
+         *  because the tagset was originally placed directly on the acked list.
+         */
+        void ackTags(TagSet set) {
+            if (_unackedTagSets.remove(set)) {
+                _tagSets.add(set);
+                _acked = true;
+            }
+            set.setAcked();
+        }
+
+        /** didn't get an ack for these tags */
+        void failTags(TagSet set) {
+            _unackedTagSets.remove(set);
+            _tagSets.remove(set);
         }
 
         public PublicKey getTarget() {
@@ -656,7 +771,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
             int removed = 0;
             synchronized (_tagSets) {
                 for (int i = 0; i < _tagSets.size(); i++) {
-                    TagSet set = (TagSet) _tagSets.get(i);
+                    TagSet set = _tagSets.get(i);
                     if (set.getDate() + SESSION_TAG_DURATION_MS <= now) {
                         _tagSets.remove(i);
                         i--;
@@ -672,7 +787,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
             _lastUsed = now;
             synchronized (_tagSets) {
                 while (_tagSets.size() > 0) {
-                    TagSet set = (TagSet) _tagSets.get(0);
+                    TagSet set = _tagSets.get(0);
                     if (set.getDate() + SESSION_TAG_DURATION_MS > now) {
                         SessionTag tag = set.consumeNext();
                         if (tag != null) return tag;
@@ -686,12 +801,13 @@ public class TransientSessionKeyManager extends SessionKeyManager {
             return null;
         }
 
+        /** @return the total number of tags in acked TagSets */
         public int availableTags() {
             int tags = 0;
             long now = _context.clock().now();
             synchronized (_tagSets) {
                 for (int i = 0; i < _tagSets.size(); i++) {
-                    TagSet set = (TagSet) _tagSets.get(i);
+                    TagSet set = _tagSets.get(i);
                     if (set.getDate() + SESSION_TAG_DURATION_MS > now)
                         tags += set.getTags().size();
                 }
@@ -719,19 +835,31 @@ public class TransientSessionKeyManager extends SessionKeyManager {
                 return -1;
         }
 
+        /**
+         *  If the session has never been acked, put the TagSet on the unacked list.
+         *  Otherwise, consider it good right away.
+         */
         public void addTags(TagSet set) {
             _lastUsed = _context.clock().now();
-            synchronized (_tagSets) {
-                _tagSets.add(set);
+            if (_acked) {
+                synchronized (_tagSets) {
+                    _tagSets.add(set);
+                }
+            } else {
+                synchronized (_unackedTagSets) {
+                    _unackedTagSets.add(set);
+                }
             }
         }
     }
 
-    static class TagSet {
+    private static class TagSet implements TagSetHandle {
         private Set<SessionTag> _sessionTags;
         private SessionKey _key;
         private long _date;
-        private Exception _createdBy;
+        //private Exception _createdBy;
+        /** only used in renderStatusHTML() for debugging */
+        private boolean _acked;
 
         public TagSet(Set<SessionTag> tags, SessionKey key, long date) {
             if (key == null) throw new IllegalArgumentException("Missing key");
@@ -739,12 +867,12 @@ public class TransientSessionKeyManager extends SessionKeyManager {
             _sessionTags = tags;
             _key = key;
             _date = date;
-            if (true) {
-                long now = I2PAppContext.getGlobalContext().clock().now();
-                _createdBy = new Exception("Created by: key=" + _key.toBase64() + " on " 
-                                           + new Date(now) + "/" + now 
-                                           + " via " + Thread.currentThread().getName());
-            }
+            //if (true) {
+            //    long now = I2PAppContext.getGlobalContext().clock().now();
+            //    _createdBy = new Exception("Created by: key=" + _key.toBase64() + " on " 
+            //                               + new Date(now) + "/" + now 
+            //                               + " via " + Thread.currentThread().getName());
+            //}
         }
 
         /** when the tag set was created */
@@ -770,22 +898,26 @@ public class TransientSessionKeyManager extends SessionKeyManager {
         }
 
         public void consume(SessionTag tag) {
-            if (contains(tag)) {
-                _sessionTags.remove(tag);
-            }
+            _sessionTags.remove(tag);
         }
 
+        /** let's do this without counting the elements first */
         public SessionTag consumeNext() {
-            if (_sessionTags.size() <= 0) {
+            SessionTag first;
+            try {
+                first = _sessionTags.iterator().next();
+            } catch (NoSuchElementException nsee) {
                 return null;
             }
-
-            SessionTag first = (SessionTag) _sessionTags.iterator().next();
             _sessionTags.remove(first);
             return first;
         }
         
-        public Exception getCreatedBy() { return _createdBy; }
+        //public Exception getCreatedBy() { return _createdBy; }
+
+        public void setAcked() { _acked = true; }
+        /** only used in renderStatusHTML() for debugging */
+        public boolean getAcked() { return _acked; }
         
         @Override
         public int hashCode() {
@@ -800,9 +932,19 @@ public class TransientSessionKeyManager extends SessionKeyManager {
         public boolean equals(Object o) {
             if ((o == null) || !(o instanceof TagSet)) return false;
             TagSet ts = (TagSet) o;
-            return DataHelper.eq(ts.getAssociatedKey(), getAssociatedKey()) 
+            return DataHelper.eq(ts.getAssociatedKey(), _key) 
                    //&& DataHelper.eq(ts.getTags(), getTags())
-                   && ts.getDate() == getDate();
+                   && ts.getDate() == _date;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder buf = new StringBuilder(256);
+            buf.append("TagSet established: ").append(new Date(_date));
+            buf.append(" Session key: ").append(_key.toBase64());
+            buf.append(" Size: ").append(_sessionTags.size());
+            buf.append(" Acked? ").append(_acked);
+            return buf.toString();
         }
     }
 }
diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java
index a049b7b8cd..961c0f769f 100644
--- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java
+++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java
@@ -17,7 +17,7 @@ import java.util.Set;
 import net.i2p.crypto.SessionKeyManager;
 import net.i2p.data.DataFormatException;
 import net.i2p.data.DataHelper;
-import net.i2p.data.Destination;
+import net.i2p.data.Hash;
 import net.i2p.data.PublicKey;
 import net.i2p.data.SessionKey;
 import net.i2p.data.SessionTag;
@@ -59,14 +59,16 @@ public class GarlicMessageBuilder {
      *
      *  So a value somewhat higher than the low threshold
      *  seems appropriate.
+     *
+     *  Use care when adjusting these values. See ConnectionOptions in streaming,
+     *  and TransientSessionKeyManager in crypto, for more information.
      */
     private static final int DEFAULT_TAGS = 40;
-    private static final int LOW_THRESHOLD = 20;
+    private static final int LOW_THRESHOLD = 30;
 
-    public static int estimateAvailableTags(RouterContext ctx, PublicKey key, Destination local) {
-        // per-dest Unimplemented
-        //SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(local);
-        SessionKeyManager skm = ctx.sessionKeyManager();
+    /** @param local non-null; do not use this method for the router's SessionKeyManager */
+    public static int estimateAvailableTags(RouterContext ctx, PublicKey key, Hash local) {
+        SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(local);
         if (skm == null)
             return 0;
         SessionKey curKey = skm.getCurrentKey(key);
@@ -75,19 +77,54 @@ public class GarlicMessageBuilder {
         return skm.getAvailableTags(key, curKey);
     }
     
-    public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config) {
-        return buildMessage(ctx, config, new SessionKey(), new HashSet());
+    /**
+     * Unused and probably a bad idea.
+     *
+     * Used below only on a recursive call if the garlic message contains a garlic message.
+     * We don't need the SessionKey or SesssionTags returned
+     * This uses the router's SKM, which is probably not what you want.
+     * This isn't fully implemented, because the key and tags aren't saved - maybe
+     * it should force elGamal?
+     *
+     * @param ctx scope
+     * @param config how/what to wrap
+     */
+    private static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config) {
+        Log log = ctx.logManager().getLog(GarlicMessageBuilder.class);
+        log.error("buildMessage 2 args, using router SKM", new Exception("who did it"));
+        return buildMessage(ctx, config, new SessionKey(), new HashSet(), ctx.sessionKeyManager());
     }
 
-    public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags) {
-        return buildMessage(ctx, config, wrappedKey, wrappedTags, DEFAULT_TAGS);
+    /**
+     * called by OCMJH
+     *
+     * @param ctx scope
+     * @param config how/what to wrap
+     * @param wrappedKey output parameter that will be filled with the sessionKey used
+     * @param wrappedTags output parameter that will be filled with the sessionTags used
+     */
+    public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
+                                             SessionKeyManager skm) {
+        return buildMessage(ctx, config, wrappedKey, wrappedTags, DEFAULT_TAGS, false, skm);
     }
 
-    public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, int numTagsToDeliver) {
+    /** unused */
+    /***
+    public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags,
+                                             int numTagsToDeliver) {
         return buildMessage(ctx, config, wrappedKey, wrappedTags, numTagsToDeliver, false);
     }
+    ***/
 
-    public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, int numTagsToDeliver, boolean forceElGamal) {
+    /**
+     * @param ctx scope
+     * @param config how/what to wrap
+     * @param wrappedKey output parameter that will be filled with the sessionKey used
+     * @param wrappedTags output parameter that will be filled with the sessionTags used
+     * @param numTagsToDeliver only if the estimated available tags are below the threshold
+     */
+    private static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
+                                             int numTagsToDeliver, boolean forceElGamal, SessionKeyManager skm) {
         Log log = ctx.logManager().getLog(GarlicMessageBuilder.class);
         PublicKey key = config.getRecipientPublicKey();
         if (key == null) {
@@ -104,14 +141,14 @@ public class GarlicMessageBuilder {
         if (log.shouldLog(Log.INFO))
             log.info("Encrypted with public key " + key + " to expire on " + new Date(config.getExpiration()));
         
-        SessionKey curKey = ctx.sessionKeyManager().getCurrentKey(key);
+        SessionKey curKey = skm.getCurrentKey(key);
         SessionTag curTag = null;
         if (curKey == null)
-            curKey = ctx.sessionKeyManager().createSession(key);
+            curKey = skm.createSession(key);
         if (!forceElGamal) {
-            curTag = ctx.sessionKeyManager().consumeNextAvailableTag(key, curKey);
+            curTag = skm.consumeNextAvailableTag(key, curKey);
             
-            int availTags = ctx.sessionKeyManager().getAvailableTags(key, curKey);
+            int availTags = skm.getAvailableTags(key, curKey);
             if (log.shouldLog(Log.DEBUG))
                 log.debug("Available tags for encryption to " + key + ": " + availTags);
 
@@ -120,7 +157,7 @@ public class GarlicMessageBuilder {
                     wrappedTags.add(new SessionTag(true));
                 if (log.shouldLog(Log.INFO))
                     log.info("Too few are available (" + availTags + "), so we're including more");
-            } else if (ctx.sessionKeyManager().getAvailableTimeLeft(key, curKey) < 60*1000) {
+            } else if (skm.getAvailableTimeLeft(key, curKey) < 60*1000) {
                 // if we have enough tags, but they expire in under 30 seconds, we want more
                 for (int i = 0; i < numTagsToDeliver; i++)
                     wrappedTags.add(new SessionTag(true));
@@ -138,16 +175,19 @@ public class GarlicMessageBuilder {
     }
     
     /**
+     *  used by TestJob and directly above
+     *
      * @param ctx scope
      * @param config how/what to wrap
-     * @param wrappedKey output parameter that will be filled with the sessionKey used
+     * @param wrappedKey unused - why??
      * @param wrappedTags output parameter that will be filled with the sessionTags used
      * @param target public key of the location being garlic routed to (may be null if we 
      *               know the encryptKey and encryptTag)
      * @param encryptKey sessionKey used to encrypt the current message
      * @param encryptTag sessionTag used to encrypt the current message
      */
-    public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set wrappedTags, PublicKey target, SessionKey encryptKey, SessionTag encryptTag) {
+    public static GarlicMessage buildMessage(RouterContext ctx, GarlicConfig config, SessionKey wrappedKey, Set<SessionTag> wrappedTags,
+                                             PublicKey target, SessionKey encryptKey, SessionTag encryptTag) {
         Log log = ctx.logManager().getLog(GarlicMessageBuilder.class);
         if (config == null)
             throw new IllegalArgumentException("Null config specified");
@@ -209,6 +249,7 @@ public class GarlicMessageBuilder {
                         cloves[i] = buildClove(ctx, (PayloadGarlicConfig)c);
                     } else {
                         log.debug("Subclove IS NOT a payload garlic clove");
+                        // See notes below
                         cloves[i] = buildClove(ctx, c);
                     }
                     if (cloves[i] == null)
@@ -242,6 +283,22 @@ public class GarlicMessageBuilder {
         return buildCommonClove(ctx, clove, config);
     }
     
+    /**
+     *  UNUSED
+     *
+     *  The Garlic Message we are building contains another garlic message,
+     *  as specified by a GarlicConfig (NOT a PayloadGarlicConfig).
+     *
+     *  So this calls back to the top, to buildMessage(ctx, config),
+     *  which uses the router's SKM, i.e. the wrong one.
+     *  Unfortunately we've lost the reference to the SessionKeyManager way down here,
+     *  so we can't call buildMessage(ctx, config, key, tags, skm).
+     *
+     *  If we do ever end up constructing a garlic message that contains a garlic message,
+     *  we'll have to fix this by passing the skm through the last buildMessage,
+     *  through buildCloveSet, to here.
+     *
+     */
     private static byte[] buildClove(RouterContext ctx, GarlicConfig config) throws DataFormatException, IOException {
         GarlicClove clove = new GarlicClove(ctx);
         GarlicMessage msg = buildMessage(ctx, config);
diff --git a/router/java/src/net/i2p/router/message/GarlicMessageParser.java b/router/java/src/net/i2p/router/message/GarlicMessageParser.java
index 84ed50b2c5..8d53fc2122 100644
--- a/router/java/src/net/i2p/router/message/GarlicMessageParser.java
+++ b/router/java/src/net/i2p/router/message/GarlicMessageParser.java
@@ -10,6 +10,7 @@ package net.i2p.router.message;
 
 import java.util.Date;
 
+import net.i2p.crypto.SessionKeyManager;
 import net.i2p.data.Certificate;
 import net.i2p.data.DataFormatException;
 import net.i2p.data.DataHelper;
@@ -32,13 +33,14 @@ public class GarlicMessageParser {
         _log = _context.logManager().getLog(GarlicMessageParser.class);
     }
     
-    public CloveSet getGarlicCloves(GarlicMessage message, PrivateKey encryptionKey) {
+    /** @param skm use tags from this session key manager */
+    public CloveSet getGarlicCloves(GarlicMessage message, PrivateKey encryptionKey, SessionKeyManager skm) {
         byte encData[] = message.getData();
         byte decrData[] = null;
         try {
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Decrypting with private key " + encryptionKey);
-            decrData = _context.elGamalAESEngine().decrypt(encData, encryptionKey);
+            decrData = _context.elGamalAESEngine().decrypt(encData, encryptionKey, skm);
         } catch (DataFormatException dfe) {
             if (_log.shouldLog(Log.WARN))
                 _log.warn("Error decrypting", dfe);
diff --git a/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java b/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java
index fcc5bbddf8..a12d55452e 100644
--- a/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java
+++ b/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java
@@ -8,6 +8,7 @@ package net.i2p.router.message;
  *
  */
 
+import net.i2p.crypto.SessionKeyManager;
 import net.i2p.data.DataHelper;
 import net.i2p.data.Hash;
 import net.i2p.data.PrivateKey;
@@ -47,13 +48,16 @@ public class GarlicMessageReceiver {
         _clientDestination = clientDestination;
         _parser = new GarlicMessageParser(context);
         _receiver = receiver;
+        //_log.error("New GMR dest = " + clientDestination);
     }
     
     public void receive(GarlicMessage message) {
         PrivateKey decryptionKey = null;
+        SessionKeyManager skm = null;
         if (_clientDestination != null) {
             LeaseSetKeys keys = _context.keyManager().getKeys(_clientDestination);
-            if (keys != null) {
+            skm = _context.clientManager().getClientSessionKeyManager(_clientDestination);
+            if (keys != null && skm != null) {
                 decryptionKey = keys.getDecryptionKey();
             } else {
                 if (_log.shouldLog(Log.WARN))
@@ -62,9 +66,10 @@ public class GarlicMessageReceiver {
             }
         } else {
             decryptionKey = _context.keyManager().getPrivateKey();
+            skm = _context.sessionKeyManager();
         }
         
-        CloveSet set = _parser.getGarlicCloves(message, decryptionKey);
+        CloveSet set = _parser.getGarlicCloves(message, decryptionKey, skm);
         if (set != null) {
             for (int i = 0; i < set.getCloveCount(); i++) {
                 GarlicClove clove = set.getClove(i);
diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java
index efbdd90f05..75b5c8dc98 100644
--- a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java
+++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java
@@ -17,6 +17,7 @@ import net.i2p.data.LeaseSet;
 import net.i2p.data.Payload;
 import net.i2p.data.PublicKey;
 import net.i2p.data.SessionKey;
+import net.i2p.data.SessionTag;
 import net.i2p.data.TunnelId;
 import net.i2p.data.i2np.DataMessage;
 import net.i2p.data.i2np.DatabaseStoreMessage;
@@ -46,13 +47,15 @@ class OutboundClientMessageJobHelper {
      *
      * For now, its just a tunneled DeliveryStatusMessage
      *
+     * Unused?
+     *
      * @param bundledReplyLeaseSet if specified, the given LeaseSet will be packaged with the message (allowing
      *                             much faster replies, since their netDb search will return almost instantly)
      * @return garlic, or null if no tunnels were found (or other errors)
      */
     static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, 
                                              Payload data, Hash from, Destination dest, TunnelInfo replyTunnel,
-                                             SessionKey wrappedKey, Set wrappedTags, 
+                                             SessionKey wrappedKey, Set<SessionTag> wrappedTags, 
                                              boolean requireAck, LeaseSet bundledReplyLeaseSet) {
         PayloadGarlicConfig dataClove = buildDataClove(ctx, data, dest, expiration);
         return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, wrappedKey, 
@@ -62,15 +65,18 @@ class OutboundClientMessageJobHelper {
      * Allow the app to specify the data clove directly, which enables OutboundClientMessage to resend the
      * same payload (including expiration and unique id) in different garlics (down different tunnels)
      *
+     * This is called from OCMOSJ
+     *
      * @return garlic, or null if no tunnels were found (or other errors)
      */
     static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, 
                                              PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel, SessionKey wrappedKey, 
-                                             Set wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet) {
+                                             Set<SessionTag> wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet) {
         GarlicConfig config = createGarlicConfig(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, requireAck, bundledReplyLeaseSet);
         if (config == null)
             return null;
-        GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, config, wrappedKey, wrappedTags);
+        GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, config, wrappedKey, wrappedTags,
+                                                              ctx.clientManager().getClientSessionKeyManager(from));
         return msg;
     }
     
diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java
index 5263a614ec..9e1ad88a46 100644
--- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java
+++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java
@@ -10,6 +10,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import net.i2p.crypto.SessionKeyManager;
+import net.i2p.crypto.TagSetHandle;
 import net.i2p.data.Base64;
 import net.i2p.data.Certificate;
 import net.i2p.data.Destination;
@@ -20,6 +22,7 @@ import net.i2p.data.Payload;
 import net.i2p.data.PublicKey;
 import net.i2p.data.RouterInfo;
 import net.i2p.data.SessionKey;
+import net.i2p.data.SessionTag;
 import net.i2p.data.i2cp.MessageId;
 import net.i2p.data.i2np.DataMessage;
 import net.i2p.data.i2np.DeliveryInstructions;
@@ -471,7 +474,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
             return;
         }
 
-        int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey(), _from);
+        int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey(),
+                                                                      _from.calculateHash());
         _outTunnel = selectOutboundTunnel(_to);
         // boolean wantACK = _wantACK || existingTags <= 30 || getContext().random().nextInt(100) < 5;
         // what's the point of 5% random? possible improvements or replacements:
@@ -489,7 +493,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
         
         PublicKey key = _leaseSet.getEncryptionKey();
         SessionKey sessKey = new SessionKey();
-        Set tags = new HashSet();
+        Set<SessionTag> tags = new HashSet();
         // If we want an ack, bundle a leaseSet... (so he can get back to us)
         LeaseSet replyLeaseSet = getReplyLeaseSet(wantACK);
         // ... and vice versa  (so we know he got it)
@@ -531,8 +535,16 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
         SendTimeoutJob onFail = null;
         ReplySelector selector = null;
         if (wantACK) {
-            onReply = new SendSuccessJob(getContext(), sessKey, tags);
-            onFail = new SendTimeoutJob(getContext());
+            TagSetHandle tsh = null;
+            if ( (sessKey != null) && (tags != null) && (tags.size() > 0) ) {
+                if (_leaseSet != null) {
+                    SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash());
+                    if (skm != null)
+                        tsh = skm.tagsDelivered(_leaseSet.getEncryptionKey(), sessKey, tags);
+                }
+            }
+            onReply = new SendSuccessJob(getContext(), sessKey, tsh);
+            onFail = new SendTimeoutJob(getContext(), sessKey, tsh);
             selector = new ReplySelector(token);
         }
         
@@ -550,9 +562,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
                            + _lease.getGateway().toBase64());
 
             DispatchJob dispatchJob = new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now()));
-            if (false) // dispatch may take 100+ms, so toss it in its own job
-                getContext().jobQueue().addJob(dispatchJob);
-            else
+            //if (false) // dispatch may take 100+ms, so toss it in its own job
+            //    getContext().jobQueue().addJob(dispatchJob);
+            //else
                 dispatchJob.runJob();
         } else {
             if (_log.shouldLog(Log.WARN))
@@ -848,6 +860,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
     
     /** build the payload clove that will be used for all of the messages, placing the clove in the status structure */
     private boolean buildClove() {
+// FIXME set SKM
         PayloadGarlicConfig clove = new PayloadGarlicConfig();
         
         DeliveryInstructions instructions = new DeliveryInstructions();
@@ -932,14 +945,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
      */
     private class SendSuccessJob extends JobImpl implements ReplyJob {
         private SessionKey _key;
-        private Set _tags;
+        private TagSetHandle _tags;
         
         /**
          * Create a new success job that will be fired when the message encrypted with
          * the given session key and bearing the specified tags are confirmed delivered.
          *
          */
-        public SendSuccessJob(RouterContext enclosingContext, SessionKey key, Set tags) {
+        public SendSuccessJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) {
             super(enclosingContext);
             _key = key;
             _tags = tags;
@@ -955,10 +968,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
                            + ": SUCCESS!  msg " + _clientMessageId
                            + " sent after " + sendTime + "ms");
             
-            if ( (_key != null) && (_tags != null) && (_tags.size() > 0) ) {
-                if (_leaseSet != null)
-                    getContext().sessionKeyManager().tagsDelivered(_leaseSet.getEncryptionKey(),
-                                                                   _key, _tags);
+            if (_key != null && _tags != null && _leaseSet != null) {
+                SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash());
+                if (skm != null)
+                    skm.tagsAcked(_leaseSet.getEncryptionKey(), _key, _tags);
             }
             
             long dataMsgId = _cloveId;
@@ -994,8 +1007,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
      *
      */
     private class SendTimeoutJob extends JobImpl {
-        public SendTimeoutJob(RouterContext enclosingContext) {
+        private SessionKey _key;
+        private TagSetHandle _tags;
+
+        public SendTimeoutJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) {
             super(enclosingContext);
+            _key = key;
+            _tags = tags;
         }
         
         public String getName() { return "Send client message timed out"; }
@@ -1005,6 +1023,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
                            + ": Soft timeout through the lease " + _lease);
             
             _lease.setNumFailure(_lease.getNumFailure()+1);
+            if (_key != null && _tags != null && _leaseSet != null) {
+                SessionKeyManager skm = getContext().clientManager().getClientSessionKeyManager(_from.calculateHash());
+                if (skm != null)
+                    skm.failTags(_leaseSet.getEncryptionKey(), _key, _tags);
+            }
             dieFatal();
         }
     }
-- 
GitLab