From 516d0b4db84db4026f0f9deef98caedde71d8795 Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Tue, 30 Nov 2004 23:41:51 +0000
Subject: [PATCH] 2004-11-30  jrandom     * Render the burst rate fields on
 /config.jsp properly (thanks ugha!)     * Build in a simple timeout to flush
 data queued into the I2PSocket but       not yet flushed.     * Don't
 explicitly flush after each SAM stream write, but leave it up to       the
 [nonblocking] passive flush.     * Don't whine about 10-99 connection events
 occurring in a second     * Don't wait for completion of packets that will
 not be ACKed (duh)     * Adjust the congestion window, even if the packet was
 resent (duh)     * Make sure to wake up any blocking read()'s when the
 MessageInputStream       is close()ed (duh)     * Never wait more than the
 disconnect timeout for a write to complete

---
 .../net/i2p/i2ptunnel/I2PTunnelRunner.java    |  5 ++
 .../net/i2p/router/web/ConfigNetHelper.java   |  7 +-
 .../src/net/i2p/sam/SAMStreamSession.java     |  3 +
 .../net/i2p/client/streaming/Connection.java  |  6 +-
 .../streaming/ConnectionDataReceiver.java     |  6 +-
 .../client/streaming/ConnectionHandler.java   |  2 +
 .../streaming/ConnectionPacketHandler.java    | 24 +++----
 .../client/streaming/MessageInputStream.java  |  4 ++
 .../client/streaming/MessageOutputStream.java | 66 ++++++++++++++++++-
 .../src/net/i2p/client/I2PSessionImpl2.java   |  6 ++
 history.txt                                   | 15 ++++-
 .../src/net/i2p/router/RouterVersion.java     |  4 +-
 12 files changed, 127 insertions(+), 21 deletions(-)

diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
index 6c052ce2be..1d2ef075a2 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
@@ -110,6 +110,9 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
                     //i2pout.flush();
                 }
             }
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug("Initial data " + (initialData != null ? initialData.length : 0) 
+                           + " written, starting forwarders");
             Thread t1 = new StreamForwarder(in, i2pout, "toI2P");
             Thread t2 = new StreamForwarder(i2pin, out, "fromI2P");
             synchronized (finishLock) {
@@ -117,6 +120,8 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
                     finishLock.wait();
                 }
             }
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug("At least one forwarder completed, closing and joining");
             // now one connection is dead - kill the other as well.
             s.close();
             i2ps.close();
diff --git a/apps/routerconsole/java/src/net/i2p/router/web/ConfigNetHelper.java b/apps/routerconsole/java/src/net/i2p/router/web/ConfigNetHelper.java
index 14f67614c5..a082a6df1c 100644
--- a/apps/routerconsole/java/src/net/i2p/router/web/ConfigNetHelper.java
+++ b/apps/routerconsole/java/src/net/i2p/router/web/ConfigNetHelper.java
@@ -120,10 +120,15 @@ public class ConfigNetHelper {
     private static String getBurstFactor(int numSeconds, String name) {
         StringBuffer buf = new StringBuffer(256);
         buf.append("<select name=\"").append(name).append("\">\n");
+        boolean found = false;
         for (int i = 10; i <= 60; i += 10) {
             buf.append("<option value=\"").append(i).append("\" ");
-            if ( (i == numSeconds) || (i == 60) )
+            if (i == numSeconds) {
                 buf.append("selected ");
+                found = true;
+            } else if ( (i == 60) && (!found) ) {
+                buf.append("selected ");
+            }
             buf.append(">");
             buf.append(i).append(" seconds</option>\n");
         }
diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java
index edafc53cbb..b98f5391df 100644
--- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java
+++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java
@@ -391,6 +391,8 @@ public class SAMStreamSession {
             while (stillRunning) {
                 try {
                     i2ps = serverSocket.accept();
+                    if (i2ps == null)
+                        break;
 
                     _log.debug("New incoming connection");
 
@@ -467,6 +469,7 @@ public class SAMStreamSession {
             }
             try {
                 i2pSocketOS.write(data);
+                //i2pSocketOS.flush();
             } catch (IOException e) {
                 _log.error("Error sending data through I2P socket", e);
                 return false;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index ac9c28821c..d17465a101 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -70,7 +70,7 @@ public class Connection {
     private long _lifetimeDupMessageReceived;
     
     public static final long MAX_RESEND_DELAY = 60*1000;
-    public static final long MIN_RESEND_DELAY = 20*1000;
+    public static final long MIN_RESEND_DELAY = 40*1000;
 
     /** wait up to 5 minutes after disconnection so we can ack/close packets */
     public static int DISCONNECT_TIMEOUT = 5*60*1000;
@@ -325,8 +325,8 @@ public class Connection {
             _occurredEventCount++;
         } else {
             _occurredTime = now;
-            if (_occurredEventCount > 10) {
-                _log.log(Log.CRIT, "More than 10 events (" + _occurredEventCount + ") in a second on " 
+            if (_occurredEventCount > 100) {
+                _log.log(Log.CRIT, "More than 100 events (" + _occurredEventCount + ") in a second on " 
                                    + toString() + ": scheduler = " + sched);
             }
             _occurredEventCount = 0;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
index ba48fde77d..d8a0f76815 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
@@ -63,7 +63,11 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
 
         if (doSend) {
             PacketLocal packet = send(buf, off, size);
-            return packet;
+            //dont wait for non-acks
+            if ( (packet.getPayloadSize() > 0) || (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) )
+                return packet;
+            else
+                return _dummyStatus;
         } else {
             return _dummyStatus;
         }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
index f3ed773540..2deb203d3b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
@@ -32,6 +32,8 @@ class ConnectionHandler {
     }
     
     public void setActive(boolean active) { 
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("setActive(" + active + ") called");
         synchronized (_synQueue) {
             _active = active; 
             _synQueue.notifyAll(); // so we break from the accept()
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index 84f7d8e682..fdf5cc5791 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -157,7 +157,7 @@ public class ConnectionPacketHandler {
                            + ") for " + con);
 
             return true;
-        } else if (numResends > 0) {
+        //} else if (numResends > 0) {
             // window sizes are shrunk on resend, not on ack
         } else {
             if (acked > 0) { 
@@ -166,17 +166,19 @@ public class ConnectionPacketHandler {
                     // new packet that ack'ed uncongested data, or an empty ack
                     int newWindowSize = con.getOptions().getWindowSize();
                     
-                    if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
-                        // congestion avoidance
-                        
-                        // we can't use newWindowSize += 1/newWindowSize, since we're
-                        // integers, so lets use a random distribution instead
-                        int shouldIncrement = _context.random().nextInt(newWindowSize);
-                        if (shouldIncrement <= 0)
+                    if (numResends <= 0) {
+                        if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
+                            // congestion avoidance
+
+                            // we can't use newWindowSize += 1/newWindowSize, since we're
+                            // integers, so lets use a random distribution instead
+                            int shouldIncrement = _context.random().nextInt(newWindowSize);
+                            if (shouldIncrement <= 0)
+                                newWindowSize += 1;
+                        } else {
+                            // slow start
                             newWindowSize += 1;
-                    } else {
-                        // slow start
-                        newWindowSize += 1;
+                        }
                     }
                     
                     if (_log.shouldLog(Log.DEBUG))
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
index b87c5c0d2d..0ff68c5a58 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
@@ -272,6 +272,9 @@ public class MessageInputStream extends InputStream {
                     // at least one byte
                     
                     while (_readyDataBlocks.size() <= 0) {
+                        if (_locallyClosed)
+                            throw new IOException("Already closed, you wanker");
+                        
                         if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
                             if (_log.shouldLog(Log.INFO))
                                 _log.info("read(...," + offset + ", " + length + ")[" + i 
@@ -402,6 +405,7 @@ public class MessageInputStream extends InputStream {
                 ba.setData(null);
             }
             _locallyClosed = true;
+            _dataLock.notifyAll();
         }
     }
     
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
index 32fd5c1cb0..694c7b5bbf 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
@@ -8,6 +8,7 @@ import net.i2p.I2PAppContext;
 import net.i2p.data.ByteArray;
 import net.i2p.util.ByteCache;
 import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer;
 
 /**
  * A stream that we can shove data into that fires off those bytes
@@ -26,6 +27,11 @@ public class MessageOutputStream extends OutputStream {
     private long _written;
     private int _writeTimeout;
     private ByteCache _dataCache;
+    private Flusher _flusher;
+    private long _lastFlushed;
+    private long _lastBuffered;
+    /** if we enqueue data but don't flush it in this period, flush it passively */
+    private int _passiveFlushDelay;
     
     public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
         this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
@@ -41,6 +47,10 @@ public class MessageOutputStream extends OutputStream {
         _written = 0;
         _closed = false;
         _writeTimeout = -1;
+        _passiveFlushDelay = 5*1000;
+        _flusher = new Flusher();
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("MessageOutputStream created");
     }
     
     public void setWriteTimeout(int ms) { _writeTimeout = ms; }
@@ -51,10 +61,11 @@ public class MessageOutputStream extends OutputStream {
     }
     
     public void write(byte b[], int off, int len) throws IOException {
-        //if (_log.shouldLog(Log.DEBUG))
-        //    _log.debug("write(b[], " + off + ", " + len + ")");
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("write(b[], " + off + ", " + len + ") ");
         int cur = off;
         int remaining = len;
+        long begin = _context.clock().now();
         while (remaining > 0) {
             WriteStatus ws = null;
             // we do any waiting outside the synchronized() block because we
@@ -70,6 +81,11 @@ public class MessageOutputStream extends OutputStream {
                     cur += remaining;
                     _written += remaining;
                     remaining = 0;
+                    _lastBuffered = _context.clock().now();
+                    if (_passiveFlushDelay > 0) {
+                        // if it is already enqueued, this just pushes it further out
+                        SimpleTimer.getInstance().addEvent(_flusher, _passiveFlushDelay);
+                    }
                 } else {
                     // buffer whatever we can fit then flush,
                     // repeating until we've pushed all of the
@@ -87,9 +103,12 @@ public class MessageOutputStream extends OutputStream {
                     _written += _valid;
                     _valid = 0;                       
                     throwAnyError();
+                    _lastFlushed = _context.clock().now();
                 }
             }
             if (ws != null) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug("Waiting " + _writeTimeout + "ms for accept of " + ws);
                 // ok, we've actually added a new packet - lets wait until
                 // its accepted into the queue before moving on (so that we 
                 // dont fill our buffer instantly)
@@ -100,8 +119,14 @@ public class MessageOutputStream extends OutputStream {
                     else
                         throw new IOException("Write not accepted into the queue");
                 }
+            } else {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug("Queued " + len + " without sending to the receiver");
             }
         }
+        long elapsed = _context.clock().now() - begin;
+        if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
+            _log.debug("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo"));
         throwAnyError();
     }
     
@@ -110,6 +135,33 @@ public class MessageOutputStream extends OutputStream {
         throwAnyError();
     }
     
+    /**
+     * Flush data that has been enqued but not flushed after a certain 
+     * period of inactivity
+     */
+    private class Flusher implements SimpleTimer.TimedEvent {
+        public void timeReached() {
+            boolean sent = false;
+            WriteStatus ws = null;
+            synchronized (_dataLock) {
+                if ( (_valid > 0) && (_lastBuffered + _passiveFlushDelay > _context.clock().now()) ) {
+                    if ( (_buf != null) && (_dataReceiver != null) ) {
+                        ws = _dataReceiver.writeData(_buf, 0, _valid);
+                        _written += _valid;
+                        _valid = 0;
+                        _lastFlushed = _context.clock().now();
+                        _dataLock.notifyAll();
+                        sent = true;
+                    }
+                }
+            }
+            // ignore the ws
+            if (sent && _log.shouldLog(Log.DEBUG)) 
+                _log.debug("Passive flush of " + ws);
+        }
+        
+    }
+    
     /** 
      * Flush the data already queued up, blocking until it has been
      * delivered.
@@ -118,6 +170,7 @@ public class MessageOutputStream extends OutputStream {
      * @throws InterruptedIOException if the write times out
      */
     public void flush() throws IOException {
+        long begin = _context.clock().now();
         WriteStatus ws = null;
         synchronized (_dataLock) {
             if (_buf == null) throw new IOException("closed (buffer went away)");
@@ -128,6 +181,7 @@ public class MessageOutputStream extends OutputStream {
             ws = _dataReceiver.writeData(_buf, 0, _valid);
             _written += _valid;
             _valid = 0;
+            _lastFlushed = _context.clock().now();
             _dataLock.notifyAll();
         }
         
@@ -137,6 +191,8 @@ public class MessageOutputStream extends OutputStream {
             ( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ||
               (_writeTimeout <= 0) ) )
             ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
+        else if ( (_writeTimeout <= 0) || (_writeTimeout > Connection.DISCONNECT_TIMEOUT) )
+            ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
         else
             ws.waitForCompletion(_writeTimeout);
         if (_log.shouldLog(Log.DEBUG))
@@ -145,6 +201,10 @@ public class MessageOutputStream extends OutputStream {
             throw new InterruptedIOException("Timed out during write");
         else if (ws.writeFailed())
             throw new IOException("Write failed");
+        
+        long elapsed = _context.clock().now() - begin;
+        if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) )
+            _log.debug("wtf, took " + elapsed + "ms to flush the stream?\n" + ws, new Exception("bar"));
         throwAnyError();
     }
     
@@ -182,6 +242,7 @@ public class MessageOutputStream extends OutputStream {
                 _buf = null;
                 _valid = 0;
             }
+            _lastFlushed = _context.clock().now();
             _dataLock.notifyAll();
         }
         if (ba != null) {
@@ -221,6 +282,7 @@ public class MessageOutputStream extends OutputStream {
             _written += _valid;
             _valid = 0;
             _dataLock.notifyAll();
+            _lastFlushed = _context.clock().now();
         }
         if (blocking && ws != null) {
             ws.waitForAccept(_writeTimeout);
diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java
index 6bfe73fdca..4c7466352c 100644
--- a/core/java/src/net/i2p/client/I2PSessionImpl2.java
+++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java
@@ -194,6 +194,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
         if (_log.shouldLog(Log.DEBUG))
             _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
                        + " / " + state.getNonce() + " found = " + found);
+        
+        long timeToSend = afterRemovingSync - beforeSendingSync;
+        if ( (timeToSend > 10*1000) && (_log.shouldLog(Log.WARN)) ) {
+            _log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
+        }
+        
         if (found) {
             if (_log.shouldLog(Log.INFO))
                 _log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "
diff --git a/history.txt b/history.txt
index 38d089d9ce..293a69a552 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,17 @@
-$Id: history.txt,v 1.88 2004/11/29 16:57:14 jrandom Exp $
+$Id: history.txt,v 1.89 2004/11/29 18:24:50 jrandom Exp $
+
+2004-11-30  jrandom
+    * Render the burst rate fields on /config.jsp properly (thanks ugha!)
+    * Build in a simple timeout to flush data queued into the I2PSocket but
+      not yet flushed.
+    * Don't explicitly flush after each SAM stream write, but leave it up to 
+      the [nonblocking] passive flush.
+    * Don't whine about 10-99 connection events occurring in a second
+    * Don't wait for completion of packets that will not be ACKed (duh)
+    * Adjust the congestion window, even if the packet was resent (duh)
+    * Make sure to wake up any blocking read()'s when the MessageInputStream
+      is close()ed (duh)
+    * Never wait more than the disconnect timeout for a write to complete
 
 2004-11-29  jrandom
     * Minor fixes to avoid unnecessary errors on shutdown (thanks susi!)
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index 3c76d59174..6cfff7f0b0 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
  *
  */
 public class RouterVersion {
-    public final static String ID = "$Revision: 1.93 $ $Date: 2004/11/29 16:57:14 $";
+    public final static String ID = "$Revision: 1.94 $ $Date: 2004/11/29 18:24:49 $";
     public final static String VERSION = "0.4.2";
-    public final static long BUILD = 6;
+    public final static long BUILD = 7;
     public static void main(String args[]) {
         System.out.println("I2P Router version: " + VERSION);
         System.out.println("Router ID: " + RouterVersion.ID);
-- 
GitLab