From 798bdf32c112a7059f05206aff684b60747b43a4 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Mon, 11 Oct 2010 15:17:35 +0000
Subject: [PATCH]     * Streaming:       - Make flush() block less, by waiting
 only for "accept" into the         streaming queue rather than "completion"
 (i.e. ACK from the far end).         This prevents complete window stalls
 when flushing, and should help performance         of apps that use flush(),
 like i2psnark (and SAM?).         close() still does a flush that waits for
 completion, as i2ptunnel         doesn't like a fast return from close().    
   - flush/close javadocs and comments     * i2ptunnel:       - Now that
 streaming flush() is fixed, use it in IRCClient, and         for initial data
 in I2PTunnelRunner, to avoid the 250 ms         passive flush delay

---
 .../net/i2p/i2ptunnel/I2PTunnelIRCClient.java |  7 +-
 .../net/i2p/i2ptunnel/I2PTunnelRunner.java    |  9 +-
 .../net/i2p/client/streaming/Connection.java  | 11 ++-
 .../client/streaming/MessageOutputStream.java | 89 ++++++++++++++++---
 .../net/i2p/client/streaming/PacketLocal.java |  4 +-
 5 files changed, 100 insertions(+), 20 deletions(-)

diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java
index 4d8b85a817..d0dc227ec1 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java
@@ -9,7 +9,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.StringTokenizer;
 
-import net.i2p.I2PAppContext;
 import net.i2p.client.streaming.I2PSocket;
 import net.i2p.data.DataFormatException;
 import net.i2p.data.Destination;
@@ -124,7 +123,7 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable
         }
         if (size == 1) // skip the rand in the most common case
             return dests.get(0);
-        int index = I2PAppContext.getGlobalContext().random().nextInt(size);
+        int index = _context.random().nextInt(size);
         return dests.get(index);
     }
 
@@ -182,6 +181,8 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable
                             }
                             outmsg=outmsg+"\r\n";   // rfc1459 sec. 2.3
                             output.write(outmsg.getBytes("ISO-8859-1"));
+                            // probably doesn't do much but can't hurt
+                            output.flush();
                         } else {
                             if (_log.shouldLog(Log.WARN))
                                 _log.warn("inbound BLOCKED: "+inmsg);
@@ -257,6 +258,8 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable
                                 }
                                 outmsg=outmsg+"\r\n";   // rfc1459 sec. 2.3
                                 output.write(outmsg.getBytes("ISO-8859-1"));
+                                // save 250 ms in streaming
+                                output.flush();
                             } else {
                                 if (_log.shouldLog(Log.WARN))
                                     _log.warn("outbound BLOCKED: "+"\""+inmsg+"\"");
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
index c2a014020d..0e1c9049f8 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
@@ -129,7 +129,14 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
                     // do NOT flush here, it will block and then onTimeout.run() won't happen on fail.
                     // But if we don't flush, then we have to wait for the connectDelay timer to fire
                     // in i2p socket? To be researched and/or fixed.
-                    //i2pout.flush();
+                    //
+                    // AS OF 0.8.1, MessageOutputStream.flush() is fixed to only wait for accept,
+                    // not for "completion" (i.e. an ACK from the far end).
+                    // So we now get a fast return from flush(), and can do it here to save 250 ms.
+                    // To make sure we are under the initial window size and don't hang waiting for accept,
+                    // only flush if it fits in one message.
+                    if (initialI2PData.length <= 1730)   // ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE
+                        i2pout.flush();
                 }
             }
             if (initialSocketData != null) {
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index bc8c923699..e662ecc60a 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -148,16 +148,21 @@ public class Connection {
     }
     
     /**
+     * This doesn't "send a choke". Rather, it blocks if the outbound window is full,
+     * thus choking the sender that calls this.
+     *
      * Block until there is an open outbound packet slot or the write timeout 
      * expires.  
+     * PacketLocal is the only caller, generally with -1.
      *
-     * @param timeoutMs PacketLocal is the only caller, often with -1??????
-     * @return true if the packet should be sent
+     * @param timeoutMs 0 or negative means wait forever, 5 minutes max
+     * @return true if the packet should be sent, false for a fatal error
+     *         will return false after 5 minutes even if timeoutMs is <= 0.
      */
     boolean packetSendChoke(long timeoutMs) {
         // if (false) return true; // <--- what the fuck??
         long start = _context.clock().now();
-        long writeExpire = start + timeoutMs;
+        long writeExpire = start + timeoutMs;  // only used if timeoutMs > 0
         boolean started = false;
         while (true) {
             long timeLeft = writeExpire - _context.clock().now();
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
index 1a7f0afbe8..fd10d679ce 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
@@ -43,6 +43,10 @@ public class MessageOutputStream extends OutputStream {
     private long _sendPeriodBytes;
     private int _sendBps;
     
+    /**
+     *  Since this is less than i2ptunnel's i2p.streaming.connectDelay default of 1000,
+     *  we only wait 250 at the start. Guess that's ok, 1000 is too long anyway.
+     */
     private static final int DEFAULT_PASSIVE_FLUSH_DELAY = 250;
 
     public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
@@ -273,8 +277,18 @@ public class MessageOutputStream extends OutputStream {
     }
     
     /** 
-     * Flush the data already queued up, blocking until it has been
-     * delivered.
+     * Flush the data already queued up, blocking only if the outbound
+     * window is full.
+     *
+     * Prior to 0.8.1, this blocked until "delivered".
+     * "Delivered" meant "received an ACK from the far end",
+     * which is not the commom implementation of flush(), and really hurt the
+     * performance of i2psnark, which flush()ed frequently.
+     * Calling flush() would cause a complete window stall.
+     *
+     * As of 0.8.1, only wait for accept into the streaming output queue.
+     * This will speed up snark significantly, and allow us to flush()
+     * the initial data in I2PTunnelRunner, saving 250 ms.
      *
      * @throws IOException if the write fails
      */
@@ -283,6 +297,14 @@ public class MessageOutputStream extends OutputStream {
      /* @throws InterruptedIOException if the write times out
       * Documented here, but doesn't belong in the javadoc. 
       */
+        flush(true);
+    }
+
+    /**
+     *  @param wait_for_accept_only see discussion in close() code
+     *  @@since 0.8.1
+     */
+    private void flush(boolean wait_for_accept_only) throws IOException {
         long begin = _context.clock().now();
         WriteStatus ws = null;
         if (_log.shouldLog(Log.INFO) && _valid > 0)
@@ -297,14 +319,28 @@ public class MessageOutputStream extends OutputStream {
                 throwAnyError();
                 return;
             }
-            ws = _dataReceiver.writeData(_buf, 0, _valid);
-            _written += _valid;
-            _valid = 0;
-            locked_updateBufferSize();
-            _lastFlushed = _context.clock().now();
-            _dataLock.notifyAll();
+            // if valid == 0 return ??? - no, this could flush a CLOSE packet too.
+
+            // Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below
+            // (disabled)
+            if (!wait_for_accept_only) {
+                ws = _dataReceiver.writeData(_buf, 0, _valid);
+                _written += _valid;
+                _valid = 0;
+                locked_updateBufferSize();
+                _lastFlushed = _context.clock().now();
+                _dataLock.notifyAll();
+            }
         }
         
+        // Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1
+        // must do this outside the data lock
+        if (wait_for_accept_only) {
+            flushAvailable(_dataReceiver, true);
+            return;
+        }
+
+        // Wait a loooooong time, until we have the ACK
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws);
         if (_closed && 
@@ -328,14 +364,28 @@ public class MessageOutputStream extends OutputStream {
         throwAnyError();
     }
     
+    /**
+     *  This does a flush, and BLOCKS until
+     *  the CLOSE packet is acked.
+     */
     @Override
     public void close() throws IOException {
         if (_closed) {
             synchronized (_dataLock) { _dataLock.notifyAll(); }
             return;
         }
+        // setting _closed before flush() will force flush() to send a CLOSE packet
         _closed = true;
-        flush();
+
+        // In 0.8.1 we rewrote flush() to only wait for accept into the window,
+        // not "completion" (i.e. ack from the far end).
+        // Unfortunately, that broke close(), at least in i2ptunnel HTTPClient.
+        // Symptom was premature close, i.e. incomplete pages and images.
+        // Possible cause - I2PTunnelRunner code? or the code here that follows flush()?
+        // It seems like we shouldn't have to wait for the far-end ACK for a close packet,
+        // should we? To be researched further.
+        // false -> wait for completion, not just accept.
+        flush(false);
         _log.debug("Output stream closed after writing " + _written);
         ByteArray ba = null;
         synchronized (_dataLock) {
@@ -351,7 +401,11 @@ public class MessageOutputStream extends OutputStream {
             _dataCache.release(ba);
         }
     }
-    /** nonblocking close */
+
+    /**
+     *  nonblocking close -
+     *  Use outside of this package is deprecated, should be made package local
+     */
     public void closeInternal() {
         _closed = true;
         if (_streamError == null)
@@ -412,6 +466,8 @@ public class MessageOutputStream extends OutputStream {
         if (_log.shouldLog(Log.INFO) && _valid > 0)
             _log.info("flushAvailable() valid = " + _valid);
         synchronized (_dataLock) {
+            // if valid == 0 return ??? - no, this could flush a CLOSE packet too.
+
             // _buf may be null, but the data receiver can handle that just fine,
             // deciding whether or not to send a packet
             ws = target.writeData(_buf, 0, _valid);
@@ -457,14 +513,21 @@ public class MessageOutputStream extends OutputStream {
     
     /** Define a way to detect the status of a write */
     public interface WriteStatus {
-        /** wait until the data written either fails or succeeds */
+        /**
+         * Wait until the data written either fails or succeeds.
+         * Success means an ACK FROM THE FAR END.
+         * @param maxWaitMs -1 = forever
+         */
         public void waitForCompletion(int maxWaitMs);
+
         /** 
-         * wait until the data written is accepted into the outbound pool,
+         * Wait until the data written is accepted into the outbound pool,
+         * (i.e. the outbound window is not full)
          * which we throttle rather than accept arbitrary data and queue 
-         * @param maxWaitMs -1 = forever ?
+         * @param maxWaitMs -1 = forever
          */
         public void waitForAccept(int maxWaitMs);
+
         /** the write was accepted.  aka did the socket not close? */
         public boolean writeAccepted();
         /** did the write fail?  */
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java
index 674ff6179c..cbe913e050 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java
@@ -194,7 +194,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
     }
     
     /**
-     * @param maxWaitMs MessageOutputStream is the only caller, often with -1 ??????
+     * Blocks until outbound window is not full. See Connection.packetSendChoke().
+     * @param maxWaitMs MessageOutputStream is the only caller, generally with -1
      */
     public void waitForAccept(int maxWaitMs) {
         if (_connection == null) 
@@ -220,6 +221,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
                        + toString());
     }
     
+    /** block until the packet is acked from the far end */
     public void waitForCompletion(int maxWaitMs) {
         long expiration = _context.clock().now()+maxWaitMs;
         while (true) {
-- 
GitLab