From 8063889d239a030ccfc97b0b02eaa2cd03d99314 Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Sat, 30 Apr 2005 03:14:09 +0000
Subject: [PATCH] udp updates: * more stats. including per-peer KBps (updated
 every second) * improved blocking/timeout situations on the send queue *
 added drop simulation hook * provide logical RTO limits

---
 .../i2p/router/transport/udp/ACKSender.java   |  2 +-
 .../transport/udp/EstablishmentManager.java   |  8 ++++
 .../udp/InboundMessageFragments.java          |  5 +--
 .../transport/udp/InboundMessageState.java    | 13 ++++--
 .../udp/OutboundMessageFragments.java         | 15 ++++---
 .../router/transport/udp/PacketPusher.java    |  2 +-
 .../i2p/router/transport/udp/PeerState.java   | 44 ++++++++++++++++---
 .../i2p/router/transport/udp/UDPReceiver.java |  9 ++++
 .../i2p/router/transport/udp/UDPSender.java   | 10 +++--
 .../router/transport/udp/UDPTransport.java    | 30 ++++++++++---
 10 files changed, 109 insertions(+), 29 deletions(-)

diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
index 7b344b4962..aaa99447bd 100644
--- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
@@ -22,7 +22,7 @@ public class ACKSender implements Runnable {
     private boolean _alive;
     
     /** how frequently do we want to send ACKs to a peer? */
-    private static final int ACK_FREQUENCY = 400;
+    static final int ACK_FREQUENCY = 400;
     
     public ACKSender(RouterContext ctx, UDPTransport transport) {
         _context = ctx;
diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
index e4d7e7d0fe..b030a8b6e1 100644
--- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
+++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
@@ -42,6 +42,10 @@ public class EstablishmentManager {
         _inboundStates = new HashMap(32);
         _outboundStates = new HashMap(32);
         _activityLock = new Object();
+        _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
+        _context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
+        _context.statManager().createRateStat("udp.inboundEstablishFailedState", "What state a failed inbound establishment request fails in", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
+        _context.statManager().createRateStat("udp.outboundEstablishFailedState", "What state a failed outbound establishment request fails in", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
     }
     
     public void startup() {
@@ -221,6 +225,7 @@ public class EstablishmentManager {
         
         _transport.addRemotePeerState(peer);
         
+        _context.statManager().addRateData("udp.inboundEstablishTime", state.getLifetime(), 0);
         sendOurInfo(peer);
     }
     
@@ -250,6 +255,7 @@ public class EstablishmentManager {
         
         _transport.addRemotePeerState(peer);
         
+        _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0);
         sendOurInfo(peer);
         
         while (true) {
@@ -343,6 +349,7 @@ public class EstablishmentManager {
                 } else if (cur.getLifetime() > MAX_ESTABLISH_TIME) {
                     // took too long, fuck 'em
                     iter.remove();
+                    _context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug("Removing expired inbound state");
                 } else {
@@ -430,6 +437,7 @@ public class EstablishmentManager {
                     // took too long, fuck 'em
                     iter.remove();
                     outboundState = cur;
+                    _context.statManager().addRateData("udp.outboundEstablishFailedState", cur.getState(), cur.getLifetime());
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug("Removing expired outbound: " + cur);
                     break;
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
index cbb75c91aa..eb952eac6a 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
@@ -36,7 +36,6 @@ public class InboundMessageFragments {
     private MessageReceiver _messageReceiver;
     private boolean _alive;
     
-    private static final int RECENTLY_COMPLETED_SIZE = 100;
     /** decay the recently completed every 2 minutes */
     private static final int DECAY_PERIOD = 120*1000;
         
@@ -113,7 +112,7 @@ public class InboundMessageFragments {
             
                 if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
                     _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
-                    from.messageFullyReceived(messageId);
+                    from.messageFullyReceived(messageId, -1);
                     _ackSender.ackPeer(from);
                     if (_log.shouldLog(Log.WARN))
                         _log.warn("Message received is a dup: " + messageId + " dups: " 
@@ -143,7 +142,7 @@ public class InboundMessageFragments {
 
                     _messageReceiver.receiveMessage(state);
                     
-                    from.messageFullyReceived(messageId);
+                    from.messageFullyReceived(messageId, state.getCompleteSize());
                     _ackSender.ackPeer(from);
                     
                     if (_log.shouldLog(Log.INFO))
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
index a03bf47ebe..03173dd3e0 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
@@ -25,6 +25,7 @@ public class InboundMessageState {
      */
     private int _lastFragment;
     private long _receiveBegin;
+    private int _completeSize;
     
     /** expire after 30s */
     private static final long MAX_RECEIVE_TIME = 10*1000;
@@ -39,6 +40,7 @@ public class InboundMessageState {
         _from = from;
         _fragments = new ByteArray[MAX_FRAGMENTS];
         _lastFragment = -1;
+        _completeSize = -1;
         _receiveBegin = ctx.clock().now();
     }
     
@@ -86,10 +88,13 @@ public class InboundMessageState {
     public Hash getFrom() { return _from; }
     public long getMessageId() { return _messageId; }
     public synchronized int getCompleteSize() {
-        int size = 0;
-        for (int i = 0; i <= _lastFragment; i++)
-            size += _fragments[i].getValid();
-        return size;
+        if (_completeSize < 0) {
+            int size = 0;
+            for (int i = 0; i <= _lastFragment; i++)
+                size += _fragments[i].getValid();
+            _completeSize = size;
+        }
+        return _completeSize;
     }
     
     public void releaseResources() {
diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
index de2330f04b..4d0fc25714 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
@@ -83,7 +83,7 @@ public class OutboundMessageFragments {
                     else if (_allowExcess)
                         return true;
                     else
-                        _activeMessages.wait();
+                        _activeMessages.wait(1000);
                 }
             } catch (InterruptedException ie) {}
         }
@@ -177,9 +177,10 @@ public class OutboundMessageFragments {
                             _nextPacketMessage = 0;
                     }
                     i--;
-                }
-            }
-        }
+                } // end (pushCount > maxVolleys)
+            } // end iterating over active
+            _activeMessages.notifyAll();
+        } // end synchronized
     }
     
     private static final long SECOND_MASK = 1023l;
@@ -286,6 +287,7 @@ public class OutboundMessageFragments {
                 if (currentFragment < 0) {
                     if (nextSend <= 0) {
                         try {
+                            _activeMessages.notifyAll();
                             _activeMessages.wait();
                         } catch (InterruptedException ie) {}
                     } else {
@@ -301,6 +303,8 @@ public class OutboundMessageFragments {
                             _activeMessages.wait(delay);
                         } catch (InterruptedException ie) {}
                     }
+                } else {
+                    _activeMessages.notifyAll();
                 }
                 _allowExcess = false;
             } // end of the synchronized block
@@ -344,6 +348,7 @@ public class OutboundMessageFragments {
                         Hash expectedBy = msg.getTarget().getIdentity().getHash();
                         if (!expectedBy.equals(ackedBy)) {
                             state = null;
+                            _activeMessages.notifyAll();
                             return 0;
                         }
                     }
@@ -355,12 +360,12 @@ public class OutboundMessageFragments {
                         if (_nextPacketMessage < 0)
                             _nextPacketMessage = 0;
                     }
-                    _activeMessages.notifyAll();
                     break;
                 } else {
                     state = null;
                 }
             }
+            _activeMessages.notifyAll();
         }
         
         if (state != null) {
diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java
index 392f702991..7ac8199918 100644
--- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java
+++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java
@@ -37,7 +37,7 @@ public class PacketPusher implements Runnable {
         while (_alive) {
             UDPPacket packet = _fragments.getNextPacket();
             if (packet != null)
-                _sender.add(packet, true); // blocks
+                _sender.add(packet, 1000); // blocks for up to a second
         }
     }
 }
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index 5c4e55b574..e160245764 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -85,6 +85,11 @@ public class PeerState {
     /** how many bytes can we send to the peer in the current second */
     private volatile int _sendWindowBytesRemaining;
     private long _lastSendRefill;
+    private int _sendBps;
+    private int _sendBytes;
+    private int _receiveBps;
+    private int _receiveBytes;
+    private long _receivePeriodBegin;
     private volatile long _lastCongestionOccurred;
     /** 
      * when sendWindowBytes is below this, grow the window size quickly,
@@ -131,6 +136,8 @@ public class PeerState {
     private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
     private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
     private static final int DEFAULT_MTU = 1472;
+    private static final int MIN_RTO = ACKSender.ACK_FREQUENCY + 100;
+    private static final int MAX_RTO = 5000;
     
     public PeerState(I2PAppContext ctx) {
         _context = ctx;
@@ -154,6 +161,10 @@ public class PeerState {
         _sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
         _slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
         _lastSendRefill = _context.clock().now();
+        _receivePeriodBegin = _lastSendRefill;
+        _sendBps = 0;
+        _sendBytes = 0;
+        _receiveBps = 0;
         _lastCongestionOccurred = -1;
         _remoteIP = null;
         _remotePort = -1;
@@ -309,6 +320,9 @@ public class PeerState {
     public void setLastSendTime(long when) { _lastSendTime = when; }
     /** when did we last receive a packet from them? */
     public void setLastReceiveTime(long when) { _lastReceiveTime = when; }
+    /** return the smoothed send transfer rate */
+    public int getSendBps() { return _sendBps; }
+    public int getReceiveBps() { return _receiveBps; }
     public int incrementConsecutiveFailedSends() { 
         long now = _context.clock().now()/(10*1000);
         if (_lastFailedSendPeriod >= now) {
@@ -333,13 +347,18 @@ public class PeerState {
      */
     public boolean allocateSendingBytes(int size) { 
         long now = _context.clock().now();
-        if (_lastSendRefill + 1000 <= now) {
+        long duration = now - _lastSendRefill;
+        if (duration >= 1000) {
             _sendWindowBytesRemaining = _sendWindowBytes;
+            _sendBytes += size;
+            _sendBps = (int)(0.9f*(float)_sendBps + 0.1f*((float)_sendBytes * (1000f/(float)duration)));
+            _sendBytes = 0;
             _lastSendRefill = now;
         }
         //if (true) return true;
         if (size <= _sendWindowBytesRemaining) {
             _sendWindowBytesRemaining -= size; 
+            _sendBytes += size;
             _lastSendTime = now;
             return true;
         } else {
@@ -374,10 +393,21 @@ public class PeerState {
     public int getSlowStartThreshold() { return _slowStartThreshold; }
     
     /** we received the message specified completely */
-    public void messageFullyReceived(Long messageId) {
+    public void messageFullyReceived(Long messageId, int bytes) {
+        if (bytes > 0)
+            _receiveBytes += bytes;
+        
+        long now = _context.clock().now();
+        long duration = now - _receivePeriodBegin;
+        if (duration >= 1000) {
+            _receiveBps = (int)(0.9f*(float)_receiveBps + 0.1f*((float)_receiveBytes * (1000f/(float)duration)));
+            _receiveBytes = 0;
+            _receivePeriodBegin = now;
+        }
+        
         synchronized (_currentACKs) {
             if (_wantACKSendSince <= 0)
-                _wantACKSendSince = _context.clock().now();
+                _wantACKSendSince = now;
             if (!_currentACKs.contains(messageId))
                 _currentACKs.add(messageId);
         }
@@ -454,10 +484,10 @@ public class PeerState {
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
                        + " rttDev=" + _rttDeviation + " rto=" + _rto);
-        if (_rto < 1000)
-            _rto = 1000;
-        if (_rto > 5000)
-            _rto = 5000;
+        if (_rto < MIN_RTO)
+            _rto = MIN_RTO;
+        if (_rto > MAX_RTO)
+            _rto = MAX_RTO;
     }
     /** we are resending a packet, so lets jack up the rto */
     public void messageRetransmitted() { 
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java
index 144ae839d4..3931815eef 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java
@@ -66,7 +66,16 @@ public class UDPReceiver {
     /** if a packet been sitting in the queue for 2 seconds, drop subsequent packets */
     private static final long MAX_QUEUE_PERIOD = 2*1000;
     
+    private static final float ARTIFICIAL_DROP_PROBABILITY = 0f; //0.02f;
+    
     private void receive(UDPPacket packet) {
+        if (ARTIFICIAL_DROP_PROBABILITY > 0) { 
+            // the first check is to let the compiler optimize away this 
+            // random block on the live system when the probability is == 0
+            if (_context.random().nextFloat() <= ARTIFICIAL_DROP_PROBABILITY)
+                return;
+        }
+        
         synchronized (_inboundQueue) {
             int queueSize = _inboundQueue.size();
             if (queueSize > 0) {
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
index c7b5fcaee6..9511ca7f90 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
@@ -66,9 +66,11 @@ public class UDPSender {
      * Add the packet to the queue.  This may block until there is space
      * available, if requested, otherwise it returns immediately
      *
+     * @param blockTime how long to block
      * @return number of packets queued
      */
-    public int add(UDPPacket packet, boolean blocking) {
+    public int add(UDPPacket packet, int blockTime) {
+        long expiration = _context.clock().now() + blockTime;
         int remaining = -1;
         while ( (_keepRunning) && (remaining < 0) ) {
             try {
@@ -78,10 +80,12 @@ public class UDPSender {
                         remaining = _outboundQueue.size();
                         _outboundQueue.notifyAll();
                     } else {
-                        if (blocking) {
-                            _outboundQueue.wait();
+                        long remainingTime = expiration - _context.clock().now();
+                        if (remainingTime > 0) {
+                            _outboundQueue.wait(remainingTime);
                         } else {
                             remaining = _outboundQueue.size();
+                            _outboundQueue.notifyAll();
                         }
                     }
                 }
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index 6536a23755..c46457a8f0 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -7,6 +7,8 @@ import java.net.UnknownHostException;
 import java.io.IOException;
 import java.io.Writer;
 
+import java.text.DecimalFormat;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -108,6 +110,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
         _fragments = new OutboundMessageFragments(_context, this);
         _inboundFragments = new InboundMessageFragments(_context, _fragments, this);
         _flooder = new UDPFlooder(_context, this);
+        
+        _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
     }
     
     public void startup() {
@@ -322,6 +326,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
         if (_log.shouldLog(Log.WARN))
             _log.debug("Dropping remote peer: " + peer);
         if (peer.getRemotePeer() != null) {
+            long now = _context.clock().now();
+            _context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
             _context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries");
             synchronized (_peersByIdent) {
                 _peersByIdent.remove(peer.getRemotePeer());
@@ -531,7 +537,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
         StringBuffer buf = new StringBuffer(512);
         buf.append("<b>UDP connections: ").append(peers.size()).append("</b><br />\n");
         buf.append("<table border=\"1\">\n");
-        buf.append(" <tr><td><b>peer</b></td><td><b>activity (in/out)</b></td>\n");
+        buf.append(" <tr><td><b>peer</b></td><td><b>activity (in/out)</b></td>");
+        buf.append("     <td><b>transfer (in/out)</b></td>\n");
         buf.append("     <td><b>uptime</b></td><td><b>skew</b></td>\n");
         buf.append("     <td><b>cwnd</b></td><td><b>ssthresh</b></td>\n");
         buf.append("     <td><b>rtt</b></td><td><b>dev</b></td><td><b>rto</b></td>\n");
@@ -567,10 +574,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
             buf.append("</td>");
             
             buf.append("<td>");
-            buf.append(DataHelper.formatDuration(now-peer.getLastReceiveTime()));
-            buf.append("/");
-            buf.append(DataHelper.formatDuration(now-peer.getLastSendTime()));
-            buf.append("</td>");
+            buf.append((now-peer.getLastReceiveTime())/1000);
+            buf.append("s/");
+            buf.append((now-peer.getLastSendTime())/1000);
+            buf.append("s</td>");
+    
+            buf.append("<td>");
+            buf.append(formatKBps(peer.getReceiveBps()));
+            buf.append("KBps/");
+            buf.append(formatKBps(peer.getSendBps()));
+            buf.append("KBps</td>");
 
             buf.append("<td>");
             buf.append(DataHelper.formatDuration(now-peer.getKeyEstablishedTime()));
@@ -616,6 +629,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
         out.write("</table>\n");
     }
 
+    private static final DecimalFormat _fmt = new DecimalFormat("#,##0.00");
+    private static final String formatKBps(int bps) {
+        synchronized (_fmt) {
+            return _fmt.format((float)bps/1024);
+        }
+    }
+    
     /**
      * Cache the bid to reduce object churn
      */
-- 
GitLab