From ab92206b773351f9d6b278d4fa2fee368a5f3e3e Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Tue, 20 Jan 2009 17:20:37 +0000
Subject: [PATCH]      * Streaming: TCB control block sharing        also tweak
 ResendPacketEvent to prepare for PacketQueue sending timeout to I2CP

---
 .../net/i2p/client/streaming/Connection.java  |  55 ++++---
 .../client/streaming/ConnectionManager.java   |   6 +
 .../streaming/ConnectionPacketHandler.java    |   4 +
 .../net/i2p/client/streaming/TCBShare.java    | 137 ++++++++++++++++++
 4 files changed, 180 insertions(+), 22 deletions(-)
 create mode 100644 apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index 73e7253ac0..85872e9c58 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -45,6 +45,7 @@ public class Connection {
     private long _congestionWindowEnd;
     private long _highestAckedThrough;
     private boolean _isInbound;
+    private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
     private Map _outboundPackets;
     private PacketQueue _outboundQueue;
@@ -120,6 +121,7 @@ public class Connection {
         _activeResends = 0;
         _resetSentOn = -1;
         _isInbound = false;
+        _updatedShareOpts = false;
         _connectionEvent = new ConEvent();
         _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
         _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
@@ -586,6 +588,8 @@ public class Connection {
         if (_remotePeerSet) throw new RuntimeException("Remote peer already set [" + _remotePeer + ", " + peer + "]");
         _remotePeerSet = true;
         _remotePeer = peer; 
+        // now that we know who the other end is, get the rtt etc. from the cache
+        _connectionManager.updateOptsFromShare(this);
     }
     
     private boolean _sendStreamIdSet = false;
@@ -709,7 +713,13 @@ public class Connection {
     }
     public long getCloseReceivedOn() { return _closeReceivedOn; }
     public void setCloseReceivedOn(long when) { _closeReceivedOn = when; }
-    
+
+    public void updateShareOpts() {
+        if (_closeSentOn > 0 && !_updatedShareOpts) {
+            _connectionManager.updateShareOpts(this);
+            _updatedShareOpts = true;
+        }
+    }
     public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; }
     public int getUnackedPacketsReceived() { return _unackedPacketsReceived; }
     /** how many packets have we sent but not yet received an ACK for?
@@ -998,7 +1008,7 @@ public class Connection {
     /**
      * Coordinate the resends of a given packet
      */
-    private class ResendPacketEvent implements SimpleTimer.TimedEvent {
+    public class ResendPacketEvent implements SimpleTimer.TimedEvent {
         private PacketLocal _packet;
         private long _nextSendTime;
         public ResendPacketEvent(PacketLocal packet, long sendTime) {
@@ -1104,7 +1114,22 @@ public class Connection {
                     _context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
                 }
                 
-                if (numSends - 1 <= _options.getMaxResends()) {
+                if (numSends - 1 > _options.getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug("Too many resends");
+                    _packet.cancelled();
+                    disconnect(false);
+                } else {
+                    //long timeout = _options.getResendDelay() << numSends;
+                    long rto = _options.getRTO();
+                    if (rto < MIN_RESEND_DELAY)
+                        rto = MIN_RESEND_DELAY;
+                    long timeout = rto << (numSends-1);
+                    if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
+                        timeout = MAX_RESEND_DELAY;
+                    // set this before enqueue() as it passes it on to the router
+                    _nextSendTime = timeout + _context.clock().now();
+
                     if (_log.shouldLog(Log.INFO))
                         _log.info("Resend packet " + _packet + " time " + numSends + 
                                   " activeResends: " + _activeResends + 
@@ -1113,6 +1138,10 @@ public class Connection {
                                   + (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
                     _outboundQueue.enqueue(_packet);
                     _lastSendTime = _context.clock().now();
+
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug("Scheduling resend in " + timeout + "ms for " + _packet);
+                    RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
                 }
                 
                 // acked during resending (... or somethin')
@@ -1123,25 +1152,7 @@ public class Connection {
                     }
                     return true;
                 }
-                
-                if (numSends - 1 > _options.getMaxResends()) {
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("Too many resends");
-                    _packet.cancelled();
-                    disconnect(false);
-                } else {
-                    //long timeout = _options.getResendDelay() << numSends;
-                    long rto = _options.getRTO();
-                    if (rto < MIN_RESEND_DELAY)
-                        rto = MIN_RESEND_DELAY;
-                    long timeout = rto << (numSends-1);
-                    if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
-                        timeout = MAX_RESEND_DELAY;
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("Scheduling resend in " + timeout + "ms for " + _packet);
-                    RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
-                    _nextSendTime = timeout + _context.clock().now();
-                }
+
                 return true;
             } else {
                 //if (_log.shouldLog(Log.DEBUG))
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index da2b1ab127..7826ba2a81 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -30,6 +30,7 @@ public class ConnectionManager {
     private PacketQueue _outboundQueue;
     private SchedulerChooser _schedulerChooser;
     private ConnectionPacketHandler _conPacketHandler;
+    private TCBShare _tcbShare;
     /** Inbound stream ID (Long) to Connection map */
     private Map _connectionByInboundId;
     /** Ping ID (Long) to PingRequest */
@@ -52,6 +53,7 @@ public class ConnectionManager {
         _connectionHandler = new ConnectionHandler(context, this);
         _schedulerChooser = new SchedulerChooser(context);
         _conPacketHandler = new ConnectionPacketHandler(context);
+        _tcbShare = new TCBShare(context);
         _session = session;
         session.setSessionListener(_messageHandler);
         _outboundQueue = new PacketQueue(context, session, this);
@@ -127,6 +129,7 @@ public class ConnectionManager {
      */
     public Connection receiveConnection(Packet synPacket) {
         Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
+        _tcbShare.updateOptsFromShare(con);
         con.setInbound();
         long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
         boolean reject = false;
@@ -277,6 +280,8 @@ public class ConnectionManager {
     public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
     public I2PSession getSession() { return _session; }
     public PacketQueue getPacketQueue() { return _outboundQueue; }
+    public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); }
+    public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); }
     
     /**
      * Something b0rked hard, so kill all of our connections without mercy.
@@ -292,6 +297,7 @@ public class ConnectionManager {
             _connectionByInboundId.clear();
             _connectionLock.notifyAll();
         }
+        _tcbShare.stop();
     }
     
     /**
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index 6a062d4a6c..7c445f0380 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -213,6 +213,10 @@ public class ConnectionPacketHandler {
             packet.releasePayload();
         }
         
+        // update the TCB Cache now that we've processed the acks and updated our rtt etc.
+        if (isNew && packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
+            con.updateShareOpts();
+
         //if (choke)
         //    con.fastRetransmit();
     }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java
new file mode 100644
index 0000000000..1562f948e5
--- /dev/null
+++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java
@@ -0,0 +1,137 @@
+package net.i2p.client.streaming;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import net.i2p.I2PAppContext;
+import net.i2p.data.Destination;
+import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer;
+
+/**
+ *  Share important TCP Control Block parameters across Connections
+ *  to the same remote peer.
+ *  This is intended for "temporal" sharing at connection open/close time,
+ *  not "ensemble" sharing during a connection. Ref. RFC 2140.
+ *  
+ *  There is a TCB share per ConnectionManager (i.e. per local Destination)
+ *  so that there is no information leakage to other Destinations on the
+ *  same router.
+ *
+ */
+public class TCBShare {
+    private I2PAppContext _context;
+    private Log _log;
+    private Map<Destination, Entry> _cache;
+    private CleanEvent _cleaner;
+
+    private static final long EXPIRE_TIME = 30*60*1000;
+    private static final long CLEAN_TIME = 10*60*1000;
+    private static final double RTT_DAMPENING = 0.75;
+    private static final double WDW_DAMPENING = 0.75;
+    private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
+    private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE / 4;
+    
+    public TCBShare(I2PAppContext ctx) {
+        _context = ctx;
+        _log = ctx.logManager().getLog(TCBShare.class);
+        _cache = new ConcurrentHashMap(4);
+        _cleaner = new CleanEvent();
+        SimpleTimer.getInstance().addEvent(_cleaner, CLEAN_TIME);
+    }
+
+    public void stop() {
+        SimpleTimer.getInstance().removeEvent(_cleaner);
+    }
+
+    public void updateOptsFromShare(Connection con) {
+        Destination dest = con.getRemotePeer();
+        if (dest == null)
+            return;
+        ConnectionOptions opts = con.getOptions();
+        if (opts == null)
+            return;
+        Entry e = _cache.get(dest);
+        if (e == null || e.isExpired())
+            return;
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("From cache: " +
+                       con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
+                       '-' +
+                       dest.calculateHash().toBase64().substring(0, 4) +
+                       " RTT: " + e.getRTT() + " wdw: " + e.getWindowSize());
+        opts.setRTT(e.getRTT());
+        opts.setWindowSize(e.getWindowSize());
+    }
+
+    public void updateShareOpts(Connection con) {
+        Destination dest = con.getRemotePeer();
+        if (dest == null)
+            return;
+        if (con.getAckedPackets() <= 0)
+            return;
+        ConnectionOptions opts = con.getOptions();
+        if (opts == null)
+            return;
+        int old = -1;
+        int oldw = -1;
+        Entry e = _cache.get(dest);
+        if (e == null || e.isExpired()) {
+            e = new Entry(opts.getRTT(), opts.getWindowSize());
+            _cache.put(dest, e);
+        } else {
+            old = e.getRTT();
+            oldw = e.getWindowSize();
+            e.setRTT(opts.getRTT());
+            e.setWindowSize(opts.getWindowSize());
+        }
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("To cache: " +
+                       con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
+                       '-' +
+                       dest.calculateHash().toBase64().substring(0, 4) +
+                       " old: " + old + " con: " + opts.getRTT() + " new: " + e.getRTT() +
+                       " oldw: " + oldw + " conw: " + opts.getWindowSize() + " neww: " + e.getWindowSize());
+    }
+
+    private class Entry {
+        int _rtt;
+        int _wdw;
+        long _updated;
+
+        public Entry(int ms, int wdw) {
+            _rtt = ms;
+            _wdw = wdw;
+            _updated = _context.clock().now();
+        }
+        public int getRTT() { return _rtt; }
+        public void setRTT(int ms) {
+            _rtt = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*ms);        
+            if (_rtt > MAX_RTT)
+                _rtt = MAX_RTT;
+            _updated = _context.clock().now();
+        }
+        public int getWindowSize() { return _wdw; }
+        public void setWindowSize(int wdw) {
+            _wdw = (int)(0.5 + WDW_DAMPENING*_wdw + (1-WDW_DAMPENING)*wdw);       
+            if (_wdw > MAX_WINDOW_SIZE)
+                _wdw = MAX_WINDOW_SIZE;
+            _updated = _context.clock().now();
+        }
+        public boolean isExpired() {
+            return _updated < _context.clock().now() - EXPIRE_TIME;
+        }
+    }
+
+    private class CleanEvent implements SimpleTimer.TimedEvent {
+        public CleanEvent() {}
+        public void timeReached() {
+            for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) {
+                if (_cache.get(iter.next()).isExpired())
+                    iter.remove();
+            }
+            SimpleTimer.getInstance().addEvent(CleanEvent.this, CLEAN_TIME);
+        }
+    }
+}
-- 
GitLab