diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 7790cf552..725a0c36b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -51,7 +51,20 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private int _maxTotalConnsPerDay; private int _maxConns; private boolean _disableRejectLog; - + private double _alpha; + private double _beta; + private double _kappa; + + /** state of a connection */ + enum AckInit { + INIT, // just created + FIRST, // first received ack + STEADY + } + + /** LOCKING: this */ + private AckInit _initState = AckInit.INIT; + // NOTE - almost all the options are below, but see // I2PSocketOptions in ministreaming for a few more @@ -65,11 +78,24 @@ class ConnectionOptions extends I2PSocketOptionsImpl { /** on inactivity timeout, send a payload message */ public static final int INACTIVITY_ACTION_SEND = 2; + /* + * These values are specified in RFC 6298 + * Do not change unless you know what you're doing + */ + private static final double TCP_ALPHA = 1.0/8; + private static final double TCP_BETA = 1.0/4; + private static final double TCP_KAPPA = 4; + private static final String PROP_TCP_ALPHA= "i2p.streaming.alpha"; + private static final String PROP_TCP_BETA= "i2p.streaming.beta"; + private static final String PROP_TCP_KAPPA = "i2p.streaming.kappa"; + + private static final String PROP_INITIAL_RTO = "i2p.streaming.initialRTO"; + private static final int INITIAL_RTO = 12000; + public static final String PROP_CONNECT_DELAY = "i2p.streaming.connectDelay"; public static final String PROP_PROFILE = "i2p.streaming.profile"; public static final String PROP_MAX_MESSAGE_SIZE = "i2p.streaming.maxMessageSize"; public static final String PROP_MAX_RESENDS = "i2p.streaming.maxResends"; - public static final String PROP_INITIAL_RTT = "i2p.streaming.initialRTT"; public static final String PROP_INITIAL_RESEND_DELAY = "i2p.streaming.initialResendDelay"; public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay"; public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize"; @@ -295,6 +321,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setMaxWindowSize(opts.getMaxWindowSize()); setConnectDelay(opts.getConnectDelay()); setProfile(opts.getProfile()); + setRTTDev(opts.getRTTDev()); setRTT(opts.getRTT()); setRequireFullySigned(opts.getRequireFullySigned()); setWindowSize(opts.getWindowSize()); @@ -332,7 +359,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE)); - setRTT(getInt(opts, PROP_INITIAL_RTT, DEFAULT_INITIAL_RTT)); setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000)); setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY)); @@ -360,6 +386,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0); _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); _maxConns = getInt(opts, PROP_MAX_STREAMS, 0); + + _alpha = getDouble(opts, PROP_TCP_ALPHA, TCP_ALPHA); + _beta = getDouble(opts, PROP_TCP_BETA, TCP_BETA); + _kappa = getDouble(opts, PROP_TCP_KAPPA, TCP_KAPPA); + + _rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO); } /** @@ -377,8 +409,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); if (opts.containsKey(PROP_MAX_MESSAGE_SIZE)) setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE)); - if (opts.containsKey(PROP_INITIAL_RTT)) - setRTT(getInt(opts, PROP_INITIAL_RTT, DEFAULT_INITIAL_RTT)); if (opts.containsKey(PROP_INITIAL_RECEIVE_WINDOW)) setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); if (opts.containsKey(PROP_INITIAL_RESEND_DELAY)) @@ -427,6 +457,11 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); if (opts.containsKey(PROP_MAX_STREAMS)) _maxConns = getInt(opts, PROP_MAX_STREAMS, 0); + + _alpha = getDouble(opts, PROP_TCP_ALPHA, TCP_ALPHA); + _beta = getDouble(opts, PROP_TCP_BETA, TCP_BETA); + _kappa = getDouble(opts, PROP_TCP_KAPPA, TCP_KAPPA); + _rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO); } /** @@ -517,10 +552,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public int getRTT() { return _rtt; } public void setRTT(int ms) { - if (_rto == 0) { // TODO: move this out - _rttDev = ms / 2; - _rto = ms + ms / 2; - } synchronized (_trend) { _trend[0] = _trend[1]; _trend[1] = _trend[2]; @@ -544,6 +575,36 @@ class ConnectionOptions extends I2PSocketOptionsImpl { /** used in TCB @since 0.9.8 */ void setRTTDev(int rttDev) { _rttDev = rttDev; } + /** + * mark these options as loaded from cache. + * affects the calculation of RTO + */ + synchronized void loadedFromCache() { + _initState = AckInit.STEADY; + } + + /** + * computes RTO based on formula in RFC + */ + synchronized void computeRTO() { + switch(_initState) { + case INIT : + throw new IllegalStateException(); + case FIRST : + _rttDev = _rtt / 2; + _rto = _rtt + _rtt / 2; + break; + case STEADY : + _rto = _rtt + (int) (_rttDev * _kappa); + break; + } + + if (_rto < Connection.MIN_RESEND_DELAY) + _rto = (int)Connection.MIN_RESEND_DELAY; + else if (_rto > Connection.MAX_RESEND_DELAY) + _rto = (int)Connection.MAX_RESEND_DELAY; + } + /** * If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have * 3 consecutive rtt decreases, we are trending downwards (-1), else we're stable. @@ -560,22 +621,21 @@ class ConnectionOptions extends I2PSocketOptionsImpl { } } - /** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */ - /** This is the value specified in RFC 2988, let's try it */ - private static final double RTT_DAMPENING = 0.875; - - public void updateRTT(int measuredValue) { - // the rttDev calculation matches that recommended in RFC 2988 (beta = 1/4) - _rttDev = _rttDev + (int)(0.25d*(Math.abs(measuredValue-_rtt)-_rttDev)); - int smoothed = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*measuredValue); - // K = 4 - _rto = smoothed + (_rttDev<<2); - if (_rto < Connection.MIN_RESEND_DELAY) - _rto = (int)Connection.MIN_RESEND_DELAY; - else if (_rto > Connection.MAX_RESEND_DELAY) - _rto = (int)Connection.MAX_RESEND_DELAY; - - setRTT(smoothed); + public synchronized void updateRTT(int measuredValue) { + switch(_initState) { + case INIT: + _initState = AckInit.FIRST; + setRTT(measuredValue); // no smoothing first sample + break; + case FIRST: + _initState = AckInit.STEADY; + case STEADY: + // calculation matches that recommended in RFC 6298 + _rttDev = (int) ((1-_beta) *_rttDev + _beta * Math.abs(measuredValue-_rtt)); + int smoothed = (int)((1-_alpha)*_rtt + _alpha*measuredValue); + setRTT(smoothed); + } + computeRTO(); } /** How long after sending a packet will we wait before resending? diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index 8fdd114bd..74f38b10f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -99,9 +99,11 @@ class TCBShare { " RTTDev: "+ rttDev + " wdw: " + wdw ); } + opts.loadedFromCache(); opts.setRTT(rtt); opts.setRTTDev(rttDev); opts.setWindowSize(wdw); + opts.computeRTO(); } /** store to cache */ diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index a411b0099..3668b214a 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -21,7 +21,7 @@ public class RouterVersion { public final static long BUILD = 0; /** for example "-test" */ - public final static String EXTRA = ""; + public final static String EXTRA = "-979"; public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA; public static void main(String args[]) { System.out.println("I2P Router version: " + FULL_VERSION);