From 15c227f5684cfd862e5e05edbb7fda8fe85abc50 Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Mon, 25 Oct 2004 03:22:29 +0000
Subject: [PATCH] * sliding windows w/ additive increase / multiplicitive
 decrease * immediately send an ack on receiving a duplicate payload message  
 (unless we've sent one within the last RTT) * only adjust the RTT when there
 have been no resends * added some (disabled) throttles - randomly injecting
 delays on   received packets, as well as randomly dropping them * logging

---
 .../net/i2p/client/streaming/Connection.java  | 27 +++++---
 .../streaming/ConnectionDataReceiver.java     | 17 ++++--
 .../client/streaming/ConnectionManager.java   |  2 +-
 .../streaming/ConnectionPacketHandler.java    | 61 ++++++++++++++++---
 .../i2p/client/streaming/PacketHandler.java   | 29 +++++++--
 .../net/i2p/client/streaming/PacketQueue.java |  2 +-
 6 files changed, 111 insertions(+), 27 deletions(-)

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index 6513f33311..b5280ecf7a 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -93,16 +93,18 @@ public class Connection {
      * @return true if the packet should be sent
      */
     boolean packetSendChoke() {
-        if (true) return true;
+        if (false) return true;
         long writeExpire = _options.getWriteTimeout();
-        if (writeExpire > 0)
-            writeExpire += _context.clock().now();
         while (true) {
             long timeLeft = writeExpire - _context.clock().now();
             synchronized (_outboundPackets) {
                 if (_outboundPackets.size() >= _options.getWindowSize()) {
                     if (writeExpire > 0) {
-                        if (timeLeft <= 0) return false;
+                        if (timeLeft <= 0) {
+                            _log.error("Outbound window is full of " + _outboundPackets.size() 
+                                       + " and we've waited too long (" + writeExpire + "ms)");
+                            return false;
+                        }
                         if (_log.shouldLog(Log.DEBUG))
                             _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "), waiting " + timeLeft);
                         try { _outboundPackets.wait(timeLeft); } catch (InterruptedException ie) {}
@@ -118,6 +120,10 @@ public class Connection {
         }
     }
     
+    void ackImmediately() {
+        _receiver.send(null, 0, 0);
+    }
+    
     /**
      * Flush any data that we can
      */
@@ -147,7 +153,7 @@ public class Connection {
             synchronized (_outboundPackets) {
                 _outboundPackets.put(new Long(packet.getSequenceNum()), packet);
             }
-            SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getResendDelay());
+            SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getRTT()*2);
         }
 
         _lastSendTime = _context.clock().now();
@@ -159,7 +165,7 @@ public class Connection {
             // something that will get a reply so that we can deliver some new tags -
             // ACKs don't get ACKed, but pings do.
             if (packet.getTagsSent().size() > 0) {
-                _log.error("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags");
+                _log.warn("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags");
                 _connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent());
             }
         }
@@ -327,6 +333,8 @@ public class Connection {
             buf.append(Base64.encode(_sendStreamId));
         else
             buf.append("unknown");
+        buf.append(" wsize: ").append(_options.getWindowSize());
+        buf.append(" rtt: ").append(_options.getRTT());
         buf.append(" unacked outbound: ");
         synchronized (_outboundPackets) {
             buf.append(_outboundPackets.size()).append(" [");
@@ -367,8 +375,8 @@ public class Connection {
 
                 int numSends = _packet.getNumSends() + 1;
                 
-                if (_log.shouldLog(Log.ERROR))
-                    _log.error("Resend packet " + _packet + " time " + numSends + " on " + Connection.this);
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Resend packet " + _packet + " time " + numSends + " on " + Connection.this);
                 _outboundQueue.enqueue(_packet);
                 
                 if (numSends > _options.getMaxResends()) {
@@ -376,7 +384,8 @@ public class Connection {
                         _log.debug("Too many resends");
                     disconnect(false);
                 } else {
-                    long timeout = _options.getResendDelay() << numSends;
+                    //long timeout = _options.getResendDelay() << numSends;
+                    long timeout = _options.getRTT() << numSends;
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug("Scheduling resend in " + timeout + "ms");
                     SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
index 4f4b76cb29..d98ff0d957 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
@@ -39,17 +39,23 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
         if (_connection.getUnackedPacketsReceived() > 0)
             doSend = true;
         
-        //if (_log.shouldLog(Log.DEBUG))
-        //    _log.debug("writeData called: size="+size + " doSend=" + doSend + " con: " + _connection, new Exception("write called by"));
+        if (_log.shouldLog(Log.ERROR) && !doSend)
+            _log.error("writeData called: size="+size + " doSend=" + doSend 
+                       + " unackedReceived: " + _connection.getUnackedPacketsReceived()
+                       + " con: " + _connection, new Exception("write called by"));
 
         if (doSend) {
-            PacketLocal packet = buildPacket(buf, off, size);
-            _connection.sendPacket(packet);
+            send(buf, off, size);
         } else {
             //_connection.flushPackets();
         }
     }
     
+    public void send(byte buf[], int off, int size) {
+        PacketLocal packet = buildPacket(buf, off, size);
+        _connection.sendPacket(packet);
+    }
+    
     private boolean isAckOnly(int size) {
         boolean ackOnly = ( (size <= 0) && // no data
                             (_connection.getLastSendId() >= 0) && // not a SYN
@@ -63,7 +69,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
         boolean ackOnly = isAckOnly(size);
         PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer());
         byte data[] = new byte[size];
-        System.arraycopy(buf, off, data, 0, size);
+        if (size > 0)
+            System.arraycopy(buf, off, data, 0, size);
         packet.setPayload(data);
 		if (ackOnly)
 			packet.setSequenceNum(0);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index e1a7a88520..1348bf44e1 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -218,7 +218,7 @@ public class ConnectionManager {
             _packet = packet;
         }
         public void pong() { 
-            _log.error("Ping successful");
+            _log.debug("Ping successful");
             _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
             synchronized (ConnectionManager.PingRequest.this) {
                 _ponged = true; 
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index fa9e08402b..b5178b4653 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -47,12 +47,15 @@ public class ConnectionPacketHandler {
                 con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2);
                 //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
                 if (_log.shouldLog(Log.WARN))
-                    _log.warn("congestion.. dup " + packet);
+                    _log.warn("congestion.. dup " + packet);   
+                con.incrementUnackedPacketsReceived();
             } else {
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("ACK only packet received: " + packet);
             }
         }
+        
+        int numResends = 0;
         List acked = con.ackPackets(packet.getAckThrough(), packet.getNacks());
         if ( (acked != null) && (acked.size() > 0) ) {
             if (_log.shouldLog(Log.DEBUG))
@@ -62,8 +65,13 @@ public class ConnectionPacketHandler {
             int lowestRtt = -1;
             for (int i = 0; i < acked.size(); i++) {
                 PacketLocal p = (PacketLocal)acked.get(i);
-                if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) )
-                    lowestRtt = p.getAckTime();
+                if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) ) {
+                    if (p.getNumSends() <= 1)
+                        lowestRtt = p.getAckTime();
+                }
+                
+                if (p.getNumSends() > 1)
+                    numResends++;
                 
                 // ACK the tags we delivered so we can use them
                 if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null) 
@@ -75,12 +83,51 @@ public class ConnectionPacketHandler {
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("Packet acked: " + p);
             }
-            int oldRTT = con.getOptions().getRTT();
-            int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*lowestRtt);
-            con.getOptions().setRTT(newRTT);
+            if (lowestRtt > 0) {
+                int oldRTT = con.getOptions().getRTT();
+                int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*lowestRtt);
+                con.getOptions().setRTT(newRTT);
+            }
         }
-        
+
+        boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends);
         con.eventOccurred();
+        if (fastAck) {
+            if (con.getLastSendTime() + con.getOptions().getRTT() < _context.clock().now()) {
+                _log.error("Fast ack for dup " + packet);
+                con.ackImmediately();
+            }
+        }
+    }
+    
+    private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends) {
+        if ( (!isNew) && (sequenceNum > 0) ) {
+            // dup real packet
+            int oldSize = con.getOptions().getWindowSize();
+            oldSize >>>= 1;
+            if (oldSize <= 0)
+                oldSize = 1;
+            con.getOptions().setWindowSize(oldSize);
+            return true;
+        } else if (numResends > 0) {
+            int newWindowSize = con.getOptions().getWindowSize();
+            newWindowSize /= 2; // >>>= numResends;
+            if (newWindowSize <= 0)
+                newWindowSize = 1;
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug("Shrink the window to " + newWindowSize + " (#resends: " + numResends 
+                           + ") for " + con);
+            con.getOptions().setWindowSize(newWindowSize);
+        } else {
+            // new packet that ack'ed uncongested data, or an empty ack
+            int newWindowSize = con.getOptions().getWindowSize();
+            newWindowSize += 1; //acked.size();
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug("New window size " + newWindowSize + " (#resends: " + numResends 
+                           + ") for " + con);
+            con.getOptions().setWindowSize(newWindowSize);
+        }
+        return false;
     }
     
     /**
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
index aae8fee22d..9b50539904 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
@@ -26,7 +26,28 @@ public class PacketHandler {
         _log = ctx.logManager().getLog(PacketHandler.class);
     }
     
+    private boolean choke(Packet packet) {
+        if (false) {
+            // artificial choke: 2% random drop and a 1s
+            // random delay
+            if (_context.random().nextInt(100) >= 98) {
+                _log.error("DROP: " + packet);
+                return false;
+            } else {
+                int delay = _context.random().nextInt(1000);
+                try { Thread.sleep(delay); } catch (InterruptedException ie) {}
+                _log.debug("OK  : " + packet + " delay = " + delay);
+                return true;
+            }
+        } else {
+            return true;
+        }
+    }
+    
     void receivePacket(Packet packet) {
+        boolean ok = choke(packet);
+        if (!ok) return;
+        
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("packet received: " + packet);
         
@@ -46,11 +67,11 @@ public class PacketHandler {
     
     private void displayPacket(Packet packet, Connection con) {
         if (_log.shouldLog(Log.DEBUG)) {
-            //SimpleDateFormat fmt = new SimpleDateFormat("hh:mm:ss.SSS");
-            //String now = fmt.format(new Date());
+            SimpleDateFormat fmt = new SimpleDateFormat("hh:mm:ss.SSS");
+            String now = fmt.format(new Date());
             String msg = packet + (con != null ? " on " + con : " on unknown con");
-            _log.debug(msg);
-            // System.out.println(now + ": " + msg);
+            //_log.debug(msg);
+            System.out.println(now + ": " + msg);
         }
     }
     
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
index 37b49297af..7489811c9f 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java
@@ -52,7 +52,7 @@ class PacketQueue {
                 packet.setTagsSent(tagsSent);
                 packet.incrementSends();
                 if (_log.shouldLog(Log.DEBUG)) {
-                    String msg = packet + " sent" + (tagsSent.size() > 0 
+                    String msg = "SEND " + packet + (tagsSent.size() > 0 
                                                      ? " with " + tagsSent.size() + " tags"
                                                      : "")
                                                      + " send # " + packet.getNumSends();
-- 
GitLab