I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 8937c4bf authored by zab2's avatar zab2
Browse files

Store stdev of rtt in the TCB cache as per RFC 2140

parent 2902a708
No related branches found
No related tags found
No related merge requests found
...@@ -93,6 +93,20 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { ...@@ -93,6 +93,20 @@ class I2PSocketOptionsImpl implements I2PSocketOptions {
} }
} }
protected static double getDouble(Properties opts, String name, double defaultVal) {
if (opts == null) return defaultVal;
String val = opts.getProperty(name);
if (val == null) {
return defaultVal;
} else {
try {
return Double.parseDouble(val);
} catch (NumberFormatException nfe) {
return defaultVal;
}
}
}
/** /**
* How long we will wait for the ACK from a SYN, in milliseconds. * How long we will wait for the ACK from a SYN, in milliseconds.
* *
......
...@@ -517,7 +517,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { ...@@ -517,7 +517,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
*/ */
public int getRTT() { return _rtt; } public int getRTT() { return _rtt; }
public void setRTT(int ms) { public void setRTT(int ms) {
if (_rto == 0) { if (_rto == 0) { // TODO: move this out
_rttDev = ms / 2; _rttDev = ms / 2;
_rto = ms + ms / 2; _rto = ms + ms / 2;
} }
...@@ -539,8 +539,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { ...@@ -539,8 +539,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
public int getRTO() { return _rto; } public int getRTO() { return _rto; }
/** for debugging @since 0.7.13 */ /** used in TCB @since 0.9.8 */
int getRTTDev() { return _rttDev; } int getRTTDev() { return _rttDev; }
/** used in TCB @since 0.9.8 */
void setRTTDev(int rttDev) { _rttDev = rttDev; }
/** /**
* If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have * If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have
......
...@@ -9,6 +9,8 @@ import net.i2p.data.Destination; ...@@ -9,6 +9,8 @@ import net.i2p.data.Destination;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2; import net.i2p.util.SimpleTimer2;
import static net.i2p.client.streaming.I2PSocketOptionsImpl.getDouble;
/** /**
* Share important TCP Control Block parameters across Connections * Share important TCP Control Block parameters across Connections
* to the same remote peer. * to the same remote peer.
...@@ -25,20 +27,42 @@ class TCBShare { ...@@ -25,20 +27,42 @@ class TCBShare {
private final Log _log; private final Log _log;
private final Map<Destination, Entry> _cache; private final Map<Destination, Entry> _cache;
private final CleanEvent _cleaner; private final CleanEvent _cleaner;
private final double _rttDampening, _wdwDampening, _rttDevDampening;
private static final long EXPIRE_TIME = 30*60*1000; private static final long EXPIRE_TIME = 30*60*1000;
private static final long CLEAN_TIME = 10*60*1000; private static final long CLEAN_TIME = 10*60*1000;
///// constants defined in rfc 2140
///// do not change unless you know what you're doing
private static final double RTT_DAMPENING = 0.75; private static final double RTT_DAMPENING = 0.75;
private static final double RTTDEV_DAMPENING = 0.75;
private static final double WDW_DAMPENING = 0.75; private static final double WDW_DAMPENING = 0.75;
private static final String RTT_DAMP_PROP="i2p.streaming.tcbcache.rttDampening";
private static final String WDW_DAMP_PROP="i2p.streaming.tcbcache.wdwDampening";
private static final String RTTDEV_DAMP_PROP="i2p.streaming.tcbcache.rttDampening";
/////
private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2; private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW; private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) { public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(TCBShare.class); _log = ctx.logManager().getLog(TCBShare.class);
_rttDampening = getDouble(ctx.getProperties(), RTT_DAMP_PROP, RTT_DAMPENING);
_wdwDampening = getDouble(ctx.getProperties(), WDW_DAMP_PROP, WDW_DAMPENING);
_rttDevDampening = getDouble(ctx.getProperties(), RTTDEV_DAMP_PROP, RTTDEV_DAMPENING);
_cache = new ConcurrentHashMap<Destination,Entry>(4); _cache = new ConcurrentHashMap<Destination,Entry>(4);
_cleaner = new CleanEvent(timer); _cleaner = new CleanEvent(timer);
_cleaner.schedule(CLEAN_TIME); _cleaner.schedule(CLEAN_TIME);
if (_log.shouldLog(Log.DEBUG)) {
String log = "Creating TCBCache with rttDamp=%s, rttDevDamp=%s, wdwDamp=%s, "+
"expire=%d, clean=%d";
log = String.format(log,_rttDampening,_rttDevDampening,_wdwDampening,
EXPIRE_TIME,CLEAN_TIME);
_log.debug(log);
}
} }
/** /**
...@@ -60,14 +84,24 @@ class TCBShare { ...@@ -60,14 +84,24 @@ class TCBShare {
Entry e = _cache.get(dest); Entry e = _cache.get(dest);
if (e == null || e.isExpired()) if (e == null || e.isExpired())
return; return;
if (_log.shouldLog(Log.DEBUG)) final int rtt, rttDev, wdw;
synchronized(e) {
rtt = e.getRTT();
rttDev = e.getRTTDev();
wdw = e.getWindowSize();
}
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("From cache: " + _log.debug("From cache: " +
con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) + con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
'-' + '-' +
dest.calculateHash().toBase64().substring(0, 4) + dest.calculateHash().toBase64().substring(0, 4) +
" RTT: " + e.getRTT() + " wdw: " + e.getWindowSize()); " RTT: " + rtt +
opts.setRTT(e.getRTT()); " RTTDev: "+ rttDev +
opts.setWindowSize(e.getWindowSize()); " wdw: " + wdw );
}
opts.setRTT(rtt);
opts.setRTTDev(rttDev);
opts.setWindowSize(wdw);
} }
/** store to cache */ /** store to cache */
...@@ -82,47 +116,61 @@ class TCBShare { ...@@ -82,47 +116,61 @@ class TCBShare {
return; return;
int old = -1; int old = -1;
int oldw = -1; int oldw = -1;
int oldDev = -1;
Entry e = _cache.get(dest); Entry e = _cache.get(dest);
if (e == null || e.isExpired()) { if (e == null || e.isExpired()) {
e = new Entry(opts.getRTT(), opts.getWindowSize()); e = new Entry(opts.getRTT(), opts.getWindowSize(), opts.getRTTDev());
_cache.put(dest, e); _cache.put(dest, e);
} else { } else {
synchronized(e) { synchronized(e) {
old = e.getRTT(); old = e.getRTT();
oldw = e.getWindowSize(); oldw = e.getWindowSize();
oldDev = e.getRTTDev();
e.setRTT(opts.getRTT()); e.setRTT(opts.getRTT());
e.setWindowSize(opts.getWindowSize()); e.setWindowSize(opts.getWindowSize());
e.setRTTDev(opts.getRTTDev());
} }
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG)) {
_log.debug("To cache: " + _log.debug("To cache: " +
con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) + con.getSession().getMyDestination().calculateHash().toBase64().substring(0, 4) +
'-' + '-' +
dest.calculateHash().toBase64().substring(0, 4) + dest.calculateHash().toBase64().substring(0, 4) +
" old: " + old + " con: " + opts.getRTT() + " new: " + e.getRTT() + " old: " + old + " con: " + opts.getRTT() + " new: " + e.getRTT() +
" oldDev: " + oldDev + " conDev: " + opts.getRTTDev() + " newDev: " + e.getRTTDev() +
" oldw: " + oldw + " conw: " + opts.getWindowSize() + " neww: " + e.getWindowSize()); " oldw: " + oldw + " conw: " + opts.getWindowSize() + " neww: " + e.getWindowSize());
}
} }
private class Entry { private class Entry {
int _rtt; int _rtt;
int _wdw; int _wdw;
int _rttDev;
long _updated; long _updated;
public Entry(int ms, int wdw) { public Entry(int ms, int wdw, int rttDev) {
_rtt = ms; _rtt = ms;
_wdw = wdw; _wdw = wdw;
_rttDev = rttDev;
_updated = _context.clock().now(); _updated = _context.clock().now();
} }
public synchronized int getRTT() { return _rtt; } public synchronized int getRTT() { return _rtt; }
public synchronized void setRTT(int ms) { public synchronized void setRTT(int ms) {
_rtt = (int)(RTT_DAMPENING*_rtt + (1-RTT_DAMPENING)*ms); _rtt = (int)(_rttDampening*_rtt + (1-_rttDampening)*ms);
if (_rtt > MAX_RTT) if (_rtt > MAX_RTT)
_rtt = MAX_RTT; _rtt = MAX_RTT;
_updated = _context.clock().now(); _updated = _context.clock().now();
} }
public synchronized int getRTTDev() { return _rttDev; }
public synchronized void setRTTDev(int count) {
_rttDev = (int)(_rttDevDampening*_rttDev + (1-_rttDevDampening)*count);
if (_rttDev > MAX_RTT_DEV)
_rttDev = MAX_RTT_DEV;
_updated = _context.clock().now();
}
public synchronized int getWindowSize() { return _wdw; } public synchronized int getWindowSize() { return _wdw; }
public synchronized void setWindowSize(int wdw) { public synchronized void setWindowSize(int wdw) {
_wdw = (int)(0.5 + WDW_DAMPENING*_wdw + (1-WDW_DAMPENING)*wdw); _wdw = (int)(0.5 + _wdwDampening*_wdw + (1-_wdwDampening)*wdw);
if (_wdw > MAX_WINDOW_SIZE) if (_wdw > MAX_WINDOW_SIZE)
_wdw = MAX_WINDOW_SIZE; _wdw = MAX_WINDOW_SIZE;
_updated = _context.clock().now(); _updated = _context.clock().now();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment