From 73a12d47de3a435ffce5148e8053d9dd77c1f07b Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Tue, 9 Nov 2004 13:26:10 +0000
Subject: [PATCH] * you mean we should implement congestion *avoidance* too? *
 ack properly on duplicates * set the message input stream buffer large enough
 to fit the max window (duh)

---
 .../net/i2p/client/streaming/Connection.java  | 22 ++++++++++++++
 .../client/streaming/ConnectionOptions.java   |  7 ++---
 .../streaming/ConnectionPacketHandler.java    | 29 ++++++++++++++++---
 3 files changed, 50 insertions(+), 8 deletions(-)

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index ac6e7fe65e..ee3b1627ab 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -56,6 +56,9 @@ public class Connection {
     private boolean _disconnectScheduled;
     private long _lastReceivedOn;
     private ActivityTimer _activityTimer;
+    /** window size when we last saw congestion */
+    private int _lastCongestionSeenAt;
+    private boolean _ackSinceCongestion;
     
     public static final long MAX_RESEND_DELAY = 60*1000;
     public static final long MIN_RESEND_DELAY = 20*1000;
@@ -63,6 +66,9 @@ public class Connection {
     /** wait up to 5 minutes after disconnection so we can ack/close packets */
     public static long DISCONNECT_TIMEOUT = 5*60*1000;
     
+    /** lets be sane.. no more than 32 packets in the air in each dir */
+    public static final int MAX_WINDOW_SIZE = 32;
+    
     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) {
         this(ctx, manager, chooser, queue, handler, null);
     }
@@ -86,12 +92,14 @@ public class Connection {
         _unackedPacketsReceived = 0;
         _congestionWindowEnd = 0;
         _highestAckedThrough = -1;
+        _lastCongestionSeenAt = MAX_WINDOW_SIZE;
         _connectionManager = manager;
         _resetReceived = false;
         _connected = true;
         _disconnectScheduled = false;
         _lastReceivedOn = -1;
         _activityTimer = new ActivityTimer();
+        _ackSinceCongestion = true;
     }
     
     public long getNextOutboundPacketNum() { 
@@ -275,6 +283,8 @@ public class Connection {
             }
             _outboundPackets.notifyAll();
         }
+        if ((acked != null) && (acked.size() > 0) )
+            _ackSinceCongestion = true;
         return acked;
     }
 
@@ -438,6 +448,17 @@ public class Connection {
         return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn);
     }
     
+    public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; }
+    
+    void congestionOccurred() {
+        // if we hit congestion and e.g. 5 packets are resent,
+        // dont set the size to (winSize >> 4).  only set the
+        if (_ackSinceCongestion) {
+            _lastCongestionSeenAt = _options.getWindowSize();
+            _ackSinceCongestion = false;
+        }
+    }
+    
     void packetReceived() {
         _lastReceivedOn = _context.clock().now();
         resetActivityTimer();
@@ -563,6 +584,7 @@ public class Connection {
                 
                 // shrink the window
                 int newWindowSize = getOptions().getWindowSize();
+                _lastCongestionSeenAt = newWindowSize;
                 newWindowSize /= 2;
                 if (newWindowSize <= 0)
                     newWindowSize = 1;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
index 22470e22b9..03fa3b5d47 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
@@ -74,7 +74,7 @@ public class ConnectionOptions extends I2PSocketOptions {
             setWriteTimeout(-1);
             setInactivityTimeout(5*60*1000);
             setInactivityAction(INACTIVITY_ACTION_SEND);
-            setInboundBufferSize(256*1024);
+            setInboundBufferSize((Packet.MAX_PAYLOAD_SIZE + 2) * Connection.MAX_WINDOW_SIZE);
         }
     }
     
@@ -103,15 +103,14 @@ public class ConnectionOptions extends I2PSocketOptions {
     public boolean getRequireFullySigned() { return _fullySigned; }
     public void setRequireFullySigned(boolean sign) { _fullySigned = sign; }
     
-    private static final int MAX_WINDOW_SIZE = 32;
     /** 
      * How many messages will we send before waiting for an ACK?
      *
      */
     public int getWindowSize() { return _windowSize; }
     public void setWindowSize(int numMsgs) { 
-        if (numMsgs > MAX_WINDOW_SIZE)
-            numMsgs = MAX_WINDOW_SIZE;
+        if (numMsgs > Connection.MAX_WINDOW_SIZE)
+            numMsgs = Connection.MAX_WINDOW_SIZE;
         _windowSize = numMsgs; 
     }
     
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index 552195fda8..c0e5647f4e 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -124,10 +124,17 @@ public class ConnectionPacketHandler {
         if ( (!isNew) && (sequenceNum > 0) ) {
             // dup real packet
             int oldSize = con.getOptions().getWindowSize();
+            con.congestionOccurred();
             oldSize >>>= 1;
             if (oldSize <= 0)
                 oldSize = 1;
             con.getOptions().setWindowSize(oldSize);
+            
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug("Congestion occurred - new windowSize " + oldSize + " congestionSeenAt: "
+                           + con.getLastCongestionSeenAt() + " (#resends: " + numResends 
+                           + ") for " + con);
+
             return true;
         } else if (numResends > 0) {
             // window sizes are shrunk on resend, not on ack
@@ -137,9 +144,23 @@ public class ConnectionPacketHandler {
                 if (lowest >= con.getCongestionWindowEnd()) {
                     // new packet that ack'ed uncongested data, or an empty ack
                     int newWindowSize = con.getOptions().getWindowSize();
-                    newWindowSize += 1; // acked; // 1
+                    
+                    if (newWindowSize > con.getLastCongestionSeenAt() / 2) {
+                        // congestion avoidance
+                        
+                        // we can't use newWindowSize += 1/newWindowSize, since we're
+                        // integers, so lets use a random distribution instead
+                        int shouldIncrement = _context.random().nextInt(newWindowSize);
+                        if (shouldIncrement <= 0)
+                            newWindowSize += 1;
+                    } else {
+                        // slow start
+                        newWindowSize += 1;
+                    }
+                    
                     if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("New window size " + newWindowSize + " (#resends: " + numResends 
+                        _log.debug("New window size " + newWindowSize + " congestionSeenAt: "
+                                   + con.getLastCongestionSeenAt() + " (#resends: " + numResends 
                                    + ") for " + con);
                     con.getOptions().setWindowSize(newWindowSize);
                     con.setCongestionWindowEnd(newWindowSize + lowest);
@@ -267,9 +288,9 @@ public class ConnectionPacketHandler {
             _con = con;
         }
         public void timeReached() {
-            if (_con.getLastActivityOn() <= _created) {
+            if (_con.getLastSendTime() <= _created) {
                 if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Last activity was a while ago, and we want to ack a dup");
+                    _log.debug("Last sent was a while ago, and we want to ack a dup");
                 // we haven't done anything since receiving the dup, send an
                 // ack now
                 _con.ackImmediately();
-- 
GitLab