From 8e3e8ada32d4d69e6cecb58d0e253687b5a02877 Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Mon, 23 Aug 2004 03:54:55 +0000
Subject: [PATCH] * refactored and revamped the capacity threshold calculation
 to take into account various skew situations and the capacity growth constant
 with the intent of producing a higher quality threshold whenever possible *
 increased the minimum # of fast peers from 4 to 8 (yay), which means we'll
 try to have at least some peers to choose from * added a new router config
 option - "router.maxParticipatingTunnels".  This is useful for gracefully
 shutting down the router (aka set it to 0 and wait until the router is no
 longer participating in tunnels, then shutdown).  You can probably also come
 up with other situations where this is useful, but I don't want to spoil all
 the fun ;)

---
 .../net/i2p/router/RouterThrottleImpl.java    |  23 +-
 .../peermanager/CapacityCalculator.java       |   2 +-
 .../router/peermanager/ProfileOrganizer.java  | 198 ++++++++++++++----
 3 files changed, 183 insertions(+), 40 deletions(-)

diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java
index 9bcbeda27b..dd33d23c48 100644
--- a/router/java/src/net/i2p/router/RouterThrottleImpl.java
+++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java
@@ -29,6 +29,8 @@ class RouterThrottleImpl implements RouterThrottle {
      */
     private static int THROTTLE_EVENT_LIMIT = 300;
     
+    private static final String PROP_MAX_TUNNELS = "router.maxParticipatingTunnels";
+    
     public RouterThrottleImpl(RouterContext context) {
         _context = context;
         _log = context.logManager().getLog(RouterThrottleImpl.class);
@@ -38,6 +40,7 @@ class RouterThrottleImpl implements RouterThrottle {
         _context.statManager().createRateStat("tunnel.bytesAllocatedAtAccept", "How many bytes had been 'allocated' for participating tunnels when we accepted a request?", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("router.throttleTunnelProcessingTime1m", "How long it takes to process a message (1 minute average) when we throttle a tunnel?", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("router.throttleTunnelProcessingTime10m", "How long it takes to process a message (10 minute average) when we throttle a tunnel?", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
+        _context.statManager().createRateStat("router.throttleTunnelMaxExceeded", "How many tunnels we are participating in when we refuse one due to excees?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
     }
     
     public boolean acceptNetworkMessage() {
@@ -116,10 +119,28 @@ class RouterThrottleImpl implements RouterThrottle {
         double bytesPerMsg = (r != null ? r.getAverageValue() : 0);
         double bytesPerTunnel = msgsPerTunnel * bytesPerMsg;
 
-
         int numTunnels = _context.tunnelManager().getParticipatingCount();
         double bytesAllocated =  (numTunnels + 1) * bytesPerTunnel;
 
+        // the max # tunnels throttle is useful for shutting down the router - 
+        // set this to 0, wait a few minutes, and the router can be shut off 
+        // without killing anyone's tunnels
+        String maxTunnels = _context.getProperty(PROP_MAX_TUNNELS);
+        if (maxTunnels != null) {
+            try {
+                int max = Integer.parseInt(maxTunnels);
+                if (numTunnels >= max) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn("Refusing tunnel request since we are already participating in " 
+                                  + numTunnels + " (our max is " + max + ")");
+                    _context.statManager().addRateData("router.throttleTunnelMaxExceeded", numTunnels, 0);
+                    return false;
+                }
+            } catch (NumberFormatException nfe) {
+                // no default, ignore it
+            }
+        }
+        
         _context.statManager().addRateData("tunnel.bytesAllocatedAtAccept", (long)bytesAllocated, msg.getTunnelDurationSeconds()*1000);
         // todo: um, throttle (include bw usage of the netDb, our own tunnels, the clients,
         // and check to see that they are less than the bandwidth limits
diff --git a/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java b/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java
index a6c3f74030..81426567f7 100644
--- a/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java
+++ b/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java
@@ -39,7 +39,7 @@ public class CapacityCalculator extends Calculator {
     }
     
     /** used to adjust each period so that we keep trying to expand the peer's capacity */
-    private static long GROWTH_FACTOR = 5;
+    static long GROWTH_FACTOR = 5;
     
     /** the calculator estimates over a 1 hour period */
     private static long ESTIMATE_PERIOD = 60*60*1000;
diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
index d0a107aad2..c33f718497 100644
--- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
+++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
@@ -66,7 +66,15 @@ public class ProfileOrganizer {
      *
      */
     public static final String PROP_MINIMUM_FAST_PEERS = "profileOrganizer.minFastPeers";
-    public static final int DEFAULT_MINIMUM_FAST_PEERS = 4;
+    public static final int DEFAULT_MINIMUM_FAST_PEERS = 8;
+    /**
+     * Defines the minimum number of 'high capacity' peers that the organizer should 
+     * select when using the mean - if less than this many are available, select the 
+     * capacity by the median.  
+     *
+     */
+    public static final String PROP_MINIMUM_HIGH_CAPACITY_PEERS = "profileOrganizer.minHighCapacityPeers";
+    public static final int DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS = 10;
     
     /** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */
     private Object _reorganizeLock = new Object();
@@ -118,12 +126,12 @@ public class ProfileOrganizer {
             
             boolean rightBigger = rval > lval;
 
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("The capacity of " + right.getPeer().toBase64() 
-                           + " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left")
-                           + " as larger: r=" + right.getCapacityValue() 
-                           + " l="
-                           + left.getCapacityValue());
+            //if (_log.shouldLog(Log.DEBUG))
+            //    _log.debug("The capacity of " + right.getPeer().toBase64() 
+            //               + " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left")
+            //               + " as larger: r=" + right.getCapacityValue() 
+            //               + " l="
+            //               + left.getCapacityValue());
                            
             if (rightBigger)
                 return 1;
@@ -399,7 +407,13 @@ public class ProfileOrganizer {
             }
             _strictCapacityOrder = reordered;
             
-            calculateThresholds(allPeers);
+            locked_calculateThresholds(allPeers);
+            
+            _failingPeers.clear();
+            _fastPeers.clear();
+            _highCapacityPeers.clear();
+            _notFailingPeers.clear();
+            _wellIntegratedPeers.clear();
             
             for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
                 PeerProfile profile = (PeerProfile)iter.next();
@@ -501,13 +515,16 @@ public class ProfileOrganizer {
     
     /**
      * Update the thresholds based on the profiles in this set.  currently
-     * implements the capacity threshold based on the median capacity (ignoring
-     * failing or inactive peers), using the median speed from that group to 
-     * define the speed threshold, and use the mean integration value from the 
+     * implements the capacity threshold based on the mean capacity of active
+     * and nonfailing peers (falling back on the median if that results in too
+     * few peers.  We then use the median speed from that group to define the 
+     * speed threshold, and use the mean integration value from the 
      * high capacity group to define the integration threshold.
      *
      */
-    private void calculateThresholds(Set allPeers) {
+    private void locked_calculateThresholds(Set allPeers) {
+        double totalCapacity = 0;
+        double totalIntegration = 0;
         Set reordered = new TreeSet(_comp);
         for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
             PeerProfile profile = (PeerProfile)iter.next();
@@ -518,37 +535,100 @@ public class ProfileOrganizer {
             if (profile.getIsFailing() || (!profile.getIsActive()))
                 continue;
         
+            totalCapacity += profile.getCapacityValue();
+            totalIntegration += profile.getIntegrationValue();
             reordered.add(profile);
         }
         
+        locked_calculateCapacityThreshold(totalCapacity, reordered);
+        locked_calculateSpeedThreshold(reordered);
+        
+        _thresholdIntegrationValue = 1.0d * avg(totalIntegration, reordered.size());
+    }
+    
+    
+    /**
+     * Update the _thresholdCapacityValue by using a few simple formulas run 
+     * against the specified peers.  Ideally, we set the threshold capacity to
+     * the mean, as long as that gives us enough peers and the mean is a "growth"
+     * value.  We fall back on the capacity of the top K-th capacity, or the 
+     * mean, or the base growth value, depending on various circumstances.
+     *
+     * @param reordered ordered set of PeerProfile objects, ordered by capacity
+     *                  (highest first) for active nonfailing peers
+     */
+    private void locked_calculateCapacityThreshold(double totalCapacity, Set reordered) {
         int numNotFailing = reordered.size();
-        // how many are in the "top half" of the high capacity peers?
-        int i = 0;
-        int threshold = 0;
-        if (numNotFailing > 0)
-            threshold = numNotFailing / 2;
-        for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) {
+        
+        double meanCapacity = avg(totalCapacity, numNotFailing);
+        
+        long baseline = CapacityCalculator.GROWTH_FACTOR;
+        int minHighCapacityPeers = getMinimumHighCapacityPeers();
+        
+        int numExceedingMean = 0;
+        int numExceedingBaseline = 0;
+        double thresholdAtMedian = 0;
+        double thresholdAtMinHighCap = 0;
+        int cur = 0;
+        for (Iterator iter = reordered.iterator(); iter.hasNext(); ) {
             PeerProfile profile = (PeerProfile)iter.next();
-            if (i >= threshold) {
-                _thresholdCapacityValue = profile.getCapacityValue();
-                break;
+            double val = profile.getCapacityValue();
+            if (val > meanCapacity)
+                numExceedingMean++;
+            if (val > baseline)
+                numExceedingBaseline++;
+            if (cur == reordered.size()/2)
+                thresholdAtMedian = val;
+            if (cur == minHighCapacityPeers)
+                thresholdAtMinHighCap = val;
+            cur++;
+        }
+        
+        if (meanCapacity > baseline) {
+            // our average is doing well (growing, not recovering from failures)
+            if (numExceedingMean > minHighCapacityPeers) {
+                if (_log.shouldLog(Log.INFO))
+                    _log.info("Our average capacity is doing well [" + meanCapacity 
+                              + "], and includes " + numExceedingMean);
+                _thresholdCapacityValue = meanCapacity;
+            } else {
+                if (_log.shouldLog(Log.INFO))
+                    _log.info("Our average capacity is doing well [" + meanCapacity 
+                              + "], but it is skewed to only have " + numExceedingMean
+                              + " so falling back on the top few to " + thresholdAtMinHighCap);
+                _thresholdCapacityValue = thresholdAtMinHighCap;
+            }
+        } else {
+            // our average isn't doing well (its recovering from failures)
+            if (numExceedingBaseline > minHighCapacityPeers) {
+                if (_log.shouldLog(Log.INFO))
+                    _log.info("Our average capacity isn't doing well [" + meanCapacity 
+                              + "], but the baseline has " + numExceedingBaseline);
+                _thresholdCapacityValue = baseline+.0001;
+            } else {
+                if (_log.shouldLog(Log.INFO))
+                    _log.info("Our average capacity isn't doing well [" + meanCapacity 
+                              + "], and the baseline has " + numExceedingBaseline 
+                              + " so falling back on the median of " + thresholdAtMedian);
+                _thresholdCapacityValue = thresholdAtMedian;
             }
         }
-
+    }
+    
+    /**
+     * Update the _thresholdSpeedValue by calculating the median speed of all
+     * high capacity peers. 
+     *
+     * @param reordered ordered set of PeerProfile objects, ordered by capacity
+     *                  (highest first) for active nonfailing peers
+     */
+    private void locked_calculateSpeedThreshold(Set reordered) {
         Set speeds = new TreeSet();
-        int numActive = 0;
-        double totalIntegration = 0;
-        double totalSpeed = 0;
-        for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) {
+        for (Iterator iter = reordered.iterator(); iter.hasNext(); ) {
             PeerProfile profile = (PeerProfile)iter.next();
             if (profile.getCapacityValue() >= _thresholdCapacityValue) {
-                if (profile.getIsActive()) {
-                    numActive++;
-                    if (profile.getIntegrationValue() > 0)
-                        totalIntegration += profile.getIntegrationValue();
-                    // duplicates being clobbered is fine by us
-                    speeds.add(new Double(0-profile.getSpeedValue()));
-                }
+                // duplicates being clobbered is fine by us
+                speeds.add(new Double(0-profile.getSpeedValue()));
             } else {
                 // its ordered
                 break;
@@ -556,7 +636,7 @@ public class ProfileOrganizer {
         }
 
         // calc the median speed of high capacity peers
-        i = 0;
+        int i = 0;
         for (Iterator iter = speeds.iterator(); iter.hasNext(); i++) {
             Double speed = (Double)iter.next();
             if (i >= (speeds.size() / 2)) {
@@ -564,10 +644,8 @@ public class ProfileOrganizer {
                 break;
             }
         }
-        if (_log.shouldLog(Log.DEBUG))
-            _log.debug("Threshold value for speed: " + _thresholdSpeedValue + " with speeds: " + speeds);
-        
-        _thresholdIntegrationValue = 1.0d * avg(totalIntegration, numActive);
+        if (_log.shouldLog(Log.INFO))
+            _log.info("Threshold value for speed: " + _thresholdSpeedValue + " out of speeds: " + speeds);
     }
     
     /** simple average, or 0 if NaN */
@@ -808,6 +886,50 @@ public class ProfileOrganizer {
         return DEFAULT_MINIMUM_FAST_PEERS;
     }
     
+    
+    /**
+     * Defines the minimum number of 'fast' peers that the organizer should select.  If
+     * the profile calculators derive a threshold that does not select at least this many peers,
+     * the threshold will be overridden to make sure this many peers are in the fast+reliable group.
+     * This parameter should help deal with a lack of diversity in the tunnels created when some 
+     * peers are particularly fast.
+     *
+     * @return minimum number of peers to be placed in the 'fast' group
+     */
+    protected int getMinimumHighCapacityPeers() {
+        if (_context.router() != null) {
+            String val = _context.router().getConfigSetting(PROP_MINIMUM_HIGH_CAPACITY_PEERS);
+            if (val != null) {
+                try {
+                    int rv = Integer.parseInt(val);
+                    if (_log.shouldLog(Log.DEBUG)) 
+                        _log.debug("router config said " + PROP_MINIMUM_HIGH_CAPACITY_PEERS + '=' + val);
+                    return rv;
+                } catch (NumberFormatException nfe) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn("Minimum high capacity peers improperly set in the router config [" + val + "]", nfe);
+                }
+            }
+        }
+        String val = _context.getProperty(PROP_MINIMUM_HIGH_CAPACITY_PEERS, ""+DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS);
+        if (val != null) {
+            try {
+                int rv = Integer.parseInt(val);
+                if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug("router context said " + PROP_MINIMUM_HIGH_CAPACITY_PEERS + '=' + val);
+                return rv;
+            } catch (NumberFormatException nfe) {
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Minimum high capacity peers improperly set in the router environment [" + val + "]", nfe);
+            }
+        }
+        
+        if (_log.shouldLog(Log.DEBUG)) 
+            _log.debug("no config for " + PROP_MINIMUM_HIGH_CAPACITY_PEERS + ", using " + DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS);
+        return DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS;
+    }
+    
+    
     private final static DecimalFormat _fmt = new DecimalFormat("###,##0.00", new DecimalFormatSymbols(Locale.UK));
     private final static String num(double num) { synchronized (_fmt) { return _fmt.format(num); } }
     
-- 
GitLab