From 370f96acfbbdc46dd95b55a25656a38141eb3e60 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Sat, 30 May 2020 16:24:25 +0000
Subject: [PATCH] Streaming: Increase MTU for ratchet (proposal 155) Set MTU in
 receiveConnection() for inbound connections Cleanup CDR.isAckOnly() Only call
 now() in MOS for debugging Set receive.streamActive stat to 1, not 0

---
 .../i2p/client/streaming/impl/Connection.java |  3 +-
 .../impl/ConnectionDataReceiver.java          | 13 +++--
 .../streaming/impl/ConnectionManager.java     | 54 +++++++++++++------
 .../streaming/impl/ConnectionOptions.java     | 27 +++++++++-
 .../impl/ConnectionPacketHandler.java         | 36 ++++++++++---
 .../streaming/impl/I2PSocketManagerFull.java  | 22 ++++++++
 .../streaming/impl/MessageOutputStream.java   | 42 ++++++++++-----
 history.txt                                   |  6 +++
 .../src/net/i2p/router/RouterVersion.java     |  2 +-
 9 files changed, 160 insertions(+), 45 deletions(-)

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index 86c3c96c72..898e2bbbdb 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -144,7 +144,8 @@ class Connection {
         _inputStream = new MessageInputStream(_context, _options.getMaxMessageSize(),
                                               _options.getMaxWindowSize(), _options.getInboundBufferSize());
         // FIXME pass through a passive flush delay setting as the 4th arg
-        _outputStream = new MessageOutputStream(_context, timer, _receiver, _options.getMaxMessageSize());
+        _outputStream = new MessageOutputStream(_context, timer, _receiver,
+                                                _options.getMaxMessageSize(), _options.getMaxInitialMessageSize());
         _timer = timer;
         _outboundPackets = new TreeMap<Long, PacketLocal>();
         if (opts != null) {
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java
index 4450bb8c1f..7e1d8a2132 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java
@@ -146,12 +146,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
         return packet;
     }
     
-    private static boolean isAckOnly(Connection con, int size) {
-        boolean ackOnly = ( (size <= 0) && // no data
-                            (con.getLastSendId() >= 0) && // not a SYN
-                            ( (!con.getOutputStream().getClosed()) || // not a CLOSE
-                            (con.getOutputStream().getClosed() && 
-                             con.getCloseSentOn() > 0) )); // or it is a dup CLOSE
+    private boolean isAckOnly(int size) {
+        boolean ackOnly = size <= 0 && // no data
+                          _connection.getLastSendId() >= 0 && // not a SYN
+                          (!_connection.getOutputStream().getClosed() || // not a CLOSE
+                           _connection.getCloseSentOn() > 0); // or it is a dup CLOSE
         return ackOnly;
     }
     
@@ -170,7 +169,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
      */
     private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) {
         if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")");
-        boolean ackOnly = isAckOnly(_connection, size);
+        boolean ackOnly = isAckOnly(size);
         boolean isFirst = (_connection.getAckedPackets() <= 0) && (_connection.getUnackedPacketsSent() <= 0);
         
         PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
index 57fcbd43f1..11d63f5463 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
@@ -230,23 +230,13 @@ class ConnectionManager {
      * Create a new connection based on the SYN packet we received.
      *
      * @param synPacket SYN packet to process
-     * @return created Connection with the packet's data already delivered to
-     *         it, or null if the syn's streamId was already taken
+     * @return created Connection with the packet's data already delivered to it,
+     *         or null if the syn's streamId was already taken,
+     *         or if the connection was rejected
      */
     public Connection receiveConnection(Packet synPacket) {
-        ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
-        opts.setPort(synPacket.getRemotePort());
-        opts.setLocalPort(synPacket.getLocalPort());
         boolean reject = false;
-        int active = 0;
-        int total = 0;
-
-            // just for the stat
-            //total = _connectionByInboundId.size();
-            //for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
-            //    if ( ((Connection)iter.next()).getIsConnected() )
-            //        active++;
-            //}
+
             if (locked_tooManyStreams()) {
                 if ((!_defaultOptions.getDisableRejectLogging()) || _log.shouldLog(Log.WARN))
                     _log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of " 
@@ -263,7 +253,7 @@ class ConnectionManager {
                 }
             }
         
-        _context.statManager().addRateData("stream.receiveActive", active, total);
+        _context.statManager().addRateData("stream.receiveActive", 1);
         
         if (reject) {
             Destination from = synPacket.getOptionalFrom();
@@ -331,6 +321,40 @@ class ConnectionManager {
             return null;
         }
         
+        ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
+        opts.setPort(synPacket.getRemotePort());
+        opts.setLocalPort(synPacket.getLocalPort());
+
+        // set up the MTU for the connection
+        int size;
+        if (synPacket.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
+            size = synPacket.getOptionalMaxSize();
+            if (size < ConnectionOptions.MIN_MESSAGE_SIZE) {
+                // log.error? connection reset?
+                size = ConnectionOptions.MIN_MESSAGE_SIZE;
+            }
+        } else {
+            // specs not clear if MTU may be omitted from SYN
+            size = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE;
+        }
+        int mtu = opts.getMaxMessageSize();
+        if (size < mtu) {
+            if (_log.shouldInfo())
+                _log.info("Reducing MTU for IB conn to " + size 
+                          + " from " + mtu);
+            opts.setMaxMessageSize(size);
+            opts.setMaxInitialMessageSize(size);
+        } else if (size > opts.getMaxInitialMessageSize()) {
+            if (size > mtu)
+                size = mtu;
+            if (_log.shouldInfo())
+                _log.info("Increasing MTU for IB conn to " + size 
+                          + " from " + mtu);
+            if (size != mtu)
+                opts.setMaxMessageSize(size);
+            opts.setMaxInitialMessageSize(size);
+        }
+
         Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser,
                                         _timer, _outboundQueue, _conPacketHandler, opts, true);
         _tcbShare.updateOptsFromShare(con);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
index 69127d8e5b..671429e01f 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
@@ -34,6 +34,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
     private int _resendDelay;
     private int _sendAckDelay;
     private int _maxMessageSize;
+    private int _maxInitialMessageSize;
     private int _maxResends;
     private int _inactivityTimeout;
     private int _inactivityAction;
@@ -304,6 +305,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      */
     public static final int DEFAULT_MAX_MESSAGE_SIZE = 1730;
     public static final int MIN_MESSAGE_SIZE = 512;
+    /**
+     *
+     *  See analysis in proposal 144
+     *  @since 0.9.48
+     */
+    public static final int DEFAULT_MAX_MESSAGE_SIZE_RATCHET = 1812;
 
     /**
      *  Sets max buffer size, connect timeout, read timeout, and write timeout
@@ -774,7 +781,25 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      * @return Maximum message size (MTU/MRU)
      */
     public int getMaxMessageSize() { return _maxMessageSize; }
-    public void setMaxMessageSize(int bytes) { _maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE); }
+    public void setMaxMessageSize(int bytes) {
+        _maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE);
+        _maxInitialMessageSize = Math.min(_maxMessageSize, DEFAULT_MAX_MESSAGE_SIZE);
+    }
+    
+    /**
+     *  What is the largest message to send in the SYN from Alice to Bob?
+     *  @return the max
+     *  @since 0.9.47
+     */
+    public int getMaxInitialMessageSize() { return _maxInitialMessageSize; }
+
+    /**
+     *  What is the largest message to send in the SYN from Alice to Bob?
+     *  @since 0.9.47
+     */
+    public void setMaxInitialMessageSize(int bytes) {
+        _maxInitialMessageSize = bytes;
+    }
     
     /**
      * What profile do we want to use for this connection?
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index 4cf909d6a2..539543de2b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -87,14 +87,38 @@ class ConnectionPacketHandler {
         }
 
         if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
-            int size = packet.getOptionalMaxSize();
-            if (size < ConnectionOptions.MIN_MESSAGE_SIZE) {
-                // log.error? connection reset?
-                size = ConnectionOptions.MIN_MESSAGE_SIZE;
+            // inbound SYN handled in ConnectionManager.receiveConnection()
+            if (!(con.isInbound() && packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))) {
+                int size = packet.getOptionalMaxSize();
+                if (size < ConnectionOptions.MIN_MESSAGE_SIZE) {
+                    // log.error? connection reset?
+                    size = ConnectionOptions.MIN_MESSAGE_SIZE;
+                }
+                int mtu = con.getOptions().getMaxMessageSize();
+                if (size < mtu) {
+                    if (_log.shouldInfo())
+                        _log.info("Reducing MTU to " + size 
+                                  + " from " + mtu);
+                    con.getOptions().setMaxMessageSize(size);
+                    con.getOutputStream().setBufferSize(size);
+                } else if (size > con.getOptions().getMaxInitialMessageSize()) {
+                    if (size > mtu)
+                        size = mtu;
+                    if (_log.shouldInfo())
+                        _log.info("Increasing MTU to " + size 
+                                  + " from " + con.getOptions().getMaxInitialMessageSize());
+                    if (size != mtu)
+                        con.getOptions().setMaxMessageSize(size);
+                    con.getOutputStream().setBufferSize(size);
+                }
             }
+        } else if (!con.isInbound() && packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
+            // SYN ACK w/o MAX_PACKET_SIZE?
+            // specs not clear if this is allowed
+            final int size = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE;
             if (size < con.getOptions().getMaxMessageSize()) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("Reducing our max message size to " + size 
+                if (_log.shouldInfo())
+                    _log.info("SYN ACK w/o MTU, Reducing MTU to " + size 
                               + " from " + con.getOptions().getMaxMessageSize());
                 con.getOptions().setMaxMessageSize(size);
                 con.getOutputStream().setBufferSize(size);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java
index ae46c4320e..8ee0652fa3 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java
@@ -32,6 +32,7 @@ import net.i2p.client.streaming.IncomingConnectionFilter;
 import net.i2p.crypto.SigAlgo;
 import net.i2p.crypto.SigType;
 import net.i2p.data.Certificate;
+import net.i2p.data.DataHelper;
 import net.i2p.data.Destination;
 import net.i2p.data.Hash;
 import net.i2p.data.PrivateKey;
@@ -202,6 +203,27 @@ public class I2PSocketManagerFull implements I2PSocketManager {
         _name = name + " " + (__managerId.incrementAndGet());
         _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
         _defaultOptions = new ConnectionOptions(opts);
+        if (opts != null && opts.getProperty(ConnectionOptions.PROP_MAX_MESSAGE_SIZE) == null) {
+            // set higher MTU for ECIES
+            String senc = opts.getProperty("i2cp.leaseSetEncType");
+            if (senc != null && !senc.equals("0")) {
+                String[] senca = DataHelper.split(senc, ",");
+                boolean has0 = false;
+                boolean has4 = false;
+                for (int i = 0; i < senca.length; i++) {
+                    if (senca[i].equals("0")) {
+                        has0 = true;
+                    } else if (senca[i].equals("4")) {
+                        has4 = true;
+                    }
+                }
+                if (has4) {
+                    _defaultOptions.setMaxMessageSize(ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE_RATCHET);
+                    if (!has0)
+                        _defaultOptions.setMaxInitialMessageSize(ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE_RATCHET);
+                }
+            }
+        }
         _connectionManager = new ConnectionManager(_context, _session, _defaultOptions, connectionFilter);
         _serverSocket = new I2PServerSocketFull(this);
         
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java
index 21183fdf32..0cd02b6e05 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java
@@ -62,16 +62,24 @@ class MessageOutputStream extends OutputStream {
 
     /** */
     public MessageOutputStream(I2PAppContext ctx, SimpleTimer2 timer,
-                               DataReceiver receiver, int bufSize) {
-        this(ctx, timer, receiver, bufSize, DEFAULT_PASSIVE_FLUSH_DELAY);
+                               DataReceiver receiver, int bufSize, int initBufSize) {
+        this(ctx, timer, receiver, bufSize, initBufSize, DEFAULT_PASSIVE_FLUSH_DELAY);
     }
 
     public MessageOutputStream(I2PAppContext ctx, SimpleTimer2 timer,
-                               DataReceiver receiver, int bufSize, int passiveFlushDelay) {
+                               DataReceiver receiver, int bufSize, int initBufSize, int passiveFlushDelay) {
         super();
+        // we only use two buffer sizes to prevent an attack
+        // where we end up with a thousand caches
+        if (bufSize < ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE) {
+            bufSize = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE;
+        } else if (bufSize > ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE &&
+                   bufSize < ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE_RATCHET) {
+            bufSize = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE_RATCHET;
+        }
         _dataCache = ByteCache.getInstance(128, bufSize);
         _originalBufferSize = bufSize;
-        _currentBufferSize = bufSize;
+        _currentBufferSize = initBufSize;
         _context = ctx;
         _log = ctx.logManager().getLog(MessageOutputStream.class);
         _buf = _dataCache.acquire().getData(); // new byte[bufSize];
@@ -79,7 +87,6 @@ class MessageOutputStream extends OutputStream {
         _dataLock = new Object();
         _writeTimeout = -1;
         _passiveFlushDelay = passiveFlushDelay;
-        _nextBufferSize = 0;
         //_sendPeriodBeginTime = ctx.clock().now();
         //_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
         _flusher = new Flusher(timer);
@@ -119,7 +126,7 @@ class MessageOutputStream extends OutputStream {
             _log.debug("write(b[], " + off + ", " + len + ") ");
         int cur = off;
         int remaining = len;
-        long begin = _context.clock().now();
+        long begin = _log.shouldDebug() ? _context.clock().now() : 0;
         while (remaining > 0) {
             WriteStatus ws = null;
             if (_closed.get()) throw new IOException("Output stream closed");
@@ -189,9 +196,11 @@ class MessageOutputStream extends OutputStream {
                     _log.debug("Queued " + len + " without sending to the receiver");
             }
         }
-        long elapsed = _context.clock().now() - begin;
-        if ( (elapsed > 10*1000) && (_log.shouldLog(Log.INFO)) )
-            _log.info("took " + elapsed + "ms to write to the stream?", new Exception("foo"));
+        if (_log.shouldDebug()) {
+            long elapsed = _context.clock().now() - begin;
+            if (elapsed > 10*1000)
+                _log.info("took " + elapsed + "ms to write to the stream?", new Exception("foo"));
+        }
         throwAnyError();
         //updateBps(len);
     }
@@ -244,9 +253,11 @@ class MessageOutputStream extends OutputStream {
      */
     private class Flusher extends SimpleTimer2.TimedEvent {
         private boolean _enqueued;
+
         public Flusher(SimpleTimer2 timer) { 
             super(timer);
         }
+
         public void enqueue() {
             // no need to be overly worried about duplicates - it would just 
             // push it further out
@@ -258,12 +269,13 @@ class MessageOutputStream extends OutputStream {
                 forceReschedule(_passiveFlushDelay);
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("Enqueueing the flusher for " + _passiveFlushDelay + "ms out");
+                _enqueued = true;
             } else {
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("NOT enqueing the flusher");
             }
-            _enqueued = true;
         }
+
         public void timeReached() {
             if (_closed.get())
                 return;
@@ -334,7 +346,7 @@ class MessageOutputStream extends OutputStream {
      *  @@since 0.8.1
      */
     private void flush(boolean wait_for_accept_only) throws IOException {
-        long begin = _context.clock().now();
+        long begin = _log.shouldDebug() ? _context.clock().now() : 0;
         WriteStatus ws = null;
         if (_log.shouldLog(Log.INFO) && _valid > 0)
             _log.info("flush() valid = " + _valid);
@@ -388,9 +400,11 @@ class MessageOutputStream extends OutputStream {
         else if (ws.writeFailed())
             throw new IOException("Write failed");
         
-        long elapsed = _context.clock().now() - begin;
-        if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
-            _log.debug("took " + elapsed + "ms to flush the stream?\n" + ws, new Exception("bar"));
+        if (_log.shouldDebug()) {
+            long elapsed = _context.clock().now() - begin;
+            if (elapsed > 10*1000)
+                _log.debug("took " + elapsed + "ms to flush the stream?\n" + ws, new Exception("bar"));
+        }
         throwAnyError();
     }
     
diff --git a/history.txt b/history.txt
index 226c50f434..decb47d5d9 100644
--- a/history.txt
+++ b/history.txt
@@ -1,3 +1,9 @@
+2020-05-30 zzz
+ * Streaming: Increase MTU for ratchet (proposal 155)
+
+2020-05-28 zzz
+ * Console: RRD4J 3.6 (ticket #2716)
+
 2020-05-27 zzz
  * Installer:
    - Require Java 8 (ticket #2511)
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index 282c18b422..0725033fa6 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -18,7 +18,7 @@ public class RouterVersion {
     /** deprecated */
     public final static String ID = "Monotone";
     public final static String VERSION = CoreVersion.VERSION;
-    public final static long BUILD = 2;
+    public final static long BUILD = 3;
 
     /** for example "-test" */
     public final static String EXTRA = "";
-- 
GitLab