From 8937c4bf2ac46b6cc7c5979ff46ee32db4f5aad9 Mon Sep 17 00:00:00 2001
From: zab2 <zab2@mail.i2p>
Date: Wed, 17 Jul 2013 18:13:42 +0000
Subject: [PATCH] Store stdev of rtt in the TCB cache as per RFC 2140

---
 .../streaming/I2PSocketOptionsImpl.java       | 14 ++++
 .../client/streaming/ConnectionOptions.java   |  6 +-
 .../net/i2p/client/streaming/TCBShare.java    | 66 ++++++++++++++++---
 3 files changed, 75 insertions(+), 11 deletions(-)

diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java
index 3a1722dfb3..02a4d9dd41 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java
@@ -93,6 +93,20 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
         }
     }
     
+    protected static double getDouble(Properties opts, String name, double defaultVal) {
+        if (opts == null) return defaultVal;
+        String val = opts.getProperty(name);
+        if (val == null) {
+            return defaultVal;
+        } else {
+            try {
+                return Double.parseDouble(val);
+            } catch (NumberFormatException nfe) {
+                return defaultVal;
+            }
+        }
+    }
+    
     /**
      * How long we will wait for the ACK from a SYN, in milliseconds.
      *
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
index 324c56d11b..7790cf552d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
@@ -517,7 +517,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      */
     public int getRTT() { return _rtt; }
     public void setRTT(int ms) { 
-        if (_rto == 0) {
+        if (_rto == 0) { // TODO: move this out
             _rttDev = ms / 2;
             _rto = ms + ms / 2;
         }
@@ -539,8 +539,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
 
     public int getRTO() { return _rto; }
 
-    /** for debugging @since 0.7.13 */
+    /** used in TCB @since 0.9.8 */
     int getRTTDev() { return _rttDev; }
+    /** used in TCB @since 0.9.8 */
+    void setRTTDev(int rttDev) { _rttDev = rttDev; }
     
     /**
      * If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java
index 7772330f4f..8fdd114bdc 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java
@@ -9,6 +9,8 @@ import net.i2p.data.Destination;
 import net.i2p.util.Log;
 import net.i2p.util.SimpleTimer2;
 
+import static net.i2p.client.streaming.I2PSocketOptionsImpl.getDouble;
+
 /**
  *  Share important TCP Control Block parameters across Connections
  *  to the same remote peer.
@@ -25,20 +27,42 @@ class TCBShare {
     private final Log _log;
     private final Map<Destination, Entry> _cache;
     private final CleanEvent _cleaner;
+    private final double _rttDampening, _wdwDampening, _rttDevDampening;
 
     private static final long EXPIRE_TIME = 30*60*1000;
     private static final long CLEAN_TIME = 10*60*1000;
+    ///// constants defined in rfc 2140
+    ///// do not change unless you know what you're doing
     private static final double RTT_DAMPENING = 0.75;
+    private static final double RTTDEV_DAMPENING = 0.75;
     private static final double WDW_DAMPENING = 0.75;
+    private static final String RTT_DAMP_PROP="i2p.streaming.tcbcache.rttDampening";
+    private static final String WDW_DAMP_PROP="i2p.streaming.tcbcache.wdwDampening";
+    private static final String RTTDEV_DAMP_PROP="i2p.streaming.tcbcache.rttDampening";
+    /////
     private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
+    private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
     private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
     
     public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
         _context = ctx;
         _log = ctx.logManager().getLog(TCBShare.class);
+        
+        _rttDampening = getDouble(ctx.getProperties(), RTT_DAMP_PROP, RTT_DAMPENING);
+        _wdwDampening = getDouble(ctx.getProperties(), WDW_DAMP_PROP, WDW_DAMPENING);
+        _rttDevDampening = getDouble(ctx.getProperties(), RTTDEV_DAMP_PROP, RTTDEV_DAMPENING);
+        
         _cache = new ConcurrentHashMap<Destination,Entry>(4);
         _cleaner = new CleanEvent(timer);
         _cleaner.schedule(CLEAN_TIME);
+        
+        if (_log.shouldLog(Log.DEBUG)) {
+            String log = "Creating TCBCache with rttDamp=%s, rttDevDamp=%s, wdwDamp=%s, "+
+                    "expire=%d, clean=%d";
+            log = String.format(log,_rttDampening,_rttDevDampening,_wdwDampening,
+                    EXPIRE_TIME,CLEAN_TIME);
+             _log.debug(log);
+        }
     }
 
     /**
@@ -60,14 +84,24 @@ class TCBShare {
         Entry e = _cache.get(dest);
         if (e == null || e.isExpired())
             return;
-        if (_log.shouldLog(Log.DEBUG))
+        final int rtt, rttDev, wdw;
+        synchronized(e) {
+            rtt = e.getRTT();
+            rttDev = e.getRTTDev();
+            wdw = e.getWindowSize();
+        }
+        if (_log.shouldLog(Log.DEBUG)) {
             _log.debug("From cache: " +
                        con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
                        '-' +
                        dest.calculateHash().toBase64().substring(0, 4) +
-                       " RTT: " + e.getRTT() + " wdw: " + e.getWindowSize());
-        opts.setRTT(e.getRTT());
-        opts.setWindowSize(e.getWindowSize());
+                       " RTT: " + rtt + 
+                       " RTTDev: "+ rttDev +
+                       " wdw: " + wdw );
+        }
+        opts.setRTT(rtt);
+        opts.setRTTDev(rttDev);
+        opts.setWindowSize(wdw);
     }
 
     /** store to cache */
@@ -82,47 +116,61 @@ class TCBShare {
             return;
         int old = -1;
         int oldw = -1;
+        int oldDev = -1;
         Entry e = _cache.get(dest);
         if (e == null || e.isExpired()) {
-            e = new Entry(opts.getRTT(), opts.getWindowSize());
+            e = new Entry(opts.getRTT(), opts.getWindowSize(), opts.getRTTDev());
             _cache.put(dest, e);
         } else {
             synchronized(e) {
                 old = e.getRTT();
                 oldw = e.getWindowSize();
+                oldDev = e.getRTTDev();
                 e.setRTT(opts.getRTT());
                 e.setWindowSize(opts.getWindowSize());
+                e.setRTTDev(opts.getRTTDev());
             }
         }
-        if (_log.shouldLog(Log.DEBUG))
+        if (_log.shouldLog(Log.DEBUG)) {
             _log.debug("To cache: " +
                        con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
                        '-' +
                        dest.calculateHash().toBase64().substring(0, 4) +
                        " old: " + old + " con: " + opts.getRTT() + " new: " + e.getRTT() +
+                       " oldDev: " + oldDev + " conDev: " + opts.getRTTDev() + " newDev: " + e.getRTTDev() +
                        " oldw: " + oldw + " conw: " + opts.getWindowSize() + " neww: " + e.getWindowSize());
+        }
     }
 
     private class Entry {
         int _rtt;
         int _wdw;
+        int _rttDev;
         long _updated;
 
-        public Entry(int ms, int wdw) {
+        public Entry(int ms, int wdw, int rttDev) {
             _rtt = ms;
             _wdw = wdw;
+            _rttDev = rttDev;
             _updated = _context.clock().now();
         }
         public synchronized int getRTT() { return _rtt; }
         public synchronized void setRTT(int ms) {
-            _rtt = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*ms);        
+            _rtt = (int)(_rttDampening*_rtt + (1-_rttDampening)*ms);        
             if (_rtt > MAX_RTT)
                 _rtt = MAX_RTT;
             _updated = _context.clock().now();
         }
+        public synchronized int getRTTDev() { return _rttDev; }
+        public synchronized void setRTTDev(int count) {
+            _rttDev = (int)(_rttDevDampening*_rttDev + (1-_rttDevDampening)*count);        
+            if (_rttDev > MAX_RTT_DEV)
+                _rttDev = MAX_RTT_DEV;
+            _updated = _context.clock().now();
+        }
         public synchronized int getWindowSize() { return _wdw; }
         public synchronized void setWindowSize(int wdw) {
-            _wdw = (int)(0.5 + WDW_DAMPENING*_wdw + (1-WDW_DAMPENING)*wdw);       
+            _wdw = (int)(0.5 + _wdwDampening*_wdw + (1-_wdwDampening)*wdw);       
             if (_wdw > MAX_WINDOW_SIZE)
                 _wdw = MAX_WINDOW_SIZE;
             _updated = _context.clock().now();
-- 
GitLab