From d2569fa446f1bfd47f91a3cbbf85b6f5f43e094a Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Tue, 25 Oct 2016 22:30:55 +0000
Subject: [PATCH] i2psnark: Don't count unchoked but uninterested peers as
 uploaders when calculating global uploader limit, to allow more upload slots,
 especially when some torrents are stalled. Convert some shared fields to
 atomics for concurrency.

---
 .../src/org/klomp/snark/PeerCheckerTask.java  |  58 +++++---
 .../src/org/klomp/snark/PeerCoordinator.java  | 130 +++++++++++++++---
 .../java/src/org/klomp/snark/Snark.java       |   2 +-
 history.txt                                   |   5 +
 .../src/net/i2p/router/RouterVersion.java     |   2 +-
 5 files changed, 156 insertions(+), 41 deletions(-)

diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java
index c2abf66609..e0ac005f5d 100644
--- a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java
+++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java
@@ -65,6 +65,7 @@ class PeerCheckerTask implements Runnable
         Peer worstDownloader = null;
 
         int uploaders = 0;
+        int interestedUploaders = 0;
         int removedCount = 0;
 
         long uploaded = 0;
@@ -76,7 +77,9 @@ class PeerCheckerTask implements Runnable
         int uploadLimit = coordinator.allowedUploaders();
         boolean overBWLimit = coordinator.overUpBWLimit();
         if (_log.shouldLog(Log.DEBUG))
-            _log.debug("peers: " + peerList.size() + " limit: " + uploadLimit + " overBW? " + overBWLimit);
+            _log.debug("START peers: " + peerList.size() + " uploaders: " + coordinator.getUploaders() +
+                       " interested: " + coordinator.getInterestedUploaders() +
+                       " limit: " + uploadLimit + " overBW? " + overBWLimit);
         DHT dht = _util.getDHT();
         for (Peer peer : peerList) {
 
@@ -98,7 +101,9 @@ class PeerCheckerTask implements Runnable
                 continue;
             }
 
-            if (!peer.isChoking())
+            // we only count choking AND interested, so as not to steal a slot
+            // from some other torrent
+            if (peer.isInterested() && !peer.isChoking())
               uploaders++;
 
             long upload = peer.getUploaded();
@@ -128,20 +133,23 @@ class PeerCheckerTask implements Runnable
             // If we are at our max uploaders and we have lots of other
             // interested peers try to make some room.
             // (Note use of coordinator.uploaders)
-            if (((coordinator.uploaders == uploadLimit
-                && coordinator.interestedAndChoking > 0)
-                || coordinator.uploaders > uploadLimit
-                || overBWLimitChoke)
+            int cup = coordinator.getUploaders();
+            if (((cup == uploadLimit
+                  && coordinator.getInterestedAndChoking() > 0)
+                 || cup > uploadLimit
+                 || overBWLimitChoke)
                 && !peer.isChoking())
               {
                 // Check if it still wants pieces from us.
                 if (!peer.isInterested())
                   {
+                    // Note that we only choke if we are over our limits,
+                    // so a peer may remain unchoked even if uninterested.
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug("Choke uninterested peer: " + peer);
                     peer.setChoking(true);
                     uploaders--;
-                    coordinator.uploaders--;
+                    coordinator.decrementUploaders(false);
                     
                     // Put it at the back of the list
                     removed.add(peer);
@@ -152,7 +160,8 @@ class PeerCheckerTask implements Runnable
                         _log.debug("BW limit (" + upload + "/" + uploaded + "), choke peer: " + peer);
                     peer.setChoking(true);
                     uploaders--;
-                    coordinator.uploaders--;
+                    interestedUploaders--;
+                    coordinator.decrementUploaders(true);
                     removedCount++;
 
                     // Put it at the back of the list for fairness, even though we won't be unchoking this time
@@ -165,7 +174,8 @@ class PeerCheckerTask implements Runnable
                         _log.debug("Choke choking peer: " + peer);
                     peer.setChoking(true);
                     uploaders--;
-                    coordinator.uploaders--;
+                    interestedUploaders--;
+                    coordinator.decrementUploaders(true);
                     removedCount++;
                     
                     // Put it at the back of the list
@@ -178,7 +188,8 @@ class PeerCheckerTask implements Runnable
                         _log.debug("Choke uninteresting peer: " + peer);
                     peer.setChoking(true);
                     uploaders--;
-                    coordinator.uploaders--;
+                    interestedUploaders--;
+                    coordinator.decrementUploaders(true);
                     removedCount++;
                     
                     // Put it at the back of the list
@@ -193,7 +204,8 @@ class PeerCheckerTask implements Runnable
                         _log.debug("Choke downloader that doesn't deliver: " + peer);
                     peer.setChoking(true);
                     uploaders--;
-                    coordinator.uploaders--;
+                    interestedUploaders--;
+                    coordinator.decrementUploaders(true);
                     removedCount++;
                     
                     // Put it at the back of the list
@@ -230,11 +242,11 @@ class PeerCheckerTask implements Runnable
 
         // Resync actual uploaders value
         // (can shift a bit by disconnecting peers)
-        coordinator.uploaders = uploaders;
+        coordinator.setUploaders(uploaders, interestedUploaders);
 
         // Remove the worst downloader if needed. (uploader if seeding)
         if (((uploaders == uploadLimit
-            && coordinator.interestedAndChoking > 0)
+            && coordinator.getInterestedAndChoking() > 0)
             || uploaders > uploadLimit)
             && worstDownloader != null)
           {
@@ -242,28 +254,34 @@ class PeerCheckerTask implements Runnable
                 _log.debug("Choke worst downloader: " + worstDownloader);
 
             worstDownloader.setChoking(true);
-            coordinator.uploaders--;
+            coordinator.decrementUploaders(worstDownloader.isInterested());
             removedCount++;
 
             // Put it at the back of the list
             removed.add(worstDownloader);
           }
         
-        // Optimistically unchoke a peer
-        if ((!overBWLimit) && !coordinator.overUpBWLimit(uploaded))
-            coordinator.unchokePeer();
-
-        // Put peers back at the end of the list that we removed earlier.
+        boolean coordOver = coordinator.overUpBWLimit(uploaded);
         synchronized (coordinator.peers) {
+            if ((!overBWLimit) && !coordOver) {
+                // Optimistically unchoke a peer
+                // must be called inside synch
+                coordinator.unchokePeer();
+            }
+            // Put peers back at the end of the list that we removed earlier.
             for(Peer peer : removed) { 
                 if (coordinator.peers.remove(peer))
                     coordinator.peers.add(peer);
             }
         }
-        coordinator.interestedAndChoking += removedCount;
+
+        coordinator.addInterestedAndChoking(removedCount);
 
 	// store the rates
 	coordinator.setRateHistory(uploaded, downloaded);
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("END peers: " + peerList.size() + " uploaders: " + uploaders +
+                       " interested: " + interestedUploaders);
 
         // close out unused files, but we don't need to do it every time
         Storage storage = coordinator.getStorage();
diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java
index 2542f8aa8c..9e4df703c4 100644
--- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java
+++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import net.i2p.I2PAppContext;
@@ -74,16 +75,23 @@ class PeerCoordinator implements PeerListener
   public static final long MAX_INACTIVE = 8*60*1000;
 
   /**
-   * Approximation of the number of current uploaders.
+   * Approximation of the number of current uploaders (unchoked peers),
+   * whether interested or not.
+   * Resynced by PeerChecker once in a while.
+   */
+  private final AtomicInteger uploaders = new AtomicInteger();
+
+  /**
+   * Approximation of the number of current uploaders (unchoked peers),
+   * that are interested.
    * Resynced by PeerChecker once in a while.
-   * External use by PeerCheckerTask only.
    */
-  int uploaders;
+  private final AtomicInteger interestedUploaders = new AtomicInteger();
 
   /**
    * External use by PeerCheckerTask only.
    */
-  int interestedAndChoking;
+  private final AtomicInteger interestedAndChoking = new AtomicInteger();
 
   // final static int MAX_DOWNLOADERS = MAX_CONNECTIONS;
   // int downloaders = 0;
@@ -627,8 +635,9 @@ class PeerCoordinator implements PeerListener
     return false;
   }
 
-
-  // (Optimistically) unchoke. Should be called with peers synchronized
+  /**
+   * (Optimistically) unchoke. Must be called with peers synchronized
+   */
   void unchokePeer()
   {
     // linked list will contain all interested peers that we choke.
@@ -645,7 +654,7 @@ class PeerCoordinator implements PeerListener
             if (peer.isChoking() && peer.isInterested())
               {
                 count++;
-                if (uploaders < maxUploaders)
+                if (uploaders.get() < maxUploaders)
                   {
                     if (peer.isInteresting() && !peer.isChoked())
                       interested.add(unchokedCount++, peer);
@@ -655,20 +664,22 @@ class PeerCoordinator implements PeerListener
               }
           }
 
-        while (uploaders < maxUploaders && !interested.isEmpty())
+        int up = uploaders.get();
+        while (up < maxUploaders && !interested.isEmpty())
           {
             Peer peer = interested.remove(0);
             if (_log.shouldLog(Log.DEBUG))
               _log.debug("Unchoke: " + peer);
             peer.setChoking(false);
-            uploaders++;
+            up = uploaders.incrementAndGet();
+            interestedUploaders.incrementAndGet();
             count--;
             // Put peer back at the end of the list.
             peers.remove(peer);
             peers.add(peer);
             peerCount = peers.size();
           }
-        interestedAndChoking = count;
+        interestedAndChoking.set(count);
   }
 
   /**
@@ -1098,11 +1109,12 @@ class PeerCoordinator implements PeerListener
   {
     if (interest)
       {
-            if (uploaders < allowedUploaders())
+            if (uploaders.get() < allowedUploaders())
               {
                 if(peer.isChoking())
                   {
-                    uploaders++;
+                    uploaders.incrementAndGet();
+                    interestedUploaders.incrementAndGet();
                     peer.setChoking(false);
                     if (_log.shouldLog(Log.INFO))
                         _log.info("Unchoke: " + peer);
@@ -1487,22 +1499,102 @@ class PeerCoordinator implements PeerListener
    */
   public int allowedUploaders()
   {
-    if (listener != null && listener.overUploadLimit(uploaders)) {
+    int up = uploaders.get();
+    if (listener != null && listener.overUploadLimit(interestedUploaders.get())) {
            if (_log.shouldLog(Log.DEBUG))
-             _log.debug("Over limit, uploaders was: " + uploaders);
-        return uploaders - 1;
-    } else if (uploaders < MAX_UPLOADERS)
-        return uploaders + 1;
-    else
+             _log.debug("Over limit, uploaders was: " + up);
+        return up - 1;
+    } else if (up < MAX_UPLOADERS) {
+        return up + 1;
+    } else {
         return MAX_UPLOADERS;
+    }
   }
 
   /**
+   *  Uploaders whether interested or not
+   *  Use this for per-torrent limits.
+   *
    *  @return current
    *  @since 0.8.4
    */
   public int getUploaders() {
-      return uploaders;
+      int rv = uploaders.get();
+      if (rv > 0) {
+          int max = getPeers();
+          if (rv > max)
+              rv = max;
+      }
+      return rv;
+  }
+
+  /**
+   *  Uploaders, interested only.
+   *  Use this to calculate the global total, so that
+   *  unchoked but uninterested peers don't count against the global limit.
+   *
+   *  @return current
+   *  @since 0.9.28
+   */
+  public int getInterestedUploaders() {
+      int rv = interestedUploaders.get();
+      if (rv > 0) {
+          int max = getPeers();
+          if (rv > max)
+              rv = max;
+      }
+      return rv;
+  }
+
+  /**
+   *  Set the uploaders and interestedUploaders counts
+   *
+   *  @since 0.9.28
+   *  @param upl whether interested or not
+   *  @param inter interested only
+   */
+  public void setUploaders(int upl, int inter) {
+      if (upl < 0)
+          upl = 0;
+      else if (upl > MAX_UPLOADERS)
+          upl = MAX_UPLOADERS;
+      uploaders.set(upl);
+      if (inter < 0)
+          inter = 0;
+      else if (inter > MAX_UPLOADERS)
+          inter = MAX_UPLOADERS;
+      interestedUploaders.set(inter);
+  }
+
+  /**
+   *  Decrement the uploaders and (if set) the interestedUploaders counts
+   *
+   *  @since 0.9.28
+   */
+  public void decrementUploaders(boolean isInterested) {
+      int up = uploaders.decrementAndGet();
+      if (up < 0)
+          uploaders.set(0);
+      if (isInterested) {
+          up = interestedUploaders.decrementAndGet();
+          if (up < 0)
+              interestedUploaders.set(0);
+      }
+  }
+
+  /**
+   *  @return current
+   *  @since 0.9.28
+   */
+  public int getInterestedAndChoking() {
+      return interestedAndChoking.get();
+  }
+
+  /**
+   *  @since 0.9.28
+   */
+  public void addInterestedAndChoking(int toAdd) {
+      interestedAndChoking.addAndGet(toAdd);
   }
 
   public boolean overUpBWLimit()
diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java
index 78e3b33e90..9bced500f2 100644
--- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java
+++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java
@@ -1337,7 +1337,7 @@ public class Snark
     int totalUploaders = 0;
     for (PeerCoordinator c : _peerCoordinatorSet) {
       if (!c.halted())
-        totalUploaders += c.uploaders;
+        totalUploaders += c.getInterestedUploaders();
     }
     int limit = _util.getMaxUploaders();
     if (_log.shouldLog(Log.DEBUG))
diff --git a/history.txt b/history.txt
index 7e0a9bbb32..7f0ad1ad57 100644
--- a/history.txt
+++ b/history.txt
@@ -1,3 +1,8 @@
+2016-10-25 zzz
+ * i2psnark: Better calculation of total upload limit
+ * SSU: Increase max IPv6 MTU (proposal #127)
+ * Zxing 3.3.0
+
 2016-10-23 zzz
  * Crypto: Create keystore directory when making SSL keys (ticket #1866)
 
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index 0725033fa6..f6e6df3a5c 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -18,7 +18,7 @@ public class RouterVersion {
     /** deprecated */
     public final static String ID = "Monotone";
     public final static String VERSION = CoreVersion.VERSION;
-    public final static long BUILD = 3;
+    public final static long BUILD = 4;
 
     /** for example "-test" */
     public final static String EXTRA = "";
-- 
GitLab