From ed8eced9dd999a0a3ca028778939e2168209f14b Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Fri, 19 Nov 2004 00:00:05 +0000 Subject: [PATCH] * stats if you want to get this data and you're running outside the router, just toss on a "-Dstat.logFilters=* -Dstat.logFile=proxy.stats" to the java command line. the resulting file can be parsed w/ java -cp lib/i2p.jar net.i2p.stat.StatLogSplitter proxy.stats, then fed into gnuplot or whatever --- .../net/i2p/client/streaming/Connection.java | 23 +++++++++++++++++- .../client/streaming/ConnectionManager.java | 24 +++++++++++++++++++ .../streaming/ConnectionPacketHandler.java | 15 ++++++++++++ .../i2p/client/streaming/MessageHandler.java | 3 +++ .../net/i2p/client/streaming/PacketLocal.java | 1 + .../net/i2p/client/streaming/PacketQueue.java | 15 +++++++++++- 6 files changed, 79 insertions(+), 2 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index fcd388369e..656b3be16e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -64,6 +64,11 @@ public class Connection { /** how many messages have been resent and not yet ACKed? */ private int _activeResends; + private long _lifetimeBytesSent; + private long _lifetimeBytesReceived; + private long _lifetimeDupMessageSent; + private long _lifetimeDupMessageReceived; + public static final long MAX_RESEND_DELAY = 60*1000; public static final long MIN_RESEND_DELAY = 20*1000; @@ -106,6 +111,7 @@ public class Connection { _ackSinceCongestion = true; _connectLock = new Object(); _activeResends = 0; + _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); } public long getNextOutboundPacketNum() { @@ -454,10 +460,24 @@ public class Connection { public String getConnectionError() { return _connectionError; } public void setConnectionError(String err) { _connectionError = err; } - public long getLifetime() { return _context.clock().now() - _createdOn; } + public long getLifetime() { + if (_closeSentOn <= 0) + return _context.clock().now() - _createdOn; + else + return _closeSentOn - _createdOn; + } public ConnectionPacketHandler getPacketHandler() { return _handler; } + public long getLifetimeBytesSent() { return _lifetimeBytesSent; } + public long getLifetimeBytesReceived() { return _lifetimeBytesReceived; } + public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent; } + public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived; } + public void incrementBytesSent(int bytes) { _lifetimeBytesSent += bytes; } + public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent += msgs; } + public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived += bytes; } + public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived += msgs; } + /** * Time when the scheduler next want to send a packet, or -1 if * never. This should be set when we want to send on timeout, for @@ -720,6 +740,7 @@ public class Connection { // shrink the window int newWindowSize = getOptions().getWindowSize(); congestionOccurred(); + _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime()); newWindowSize /= 2; if (newWindowSize <= 0) newWindowSize = 1; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 92b302337b..0ef882f1b6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -57,6 +57,15 @@ public class ConnectionManager { _allowIncoming = false; _maxConcurrentStreams = maxConcurrent; _numWaiting = 0; + _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); } Connection getConnectionByInboundId(byte[] id) { @@ -140,6 +149,8 @@ public class ConnectionManager { } return null; } + + _context.statManager().addRateData("stream.connectionReceived", 1, 0); return con; } @@ -207,6 +218,8 @@ public class ConnectionManager { } if (_numWaiting > 0) _numWaiting--; + + _context.statManager().addRateData("stream.connectionCreated", 1, 0); return con; } @@ -261,6 +274,17 @@ public class ConnectionManager { _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values()); _connectionLock.notifyAll(); } + if (removed) { + _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime()); + } } public Set listConnections() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 1417e11584..84f7d8e682 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -21,6 +21,10 @@ public class ConnectionPacketHandler { public ConnectionPacketHandler(I2PAppContext context) { _context = context; _log = context.logManager().getLog(ConnectionPacketHandler.class); + _context.statManager().createRateStat("stream.con.receiveMessageSize", "Size of a message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); } /** distribute a packet to the connection specified */ @@ -28,6 +32,7 @@ public class ConnectionPacketHandler { boolean ok = verifyPacket(packet, con); if (!ok) return; con.packetReceived(); + if (con.getInputStream().getTotalQueuedSize() > con.getOptions().getInboundBufferSize()) { if (_log.shouldLog(Log.WARN)) _log.warn("Inbound buffer exceeded on connection " + con + ": dropping " + packet); @@ -35,6 +40,9 @@ public class ConnectionPacketHandler { return; } con.getOptions().setChoke(0); + + _context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize(), 0); + boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { @@ -49,6 +57,8 @@ public class ConnectionPacketHandler { if (isNew) { con.incrementUnackedPacketsReceived(); + con.incrementBytesReceived(packet.getPayloadSize()); + if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED) && (packet.getOptionalDelay() <= 0) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Scheduling immediate ack for " + packet); @@ -63,6 +73,9 @@ public class ConnectionPacketHandler { } } else { if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ) { + _context.statManager().addRateData("stream.con.receiveDuplicateSize", packet.getPayloadSize(), 0); + con.incrementDupMessagesReceived(1); + // take note of congestion //con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); @@ -96,6 +109,7 @@ public class ConnectionPacketHandler { //if (p.getNumSends() <= 1) highestRTT = p.getAckTime(); } + _context.statManager().addRateData("stream.sendsBeforeAck", p.getNumSends(), p.getAckTime()); if (p.getNumSends() > 1) numResends++; @@ -113,6 +127,7 @@ public class ConnectionPacketHandler { if (highestRTT > 0) { con.getOptions().updateRTT(highestRTT); } + _context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT); } boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 9d558f6c4e..0da98791fb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -18,6 +18,7 @@ public class MessageHandler implements I2PSessionListener { _manager = mgr; _context = ctx; _log = ctx.logManager().getLog(MessageHandler.class); + _context.statManager().createRateStat("stream.packetReceiveFailure", "When do we fail to decrypt or otherwise receive a packet sent to us?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); } /** Instruct the client that the given session has received a message with @@ -31,6 +32,7 @@ public class MessageHandler implements I2PSessionListener { try { data = session.receiveMessage(msgId); } catch (I2PSessionException ise) { + _context.statManager().addRateData("stream.packetReceiveFailure", 1, 0); if (_log.shouldLog(Log.WARN)) _log.warn("Error receiving the message", ise); return; @@ -40,6 +42,7 @@ public class MessageHandler implements I2PSessionListener { packet.readPacket(data, 0, data.length); _manager.getPacketHandler().receivePacket(packet); } catch (IllegalArgumentException iae) { + _context.statManager().addRateData("stream.packetReceiveFailure", 1, 0); if (_log.shouldLog(Log.WARN)) _log.warn("Received an invalid packet", iae); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index b04f6f1e18..814bfcd2a3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -67,6 +67,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat } public long getCreatedOn() { return _createdOn; } + public long getLifetime() { return _context.clock().now() - _createdOn; } public void incrementSends() { _numSends++; _lastSend = _context.clock().now(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 8c3d30a5d7..f3d8359198 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -29,6 +29,8 @@ class PacketQueue { _connectionManager = mgr; _buf = _cache.acquire().getData(); // new byte[36*1024]; _log = context.logManager().getLog(PacketQueue.class); + _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); } protected void finalize() throws Throwable { @@ -62,9 +64,9 @@ class PacketQueue { long end = 0; boolean sent = false; try { + int size = 0; synchronized (this) { Arrays.fill(_buf, (byte)0x0); - int size = 0; if (packet.shouldSign()) size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey()); else @@ -75,6 +77,17 @@ class PacketQueue { sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); end = _context.clock().now(); } + _context.statManager().addRateData("stream.con.sendMessageSize", size, packet.getLifetime()); + if (packet.getNumSends() > 1) + _context.statManager().addRateData("stream.con.sendDuplicateSize", size, packet.getLifetime()); + + + Connection con = packet.getConnection(); + if (con != null) { + con.incrementBytesSent(size); + if (packet.getNumSends() > 1) + con.incrementDupMessagesSent(1); + } } catch (I2PSessionException ise) { if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send the packet " + packet, ise); -- GitLab