Implement RTT update on first ack

This commit is contained in:
zab2
2013-07-17 21:13:19 +00:00
parent 8937c4bf2a
commit 171f0d2671
3 changed files with 88 additions and 26 deletions

View File

@@ -51,7 +51,20 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private int _maxTotalConnsPerDay;
private int _maxConns;
private boolean _disableRejectLog;
private double _alpha;
private double _beta;
private double _kappa;
/** state of a connection */
enum AckInit {
INIT, // just created
FIRST, // first received ack
STEADY
}
/** LOCKING: this */
private AckInit _initState = AckInit.INIT;
// NOTE - almost all the options are below, but see
// I2PSocketOptions in ministreaming for a few more
@@ -65,11 +78,24 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
/** on inactivity timeout, send a payload message */
public static final int INACTIVITY_ACTION_SEND = 2;
/*
* These values are specified in RFC 6298
* Do not change unless you know what you're doing
*/
private static final double TCP_ALPHA = 1.0/8;
private static final double TCP_BETA = 1.0/4;
private static final double TCP_KAPPA = 4;
private static final String PROP_TCP_ALPHA= "i2p.streaming.alpha";
private static final String PROP_TCP_BETA= "i2p.streaming.beta";
private static final String PROP_TCP_KAPPA = "i2p.streaming.kappa";
private static final String PROP_INITIAL_RTO = "i2p.streaming.initialRTO";
private static final int INITIAL_RTO = 12000;
public static final String PROP_CONNECT_DELAY = "i2p.streaming.connectDelay";
public static final String PROP_PROFILE = "i2p.streaming.profile";
public static final String PROP_MAX_MESSAGE_SIZE = "i2p.streaming.maxMessageSize";
public static final String PROP_MAX_RESENDS = "i2p.streaming.maxResends";
public static final String PROP_INITIAL_RTT = "i2p.streaming.initialRTT";
public static final String PROP_INITIAL_RESEND_DELAY = "i2p.streaming.initialResendDelay";
public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay";
public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize";
@@ -295,6 +321,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setMaxWindowSize(opts.getMaxWindowSize());
setConnectDelay(opts.getConnectDelay());
setProfile(opts.getProfile());
setRTTDev(opts.getRTTDev());
setRTT(opts.getRTT());
setRequireFullySigned(opts.getRequireFullySigned());
setWindowSize(opts.getWindowSize());
@@ -332,7 +359,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1));
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, DEFAULT_MAX_MESSAGE_SIZE));
setRTT(getInt(opts, PROP_INITIAL_RTT, DEFAULT_INITIAL_RTT));
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
@@ -360,6 +386,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
_alpha = getDouble(opts, PROP_TCP_ALPHA, TCP_ALPHA);
_beta = getDouble(opts, PROP_TCP_BETA, TCP_BETA);
_kappa = getDouble(opts, PROP_TCP_KAPPA, TCP_KAPPA);
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);
}
/**
@@ -377,8 +409,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK));
if (opts.containsKey(PROP_MAX_MESSAGE_SIZE))
setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE));
if (opts.containsKey(PROP_INITIAL_RTT))
setRTT(getInt(opts, PROP_INITIAL_RTT, DEFAULT_INITIAL_RTT));
if (opts.containsKey(PROP_INITIAL_RECEIVE_WINDOW))
setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
if (opts.containsKey(PROP_INITIAL_RESEND_DELAY))
@@ -427,6 +457,11 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
_maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
if (opts.containsKey(PROP_MAX_STREAMS))
_maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
_alpha = getDouble(opts, PROP_TCP_ALPHA, TCP_ALPHA);
_beta = getDouble(opts, PROP_TCP_BETA, TCP_BETA);
_kappa = getDouble(opts, PROP_TCP_KAPPA, TCP_KAPPA);
_rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO);
}
/**
@@ -517,10 +552,6 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
*/
public int getRTT() { return _rtt; }
public void setRTT(int ms) {
if (_rto == 0) { // TODO: move this out
_rttDev = ms / 2;
_rto = ms + ms / 2;
}
synchronized (_trend) {
_trend[0] = _trend[1];
_trend[1] = _trend[2];
@@ -544,6 +575,36 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
/** used in TCB @since 0.9.8 */
void setRTTDev(int rttDev) { _rttDev = rttDev; }
/**
* mark these options as loaded from cache.
* affects the calculation of RTO
*/
synchronized void loadedFromCache() {
_initState = AckInit.STEADY;
}
/**
* computes RTO based on formula in RFC
*/
synchronized void computeRTO() {
switch(_initState) {
case INIT :
throw new IllegalStateException();
case FIRST :
_rttDev = _rtt / 2;
_rto = _rtt + _rtt / 2;
break;
case STEADY :
_rto = _rtt + (int) (_rttDev * _kappa);
break;
}
if (_rto < Connection.MIN_RESEND_DELAY)
_rto = (int)Connection.MIN_RESEND_DELAY;
else if (_rto > Connection.MAX_RESEND_DELAY)
_rto = (int)Connection.MAX_RESEND_DELAY;
}
/**
* If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have
* 3 consecutive rtt decreases, we are trending downwards (-1), else we're stable.
@@ -560,22 +621,21 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
}
}
/** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */
/** This is the value specified in RFC 2988, let's try it */
private static final double RTT_DAMPENING = 0.875;
public void updateRTT(int measuredValue) {
// the rttDev calculation matches that recommended in RFC 2988 (beta = 1/4)
_rttDev = _rttDev + (int)(0.25d*(Math.abs(measuredValue-_rtt)-_rttDev));
int smoothed = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*measuredValue);
// K = 4
_rto = smoothed + (_rttDev<<2);
if (_rto < Connection.MIN_RESEND_DELAY)
_rto = (int)Connection.MIN_RESEND_DELAY;
else if (_rto > Connection.MAX_RESEND_DELAY)
_rto = (int)Connection.MAX_RESEND_DELAY;
setRTT(smoothed);
public synchronized void updateRTT(int measuredValue) {
switch(_initState) {
case INIT:
_initState = AckInit.FIRST;
setRTT(measuredValue); // no smoothing first sample
break;
case FIRST:
_initState = AckInit.STEADY;
case STEADY:
// calculation matches that recommended in RFC 6298
_rttDev = (int) ((1-_beta) *_rttDev + _beta * Math.abs(measuredValue-_rtt));
int smoothed = (int)((1-_alpha)*_rtt + _alpha*measuredValue);
setRTT(smoothed);
}
computeRTO();
}
/** How long after sending a packet will we wait before resending?

View File

@@ -99,9 +99,11 @@ class TCBShare {
" RTTDev: "+ rttDev +
" wdw: " + wdw );
}
opts.loadedFromCache();
opts.setRTT(rtt);
opts.setRTTDev(rttDev);
opts.setWindowSize(wdw);
opts.computeRTO();
}
/** store to cache */

View File

@@ -21,7 +21,7 @@ public class RouterVersion {
public final static long BUILD = 0;
/** for example "-test" */
public final static String EXTRA = "";
public final static String EXTRA = "-979";
public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA;
public static void main(String args[]) {
System.out.println("I2P Router version: " + FULL_VERSION);