From 3b2aa946afabd17e1d86529e32ebd40ecd746179 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Sat, 2 Oct 2010 16:56:02 +0000
Subject: [PATCH]     * I2CP:       - Add new option
 i2cp.messageReliability=none, which prevents the         router from sending
 MessageStatusMessages back in reply to an         outbound
 SendMessageMessage. Since the streaming lib always ignored         the MSMs
 anyway, make it the default for streaming.         This will reduce the I2CP
 traffic significantly.         MSM handling now avoided, but it is still
 fairly broken, see         comments in I2PSessionImpl2.       - Cleanups,
 javadoc, rate reduction

---
 .../streaming/I2PSocketManagerFactory.java    | 15 ++--
 core/java/src/net/i2p/client/I2PClient.java   |  4 +-
 .../src/net/i2p/client/I2PSessionImpl2.java   | 79 ++++++++++++++-----
 .../router/client/ClientConnectionRunner.java | 32 ++++++--
 4 files changed, 99 insertions(+), 31 deletions(-)

diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
index 393cf1ae21..34dc8ac593 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
@@ -130,16 +130,19 @@ public class I2PSocketManagerFactory {
             if (!opts.containsKey(name))
                 opts.setProperty(name, System.getProperty(name));
         }
-        boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER));
-        if (oldLib && false) {
+        //boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER));
+        //if (oldLib && false) {
             // for the old streaming lib
-            opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
+        //    opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
             //opts.setProperty("tunnels.depthInbound", "0");
-        } else {
+        //} else {
             // for new streaming lib:
-            opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
+            //opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
+            // as of 0.8.1 (I2CP default is BestEffort)
+            if (!opts.containsKey(I2PClient.PROP_RELIABILITY))
+                opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_NONE);
             //p.setProperty("tunnels.depthInbound", "0");
-        }
+        //}
 
         if (i2cpHost != null)
             opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
diff --git a/core/java/src/net/i2p/client/I2PClient.java b/core/java/src/net/i2p/client/I2PClient.java
index 9a732c3dba..941549d537 100644
--- a/core/java/src/net/i2p/client/I2PClient.java
+++ b/core/java/src/net/i2p/client/I2PClient.java
@@ -34,6 +34,8 @@ public interface I2PClient {
     public final static String PROP_RELIABILITY_BEST_EFFORT = "BestEffort";
     /** Reliability value: guaranteed */
     public final static String PROP_RELIABILITY_GUARANTEED = "Guaranteed";
+    /** @since 0.8.1 */
+    public final static String PROP_RELIABILITY_NONE = "none";
 
     /** protocol flag that must be sent when opening the i2cp connection to the router */
     public final static int PROTOCOL_BYTE = 0x2A;
@@ -64,4 +66,4 @@ public interface I2PClient {
      * @return newly created destination
      */
     public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException;
-}
\ No newline at end of file
+}
diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java
index d8d8c8dfce..62b0d3d08b 100644
--- a/core/java/src/net/i2p/client/I2PSessionImpl2.java
+++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java
@@ -33,12 +33,14 @@ import net.i2p.util.Log;
 class I2PSessionImpl2 extends I2PSessionImpl {
 
     /** set of MessageState objects, representing all of the messages in the process of being sent */
-    private /* FIXME final FIXME */ Set _sendingStates;
+    private /* FIXME final FIXME */ Set<MessageState> _sendingStates;
     /** max # seconds to wait for confirmation of the message send */
     private final static long SEND_TIMEOUT = 60 * 1000; // 60 seconds to send 
     /** should we gzip each payload prior to sending it? */
     private final static boolean SHOULD_COMPRESS = true;
     private final static boolean SHOULD_DECOMPRESS = true;
+    /** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */
+    private boolean _noEffort;
 
     /** for extension */
     public I2PSessionImpl2() {}
@@ -53,6 +55,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
         super(ctx, destKeyStream, options);
         _log = ctx.logManager().getLog(I2PSessionImpl2.class);
         _sendingStates = new HashSet(32);
+        // default is BestEffort
+        _noEffort = "none".equalsIgnoreCase(options.getProperty(I2PClient.PROP_RELIABILITY));
 
         ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } );
         //ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
@@ -60,15 +64,16 @@ class I2PSessionImpl2 extends I2PSessionImpl {
         //ctx.statManager().createRateStat("i2cp.sendBestEffortStage2", "third part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
         //ctx.statManager().createRateStat("i2cp.sendBestEffortStage3", "fourth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
         //ctx.statManager().createRateStat("i2cp.sendBestEffortStage4", "fifth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
-        _context.statManager().createRateStat("i2cp.receiveStatusTime.0", "How long it took to get status=0 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
-        _context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
-        _context.statManager().createRateStat("i2cp.receiveStatusTime.2", "How long it took to get status=2 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
-        _context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
-        _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
-        _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
-        _context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 60*1000, 10*60*1000 });
-        _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 60*1000, 30*60*1000 });
-        _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 60*1000, 30*60*1000 });
+        //_context.statManager().createRateStat("i2cp.receiveStatusTime.0", "How long it took to get status=0 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
+        _context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 10*60*1000 });
+        // best effort codes unused
+        //_context.statManager().createRateStat("i2cp.receiveStatusTime.2", "How long it took to get status=2 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
+        //_context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
+        _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 });
+        _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 });
+        _context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 });
+        _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 });
+        _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 });
     }
 
     protected long getTimeout() {
@@ -186,7 +191,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
         }
         _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0);
         _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
-        return sendBestEffort(dest, payload, keyUsed, tagsSent, expires);
+        if (_noEffort)
+            return sendNoEffort(dest, payload, expires);
+        else
+            return sendBestEffort(dest, payload, keyUsed, tagsSent, expires);
     }
 
     /**
@@ -213,6 +221,9 @@ class I2PSessionImpl2 extends I2PSessionImpl {
     private static final int NUM_TAGS = 50;
 
     /**
+     * TODO - Don't need to save MessageState since actuallyWait is false...
+     * But for now just use sendNoEffort() instead.
+     *
      * @param keyUsed unused - no end-to-end crypto
      * @param tagsSent unused - no end-to-end crypto
      */
@@ -257,7 +268,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
                                + "ms left, " + oldTags + " tags known and " 
                                + (tag == null ? "no tag" : " a valid tag"));
             }
-        
+
             if (false) // rekey
                 newKey = _context.keyGenerator().generateSessionKey();
         
@@ -371,6 +382,37 @@ class I2PSessionImpl2 extends I2PSessionImpl {
         return found;
     }
     
+    /**
+     * Same as sendBestEffort(), except we do not expect any MessageStatusMessage responses -
+     * not for accepted, or success, or failure.
+     * So we don't create a MessageState and save it on the _sendingStates HashSet
+     *
+     * @return true always
+     * @since 0.8.1
+     */
+    protected boolean sendNoEffort(Destination dest, byte payload[], long expires)
+                    throws I2PSessionException {
+        // nonce always 0
+        _producer.sendMessage(this, dest, 0, payload, null, null, null, null, expires);
+        return true;
+    }
+    
+    /**
+     *  Only call this with nonzero status, i.e. for outbound messages
+     *  whose MessageState may be queued on _sendingStates.
+     *
+     *  Even when using sendBestEffort(), this is a waste, because the
+     *  MessageState is removed from _sendingStates immediately and
+     *  so the lookup here fails.
+     *  And iterating through the HashSet instead of having a map
+     *  is bad too.
+     *
+     *  This is now pretty much avoided since streaming now sets
+     *  i2cp.messageReliability = none, which forces sendNoEffort() instead of sendBestEffort(),
+     *  so the router won't send us any MSM's for outbound traffic.
+     *
+     *  @param status != 0
+     */
     @Override
     public void receiveStatus(int msgId, long nonce, int status) {
         if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
@@ -413,12 +455,13 @@ class I2PSessionImpl2 extends I2PSessionImpl {
                 case 1:
                     _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime, 0);
                     break;
-                case 2:
-                    _context.statManager().addRateData("i2cp.receiveStatusTime.2", lifetime, 0);
-                    break;
-                case 3:
-                    _context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0);
-                    break;
+                // best effort codes unused
+                //case 2:
+                //    _context.statManager().addRateData("i2cp.receiveStatusTime.2", lifetime, 0);
+                //    break;
+                //case 3:
+                //    _context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0);
+                //    break;
                 case 4:
                     _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime, 0);
                     break;
diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
index e5aa1b5abb..f363bd2781 100644
--- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
+++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
@@ -17,6 +17,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import net.i2p.client.I2PClient;
 import net.i2p.crypto.SessionKeyManager;
 import net.i2p.crypto.TransientSessionKeyManager;
 import net.i2p.data.Destination;
@@ -76,11 +77,13 @@ public class ClientConnectionRunner {
      * This contains the last 10 MessageIds that have had their (non-ack) status 
      * delivered to the client (so that we can be sure only to update when necessary)
      */
-    private final List _alreadyProcessed;
+    private final List<MessageId> _alreadyProcessed;
     private ClientWriterRunner _writer;
     private Hash _destHashCache;
     /** are we, uh, dead */
     private boolean _dead;
+    /** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */
+    private boolean _dontSendMSM;
     
     /**
      * Create a new runner against the given socket
@@ -91,11 +94,9 @@ public class ClientConnectionRunner {
         _log = _context.logManager().getLog(ClientConnectionRunner.class);
         _manager = manager;
         _socket = socket;
-        _config = null;
         _messages = new ConcurrentHashMap();
         _alreadyProcessed = new ArrayList();
         _acceptedPending = new ConcurrentHashSet();
-        _dead = false;
     }
     
     private static volatile int __id = 0;
@@ -189,6 +190,9 @@ public class ClientConnectionRunner {
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64());
         _config = config;
+        // This is the only option that is interpreted here, not at the tunnel manager
+        if (config.getOptions() != null)
+            _dontSendMSM = "none".equalsIgnoreCase(config.getOptions().getProperty(I2PClient.PROP_RELIABILITY));
         // per-destination session key manager to prevent rather easy correlation
         if (_sessionKeyManager == null)
             _sessionKeyManager = new TransientSessionKeyManager(_context);
@@ -197,10 +201,18 @@ public class ClientConnectionRunner {
         _manager.destinationEstablished(this);
     }
     
+    /** 
+     * Send a notification to the client that their message (id specified) was
+     * delivered (or failed delivery)
+     * Note that this sends the Guaranteed status codes, even though we only support best effort.
+     * Doesn't do anything if i2cp.messageReliability = "none"
+     */
     void updateMessageDeliveryStatus(MessageId id, boolean delivered) {
-        if (_dead) return;
+        if (_dead || _dontSendMSM)
+            return;
         _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, delivered));
     }
+
     /** 
      * called after a new leaseSet is granted by the client, the NetworkDb has been
      * updated.  This takes care of all the LeaseRequestState stuff (including firing any jobs)
@@ -254,7 +266,8 @@ public class ClientConnectionRunner {
         long expiration = 0;
         if (message instanceof SendMessageExpiresMessage)
             expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime();
-        _acceptedPending.add(id);
+        if (!_dontSendMSM)
+            _acceptedPending.add(id);
 
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size [" 
@@ -276,9 +289,11 @@ public class ClientConnectionRunner {
     /** 
      * Send a notification to the client that their message (id specified) was accepted 
      * for delivery (but not necessarily delivered)
-     *
+     * Doesn't do anything if i2cp.messageReliability = "none"
      */
     void ackSendMessage(MessageId id, long nonce) {
+        if (_dontSendMSM)
+            return;
         SessionId sid = _sessionId;
         if (sid == null) return;
         if (_log.shouldLog(Log.DEBUG))
@@ -517,12 +532,17 @@ public class ClientConnectionRunner {
         }
 
         public String getName() { return "Update Delivery Status"; }
+
+        /**
+         * Note that this sends the Guaranteed status codes, even though we only support best effort.
+         */
         public void runJob() {
             if (_dead) return;
 
             MessageStatusMessage msg = new MessageStatusMessage();
             msg.setMessageId(_messageId.getMessageId());
             msg.setSessionId(_sessionId.getSessionId());
+            // has to be >= 0, it is initialized to -1
             msg.setNonce(2);
             msg.setSize(0);
             if (_success) 
-- 
GitLab