diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java index 3a1722dfb31942c1ffa174b71be64894139d843c..02a4d9dd41284e4ada99ff066ac1fce27385c207 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java @@ -93,6 +93,20 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { } } + protected static double getDouble(Properties opts, String name, double defaultVal) { + if (opts == null) return defaultVal; + String val = opts.getProperty(name); + if (val == null) { + return defaultVal; + } else { + try { + return Double.parseDouble(val); + } catch (NumberFormatException nfe) { + return defaultVal; + } + } + } + /** * How long we will wait for the ACK from a SYN, in milliseconds. * diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 324c56d11ba930c04ebda0ba08935442f71d3883..7790cf552d20dc54ad4c93544d3b81eae72b4d56 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -517,7 +517,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public int getRTT() { return _rtt; } public void setRTT(int ms) { - if (_rto == 0) { + if (_rto == 0) { // TODO: move this out _rttDev = ms / 2; _rto = ms + ms / 2; } @@ -539,8 +539,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public int getRTO() { return _rto; } - /** for debugging @since 0.7.13 */ + /** used in TCB @since 0.9.8 */ int getRTTDev() { return _rttDev; } + /** used in TCB @since 0.9.8 */ + void setRTTDev(int rttDev) { _rttDev = rttDev; } /** * If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index 7772330f4f9d5dc1a7b04de33ee65596bcd02db3..8fdd114bdca8f5db8da42df410443ea67189648e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -9,6 +9,8 @@ import net.i2p.data.Destination; import net.i2p.util.Log; import net.i2p.util.SimpleTimer2; +import static net.i2p.client.streaming.I2PSocketOptionsImpl.getDouble; + /** * Share important TCP Control Block parameters across Connections * to the same remote peer. @@ -25,20 +27,42 @@ class TCBShare { private final Log _log; private final Map<Destination, Entry> _cache; private final CleanEvent _cleaner; + private final double _rttDampening, _wdwDampening, _rttDevDampening; private static final long EXPIRE_TIME = 30*60*1000; private static final long CLEAN_TIME = 10*60*1000; + ///// constants defined in rfc 2140 + ///// do not change unless you know what you're doing private static final double RTT_DAMPENING = 0.75; + private static final double RTTDEV_DAMPENING = 0.75; private static final double WDW_DAMPENING = 0.75; + private static final String RTT_DAMP_PROP="i2p.streaming.tcbcache.rttDampening"; + private static final String WDW_DAMP_PROP="i2p.streaming.tcbcache.wdwDampening"; + private static final String RTTDEV_DAMP_PROP="i2p.streaming.tcbcache.rttDampening"; + ///// private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2; + private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5); private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW; public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) { _context = ctx; _log = ctx.logManager().getLog(TCBShare.class); + + _rttDampening = getDouble(ctx.getProperties(), RTT_DAMP_PROP, RTT_DAMPENING); + _wdwDampening = getDouble(ctx.getProperties(), WDW_DAMP_PROP, WDW_DAMPENING); + _rttDevDampening = getDouble(ctx.getProperties(), RTTDEV_DAMP_PROP, RTTDEV_DAMPENING); + _cache = new ConcurrentHashMap<Destination,Entry>(4); _cleaner = new CleanEvent(timer); _cleaner.schedule(CLEAN_TIME); + + if (_log.shouldLog(Log.DEBUG)) { + String log = "Creating TCBCache with rttDamp=%s, rttDevDamp=%s, wdwDamp=%s, "+ + "expire=%d, clean=%d"; + log = String.format(log,_rttDampening,_rttDevDampening,_wdwDampening, + EXPIRE_TIME,CLEAN_TIME); + _log.debug(log); + } } /** @@ -60,14 +84,24 @@ class TCBShare { Entry e = _cache.get(dest); if (e == null || e.isExpired()) return; - if (_log.shouldLog(Log.DEBUG)) + final int rtt, rttDev, wdw; + synchronized(e) { + rtt = e.getRTT(); + rttDev = e.getRTTDev(); + wdw = e.getWindowSize(); + } + if (_log.shouldLog(Log.DEBUG)) { _log.debug("From cache: " + con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) + '-' + dest.calculateHash().toBase64().substring(0, 4) + - " RTT: " + e.getRTT() + " wdw: " + e.getWindowSize()); - opts.setRTT(e.getRTT()); - opts.setWindowSize(e.getWindowSize()); + " RTT: " + rtt + + " RTTDev: "+ rttDev + + " wdw: " + wdw ); + } + opts.setRTT(rtt); + opts.setRTTDev(rttDev); + opts.setWindowSize(wdw); } /** store to cache */ @@ -82,47 +116,61 @@ class TCBShare { return; int old = -1; int oldw = -1; + int oldDev = -1; Entry e = _cache.get(dest); if (e == null || e.isExpired()) { - e = new Entry(opts.getRTT(), opts.getWindowSize()); + e = new Entry(opts.getRTT(), opts.getWindowSize(), opts.getRTTDev()); _cache.put(dest, e); } else { synchronized(e) { old = e.getRTT(); oldw = e.getWindowSize(); + oldDev = e.getRTTDev(); e.setRTT(opts.getRTT()); e.setWindowSize(opts.getWindowSize()); + e.setRTTDev(opts.getRTTDev()); } } - if (_log.shouldLog(Log.DEBUG)) + if (_log.shouldLog(Log.DEBUG)) { _log.debug("To cache: " + con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) + '-' + dest.calculateHash().toBase64().substring(0, 4) + " old: " + old + " con: " + opts.getRTT() + " new: " + e.getRTT() + + " oldDev: " + oldDev + " conDev: " + opts.getRTTDev() + " newDev: " + e.getRTTDev() + " oldw: " + oldw + " conw: " + opts.getWindowSize() + " neww: " + e.getWindowSize()); + } } private class Entry { int _rtt; int _wdw; + int _rttDev; long _updated; - public Entry(int ms, int wdw) { + public Entry(int ms, int wdw, int rttDev) { _rtt = ms; _wdw = wdw; + _rttDev = rttDev; _updated = _context.clock().now(); } public synchronized int getRTT() { return _rtt; } public synchronized void setRTT(int ms) { - _rtt = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*ms); + _rtt = (int)(_rttDampening*_rtt + (1-_rttDampening)*ms); if (_rtt > MAX_RTT) _rtt = MAX_RTT; _updated = _context.clock().now(); } + public synchronized int getRTTDev() { return _rttDev; } + public synchronized void setRTTDev(int count) { + _rttDev = (int)(_rttDevDampening*_rttDev + (1-_rttDevDampening)*count); + if (_rttDev > MAX_RTT_DEV) + _rttDev = MAX_RTT_DEV; + _updated = _context.clock().now(); + } public synchronized int getWindowSize() { return _wdw; } public synchronized void setWindowSize(int wdw) { - _wdw = (int)(0.5 + WDW_DAMPENING*_wdw + (1-WDW_DAMPENING)*wdw); + _wdw = (int)(0.5 + _wdwDampening*_wdw + (1-_wdwDampening)*wdw); if (_wdw > MAX_WINDOW_SIZE) _wdw = MAX_WINDOW_SIZE; _updated = _context.clock().now();