From 049d6b2fa8fa6fce1055fedf3f4f72a0b8a92fe2 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 12 Nov 2008 20:10:39 +0000 Subject: [PATCH] * Streaming: - Add more info to Connection.toString() for debugging - Fix lifetimeMessages{Sent,Received} stats - Reduce RTT damping to 0.875 (was 0.9) - Add a stream.con.initialRTT.{in,out} stats --- .../net/i2p/client/streaming/Connection.java | 28 ++++++++++++++----- .../client/streaming/ConnectionManager.java | 12 ++++++-- .../client/streaming/ConnectionOptions.java | 3 +- .../streaming/ConnectionPacketHandler.java | 12 +++++++- 4 files changed, 44 insertions(+), 11 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 45e51e4f8b..e23ecb3df4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -44,6 +44,7 @@ public class Connection { private int _unackedPacketsReceived; private long _congestionWindowEnd; private long _highestAckedThrough; + private boolean _isInbound; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ private Map _outboundPackets; private PacketQueue _outboundQueue; @@ -118,6 +119,7 @@ public class Connection { _connectLock = new Object(); _activeResends = 0; _resetSentOn = -1; + _isInbound = false; _connectionEvent = new ConEvent(); _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -470,6 +472,8 @@ public class Connection { } public boolean getResetReceived() { return _resetReceived; } + public void setInbound() { _isInbound = true; } + public boolean isInbound() { return _isInbound; } public boolean getIsConnected() { return _connected; } public boolean getHardDisconnected() { return _hardDisconnected; } public boolean getResetSent() { return _resetSent; } @@ -908,6 +912,15 @@ public class Connection { buf.append(Packet.toId(_sendStreamId)); else buf.append("unknown"); + if (_isInbound) + buf.append(" from "); + else + buf.append(" to "); + if (_remotePeerSet) + buf.append(_remotePeer.calculateHash().toBase64().substring(0,4)); + else + buf.append("unknown"); + buf.append(" up ").append(DataHelper.formatDuration(_context.clock().now() - _createdOn)); buf.append(" wsize: ").append(_options.getWindowSize()); buf.append(" cwin: ").append(_congestionWindowEnd - _highestAckedThrough); buf.append(" rtt: ").append(_options.getRTT()); @@ -925,14 +938,13 @@ public class Connection { } */ buf.append("unacked in: ").append(getUnackedPacketsReceived()); + int missing = 0; if (_inputStream != null) { - buf.append(" [high "); - buf.append(_inputStream.getHighestBlockId()); long nacks[] = _inputStream.getNacks(); - if (nacks != null) - for (int i = 0; i < nacks.length; i++) - buf.append(" ").append(nacks[i]); - buf.append("]"); + if (nacks != null) { + missing = nacks.length; + buf.append(" [").append(missing).append(" missing]"); + } } if (getResetSent()) @@ -947,7 +959,9 @@ public class Connection { } if (getCloseReceivedOn() > 0) buf.append(" close received"); - buf.append(" acked: ").append(getAckedPackets()); + buf.append(" sent: ").append(1 + _lastSendId); + if (_inputStream != null) + buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 5caf341f4e..da2b1ab127 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -127,6 +127,7 @@ public class ConnectionManager { */ public Connection receiveConnection(Packet synPacket) { Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); + con.setInbound(); long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; boolean reject = false; int active = 0; @@ -311,8 +312,15 @@ public class ConnectionManager { _connectionLock.notifyAll(); } if (removed) { - _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime()); - _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime()); + _context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime()); + MessageInputStream stream = con.getInputStream(); + if (stream != null) { + long rcvd = 1 + stream.getHighestBlockId(); + long nacks[] = stream.getNacks(); + if (nacks != null) + rcvd -= nacks.length; + _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", rcvd, con.getLifetime()); + } _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index f397521d78..07915a51ec 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -355,7 +355,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { } /** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */ - private static final double RTT_DAMPENING = 0.9; + /** This is the value specified in RFC 2988, let's try it */ + private static final double RTT_DAMPENING = 0.875; public void updateRTT(int measuredValue) { _rttDev = _rttDev + (int)(0.25d*(Math.abs(measuredValue-_rtt)-_rttDev)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 2d41aad8b4..6a062d4a6c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -23,10 +23,12 @@ public class ConnectionPacketHandler { _log = context.logManager().getLog(ConnectionPacketHandler.class); _context.statManager().createRateStat("stream.con.receiveMessageSize", "Size of a message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.receiveDuplicateSize", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Size of a duplicate message received on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Avg number of acks in a message", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.initialRTT.in", "What is the actual RTT for the first packet of an inbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.initialRTT.out", "What is the actual RTT for the first packet of an outbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); } /** distribute a packet to the connection specified */ @@ -220,6 +222,8 @@ public class ConnectionPacketHandler { //if ( (nacks != null) && (nacks.length > 0) ) // con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); + boolean firstAck = isNew && con.getHighestAckedThrough() < 0; + int numResends = 0; List acked = null; // if we don't know the streamIds for both sides of the connection, there's no way we @@ -265,6 +269,12 @@ public class ConnectionPacketHandler { } if (highestRTT > 0) { con.getOptions().updateRTT(highestRTT); + if (firstAck) { + if (con.isInbound()) + _context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0); + else + _context.statManager().addRateData("stream.con.initialRTT.out", highestRTT, 0); + } } _context.statManager().addRateData("stream.con.packetsAckedPerMessageReceived", acked.size(), highestRTT); } -- GitLab