I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit d2569fa4 authored by zzz's avatar zzz
Browse files

i2psnark: Don't count unchoked but uninterested peers as uploaders

when calculating global uploader limit, to allow more upload slots,
especially when some torrents are stalled.
Convert some shared fields to atomics for concurrency.
parent 8a845229
No related branches found
No related tags found
No related merge requests found
......@@ -65,6 +65,7 @@ class PeerCheckerTask implements Runnable
Peer worstDownloader = null;
int uploaders = 0;
int interestedUploaders = 0;
int removedCount = 0;
long uploaded = 0;
......@@ -76,7 +77,9 @@ class PeerCheckerTask implements Runnable
int uploadLimit = coordinator.allowedUploaders();
boolean overBWLimit = coordinator.overUpBWLimit();
if (_log.shouldLog(Log.DEBUG))
_log.debug("peers: " + peerList.size() + " limit: " + uploadLimit + " overBW? " + overBWLimit);
_log.debug("START peers: " + peerList.size() + " uploaders: " + coordinator.getUploaders() +
" interested: " + coordinator.getInterestedUploaders() +
" limit: " + uploadLimit + " overBW? " + overBWLimit);
DHT dht = _util.getDHT();
for (Peer peer : peerList) {
......@@ -98,7 +101,9 @@ class PeerCheckerTask implements Runnable
continue;
}
if (!peer.isChoking())
// we only count choking AND interested, so as not to steal a slot
// from some other torrent
if (peer.isInterested() && !peer.isChoking())
uploaders++;
long upload = peer.getUploaded();
......@@ -128,20 +133,23 @@ class PeerCheckerTask implements Runnable
// If we are at our max uploaders and we have lots of other
// interested peers try to make some room.
// (Note use of coordinator.uploaders)
if (((coordinator.uploaders == uploadLimit
&& coordinator.interestedAndChoking > 0)
|| coordinator.uploaders > uploadLimit
|| overBWLimitChoke)
int cup = coordinator.getUploaders();
if (((cup == uploadLimit
&& coordinator.getInterestedAndChoking() > 0)
|| cup > uploadLimit
|| overBWLimitChoke)
&& !peer.isChoking())
{
// Check if it still wants pieces from us.
if (!peer.isInterested())
{
// Note that we only choke if we are over our limits,
// so a peer may remain unchoked even if uninterested.
if (_log.shouldLog(Log.DEBUG))
_log.debug("Choke uninterested peer: " + peer);
peer.setChoking(true);
uploaders--;
coordinator.uploaders--;
coordinator.decrementUploaders(false);
// Put it at the back of the list
removed.add(peer);
......@@ -152,7 +160,8 @@ class PeerCheckerTask implements Runnable
_log.debug("BW limit (" + upload + "/" + uploaded + "), choke peer: " + peer);
peer.setChoking(true);
uploaders--;
coordinator.uploaders--;
interestedUploaders--;
coordinator.decrementUploaders(true);
removedCount++;
// Put it at the back of the list for fairness, even though we won't be unchoking this time
......@@ -165,7 +174,8 @@ class PeerCheckerTask implements Runnable
_log.debug("Choke choking peer: " + peer);
peer.setChoking(true);
uploaders--;
coordinator.uploaders--;
interestedUploaders--;
coordinator.decrementUploaders(true);
removedCount++;
// Put it at the back of the list
......@@ -178,7 +188,8 @@ class PeerCheckerTask implements Runnable
_log.debug("Choke uninteresting peer: " + peer);
peer.setChoking(true);
uploaders--;
coordinator.uploaders--;
interestedUploaders--;
coordinator.decrementUploaders(true);
removedCount++;
// Put it at the back of the list
......@@ -193,7 +204,8 @@ class PeerCheckerTask implements Runnable
_log.debug("Choke downloader that doesn't deliver: " + peer);
peer.setChoking(true);
uploaders--;
coordinator.uploaders--;
interestedUploaders--;
coordinator.decrementUploaders(true);
removedCount++;
// Put it at the back of the list
......@@ -230,11 +242,11 @@ class PeerCheckerTask implements Runnable
// Resync actual uploaders value
// (can shift a bit by disconnecting peers)
coordinator.uploaders = uploaders;
coordinator.setUploaders(uploaders, interestedUploaders);
// Remove the worst downloader if needed. (uploader if seeding)
if (((uploaders == uploadLimit
&& coordinator.interestedAndChoking > 0)
&& coordinator.getInterestedAndChoking() > 0)
|| uploaders > uploadLimit)
&& worstDownloader != null)
{
......@@ -242,28 +254,34 @@ class PeerCheckerTask implements Runnable
_log.debug("Choke worst downloader: " + worstDownloader);
worstDownloader.setChoking(true);
coordinator.uploaders--;
coordinator.decrementUploaders(worstDownloader.isInterested());
removedCount++;
// Put it at the back of the list
removed.add(worstDownloader);
}
// Optimistically unchoke a peer
if ((!overBWLimit) && !coordinator.overUpBWLimit(uploaded))
coordinator.unchokePeer();
// Put peers back at the end of the list that we removed earlier.
boolean coordOver = coordinator.overUpBWLimit(uploaded);
synchronized (coordinator.peers) {
if ((!overBWLimit) && !coordOver) {
// Optimistically unchoke a peer
// must be called inside synch
coordinator.unchokePeer();
}
// Put peers back at the end of the list that we removed earlier.
for(Peer peer : removed) {
if (coordinator.peers.remove(peer))
coordinator.peers.add(peer);
}
}
coordinator.interestedAndChoking += removedCount;
coordinator.addInterestedAndChoking(removedCount);
// store the rates
coordinator.setRateHistory(uploaded, downloaded);
if (_log.shouldLog(Log.DEBUG))
_log.debug("END peers: " + peerList.size() + " uploaders: " + uploaders +
" interested: " + interestedUploaders);
// close out unused files, but we don't need to do it every time
Storage storage = coordinator.getStorage();
......
......@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
......@@ -74,16 +75,23 @@ class PeerCoordinator implements PeerListener
public static final long MAX_INACTIVE = 8*60*1000;
/**
* Approximation of the number of current uploaders.
* Approximation of the number of current uploaders (unchoked peers),
* whether interested or not.
* Resynced by PeerChecker once in a while.
*/
private final AtomicInteger uploaders = new AtomicInteger();
/**
* Approximation of the number of current uploaders (unchoked peers),
* that are interested.
* Resynced by PeerChecker once in a while.
* External use by PeerCheckerTask only.
*/
int uploaders;
private final AtomicInteger interestedUploaders = new AtomicInteger();
/**
* External use by PeerCheckerTask only.
*/
int interestedAndChoking;
private final AtomicInteger interestedAndChoking = new AtomicInteger();
// final static int MAX_DOWNLOADERS = MAX_CONNECTIONS;
// int downloaders = 0;
......@@ -627,8 +635,9 @@ class PeerCoordinator implements PeerListener
return false;
}
// (Optimistically) unchoke. Should be called with peers synchronized
/**
* (Optimistically) unchoke. Must be called with peers synchronized
*/
void unchokePeer()
{
// linked list will contain all interested peers that we choke.
......@@ -645,7 +654,7 @@ class PeerCoordinator implements PeerListener
if (peer.isChoking() && peer.isInterested())
{
count++;
if (uploaders < maxUploaders)
if (uploaders.get() < maxUploaders)
{
if (peer.isInteresting() && !peer.isChoked())
interested.add(unchokedCount++, peer);
......@@ -655,20 +664,22 @@ class PeerCoordinator implements PeerListener
}
}
while (uploaders < maxUploaders && !interested.isEmpty())
int up = uploaders.get();
while (up < maxUploaders && !interested.isEmpty())
{
Peer peer = interested.remove(0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unchoke: " + peer);
peer.setChoking(false);
uploaders++;
up = uploaders.incrementAndGet();
interestedUploaders.incrementAndGet();
count--;
// Put peer back at the end of the list.
peers.remove(peer);
peers.add(peer);
peerCount = peers.size();
}
interestedAndChoking = count;
interestedAndChoking.set(count);
}
/**
......@@ -1098,11 +1109,12 @@ class PeerCoordinator implements PeerListener
{
if (interest)
{
if (uploaders < allowedUploaders())
if (uploaders.get() < allowedUploaders())
{
if(peer.isChoking())
{
uploaders++;
uploaders.incrementAndGet();
interestedUploaders.incrementAndGet();
peer.setChoking(false);
if (_log.shouldLog(Log.INFO))
_log.info("Unchoke: " + peer);
......@@ -1487,22 +1499,102 @@ class PeerCoordinator implements PeerListener
*/
public int allowedUploaders()
{
if (listener != null && listener.overUploadLimit(uploaders)) {
int up = uploaders.get();
if (listener != null && listener.overUploadLimit(interestedUploaders.get())) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Over limit, uploaders was: " + uploaders);
return uploaders - 1;
} else if (uploaders < MAX_UPLOADERS)
return uploaders + 1;
else
_log.debug("Over limit, uploaders was: " + up);
return up - 1;
} else if (up < MAX_UPLOADERS) {
return up + 1;
} else {
return MAX_UPLOADERS;
}
}
/**
* Uploaders whether interested or not
* Use this for per-torrent limits.
*
* @return current
* @since 0.8.4
*/
public int getUploaders() {
return uploaders;
int rv = uploaders.get();
if (rv > 0) {
int max = getPeers();
if (rv > max)
rv = max;
}
return rv;
}
/**
* Uploaders, interested only.
* Use this to calculate the global total, so that
* unchoked but uninterested peers don't count against the global limit.
*
* @return current
* @since 0.9.28
*/
public int getInterestedUploaders() {
int rv = interestedUploaders.get();
if (rv > 0) {
int max = getPeers();
if (rv > max)
rv = max;
}
return rv;
}
/**
* Set the uploaders and interestedUploaders counts
*
* @since 0.9.28
* @param upl whether interested or not
* @param inter interested only
*/
public void setUploaders(int upl, int inter) {
if (upl < 0)
upl = 0;
else if (upl > MAX_UPLOADERS)
upl = MAX_UPLOADERS;
uploaders.set(upl);
if (inter < 0)
inter = 0;
else if (inter > MAX_UPLOADERS)
inter = MAX_UPLOADERS;
interestedUploaders.set(inter);
}
/**
* Decrement the uploaders and (if set) the interestedUploaders counts
*
* @since 0.9.28
*/
public void decrementUploaders(boolean isInterested) {
int up = uploaders.decrementAndGet();
if (up < 0)
uploaders.set(0);
if (isInterested) {
up = interestedUploaders.decrementAndGet();
if (up < 0)
interestedUploaders.set(0);
}
}
/**
* @return current
* @since 0.9.28
*/
public int getInterestedAndChoking() {
return interestedAndChoking.get();
}
/**
* @since 0.9.28
*/
public void addInterestedAndChoking(int toAdd) {
interestedAndChoking.addAndGet(toAdd);
}
public boolean overUpBWLimit()
......
......@@ -1337,7 +1337,7 @@ public class Snark
int totalUploaders = 0;
for (PeerCoordinator c : _peerCoordinatorSet) {
if (!c.halted())
totalUploaders += c.uploaders;
totalUploaders += c.getInterestedUploaders();
}
int limit = _util.getMaxUploaders();
if (_log.shouldLog(Log.DEBUG))
......
2016-10-25 zzz
* i2psnark: Better calculation of total upload limit
* SSU: Increase max IPv6 MTU (proposal #127)
* Zxing 3.3.0
2016-10-23 zzz
* Crypto: Create keystore directory when making SSL keys (ticket #1866)
 
......
......@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 3;
public final static long BUILD = 4;
/** for example "-test" */
public final static String EXTRA = "";
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment