From 83165df7e5e68a8fe475bc54ae75303028a1cccc Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Tue, 9 Nov 2004 11:00:04 +0000
Subject: [PATCH] * delay the ack of a syn * make sure we ack duplicate
 messages received (if we aren't already doing so) * implement a choke on the
 local buffer, in case we receive data faster than its   removed from the
 i2psocket's MessageInputStream (handle via packet drop and   explicit
 congestion notification)

---
 .../client/streaming/ConnectionOptions.java   | 10 +++++++
 .../streaming/ConnectionPacketHandler.java    | 29 ++++++++++++++++++-
 .../client/streaming/SchedulerReceived.java   | 13 ++++++---
 3 files changed, 47 insertions(+), 5 deletions(-)

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
index a35de82975..22470e22b9 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
@@ -20,6 +20,7 @@ public class ConnectionOptions extends I2PSocketOptions {
     private int _maxResends;
     private int _inactivityTimeout;
     private int _inactivityAction;
+    private int _inboundBufferSize;
 
     public static final int PROFILE_BULK = 1;
     public static final int PROFILE_INTERACTIVE = 2;
@@ -59,6 +60,7 @@ public class ConnectionOptions extends I2PSocketOptions {
             setMaxResends(opts.getMaxResends());
             setInactivityTimeout(opts.getInactivityTimeout());
             setInactivityAction(opts.getInactivityAction());
+            setInboundBufferSize(opts.getInboundBufferSize());
         } else {
             setConnectDelay(2*1000);
             setProfile(PROFILE_BULK);
@@ -72,6 +74,7 @@ public class ConnectionOptions extends I2PSocketOptions {
             setWriteTimeout(-1);
             setInactivityTimeout(5*60*1000);
             setInactivityAction(INACTIVITY_ACTION_SEND);
+            setInboundBufferSize(256*1024);
         }
     }
     
@@ -186,4 +189,11 @@ public class ConnectionOptions extends I2PSocketOptions {
     
     public int getInactivityAction() { return _inactivityAction; }
     public void setInactivityAction(int action) { _inactivityAction = action; }
+    
+    /** 
+     * how much data are we willing to accept in our buffer?
+     *
+     */
+    public int getInboundBufferSize() { return _inboundBufferSize; }
+    public void setInboundBufferSize(int bytes) { _inboundBufferSize = bytes; }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index c8038fc0ad..552195fda8 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -7,6 +7,7 @@ import net.i2p.I2PException;
 import net.i2p.data.DataHelper;
 import net.i2p.data.Destination;
 import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer;
 
 /**
  * Receive a packet for a particular connection - placing the data onto the
@@ -27,6 +28,13 @@ public class ConnectionPacketHandler {
         boolean ok = verifyPacket(packet, con);
         if (!ok) return;
         con.packetReceived();
+        if (con.getInputStream().getTotalQueuedSize() > con.getOptions().getInboundBufferSize()) {
+            if (_log.shouldLog(Log.WARN))
+                _log.warn("Inbound buffer exceeded on connection " + con + ": dropping " + packet);
+            con.getOptions().setChoke(5*1000);
+            return;
+        }
+        con.getOptions().setChoke(0);
         boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
 
         // close *after* receiving the data, as well as after verifying the signatures / etc
@@ -53,7 +61,8 @@ public class ConnectionPacketHandler {
                 con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2);
                 //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
                 if (_log.shouldLog(Log.WARN))
-                    _log.warn("congestion.. dup " + packet);   
+                    _log.warn("congestion.. dup " + packet);
+                SimpleTimer.getInstance().addEvent(new AckDup(con), con.getOptions().getSendAckDelay());
                 //con.incrementUnackedPacketsReceived();
                 con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
             } else {
@@ -249,4 +258,22 @@ public class ConnectionPacketHandler {
             }
         }
     }    
+    
+    private class AckDup implements SimpleTimer.TimedEvent {
+        private long _created;
+        private Connection _con;
+        public AckDup(Connection con) {
+            _created = _context.clock().now();
+            _con = con;
+        }
+        public void timeReached() {
+            if (_con.getLastActivityOn() <= _created) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug("Last activity was a while ago, and we want to ack a dup");
+                // we haven't done anything since receiving the dup, send an
+                // ack now
+                _con.ackImmediately();
+            }
+        }
+    }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java
index 09836477e5..89d28b3546 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java
@@ -31,10 +31,15 @@ class SchedulerReceived extends SchedulerImpl {
         
         long timeTillSend = con.getNextSendTime() - _context.clock().now();
         if (timeTillSend <= 0) {
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("received con... send a packet");
-            con.sendAvailable();
-            con.setNextSendTime(-1);
+            if (con.getNextSendTime() > 0) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug("received con... send a packet");
+                con.sendAvailable();
+                con.setNextSendTime(-1);
+            } else {
+                con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
+                reschedule(con.getOptions().getSendAckDelay(), con);
+            }
         } else {
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("received con... time till next send: " + timeTillSend);
-- 
GitLab