From ad47bf5da3b2ee730ddeb744054739aa2f897f5c Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Thu, 7 Jul 2005 22:27:44 +0000
Subject: [PATCH] * moved the inbound partial messages to the PeerState itself,
 reducing lock contention in the InboundMessageFragments and transparently
 dropping failed messages when we drop old peer states

---
 .../i2p/router/transport/udp/ACKSender.java   |   2 +-
 .../udp/InboundMessageFragments.java          | 114 +++++++-----------
 .../i2p/router/transport/udp/PeerState.java   |  47 +++++++-
 .../i2p/router/transport/udp/UDPFlooder.java  |  63 ++++++----
 .../router/transport/udp/UDPTransport.java    |  11 --
 5 files changed, 129 insertions(+), 108 deletions(-)

diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
index 986a9bd45a..c119bd2d00 100644
--- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
@@ -91,7 +91,7 @@ public class ACKSender implements Runnable {
             if (peer != null) {
                 long lastSend = peer.getLastACKSend();
                 long wanted = peer.getWantedACKSendSince();
-                List ackBitfields = peer.retrieveACKBitfields(_transport.getPartialACKSource());
+                List ackBitfields = peer.retrieveACKBitfields();
                 
                 if (wanted < 0)
                     _log.error("wtf, why are we acking something they dont want?  remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
index d221e417d5..4fdb536ded 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
@@ -20,16 +20,10 @@ import net.i2p.util.Log;
  * up in the router we have full blown replay detection, its nice to have a
  * basic line of defense here).
  *
- * TODO: add in some sensible code to drop expired fragments from peers we 
- * don't hear from again (either a periodic culling for expired peers, or
- * a scheduled event)
- *
  */
-public class InboundMessageFragments implements UDPTransport.PartialACKSource {
+public class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
     private RouterContext _context;
     private Log _log;
-    /** Map of peer (Hash) to a Map of messageId (Long) to InboundMessageState objects */
-    private Map _inboundMessages;
     /** list of message IDs recently received, so we can ignore in flight dups */
     private DecayingBloomFilter _recentlyCompletedMessages;
     private OutboundMessageFragments _outbound;
@@ -44,7 +38,7 @@ public class InboundMessageFragments implements UDPTransport.PartialACKSource {
     public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) {
         _context = ctx;
         _log = ctx.logManager().getLog(InboundMessageFragments.class);
-        _inboundMessages = new HashMap(64);
+        //_inboundMessages = new HashMap(64);
         _outbound = outbound;
         _transport = transport;
         _ackSender = new ACKSender(_context, _transport);
@@ -73,9 +67,6 @@ public class InboundMessageFragments implements UDPTransport.PartialACKSource {
         _recentlyCompletedMessages = null;
         _ackSender.shutdown();
         _messageReceiver.shutdown();
-        synchronized (_inboundMessages) {
-            _inboundMessages.clear();
-        }
     }
     public boolean isAlive() { return _alive; }
 
@@ -103,72 +94,76 @@ public class InboundMessageFragments implements UDPTransport.PartialACKSource {
     private void receiveMessages(PeerState from, UDPPacketReader.DataReader data) {
         int fragments = data.readFragmentCount();
         if (fragments <= 0) return;
-        synchronized (_inboundMessages) { // XXX: CHOKE POINT (to what extent?)
-            Map messages = (Map)_inboundMessages.get(from.getRemotePeer());
-            if (messages == null) {
-                messages = new HashMap(fragments);
-                _inboundMessages.put(from.getRemotePeer(), messages);
-            }
-        
-            for (int i = 0; i < fragments; i++) {
-                Long messageId = new Long(data.readMessageId(i));
+        Hash fromPeer = from.getRemotePeer();
             
-                if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
-                    _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
-                    from.messageFullyReceived(messageId, -1);
-                    _ackSender.ackPeer(from);
-                    if (_log.shouldLog(Log.WARN))
-                        _log.warn("Message received is a dup: " + messageId + " dups: " 
-                                  + _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of " 
-                                  + _recentlyCompletedMessages.getInsertedCount());
-                    continue;
-                }
+        Map messages = from.getInboundMessages();
             
-                int size = data.readMessageFragmentSize(i);
-                InboundMessageState state = null;
-                boolean messageComplete = false;
-                boolean messageExpired = false;
-                boolean fragmentOK = false;
+        for (int i = 0; i < fragments; i++) {
+            Long messageId = new Long(data.readMessageId(i));
+
+            if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
+                _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
+                from.messageFullyReceived(messageId, -1);
+                _ackSender.ackPeer(from);
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Message received is a dup: " + messageId + " dups: " 
+                              + _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of " 
+                              + _recentlyCompletedMessages.getInsertedCount());
+                continue;
+            }
+            
+            int size = data.readMessageFragmentSize(i);
+            InboundMessageState state = null;
+            boolean messageComplete = false;
+            boolean messageExpired = false;
+            boolean fragmentOK = false;
+            boolean partialACK = false;
+         
+            // perhaps compact the synchronized block further by synchronizing on the
+            // particular state once its found?
+            synchronized (messages) {
                 state = (InboundMessageState)messages.get(messageId);
                 if (state == null) {
-                    state = new InboundMessageState(_context, messageId.longValue(), from.getRemotePeer());
+                    state = new InboundMessageState(_context, messageId.longValue(), fromPeer);
                     messages.put(messageId, state);
                 }
+                
                 fragmentOK = state.receiveFragment(data, i);
+             
                 if (state.isComplete()) {
                     messageComplete = true;
                     messages.remove(messageId);
-                    if (messages.size() <= 0)
-                        _inboundMessages.remove(from.getRemotePeer());
-                    
-                    _recentlyCompletedMessages.add(messageId.longValue());
+                } else if (state.isExpired()) {
+                    messageExpired = true;
+                    messages.remove(messageId);
+                } else {
+                    partialACK = true;
+                }
 
+                if (messageComplete) {
+                    _recentlyCompletedMessages.add(messageId.longValue());
                     _messageReceiver.receiveMessage(state);
-                    
+
                     from.messageFullyReceived(messageId, state.getCompleteSize());
                     _ackSender.ackPeer(from);
-                    
+
                     if (_log.shouldLog(Log.INFO))
                         _log.info("Message received completely!  " + state);
 
                     _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
                     _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
-                } else if (state.isExpired()) {
-                    messageExpired = true;
-                    messages.remove(messageId);
-                    if (messages.size() <= 0)
-                        _inboundMessages.remove(from.getRemotePeer());
+                } else if (messageExpired) {
+                    state.releaseResources();
                     if (_log.shouldLog(Log.WARN))
                         _log.warn("Message expired while only being partially read: " + state);
-                    state.releaseResources();
-                } else {
+                } else if (partialACK) {
                     // not expired but not yet complete... lets queue up a partial ACK
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
                     from.messagePartiallyReceived();
                     _ackSender.ackPeer(from);
                 }
-                
+
                 if (!fragmentOK)
                     break;
             }
@@ -209,23 +204,4 @@ public class InboundMessageFragments implements UDPTransport.PartialACKSource {
         else
             from.dataReceived();
     }
-    
-    public void fetchPartialACKs(Hash fromPeer, List ackBitfields) {
-        synchronized (_inboundMessages) {
-            Map messages = (Map)_inboundMessages.get(fromPeer);
-            if (messages == null)
-                return;
-            for (Iterator iter = messages.values().iterator(); iter.hasNext(); ) {
-                InboundMessageState state = (InboundMessageState)iter.next();
-                if (state.isExpired()) {
-                    iter.remove();
-                } else {
-                    if (!state.isComplete())
-                        ackBitfields.add(state.createACKBitfield());
-                }
-            }
-            if (messages.size() <= 0)
-                _inboundMessages.remove(fromPeer);
-        }
-    }
 }
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index 85794a56ef..d5a49fc768 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -1,7 +1,10 @@
 package net.i2p.router.transport.udp;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -144,6 +147,9 @@ public class PeerState {
     private long _packetsReceivedDuplicate;
     private long _packetsReceived;
     
+    /** Message (Long) to InboundMessageState for active message */
+    private Map _inboundMessages;
+    
     private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
     private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
     private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
@@ -202,6 +208,7 @@ public class PeerState {
         _retransmissionPeriodStart = 0;
         _packetsReceived = 0;
         _packetsReceivedDuplicate = 0;
+        _inboundMessages = new HashMap(8);
         _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
@@ -456,6 +463,12 @@ public class PeerState {
             _wantACKSendSince = _context.clock().now();
     }
     
+    /** 
+     * Fetch the internal id (Long) to InboundMessageState for incomplete inbound messages.
+     * Access to this map must be synchronized explicitly!
+     */
+    public Map getInboundMessages() { return _inboundMessages; }
+    
     /** 
      * either they told us to back off, or we had to resend to get 
      * the data through.  
@@ -489,7 +502,7 @@ public class PeerState {
      * will be unchanged if there are ACKs remaining.
      *
      */
-    public List retrieveACKBitfields(UDPTransport.PartialACKSource partialACKSource) {
+    public List retrieveACKBitfields() {
         List rv = new ArrayList(16);
         int bytesRemaining = countMaxACKData();
         synchronized (_currentACKs) {
@@ -502,12 +515,12 @@ public class PeerState {
         }
             
         int partialIncluded = 0;
-        if ( (bytesRemaining > 4) && (partialACKSource != null) ) {
+        if (bytesRemaining > 4) {
             // ok, there's room to *try* to fit in some partial ACKs, so
             // we should try to find some packets to partially ACK 
             // (preferably the ones which have the most received fragments)
             List partial = new ArrayList();
-            partialACKSource.fetchPartialACKs(_remotePeer, partial);
+            fetchPartialACKs(partial);
             // we may not be able to use them all, but lets try...
             for (int i = 0; (bytesRemaining > 4) && (i < partial.size()); i++) {
                 ACKBitfield bitfield = (ACKBitfield)partial.get(i);
@@ -528,6 +541,34 @@ public class PeerState {
         return rv;
     }
     
+    private void fetchPartialACKs(List rv) {
+        InboundMessageState states[] = null;
+        int curState = 0;
+        synchronized (_inboundMessages) {
+            int numMessages = _inboundMessages.size();
+            if (numMessages <= 0) 
+                return;
+            for (Iterator iter = _inboundMessages.values().iterator(); iter.hasNext(); ) {
+                InboundMessageState state = (InboundMessageState)iter.next();
+                if (state.isExpired()) {
+                    iter.remove();
+                } else {
+                    if (!state.isComplete()) {
+                        if (states == null)
+                            states = new InboundMessageState[numMessages];
+                        states[curState++] = state;
+                    }
+                }
+            }
+        }
+        if (states != null) {
+            for (int i = curState-1; i >= 0; i--) {
+                if (states[i] != null)
+                    rv.add(states[i].createACKBitfield());
+            }
+        }
+    }
+    
     /** represent a full ACK of a message */
     private class FullACKBitfield implements ACKBitfield {
         private long _msgId;
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java
index 5a4cdc6865..240d372fee 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java
@@ -59,6 +59,7 @@ class UDPFlooder implements Runnable {
     }
     
     public void run() {
+        long nextSend = _context.clock().now();
         while (_alive) {
             try {
                 synchronized (_peers) {
@@ -67,33 +68,47 @@ class UDPFlooder implements Runnable {
                 }
             } catch (InterruptedException ie) {}
             
-            // peers always grows, so this is fairly safe
-            for (int i = 0; i < _peers.size(); i++) {
-                PeerState peer = (PeerState)_peers.get(i);
-                DataMessage m = new DataMessage(_context);
-                byte data[] = _floodData; // new byte[4096];
-                //_context.random().nextBytes(data);
-                m.setData(data);
-                m.setMessageExpiration(_context.clock().now() + 10*1000);
-                m.setUniqueId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
-                if (true) {
-                    OutNetMessage msg = new OutNetMessage(_context);
-                    msg.setMessage(m);
-                    msg.setExpiration(m.getMessageExpiration());
-                    msg.setPriority(500);
-                    RouterInfo to = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer());
-                    if (to == null)
-                        continue;
-                    msg.setTarget(to);
-                    _context.statManager().getStatLog().addData(peer.getRemotePeer().toBase64().substring(0,6), "udp.floodDataSent", 1, 0);
+            long now = _context.clock().now();
+            if (now >= nextSend) {
+                // peers always grows, so this is fairly safe
+                for (int i = 0; i < _peers.size(); i++) {
+                    PeerState peer = (PeerState)_peers.get(i);
+                    DataMessage m = new DataMessage(_context);
+                    byte data[] = _floodData; // new byte[4096];
+                    //_context.random().nextBytes(data);
+                    m.setData(data);
+                    m.setMessageExpiration(_context.clock().now() + 10*1000);
+                    m.setUniqueId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
+                    if (true) {
+                        OutNetMessage msg = new OutNetMessage(_context);
+                        msg.setMessage(m);
+                        msg.setExpiration(m.getMessageExpiration());
+                        msg.setPriority(500);
+                        RouterInfo to = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer());
+                        if (to == null)
+                            continue;
+                        msg.setTarget(to);
+                        _context.statManager().getStatLog().addData(peer.getRemotePeer().toBase64().substring(0,6), "udp.floodDataSent", 1, 0);
 
-                    _transport.send(msg);
-                } else {
-                    _transport.send(m, peer);
+                        _transport.send(msg);
+                    } else {
+                        _transport.send(m, peer);
+                    }
                 }
+                nextSend = now + calcFloodDelay();
+            }
+            
+            long delay = nextSend - now;
+            if (delay > 0) {
+                if (delay > 10*1000) {
+                    long fd = calcFloodDelay();
+                    if (delay > fd) {
+                        nextSend = now + fd;
+                        delay = fd;
+                    }
+                }
+                try { Thread.sleep(delay); } catch (InterruptedException ie) {}
             }
-            long floodDelay = calcFloodDelay();
-            try { Thread.sleep(floodDelay); } catch (InterruptedException ie) {}
         }
     }
     
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index 848a028d44..95921150b9 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -666,17 +666,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
         
         out.write("</table>\n");
     }
-
-    public PartialACKSource getPartialACKSource() { return _inboundFragments; }
-    
-    /** help us grab partial ACKs */
-    public interface PartialACKSource {
-        /** 
-         * build partial ACKs of messages received from the peer and store
-         * them in the ackBitfields
-         */
-        public void fetchPartialACKs(Hash fromPeer, List ackBitfields);
-    }
     
     private static final DecimalFormat _fmt = new DecimalFormat("#,##0.00");
     private static final String formatKBps(int bps) {
-- 
GitLab