diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index abeddc56571456bd96cb299e673c7ea0ef25c13a..b3c84442ea923ba7d4954c4db8ea1953b5bce56a 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -295,7 +295,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { buf.setLength(0); ok = DataHelper.readLine(in, buf); if (!ok) throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]"); - if ( (buf.length() <= 1) && ( (buf.charAt(0) == '\n') || (buf.charAt(0) == '\r') ) ) { + if ( (buf.length() == 0) || + ((buf.charAt(0) == '\n') || (buf.charAt(0) == '\r')) ) { // end of headers reached return headers; } else { diff --git a/core/java/src/net/i2p/util/NativeBigInteger.java b/core/java/src/net/i2p/util/NativeBigInteger.java index 20b5244b2318518a77e36fc6d635c036c680de45..8e5a45845bc9edcd956c07a3175cb20d7e2aefce 100644 --- a/core/java/src/net/i2p/util/NativeBigInteger.java +++ b/core/java/src/net/i2p/util/NativeBigInteger.java @@ -105,13 +105,14 @@ public class NativeBigInteger extends BigInteger { private final static String JBIGI_OPTIMIZATION_PENTIUM4 = "pentium4"; private static final boolean _isWin = System.getProperty("os.name").startsWith("Win"); + private static final boolean _isOS2 = System.getProperty("os.name").startsWith("OS/2"); private static final boolean _isMac = System.getProperty("os.name").startsWith("Mac"); private static final boolean _isLinux = System.getProperty("os.name").toLowerCase().indexOf("linux") != -1; private static final boolean _isFreebsd = System.getProperty("os.name").toLowerCase().indexOf("freebsd") != -1; - private static final boolean _isNix = !(_isWin || _isMac); + private static final boolean _isNix = !(_isWin || _isMac || _isOS2); /* libjbigi.so vs jbigi.dll */ - private static final String _libPrefix = (_isWin ? "" : "lib"); - private static final String _libSuffix = (_isWin ? ".dll" : _isMac ? ".jnilib" : ".so"); + private static final String _libPrefix = (_isWin || _isOS2 ? "" : "lib"); + private static final String _libSuffix = (_isWin || _isOS2 ? ".dll" : _isMac ? ".jnilib" : ".so"); private final static String sCPUType; //The CPU Type to optimize for (one of the above strings) @@ -254,7 +255,8 @@ public class NativeBigInteger extends BigInteger { */ public static void main(String args[]) { runModPowTest(100); - runDoubleValueTest(100); + // i2p doesn't care about the double values + //runDoubleValueTest(100); } /* the sample numbers are elG generator/prime so we can test with reasonable numbers */ @@ -440,8 +442,10 @@ public class NativeBigInteger extends BigInteger { if (_doLog && !_nativeOk) System.err.println("INFO: Native BigInteger library jbigi not loaded - using pure java"); }catch(Exception e){ - if (_doLog) + if (_doLog) { System.err.println("INFO: Native BigInteger library jbigi not loaded, reason: '"+e.getMessage()+"' - using pure java"); + e.printStackTrace(); + } } } @@ -489,7 +493,8 @@ public class NativeBigInteger extends BigInteger { } private static final boolean loadFromResource(String resourceName) { if (resourceName == null) return false; - URL resource = NativeBigInteger.class.getClassLoader().getResource(resourceName); + //URL resource = NativeBigInteger.class.getClassLoader().getResource(resourceName); + URL resource = ClassLoader.getSystemResource(resourceName); if (resource == null) { if (_doLog) System.err.println("NOTICE: Resource name [" + resourceName + "] was not found"); @@ -561,6 +566,8 @@ public class NativeBigInteger extends BigInteger { return "jbigi-freebsd"+sAppend; // The convention on freebsd... if(_isMac) return "jbigi-osx"+sAppend; + if(_isOS2) + return "jbigi-os2"+sAppend; throw new RuntimeException("Dont know jbigi library name for os type '"+System.getProperty("os.name")+"'"); } } diff --git a/history.txt b/history.txt index 8a456113b841e1f16c3aa4e8a4c41a26a7acd767..4b43e3ceb068b3d7f12080f7e6ff04bc42f6909b 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,28 @@ -$Id: history.txt,v 1.376 2005/12/31 18:40:23 jrandom Exp $ +$Id: history.txt,v 1.377 2006/01/01 12:23:29 jrandom Exp $ + +2006-01-04 jrandom + * Rather than profile individual tunnels for throughput over their + lifetime, do so at 1 minute intervals (allowing less frequently active + tunnels to be more fairly measured). + * Run the live tunnel load test across two tunnels at a time, by default. + The load test runs for a random period from 90s to the tunnel lifetime, + self paced. This should help gathering data for profiling peers that + are in exploratory tunnels. + +2006-01-03 jrandom + * Calculate the overall peer throughput across the 3 fastest one minute + tunnel throughput values, rather than the single fastest throughput. + * Degrade the profiled throughput data over time (cutting the profiled + peaks in half once a day, on average) + * Enable yet another new speed calculation for profiling peers, using the + peak throughput from individual tunnels that a peer is participating in, + rather than across all tunnels they are participating in. This helps + gather a fairer peer throughput measurement, since it won't allow a slow + high capacity peer seem to have a higher throughput (pushing a little + data across many tunnels at once, as opposed to lots of data across a + single tunnel). This degrades over time like the other. + * Add basic OS/2 support to the jbigi code (though we do not bundle a + precompiled OS/2 library) 2006-01-01 jrandom * Disable multifile torrent creation in I2PSnark's web UI for the moment diff --git a/router/java/src/net/i2p/router/LoadTestManager.java b/router/java/src/net/i2p/router/LoadTestManager.java index 75316e4202c89e607cde662929ead3f6f9560fc0..fa858bfa65469c95ba2fd9ad82ac6558dd347d3a 100644 --- a/router/java/src/net/i2p/router/LoadTestManager.java +++ b/router/java/src/net/i2p/router/LoadTestManager.java @@ -5,6 +5,7 @@ import java.util.*; import net.i2p.util.*; import net.i2p.data.*; import net.i2p.data.i2np.*; +import net.i2p.router.TunnelInfo; import net.i2p.router.message.*; import net.i2p.router.tunnel.*; import net.i2p.router.tunnel.pool.*; @@ -80,8 +81,8 @@ public class LoadTestManager { } } - /** 10 peers at a time */ - private static final int CONCURRENT_PEERS = 0; + /** 1 peer at a time */ + private static final int CONCURRENT_PEERS = 1; /** 4 messages per peer at a time */ private static final int CONCURRENT_MESSAGES = 4; @@ -360,6 +361,11 @@ public class LoadTestManager { _context.statManager().addRateData("test.rtt", period, count); if (period > 2000) _context.statManager().addRateData("test.rttHigh", period, count); + TunnelInfo info = _cfg.getOutbound(); + if (info != null) + info.incrementVerifiedBytesTransferred(5*1024); + // the inbound tunnel is incremented by the tunnel management system itself, + // so don't double count it here return true; } return false; @@ -390,7 +396,7 @@ public class LoadTestManager { } else { int hop = 0; TunnelInfo info = tunnel.getOutbound(); - for (int i = 0; (info != null) && (i < info.getLength()-1); i++) { + for (int i = 0; (info != null) && (i < info.getLength()); i++) { Hash peer = info.getPeer(i); if ( (peer != null) && (peer.equals(_context.routerHash())) ) continue; @@ -409,7 +415,7 @@ public class LoadTestManager { hop++; } info = tunnel.getInbound(); - for (int i = 0; (info != null) && (i < info.getLength()-1); i++) { + for (int i = 0; (info != null) && (i < info.getLength()); i++) { Hash peer = info.getPeer(i); if ( (peer != null) && (peer.equals(_context.routerHash())) ) continue; @@ -585,14 +591,16 @@ public class LoadTestManager { private boolean wantToTest(LoadTestTunnelConfig cfg) { // wait 10 minutes before testing anything if (_context.router().getUptime() <= 10*60*1000) return false; + if (bandwidthOverloaded()) return false; if (TEST_LIVE_TUNNELS && _active.size() < getConcurrency()) { // length == #hops+1 (as it includes the creator) if (cfg.getLength() < 2) return false; // only load test the client tunnels - if (cfg.getTunnel().getDestination() == null) - return false; + // XXX why? + ////if (cfg.getTunnel().getDestination() == null) + //// return false; _active.add(cfg); return true; } else { @@ -600,6 +608,20 @@ public class LoadTestManager { } } + private boolean bandwidthOverloaded() { + int msgLoadBps = CONCURRENT_MESSAGES + * 5 // message size + / 10; // 10 seconds before timeout & retransmission + msgLoadBps *= 2; // buffer + if (_context.bandwidthLimiter().getSendBps()/1024d + (double)msgLoadBps >= _context.bandwidthLimiter().getOutboundKBytesPerSecond()) + return true; + if (_context.bandwidthLimiter().getReceiveBps()/1024d + (double)msgLoadBps >= _context.bandwidthLimiter().getInboundKBytesPerSecond()) + return true; + if (_context.throttle().getMessageDelay() > 1000) + return true; + return false; + } + private class CreatedJob extends JobImpl { private LoadTestTunnelConfig _cfg; public CreatedJob(RouterContext ctx, LoadTestTunnelConfig cfg) { @@ -619,6 +641,9 @@ public class LoadTestManager { runTest(_cfg); } } + private long TEST_PERIOD_MAX = 10*60*1000; + private long TEST_PERIOD_MIN = 90*1000; + private class Expire extends JobImpl { private LoadTestTunnelConfig _cfg; private boolean _removeFromDispatcher; @@ -629,13 +654,22 @@ public class LoadTestManager { super(ctx); _cfg = cfg; _removeFromDispatcher = removeFromDispatcher; - getTiming().setStartAfter(cfg.getExpiration()+60*1000); + long duration = ctx.random().nextLong(TEST_PERIOD_MAX); + if (duration < TEST_PERIOD_MIN) + duration += TEST_PERIOD_MIN; + long expiration = duration + ctx.clock().now(); + if (expiration > cfg.getExpiration()+60*1000) + expiration = cfg.getExpiration()+60*1000; + getTiming().setStartAfter(expiration); } public String getName() { return "expire test tunnel"; } public void runJob() { if (_removeFromDispatcher) getContext().tunnelDispatcher().remove(_cfg.getTunnel()); _cfg.logComplete(); + TunnelInfo info = _cfg.getOutbound(); + if (info != null) + info.incrementVerifiedBytesTransferred(0); // just to wrap up the test data _active.remove(_cfg); } } diff --git a/router/java/src/net/i2p/router/ProfileManager.java b/router/java/src/net/i2p/router/ProfileManager.java index 0c8348d602480ceb09eb948f121a0b8daf46d41a..f4914218406109abd4b3b424b8eafbda5aef9415 100644 --- a/router/java/src/net/i2p/router/ProfileManager.java +++ b/router/java/src/net/i2p/router/ProfileManager.java @@ -68,6 +68,18 @@ public interface ProfileManager { */ void tunnelDataPushed(Hash peer, long rtt, int size); + /** + * Note that the peer is participating in a tunnel that pushed the given amount of data + * over the last minute. + */ + void tunnelDataPushed1m(Hash peer, int size); + + /** + * Note that we were able to push the given amount of data through a tunnel + * that the peer is participating in + */ + void tunnelLifetimePushed(Hash peer, long lifetime, long size); + /** * Note that the peer participated in a tunnel that failed. Its failure may not have * been the peer's fault however. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 835342fe26c1530fab6baf588d12af150ae6374b..78f51c017c7a5c58f02f75fb32a34f2afd5bd58e 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.323 $ $Date: 2005/12/31 18:40:22 $"; + public final static String ID = "$Revision: 1.324 $ $Date: 2006/01/01 12:23:29 $"; public final static String VERSION = "0.6.1.8"; - public final static long BUILD = 7; + public final static long BUILD = 8; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/TunnelInfo.java b/router/java/src/net/i2p/router/TunnelInfo.java index bd427b46e090fb6c596056cfb7c0394a33fe973c..8303c3fbbfc9ebb69f47e8cade8cc62d6fae536f 100644 --- a/router/java/src/net/i2p/router/TunnelInfo.java +++ b/router/java/src/net/i2p/router/TunnelInfo.java @@ -48,4 +48,9 @@ public interface TunnelInfo { public void testSuccessful(int responseTime); public long getProcessedMessagesCount(); + + /** we know for sure that this many bytes travelled through the tunnel in its lifetime */ + public long getVerifiedBytesTransferred(); + /** we know for sure that the given number of bytes were sent down the tunnel fully */ + public void incrementVerifiedBytesTransferred(int numBytes); } diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 7a700656a8aff892a317c639faae5593c3ec6cfa..97e1dcd113be2e9deb1ee0bbeaa0ac7350f95756 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -575,6 +575,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { getContext().profileManager().tunnelTestSucceeded(_outTunnel.getPeer(i), sendTime); getContext().profileManager().tunnelDataPushed(_outTunnel.getPeer(i), sendTime, size); } + _outTunnel.incrementVerifiedBytesTransferred(size); } if (_inTunnel != null) for (int i = 0; i < _inTunnel.getLength(); i++) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 36c27ee0dea8d22c7484523ef39d1349e71a279f..e2466b56010fce0a07ca64dcb242ea1a66f52aea 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -376,7 +376,8 @@ class SearchJob extends JobImpl { + msg.getReplyTunnel() + "]"); SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); - SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); + SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, + this, outTunnel, inTunnel); getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout); getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, router.getIdentity().getHash()); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchUpdateReplyFoundJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchUpdateReplyFoundJob.java index af5027326d2fba9efe4dbd3552afa233a333403a..fb128ecfa1acea776e9bd4993a5f01904ee918b6 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchUpdateReplyFoundJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchUpdateReplyFoundJob.java @@ -10,6 +10,7 @@ import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.JobImpl; import net.i2p.router.ReplyJob; import net.i2p.router.RouterContext; +import net.i2p.router.TunnelInfo; import net.i2p.util.Log; /** @@ -23,16 +24,27 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob { private SearchState _state; private KademliaNetworkDatabaseFacade _facade; private SearchJob _job; + private TunnelInfo _outTunnel; + private TunnelInfo _replyTunnel; + private long _sentOn; public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer, SearchState state, KademliaNetworkDatabaseFacade facade, SearchJob job) { + this(context, peer, state, facade, job, null, null); + } + public SearchUpdateReplyFoundJob(RouterContext context, RouterInfo peer, + SearchState state, KademliaNetworkDatabaseFacade facade, + SearchJob job, TunnelInfo outTunnel, TunnelInfo replyTunnel) { super(context); _log = context.logManager().getLog(SearchUpdateReplyFoundJob.class); _peer = peer.getIdentity().getHash(); _state = state; _facade = facade; _job = job; + _outTunnel = outTunnel; + _replyTunnel = replyTunnel; + _sentOn = System.currentTimeMillis(); } public String getName() { return "Update Reply Found for Kademlia Search"; } @@ -42,6 +54,21 @@ class SearchUpdateReplyFoundJob extends JobImpl implements ReplyJob { _log.info(getJobId() + ": Reply from " + _peer.toBase64() + " with message " + message.getClass().getName()); + long howLong = System.currentTimeMillis() - _sentOn; + // assume requests are 1KB (they're almost always much smaller, but tunnels have a fixed size) + int msgSize = 1024; + + if (_replyTunnel != null) { + for (int i = 0; i < _replyTunnel.getLength(); i++) + getContext().profileManager().tunnelDataPushed(_replyTunnel.getPeer(i), howLong, msgSize); + _replyTunnel.incrementVerifiedBytesTransferred(msgSize); + } + if (_outTunnel != null) { + for (int i = 0; i < _outTunnel.getLength(); i++) + getContext().profileManager().tunnelDataPushed(_outTunnel.getPeer(i), howLong, msgSize); + _outTunnel.incrementVerifiedBytesTransferred(msgSize); + } + if (message instanceof DatabaseStoreMessage) { long timeToReply = _state.dataFound(_peer); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index 6fc9f7d5cfbe071a86bd30cec7f7c98aaadbca88..b204884817aeb8e946948b98f35cceb42c717eb2 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -363,6 +363,7 @@ class StoreJob extends JobImpl { _log.warn("sent a " + _msgSize + "byte netDb message through tunnel " + _sendThrough + " after " + howLong); for (int i = 0; i < _sendThrough.getLength(); i++) getContext().profileManager().tunnelDataPushed(_sendThrough.getPeer(i), howLong, _msgSize); + _sendThrough.incrementVerifiedBytesTransferred(_msgSize); } if (_state.getCompleteCount() >= getRedundancy()) { diff --git a/router/java/src/net/i2p/router/peermanager/PeerProfile.java b/router/java/src/net/i2p/router/peermanager/PeerProfile.java index 8a743a27b9416793678a22cee3f4dd0c023abf9b..7c2705becfa6c5749ad58f3d85ecc84aa84f21cf 100644 --- a/router/java/src/net/i2p/router/peermanager/PeerProfile.java +++ b/router/java/src/net/i2p/router/peermanager/PeerProfile.java @@ -234,13 +234,95 @@ public class PeerProfile { + " to " + _tunnelTestResponseTimeAvg + " via " + ms); } - /** bytes per minute */ - private volatile double _peakThroughput; + /** keep track of the fastest 3 throughputs */ + private static final int THROUGHPUT_COUNT = 3; + /** + * fastest 1 minute throughput, in bytes per minute, ordered with fastest + * first. this is not synchronized, as we don't *need* perfection, and we only + * reorder/insert values on coallesce + */ + private final double _peakThroughput[] = new double[THROUGHPUT_COUNT]; private volatile long _peakThroughputCurrentTotal; - public double getPeakThroughputKBps() { return _peakThroughput / (60d*1024d); } - public void setPeakThroughputKBps(double kBps) { _peakThroughput = kBps*60; } + public double getPeakThroughputKBps() { + double rv = 0; + for (int i = 0; i < THROUGHPUT_COUNT; i++) + rv += _peakThroughput[i]; + rv /= (60d*1024d*(double)THROUGHPUT_COUNT); + return rv; + } + public void setPeakThroughputKBps(double kBps) { + _peakThroughput[0] = kBps*60*1024; + //for (int i = 0; i < THROUGHPUT_COUNT; i++) + // _peakThroughput[i] = kBps*60; + } void dataPushed(int size) { _peakThroughputCurrentTotal += size; } + private final double _peakTunnelThroughput[] = new double[THROUGHPUT_COUNT]; + /** the tunnel pushed that much data in its lifetime */ + void tunnelDataTransferred(long tunnelByteLifetime) { + double lowPeak = _peakTunnelThroughput[THROUGHPUT_COUNT-1]; + if (tunnelByteLifetime > lowPeak) { + synchronized (_peakTunnelThroughput) { + for (int i = 0; i < THROUGHPUT_COUNT; i++) { + if (tunnelByteLifetime > _peakTunnelThroughput[i]) { + for (int j = THROUGHPUT_COUNT-1; j > i; j--) + _peakTunnelThroughput[j] = _peakTunnelThroughput[j-1]; + _peakTunnelThroughput[i] = tunnelByteLifetime; + break; + } + } + } + } + } + public double getPeakTunnelThroughputKBps() { + double rv = 0; + for (int i = 0; i < THROUGHPUT_COUNT; i++) + rv += _peakTunnelThroughput[i]; + rv /= (10d*60d*1024d*(double)THROUGHPUT_COUNT); + return rv; + } + public void setPeakTunnelThroughputKBps(double kBps) { + _peakTunnelThroughput[0] = kBps*60d*10d*1024d; + } + + /** total number of bytes pushed through a single tunnel in a 1 minute period */ + private final double _peakTunnel1mThroughput[] = new double[THROUGHPUT_COUNT]; + /** the tunnel pushed that much data in a 1 minute period */ + void dataPushed1m(int size) { + double lowPeak = _peakTunnel1mThroughput[THROUGHPUT_COUNT-1]; + if (size > lowPeak) { + synchronized (_peakTunnel1mThroughput) { + for (int i = 0; i < THROUGHPUT_COUNT; i++) { + if (size > _peakTunnel1mThroughput[i]) { + for (int j = THROUGHPUT_COUNT-1; j > i; j--) + _peakTunnel1mThroughput[j] = _peakTunnel1mThroughput[j-1]; + _peakTunnel1mThroughput[i] = size; + break; + } + } + } + + if (_log.shouldLog(Log.WARN) ) { + StringBuffer buf = new StringBuffer(128); + buf.append("Updating 1m throughput after ").append(size).append(" to "); + for (int i = 0; i < THROUGHPUT_COUNT; i++) + buf.append(_peakTunnel1mThroughput[i]).append(','); + buf.append(" for ").append(_peer.toBase64()); + _log.warn(buf.toString()); + } + } + } + public double getPeakTunnel1mThroughputKBps() { + double rv = 0; + for (int i = 0; i < THROUGHPUT_COUNT; i++) + rv += _peakTunnel1mThroughput[i]; + rv /= (60d*1024d*(double)THROUGHPUT_COUNT); + return rv; + } + public void setPeakTunnel1mThroughputKBps(double kBps) { + _peakTunnel1mThroughput[0] = kBps*60*1024; + } + /** * when the given peer is performing so poorly that we don't want to bother keeping * extensive stats on them, call this to discard excess data points. Specifically, @@ -307,19 +389,67 @@ public class PeerProfile { _dbIntroduction.setStatLog(_context.statManager().getStatLog()); _expanded = true; } - + /** once a day, on average, cut the measured throughtput values in half */ + private static final long DROP_PERIOD_MINUTES = 24*60; private long _lastCoalesceDate = System.currentTimeMillis(); private void coalesceThroughput() { long now = System.currentTimeMillis(); long measuredPeriod = now - _lastCoalesceDate; if (measuredPeriod >= 60*1000) { long tot = _peakThroughputCurrentTotal; - double peak = _peakThroughput; - if (tot >= peak) - _peakThroughput = tot; + double lowPeak = _peakThroughput[THROUGHPUT_COUNT-1]; + if (tot > lowPeak) { + for (int i = 0; i < THROUGHPUT_COUNT; i++) { + if (tot > _peakThroughput[i]) { + for (int j = THROUGHPUT_COUNT-1; j > i; j--) + _peakThroughput[j] = _peakThroughput[j-1]; + _peakThroughput[i] = tot; + break; + } + } + + if (false && _log.shouldLog(Log.WARN) ) { + StringBuffer buf = new StringBuffer(128); + buf.append("Updating throughput after ").append(tot).append(" to "); + for (int i = 0; i < THROUGHPUT_COUNT; i++) + buf.append(_peakThroughput[i]).append(','); + buf.append(" for ").append(_peer.toBase64()); + _log.warn(buf.toString()); + } + } else { + if (_context.random().nextLong(DROP_PERIOD_MINUTES*2) <= 0) { + for (int i = 0; i < THROUGHPUT_COUNT; i++) + _peakThroughput[i] /= 2; + + if (false && _log.shouldLog(Log.WARN) ) { + StringBuffer buf = new StringBuffer(128); + buf.append("Degrading the throughput measurements to "); + for (int i = 0; i < THROUGHPUT_COUNT; i++) + buf.append(_peakThroughput[i]).append(','); + buf.append(" for ").append(_peer.toBase64()); + _log.warn(buf.toString()); + } + } + } + + // we degrade the tunnel throughput here too, regardless of the current + // activity + if (_context.random().nextLong(DROP_PERIOD_MINUTES*2) <= 0) { + for (int i = 0; i < THROUGHPUT_COUNT; i++) { + _peakTunnelThroughput[i] /= 2; + _peakTunnel1mThroughput[i] /= 2; + } + + if (_log.shouldLog(Log.WARN) ) { + StringBuffer buf = new StringBuffer(128); + buf.append("Degrading the tunnel throughput measurements to "); + for (int i = 0; i < THROUGHPUT_COUNT; i++) + buf.append(_peakTunnel1mThroughput[i]).append(','); + buf.append(" for ").append(_peer.toBase64()); + _log.warn(buf.toString()); + } + } _peakThroughputCurrentTotal = 0; - if ( (tot > 0) && _log.shouldLog(Log.WARN) ) - _log.warn("updating throughput after " + tot + " to " + (_peakThroughput/60d) + " for " + _peer.toBase64()); _lastCoalesceDate = now; } } diff --git a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java index 58952b6c2955979ffdfa74e4f480faf605e94a0a..e82c965905ab645c3c7318f761a5019ccd338c33 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java @@ -124,6 +124,23 @@ public class ProfileManagerImpl implements ProfileManager { if (data != null) data.dataPushed(size); // ignore rtt, as we are averaging over a minute } + public void tunnelDataPushed1m(Hash peer, int size) { + if (_context.routerHash().equals(peer)) + return; + PeerProfile data = getProfile(peer); + if (data != null) + data.dataPushed1m(size); + } + + + public void tunnelLifetimePushed(Hash peer, long lifetime, long size) { + if (_context.routerHash().equals(peer)) + return; + PeerProfile data = getProfile(peer); + if (data != null) + data.tunnelDataTransferred(size); + } + private int getSlowThreshold() { // perhaps we should have this compare vs. tunnel.testSuccessTime? diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizerRenderer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizerRenderer.java index a3565f770d9e4b73cbf5f3759d9707d1911d0332..09563bd3d1d12839715003814dda9a76ef7fac44 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizerRenderer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizerRenderer.java @@ -124,7 +124,7 @@ class ProfileOrganizerRenderer { } buf.append("</table>"); buf.append("<i>Definitions:<ul>"); - buf.append("<li><b>speed</b>: peak throughput (bytes per second) over a 1 minute period that the peer has sustained</li>"); + buf.append("<li><b>speed</b>: peak throughput (bytes per second) over a 1 minute period that the peer has sustained in a single tunnel</li>"); buf.append("<li><b>capacity</b>: how many tunnels can we ask them to join in an hour?</li>"); buf.append("<li><b>integration</b>: how many new peers have they told us about lately?</li>"); buf.append("<li><b>failing?</b>: is the peer currently swamped (and if possible we should avoid nagging them)?</li>"); diff --git a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java index d5e4b3b16bd124ae72b3e9caab1d3b6e8bda1893..b5305d6401f7853ba4890ecdff5dd3201861c429 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java +++ b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java @@ -123,6 +123,8 @@ class ProfilePersistenceHelper { buf.append("# moving average as to how fast the peer replies").append(NL); buf.append("tunnelTestTimeAverage=").append(profile.getTunnelTestTimeAverage()).append(NL); buf.append("tunnelPeakThroughput=").append(profile.getPeakThroughputKBps()).append(NL); + buf.append("tunnelPeakTunnelThroughput=").append(profile.getPeakTunnelThroughputKBps()).append(NL); + buf.append("tunnelPeakTunnel1mThroughput=").append(profile.getPeakTunnel1mThroughputKBps()).append(NL); buf.append(NL); out.write(buf.toString().getBytes()); @@ -209,6 +211,8 @@ class ProfilePersistenceHelper { profile.setLastHeardFrom(getLong(props, "lastHeardFrom")); profile.setTunnelTestTimeAverage(getDouble(props, "tunnelTestTimeAverage")); profile.setPeakThroughputKBps(getDouble(props, "tunnelPeakThroughput")); + profile.setPeakTunnelThroughputKBps(getDouble(props, "tunnelPeakTunnelThroughput")); + profile.setPeakTunnel1mThroughputKBps(getDouble(props, "tunnelPeakTunnel1mThroughput")); profile.getTunnelHistory().load(props); profile.getDBHistory().load(props); diff --git a/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java b/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java index 926fc40d6d2ff9d5ae3324a4bf141c209f6e8668..c2d139dc4bb23e72b156402e200425fb3419bf94 100644 --- a/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java +++ b/router/java/src/net/i2p/router/peermanager/SpeedCalculator.java @@ -38,8 +38,14 @@ public class SpeedCalculator extends Calculator { } public double calc(PeerProfile profile) { - if (true) return profile.getPeakThroughputKBps()*1024d; - if (true) return calcAverage(profile); + if (true) // measures 1 minute throughput of individual tunnels + return profile.getPeakTunnel1mThroughputKBps()*1024d; + if (true) // measures throughput of individual tunnels + return profile.getPeakTunnelThroughputKBps()*1024d; + if (true) // measures throughput across all tunnels + return profile.getPeakThroughputKBps()*1024d; + if (true) + return calcAverage(profile); long threshold = getEventThreshold(); boolean tunnelTestOnly = getUseTunnelTestOnly(); diff --git a/router/java/src/net/i2p/router/tunnel/InboundEndpointProcessor.java b/router/java/src/net/i2p/router/tunnel/InboundEndpointProcessor.java index 7199dd556b765c9aa0e4648915e410d462fd04a9..c0bd298ae6f1c331033effac42a7436375547d7b 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundEndpointProcessor.java +++ b/router/java/src/net/i2p/router/tunnel/InboundEndpointProcessor.java @@ -83,6 +83,7 @@ public class InboundEndpointProcessor { _log.debug("Received a " + length + "byte message through tunnel " + _config); for (int i = 0; i < _config.getLength(); i++) ctx.profileManager().tunnelDataPushed(_config.getPeer(i), rtt, length); + _config.incrementVerifiedBytesTransferred(length); } return true; diff --git a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java index 39135a5e4166b23a0b7e453eb35f23662a02e0f0..d5ff4bbca43069c62d6c8561b5d1f55a16700a39 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java @@ -9,6 +9,7 @@ import net.i2p.data.Base64; import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.router.TunnelInfo; +import net.i2p.router.RouterContext; /** * Coordinate the info that the tunnel creator keeps track of, including what @@ -16,6 +17,7 @@ import net.i2p.router.TunnelInfo; * */ public class TunnelCreatorConfig implements TunnelInfo { + protected RouterContext _context; /** only necessary for client tunnels */ private Hash _destination; /** gateway first */ @@ -25,11 +27,13 @@ public class TunnelCreatorConfig implements TunnelInfo { private long _expiration; private boolean _isInbound; private long _messagesProcessed; + private volatile long _verifiedBytesTransferred; - public TunnelCreatorConfig(int length, boolean isInbound) { - this(length, isInbound, null); + public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound) { + this(ctx, length, isInbound, null); } - public TunnelCreatorConfig(int length, boolean isInbound, Hash destination) { + public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) { + _context = ctx; if (length <= 0) throw new IllegalArgumentException("0 length? 0 hop tunnels are 1 length!"); _config = new HopConfig[length]; @@ -40,6 +44,7 @@ public class TunnelCreatorConfig implements TunnelInfo { _isInbound = isInbound; _destination = destination; _messagesProcessed = 0; + _verifiedBytesTransferred = 0; } /** how many hops are there in the tunnel? */ @@ -83,6 +88,45 @@ public class TunnelCreatorConfig implements TunnelInfo { /** take note of a message being pumped through this tunnel */ public void incrementProcessedMessages() { _messagesProcessed++; } public long getProcessedMessagesCount() { return _messagesProcessed; } + + public void incrementVerifiedBytesTransferred(int bytes) { + _verifiedBytesTransferred += bytes; + _peakThroughputCurrentTotal += bytes; + long now = System.currentTimeMillis(); + long timeSince = now - _peakThroughputLastCoallesce; + if (timeSince >= 60*1000) { + long tot = _peakThroughputCurrentTotal; + double normalized = (double)tot * 60d*1000d / (double)timeSince; + _peakThroughputLastCoallesce = now; + _peakThroughputCurrentTotal = 0; + for (int i = 0; i < _peers.length; i++) + _context.profileManager().tunnelDataPushed1m(_peers[i], (int)normalized); + } + } + public long getVerifiedBytesTransferred() { return _verifiedBytesTransferred; } + + private static final int THROUGHPUT_COUNT = 3; + /** + * fastest 1 minute throughput, in bytes per minute, ordered with fastest + * first. + */ + private final double _peakThroughput[] = new double[THROUGHPUT_COUNT]; + private volatile long _peakThroughputCurrentTotal; + private volatile long _peakThroughputLastCoallesce = System.currentTimeMillis(); + public double getPeakThroughputKBps() { + double rv = 0; + for (int i = 0; i < THROUGHPUT_COUNT; i++) + rv += _peakThroughput[i]; + rv /= (60d*1024d*(double)THROUGHPUT_COUNT); + return rv; + } + public void setPeakThroughputKBps(double kBps) { + _peakThroughput[0] = kBps*60*1024; + //for (int i = 0; i < THROUGHPUT_COUNT; i++) + // _peakThroughput[i] = kBps*60; + } + + public String toString() { // H0:1235-->H1:2345-->H2:2345 diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index 133bfbbc5a59c399b3261a9b9207829ab2492af3..0dad70c92edbad23cac0d882bee89e207d791609 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -84,6 +84,8 @@ public class TunnelParticipant { + " for " + msg); _config.incrementProcessedMessages(); send(_config, msg, ri); + if (_config != null) + incrementThroughput(_config.getReceiveFrom()); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Lookup the nextHop (" + _config.getSendTo().toBase64().substring(0,4) @@ -98,6 +100,30 @@ public class TunnelParticipant { } } + private int _periodMessagesTransferred; + private long _lastCoallesced = System.currentTimeMillis(); + /** + * take note that the peers specified were able to push us data. hmm, is this safe? + * this could be easily gamed to get us to rank some peer of their choosing as quite + * fast. That peer would have to actually be quite fast, but having a remote peer + * influence who we spend our time profiling is dangerous, so this will be disabled for + * now. + */ + private void incrementThroughput(Hash prev) { + if (true) return; + long now = System.currentTimeMillis(); + long timeSince = now - _lastCoallesced; + if (timeSince >= 60*1000) { + int amount = 1024 * _periodMessagesTransferred; + int normalized = (int)((double)amount * 60d*1000d / (double)timeSince); + _periodMessagesTransferred = 0; + _lastCoallesced = now; + _context.profileManager().tunnelDataPushed1m(prev, normalized); + } else { + _periodMessagesTransferred++; + } + } + public int getCompleteCount() { if (_handler != null) return _handler.getCompleteCount(); diff --git a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java index f22e88200febbd361c2548bc892db1da0cd1d45e..0cd5457d349dd06059ee10cce6690eeef395e4ed 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/pool/PooledTunnelCreatorConfig.java @@ -10,7 +10,6 @@ import net.i2p.router.tunnel.TunnelCreatorConfig; * */ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { - private RouterContext _context; private TunnelPool _pool; private boolean _failed; private TestJob _testJob; @@ -23,8 +22,7 @@ public class PooledTunnelCreatorConfig extends TunnelCreatorConfig { this(ctx, length, isInbound, null); } public PooledTunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) { - super(length, isInbound, destination); - _context = ctx; + super(ctx, length, isInbound, destination); _failed = false; _pool = null; } diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index e6cb4ff0032d14d89bc774092e3f11b1c6614490..f676c2e5de2583a49d4cbfada0798ea3f733788f 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -339,6 +339,11 @@ public class TunnelPool { _lifetimeProcessed += info.getProcessedMessagesCount(); + long lifetimeConfirmed = info.getVerifiedBytesTransferred(); + long lifetime = 10*60*1000; + for (int i = 0; i < info.getLength(); i++) + _context.profileManager().tunnelLifetimePushed(info.getPeer(i), lifetime, lifetimeConfirmed); + if (_settings.isInbound() && (_settings.getDestination() != null) ) { if (ls != null) { _context.clientManager().requestLeaseSet(_settings.getDestination(), ls);