From 6e5114a4c232e998c2704601068a7485569315bf Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Tue, 13 Dec 2005 21:32:50 +0000 Subject: [PATCH] multihop load tests (testing everyone with a fixed set of fast other peers). not for general purpose use, so you can probably safely ignore this code, but its useful for testing --- .../src/net/i2p/router/LoadTestManager.java | 165 ++++++++++++++---- 1 file changed, 135 insertions(+), 30 deletions(-) diff --git a/router/java/src/net/i2p/router/LoadTestManager.java b/router/java/src/net/i2p/router/LoadTestManager.java index aa8378ab7f..e31751de04 100644 --- a/router/java/src/net/i2p/router/LoadTestManager.java +++ b/router/java/src/net/i2p/router/LoadTestManager.java @@ -65,7 +65,10 @@ public class LoadTestManager { } } + /** 10 peers at a time */ private static final int CONCURRENT_PEERS = 10; + /** 4 messages per peer at a time */ + private static final int CONCURRENT_MESSAGES = 4; public void runTest() { if ( (_untestedPeers == null) || (_untestedPeers.size() <= 0) ) { @@ -92,14 +95,31 @@ public class LoadTestManager { return rv; } + private int getPeerMessages() { + int rv = CONCURRENT_MESSAGES; + try { + rv = Integer.parseInt(_context.getProperty("router.loadTestMessagesPerPeer", CONCURRENT_MESSAGES+"")); + } catch (NumberFormatException nfe) { + rv = CONCURRENT_MESSAGES; + } + if (rv < 1) + rv = 1; + if (rv > 50) + rv = 50; + return rv; + } + /** * Actually send the messages through the given tunnel */ private void runTest(LoadTestTunnelConfig tunnel) { log(tunnel, "start"); - sendTestMessage(tunnel, 0); + int peerMessages = getPeerMessages(); + for (int i = 0; i < peerMessages; i++) + sendTestMessage(tunnel); } - private void sendTestMessage(LoadTestTunnelConfig tunnel, long count) { + + private void sendTestMessage(LoadTestTunnelConfig tunnel) { if (_context.clock().now() > tunnel.getExpiration()) return; RouterInfo target = _context.netDb().lookupRouterInfoLocally(tunnel.getPeer(0)); @@ -117,11 +137,11 @@ public class LoadTestManager { OutNetMessage om = new OutNetMessage(_context); om.setMessage(tm); - SendAgain failed = new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), false, count+1); + SendAgain failed = new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), false); om.setOnFailedReplyJob(failed); - om.setOnReplyJob(new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), true, count+1)); + om.setOnReplyJob(new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), true)); //om.setOnFailedSendJob(failed); - om.setReplySelector(new Selector(tunnel, payloadMessage.getUniqueId(), count+1)); + om.setReplySelector(new Selector(tunnel, payloadMessage.getUniqueId())); om.setTarget(target); om.setExpiration(tm.getMessageExpiration()); om.setPriority(40); @@ -160,15 +180,13 @@ public class LoadTestManager { private LoadTestTunnelConfig _cfg; private long _messageId; private boolean _ok; - private long _count; private boolean _run; private long _dontStartUntil; - public SendAgain(RouterContext ctx, LoadTestTunnelConfig cfg, long messageId, boolean ok, long count) { + public SendAgain(RouterContext ctx, LoadTestTunnelConfig cfg, long messageId, boolean ok) { super(ctx); _cfg = cfg; _messageId = messageId; _ok = ok; - _count = count; _run = false; _dontStartUntil = ctx.clock().now() + 10*1000; } @@ -176,10 +194,10 @@ public class LoadTestManager { public void runJob() { if (!_ok) { if (!_run) { - log(_cfg, _messageId + " " + _count + " TIMEOUT"); + log(_cfg, _messageId + " " + _cfg.getFullMessageCount() + " TIMEOUT"); getContext().statManager().addRateData("test.timeoutAfter", _cfg.getFullMessageCount(), 0); if (getContext().clock().now() >= _dontStartUntil) { - sendTestMessage(_cfg, (_ok ? _count : _count-1)); + sendTestMessage(_cfg); _cfg.incrementFailed(); } else { getTiming().setStartAfter(_dontStartUntil); @@ -188,7 +206,7 @@ public class LoadTestManager { } _run = true; } else { - sendTestMessage(_cfg, (_ok ? _count : _count-1)); + sendTestMessage(_cfg); } } @@ -198,22 +216,21 @@ public class LoadTestManager { private class Selector implements MessageSelector { private LoadTestTunnelConfig _cfg; private long _messageId; - private long _count; - public Selector(LoadTestTunnelConfig cfg, long messageId, long count) { + public Selector(LoadTestTunnelConfig cfg, long messageId) { _cfg = cfg; _messageId = messageId; - _count = count; } public boolean continueMatching() { return false; } public long getExpiration() { return _cfg.getExpiration(); } public boolean isMatch(I2NPMessage message) { if (message.getUniqueId() == _messageId) { + long count = _cfg.getFullMessageCount(); _cfg.incrementFull(); long period = _context.clock().now() - (message.getMessageExpiration() - 10*1000); - log(_cfg, _messageId + " " + _count + " after " + period); - _context.statManager().addRateData("test.rtt", period, _count); + log(_cfg, _messageId + " " + count + " after " + period); + _context.statManager().addRateData("test.rtt", period, count); if (period > 2000) - _context.statManager().addRateData("test.rttHigh", period, _count); + _context.statManager().addRateData("test.rttHigh", period, count); return true; } return false; @@ -222,19 +239,23 @@ public class LoadTestManager { private void log(LoadTestTunnelConfig tunnel, String msg) { StringBuffer buf = new StringBuffer(128); - Hash peer = tunnel.getPeer(0); - if (peer != null) - buf.append(peer.toBase64()); - else - buf.append("[unknown_peer]"); - buf.append(" "); - TunnelId id = tunnel.getReceiveTunnelId(0); - if (id != null) - buf.append(id.getTunnelId()); - else - buf.append("[unknown_tunnel]"); - buf.append(" "); - buf.append(_context.clock().now()).append(" ").append(msg).append("\n"); + for (int i = 0; i < tunnel.getLength()-1; i++) { + Hash peer = tunnel.getPeer(i); + if ( (peer != null) && (peer.equals(_context.routerHash())) ) + continue; + else if (peer != null) + buf.append(peer.toBase64()); + else + buf.append("[unknown_peer]"); + buf.append(" "); + TunnelId id = tunnel.getReceiveTunnelId(i); + if (id != null) + buf.append(id.getTunnelId()); + else + buf.append("[unknown_tunnel]"); + buf.append(" "); + buf.append(_context.clock().now()).append(" hop ").append(i).append(" ").append(msg).append("\n"); + } try { synchronized (_out) { _out.write(buf.toString()); @@ -244,7 +265,19 @@ public class LoadTestManager { } } + + private boolean getBuildOneHop() { + return Boolean.valueOf(_context.getProperty("router.loadTestOneHop", "false")).booleanValue(); + } + private void buildTestTunnel(Hash peer) { + if (getBuildOneHop()) { + buildOneHop(peer); + } else { + buildLonger(peer); + } + } + private void buildOneHop(Hash peer) { long expiration = _context.clock().now() + 10*60*1000; LoadTestTunnelConfig cfg = new LoadTestTunnelConfig(_context, 2, true); @@ -272,6 +305,78 @@ public class LoadTestManager { _context.jobQueue().addJob(req); } + private Hash pickFastPeer(Hash skipPeer) { + String peers = _context.getProperty("router.loadTestFastPeers"); + if (peers != null) { + StringTokenizer tok = new StringTokenizer(peers.trim(), ", \t"); + List peerList = new ArrayList(); + while (tok.hasMoreTokens()) { + String str = tok.nextToken(); + try { + Hash h = new Hash(); + h.fromBase64(str); + peerList.add(h); + } catch (DataFormatException dfe) { + // ignore + } + } + Collections.shuffle(peerList); + while (peerList.size() > 0) { + Hash cur = (Hash)peerList.remove(0); + if (!cur.equals(skipPeer)) + return cur; + } + } + return null; + } + + private void buildLonger(Hash peer) { + long expiration = _context.clock().now() + 10*60*1000; + + LoadTestTunnelConfig cfg = new LoadTestTunnelConfig(_context, 3, true); + // cfg.getPeer() is ordered gateway first + cfg.setPeer(0, peer); + HopConfig hop = cfg.getConfig(0); + hop.setExpiration(expiration); + hop.setIVKey(_context.keyGenerator().generateSessionKey()); + hop.setLayerKey(_context.keyGenerator().generateSessionKey()); + // now lets put in a fast peer + Hash fastPeer = pickFastPeer(peer); + if (fastPeer == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Unable to pick a fast peer for the load test of " + peer.toBase64()); + buildOneHop(peer); + return; + } else if (fastPeer.equals(peer)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Can't test the peer with themselves, going one hop for " + peer.toBase64()); + buildOneHop(peer); + return; + } + cfg.setPeer(1, fastPeer); + hop = cfg.getConfig(1); + hop.setExpiration(expiration); + hop.setIVKey(_context.keyGenerator().generateSessionKey()); + hop.setLayerKey(_context.keyGenerator().generateSessionKey()); + // now for ourselves + cfg.setPeer(2, _context.routerHash()); + hop = cfg.getConfig(2); + hop.setExpiration(expiration); + hop.setIVKey(_context.keyGenerator().generateSessionKey()); + hop.setLayerKey(_context.keyGenerator().generateSessionKey()); + + cfg.setExpiration(expiration); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Config for " + peer.toBase64() + " with fastPeer: " + fastPeer.toBase64() + ": " + cfg); + + CreatedJob onCreated = new CreatedJob(_context, cfg); + FailedJob fail = new FailedJob(_context, cfg); + RequestTunnelJob req = new RequestTunnelJob(_context, cfg, onCreated, fail, cfg.getLength()-1, false, true); + _context.jobQueue().addJob(req); + } + + private class CreatedJob extends JobImpl { private LoadTestTunnelConfig _cfg; public CreatedJob(RouterContext ctx, LoadTestTunnelConfig cfg) { -- GitLab