multihop load tests (testing everyone with a fixed set of fast other peers).

not for general purpose use, so you can probably safely ignore this code, but its
useful for testing
This commit is contained in:
jrandom
2005-12-13 21:32:50 +00:00
committed by zzz
parent 77c818a0b1
commit 6e5114a4c2

View File

@@ -65,7 +65,10 @@ public class LoadTestManager {
}
}
/** 10 peers at a time */
private static final int CONCURRENT_PEERS = 10;
/** 4 messages per peer at a time */
private static final int CONCURRENT_MESSAGES = 4;
public void runTest() {
if ( (_untestedPeers == null) || (_untestedPeers.size() <= 0) ) {
@@ -92,14 +95,31 @@ public class LoadTestManager {
return rv;
}
private int getPeerMessages() {
int rv = CONCURRENT_MESSAGES;
try {
rv = Integer.parseInt(_context.getProperty("router.loadTestMessagesPerPeer", CONCURRENT_MESSAGES+""));
} catch (NumberFormatException nfe) {
rv = CONCURRENT_MESSAGES;
}
if (rv < 1)
rv = 1;
if (rv > 50)
rv = 50;
return rv;
}
/**
* Actually send the messages through the given tunnel
*/
private void runTest(LoadTestTunnelConfig tunnel) {
log(tunnel, "start");
sendTestMessage(tunnel, 0);
int peerMessages = getPeerMessages();
for (int i = 0; i < peerMessages; i++)
sendTestMessage(tunnel);
}
private void sendTestMessage(LoadTestTunnelConfig tunnel, long count) {
private void sendTestMessage(LoadTestTunnelConfig tunnel) {
if (_context.clock().now() > tunnel.getExpiration())
return;
RouterInfo target = _context.netDb().lookupRouterInfoLocally(tunnel.getPeer(0));
@@ -117,11 +137,11 @@ public class LoadTestManager {
OutNetMessage om = new OutNetMessage(_context);
om.setMessage(tm);
SendAgain failed = new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), false, count+1);
SendAgain failed = new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), false);
om.setOnFailedReplyJob(failed);
om.setOnReplyJob(new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), true, count+1));
om.setOnReplyJob(new SendAgain(_context, tunnel, payloadMessage.getUniqueId(), true));
//om.setOnFailedSendJob(failed);
om.setReplySelector(new Selector(tunnel, payloadMessage.getUniqueId(), count+1));
om.setReplySelector(new Selector(tunnel, payloadMessage.getUniqueId()));
om.setTarget(target);
om.setExpiration(tm.getMessageExpiration());
om.setPriority(40);
@@ -160,15 +180,13 @@ public class LoadTestManager {
private LoadTestTunnelConfig _cfg;
private long _messageId;
private boolean _ok;
private long _count;
private boolean _run;
private long _dontStartUntil;
public SendAgain(RouterContext ctx, LoadTestTunnelConfig cfg, long messageId, boolean ok, long count) {
public SendAgain(RouterContext ctx, LoadTestTunnelConfig cfg, long messageId, boolean ok) {
super(ctx);
_cfg = cfg;
_messageId = messageId;
_ok = ok;
_count = count;
_run = false;
_dontStartUntil = ctx.clock().now() + 10*1000;
}
@@ -176,10 +194,10 @@ public class LoadTestManager {
public void runJob() {
if (!_ok) {
if (!_run) {
log(_cfg, _messageId + " " + _count + " TIMEOUT");
log(_cfg, _messageId + " " + _cfg.getFullMessageCount() + " TIMEOUT");
getContext().statManager().addRateData("test.timeoutAfter", _cfg.getFullMessageCount(), 0);
if (getContext().clock().now() >= _dontStartUntil) {
sendTestMessage(_cfg, (_ok ? _count : _count-1));
sendTestMessage(_cfg);
_cfg.incrementFailed();
} else {
getTiming().setStartAfter(_dontStartUntil);
@@ -188,7 +206,7 @@ public class LoadTestManager {
}
_run = true;
} else {
sendTestMessage(_cfg, (_ok ? _count : _count-1));
sendTestMessage(_cfg);
}
}
@@ -198,22 +216,21 @@ public class LoadTestManager {
private class Selector implements MessageSelector {
private LoadTestTunnelConfig _cfg;
private long _messageId;
private long _count;
public Selector(LoadTestTunnelConfig cfg, long messageId, long count) {
public Selector(LoadTestTunnelConfig cfg, long messageId) {
_cfg = cfg;
_messageId = messageId;
_count = count;
}
public boolean continueMatching() { return false; }
public long getExpiration() { return _cfg.getExpiration(); }
public boolean isMatch(I2NPMessage message) {
if (message.getUniqueId() == _messageId) {
long count = _cfg.getFullMessageCount();
_cfg.incrementFull();
long period = _context.clock().now() - (message.getMessageExpiration() - 10*1000);
log(_cfg, _messageId + " " + _count + " after " + period);
_context.statManager().addRateData("test.rtt", period, _count);
log(_cfg, _messageId + " " + count + " after " + period);
_context.statManager().addRateData("test.rtt", period, count);
if (period > 2000)
_context.statManager().addRateData("test.rttHigh", period, _count);
_context.statManager().addRateData("test.rttHigh", period, count);
return true;
}
return false;
@@ -222,19 +239,23 @@ public class LoadTestManager {
private void log(LoadTestTunnelConfig tunnel, String msg) {
StringBuffer buf = new StringBuffer(128);
Hash peer = tunnel.getPeer(0);
if (peer != null)
buf.append(peer.toBase64());
else
buf.append("[unknown_peer]");
buf.append(" ");
TunnelId id = tunnel.getReceiveTunnelId(0);
if (id != null)
buf.append(id.getTunnelId());
else
buf.append("[unknown_tunnel]");
buf.append(" ");
buf.append(_context.clock().now()).append(" ").append(msg).append("\n");
for (int i = 0; i < tunnel.getLength()-1; i++) {
Hash peer = tunnel.getPeer(i);
if ( (peer != null) && (peer.equals(_context.routerHash())) )
continue;
else if (peer != null)
buf.append(peer.toBase64());
else
buf.append("[unknown_peer]");
buf.append(" ");
TunnelId id = tunnel.getReceiveTunnelId(i);
if (id != null)
buf.append(id.getTunnelId());
else
buf.append("[unknown_tunnel]");
buf.append(" ");
buf.append(_context.clock().now()).append(" hop ").append(i).append(" ").append(msg).append("\n");
}
try {
synchronized (_out) {
_out.write(buf.toString());
@@ -244,7 +265,19 @@ public class LoadTestManager {
}
}
private boolean getBuildOneHop() {
return Boolean.valueOf(_context.getProperty("router.loadTestOneHop", "false")).booleanValue();
}
private void buildTestTunnel(Hash peer) {
if (getBuildOneHop()) {
buildOneHop(peer);
} else {
buildLonger(peer);
}
}
private void buildOneHop(Hash peer) {
long expiration = _context.clock().now() + 10*60*1000;
LoadTestTunnelConfig cfg = new LoadTestTunnelConfig(_context, 2, true);
@@ -272,6 +305,78 @@ public class LoadTestManager {
_context.jobQueue().addJob(req);
}
private Hash pickFastPeer(Hash skipPeer) {
String peers = _context.getProperty("router.loadTestFastPeers");
if (peers != null) {
StringTokenizer tok = new StringTokenizer(peers.trim(), ", \t");
List peerList = new ArrayList();
while (tok.hasMoreTokens()) {
String str = tok.nextToken();
try {
Hash h = new Hash();
h.fromBase64(str);
peerList.add(h);
} catch (DataFormatException dfe) {
// ignore
}
}
Collections.shuffle(peerList);
while (peerList.size() > 0) {
Hash cur = (Hash)peerList.remove(0);
if (!cur.equals(skipPeer))
return cur;
}
}
return null;
}
private void buildLonger(Hash peer) {
long expiration = _context.clock().now() + 10*60*1000;
LoadTestTunnelConfig cfg = new LoadTestTunnelConfig(_context, 3, true);
// cfg.getPeer() is ordered gateway first
cfg.setPeer(0, peer);
HopConfig hop = cfg.getConfig(0);
hop.setExpiration(expiration);
hop.setIVKey(_context.keyGenerator().generateSessionKey());
hop.setLayerKey(_context.keyGenerator().generateSessionKey());
// now lets put in a fast peer
Hash fastPeer = pickFastPeer(peer);
if (fastPeer == null) {
if (_log.shouldLog(Log.INFO))
_log.info("Unable to pick a fast peer for the load test of " + peer.toBase64());
buildOneHop(peer);
return;
} else if (fastPeer.equals(peer)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Can't test the peer with themselves, going one hop for " + peer.toBase64());
buildOneHop(peer);
return;
}
cfg.setPeer(1, fastPeer);
hop = cfg.getConfig(1);
hop.setExpiration(expiration);
hop.setIVKey(_context.keyGenerator().generateSessionKey());
hop.setLayerKey(_context.keyGenerator().generateSessionKey());
// now for ourselves
cfg.setPeer(2, _context.routerHash());
hop = cfg.getConfig(2);
hop.setExpiration(expiration);
hop.setIVKey(_context.keyGenerator().generateSessionKey());
hop.setLayerKey(_context.keyGenerator().generateSessionKey());
cfg.setExpiration(expiration);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Config for " + peer.toBase64() + " with fastPeer: " + fastPeer.toBase64() + ": " + cfg);
CreatedJob onCreated = new CreatedJob(_context, cfg);
FailedJob fail = new FailedJob(_context, cfg);
RequestTunnelJob req = new RequestTunnelJob(_context, cfg, onCreated, fail, cfg.getLength()-1, false, true);
_context.jobQueue().addJob(req);
}
private class CreatedJob extends JobImpl {
private LoadTestTunnelConfig _cfg;
public CreatedJob(RouterContext ctx, LoadTestTunnelConfig cfg) {