i2psnark: Bandwidth limiter improvements

This commit is contained in:
zzz
2024-01-15 10:05:46 +00:00
parent 399507805f
commit db52cc7811
20 changed files with 908 additions and 339 deletions

View File

@@ -6,6 +6,7 @@ package org.klomp.snark;
import java.util.Properties;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PSession;
@@ -32,7 +33,9 @@ class BWLimits {
session.connect();
rv = session.bandwidthLimits();
session.destroySession();
} catch (I2PSessionException ise) {}
} catch (I2PSessionException ise) {
I2PAppContext.getGlobalContext().logManager().getLog(BWLimits.class).warn("BWL fail", ise);
}
return rv;
}

View File

@@ -0,0 +1,63 @@
package org.klomp.snark;
/**
* Bandwidth and bandwidth limits
*
* Maintain three bandwidth estimators:
* Sent, received, and requested.
*
* @since 0.9.62
*/
public interface BandwidthListener {
/**
* The average rate in Bps
*/
public long getUploadRate();
/**
* The average rate in Bps
*/
public long getDownloadRate();
/**
* We unconditionally sent this many bytes
*/
public void uploaded(int size);
/**
* We unconditionally received this many bytes
*/
public void downloaded(int size);
/**
* Should we send this many bytes?
* Do NOT call uploaded() if this returns true.
*/
public boolean shouldSend(int size);
/**
* Should we request this many bytes?
*/
public boolean shouldRequest(Peer peer, int size);
/**
* Current limit in BPS
*/
public long getUpBWLimit();
/**
* Current limit in BPS
*/
public long getDownBWLimit();
/**
* Are we currently over the limit?
*/
public boolean overUpBWLimit();
/**
* Are we currently over the limit?
*/
public boolean overDownBWLimit();
}

View File

@@ -0,0 +1,179 @@
package org.klomp.snark;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
import net.i2p.util.SyntheticREDQueue;
/**
* Bandwidth and bandwidth limits
*
* Maintain three bandwidth estimators:
* Sent, received, and requested.
*
* There are three layers of BandwidthListeners:
*<pre>
* BandwidthManager (total)
* PeerCoordinator (per-torrent)
* Peer/WebPeer (per-connection)
*<pre>
*
* Here at the top, we use SyntheticRedQueues for accurate
* and current moving averages of up, down, and requested bandwidth.
*
* At the lower layers, simple weighted moving averages of
* three buckets of 40 seconds each (2 minutes total) are used
* for up and down, and requested is delegated here.
*
* The lower layers must report to the next-higher layer.
*
* At the Peer layer, we report inbound piece data per-read,
* not per-piece, to get a smoother inbound estimate.
*
* Only the following data are counted by the BandwidthListeners:
*<ul><li>Pieces (both Peer and WebPeer)
*<li>ut_metadata
*</ul>
*
* No overhead at any layer is accounted for.
*
* @since 0.9.62
*/
public class BandwidthManager implements BandwidthListener {
private final I2PAppContext _context;
private final Log _log;
private SyntheticREDQueue _up, _down, _req;
BandwidthManager(I2PAppContext ctx, int upLimit, int downLimit) {
_context = ctx;
_log = ctx.logManager().getLog(BandwidthManager.class);
_up = new SyntheticREDQueue(ctx, upLimit);
_down = new SyntheticREDQueue(ctx, downLimit);
// Allow down limit a little higher based on testing
// Allow req limit a little higher still because it uses RED
// so it actually kicks in sooner.
_req = new SyntheticREDQueue(ctx, downLimit * 110 / 100);
}
/**
* Current limit in Bps
*/
void setUpBWLimit(long upLimit) {
int limit = (int) Math.min(upLimit, Integer.MAX_VALUE);
if (limit != getUpBWLimit())
_up = new SyntheticREDQueue(_context, limit);
}
/**
* Current limit in Bps
*/
void setDownBWLimit(long downLimit) {
int limit = (int) Math.min(downLimit, Integer.MAX_VALUE);
if (limit != getDownBWLimit()) {
_down = new SyntheticREDQueue(_context, limit);
_req = new SyntheticREDQueue(_context, limit * 110 / 100);
}
}
/**
* The average rate in Bps
*/
long getRequestRate() {
return (long) (1000f * _req.getBandwidthEstimate());
}
// begin BandwidthListener interface
/**
* The average rate in Bps
*/
public long getUploadRate() {
return (long) (1000f * _up.getBandwidthEstimate());
}
/**
* The average rate in Bps
*/
public long getDownloadRate() {
return (long) (1000f * _down.getBandwidthEstimate());
}
/**
* We unconditionally sent this many bytes
*/
public void uploaded(int size) {
_up.addSample(size);
}
/**
* We received this many bytes
*/
public void downloaded(int size) {
_down.addSample(size);
}
/**
* Should we send this many bytes?
* Do NOT call uploaded() if this returns true.
*/
public boolean shouldSend(int size) {
boolean rv = _up.offer(size, 1.0f);
if (!rv && _log.shouldWarn())
_log.warn("Deny sending " + size + " bytes, upload rate " + DataHelper.formatSize(getUploadRate()) + "Bps");
return rv;
}
/**
* Should we request this many bytes?
*
* @param peer ignored
*/
public boolean shouldRequest(Peer peer, int size) {
boolean rv = !overDownBWLimit() && _req.offer(size, 1.0f);
if (!rv && _log.shouldWarn())
_log.warn("Deny requesting " + size + " bytes, download rate " + DataHelper.formatSize(getDownloadRate()) + "Bps" +
", request rate " + DataHelper.formatSize(getRequestRate()) + "Bps");
return rv;
}
/**
* Current limit in BPS
*/
public long getUpBWLimit() {
return _up.getMaxBandwidth();
}
/**
* Current limit in BPS
*/
public long getDownBWLimit() {
return _down.getMaxBandwidth();
}
/**
* Are we currently over the limit?
*/
public boolean overUpBWLimit() {
return getUploadRate() > getUpBWLimit();
}
/**
* Are we currently over the limit?
*/
public boolean overDownBWLimit() {
return getDownloadRate() > getDownBWLimit();
}
/**
* In HTML for debug page
*/
@Override
public String toString() {
return "<br><b>Bandwidth Limiters</b><br><b>Up:</b> " + _up +
"<br><b>Down:</b> " + _down +
"<br><b>Req:</b> " + _req +
"<br>";
}
}

View File

@@ -83,4 +83,9 @@ public interface CompleteListener {
* @since 0.9.42
*/
public boolean shouldAutoStart();
/**
* @since 0.9.62
*/
public BandwidthListener getBandwidthListener();
}

View File

@@ -42,15 +42,5 @@ interface CoordinatorListener
*/
public boolean overUploadLimit(int uploaders);
/**
* Is i2psnark as a whole over its limit?
*/
public boolean overUpBWLimit();
/**
* Is a particular peer who has this recent download rate (in Bps) over our upstream bandwidth limit?
*/
public boolean overUpBWLimit(long total);
public void addMessage(String message);
}

View File

@@ -190,6 +190,8 @@ abstract class ExtensionHandler {
}
if (log.shouldLog(Log.INFO))
log.info("Request chunk " + chk + " from " + peer);
// ignore the rv, always request
peer.shouldRequest(state.chunkSize(chk));
sendRequest(peer, chk);
}
} catch (Exception e) {
@@ -230,7 +232,6 @@ abstract class ExtensionHandler {
sendPiece(peer, piece, pc, totalSize);
// Do this here because PeerConnectionOut only reports for PIECE messages
peer.uploaded(pc.length);
listener.uploaded(peer, pc.length);
} else if (type == TYPE_DATA) {
// On close reading of BEP 9, this is the total metadata size.
// Prior to 0.9.21, we sent the piece size, so we can't count on it.
@@ -245,7 +246,6 @@ abstract class ExtensionHandler {
return;
int len = is.available();
peer.downloaded(len);
listener.downloaded(peer, len);
// this checks the size
done = state.saveChunk(piece, bs, bs.length - len, len);
if (log.shouldLog(Log.INFO))
@@ -264,6 +264,8 @@ abstract class ExtensionHandler {
// get the next chunk
if (log.shouldLog(Log.INFO))
log.info("Request chunk " + chk + " from " + peer);
// ignore the rv, always request
peer.shouldRequest(state.chunkSize(chk));
sendRequest(peer, chk);
}
} else if (type == TYPE_REJECT) {

View File

@@ -2,6 +2,7 @@ package org.klomp.snark;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -43,6 +44,7 @@ class PartialPiece implements Comparable<PartialPiece> {
private RandomAccessFile raf;
private final int pclen;
private final File tempDir;
private final BitField bitfield;
private static final int BUFSIZE = PeerState.PARTSIZE;
private static final ByteCache _cache = ByteCache.getInstance(16, BUFSIZE);
@@ -66,6 +68,7 @@ class PartialPiece implements Comparable<PartialPiece> {
this.pclen = len;
//this.createdTime = 0;
this.tempDir = tempDir;
bitfield = new BitField((len + PeerState.PARTSIZE - 1) / PeerState.PARTSIZE);
// temps for finals
byte[] tbs = null;
@@ -98,22 +101,27 @@ class PartialPiece implements Comparable<PartialPiece> {
//tfile = SecureFile.createTempFile("piece", null, tempDir);
// debug
tempfile = SecureFile.createTempFile("piece_" + piece.getId() + '_', null, tempDir);
//I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn("Created " + tempfile);
// tfile.deleteOnExit() ???
raf = new RandomAccessFile(tempfile, "rw");
// Do not preallocate the file space.
// Not necessary to call setLength(), file is extended when written
//traf.setLength(len);
}
/**
* Convert this PartialPiece to a request for the next chunk.
* Used by PeerState only. This depends on the downloaded value
* as set by setDownloaded() or read().
* Used by PeerState only.
*
* @return null if complete
*/
public synchronized Request getRequest() {
return new Request(this, this.off, Math.min(this.pclen - this.off, PeerState.PARTSIZE));
int chunk = off / PeerState.PARTSIZE;
int sz = bitfield.size();
for (int i = chunk; i < sz; i++) {
if (!bitfield.get(i))
return new Request(this, off, Math.min(pclen - off, PeerState.PARTSIZE));
if (i == sz - 1)
off = pclen;
else
off += PeerState.PARTSIZE;
}
return null;
}
/** piece number */
@@ -129,28 +137,20 @@ class PartialPiece implements Comparable<PartialPiece> {
}
/**
* How many bytes are good - as set by setDownloaded() or read()
* @since 0.9.62
*/
public synchronized boolean isComplete() {
return bitfield.complete();
}
/**
* How many consecutive bytes are good - as set by read().
* There may be more good bytes after "holes".
*/
public synchronized int getDownloaded() {
return this.off;
}
/**
* Call this if necessary before returning a PartialPiece to the PeerCoordinator.
* We do not use a bitmap to track individual chunks received.
* Any chunks after a 'hole' will be lost.
* @since 0.9.1
*/
public synchronized void setDownloaded(int offset) {
this.off = offset;
}
/****
public long getCreated() {
return this.createdTime;
}
****/
/**
* Piece must be complete.
* The SHA1 hash of the completely read data.
@@ -200,13 +200,47 @@ class PartialPiece implements Comparable<PartialPiece> {
*
* @since 0.9.1
*/
public void read(DataInputStream din, int offset, int len) throws IOException {
public void read(DataInputStream din, int offset, int len, BandwidthListener bwl) throws IOException {
if (offset % PeerState.PARTSIZE != 0)
throw new IOException("Bad offset " + offset);
int chunk = offset / PeerState.PARTSIZE;
// We read the data before checking if we have the chunk,
// because otherwise we'd have to break the peer connection
if (bs != null) {
din.readFully(bs, offset, len);
// Don't use readFully() so we may update the BandwidthListener as we go
//in.readFully(bs, offset, len);
int offs = offset;
int toRead = len;
while (toRead > 0) {
int numRead = din.read(bs, offs, toRead);
if (numRead < 0)
throw new EOFException();
offs += numRead;
toRead -= numRead;
bwl.downloaded(numRead);
}
synchronized (this) {
// only works for in-order chunks
if (this.off == offset)
this.off += len;
if (bitfield.get(chunk)) {
warn("Already have chunk " + chunk + " on " + this);
} else {
bitfield.set(chunk);
if (this.off == offset) {
this.off += len;
// if this filled in a hole, advance off
int sz = bitfield.size();
for (int i = chunk + 1; i < sz; i++) {
if (!bitfield.get(i))
break;
warn("Hole filled in before chunk " + i + " on " + this + ' ' + bitfield);
if (i == sz - 1)
off = pclen;
else
off += PeerState.PARTSIZE;
}
} else {
warn("Out of order chunk " + chunk + " on " + this + ' ' + bitfield);
}
}
}
} else {
// read in fully before synching on raf
@@ -219,15 +253,46 @@ class PartialPiece implements Comparable<PartialPiece> {
ba = null;
tmp = new byte[len];
}
din.readFully(tmp);
// Don't use readFully() so we may update the BandwidthListener as we go
//din.readFully(tmp);
int offs = 0;
int toRead = len;
while (toRead > 0) {
int numRead = din.read(tmp, offs, toRead);
if (numRead < 0)
throw new EOFException();
offs += numRead;
toRead -= numRead;
bwl.downloaded(numRead);
}
synchronized (this) {
if (raf == null)
createTemp();
raf.seek(offset);
raf.write(tmp);
// only works for in-order chunks
if (this.off == offset)
this.off += len;
if (bitfield.get(chunk)) {
warn("Already have chunk " + chunk + " on " + this);
} else {
if (raf == null)
createTemp();
raf.seek(offset);
raf.write(tmp);
bitfield.set(chunk);
if (this.off == offset) {
this.off += len;
// if this filled in a hole, advance off
int sz = bitfield.size();
for (int i = chunk + 1; i < sz; i++) {
if (!bitfield.get(i))
break;
warn("Hole filled in before chunk " + i + " on " + this + ' ' + bitfield);
if (i == sz - 1)
off = pclen;
else
off += PeerState.PARTSIZE;
}
} else {
warn("Out of order chunk " + chunk + " on " + this + ' ' + bitfield);
}
}
}
if (ba != null)
_cache.release(ba, false);
@@ -300,7 +365,6 @@ class PartialPiece implements Comparable<PartialPiece> {
try {
raf.close();
} catch (IOException ioe) {
I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn("Error closing " + raf, ioe);
}
tempfile.delete();
}
@@ -339,4 +403,11 @@ class PartialPiece implements Comparable<PartialPiece> {
public String toString() {
return "Partial(" + piece.getId() + ',' + off + ',' + pclen + ')';
}
/**
* @since 0.9.62
*/
public static void warn(String s) {
I2PAppContext.getGlobalContext().logManager().getLog(PartialPiece.class).warn(s);
}
}

View File

@@ -40,7 +40,7 @@ import net.i2p.util.Log;
import org.klomp.snark.bencode.BEValue;
import org.klomp.snark.bencode.InvalidBEncodingException;
public class Peer implements Comparable<Peer>
public class Peer implements Comparable<Peer>, BandwidthListener
{
protected final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
// Identifying property, the peer id of the other side.
@@ -239,7 +239,7 @@ public class Peer implements Comparable<Peer>
*
* @param uploadOnly if we are complete with skipped files, i.e. a partial seed
*/
public void runConnection(I2PSnarkUtil util, PeerListener listener, BitField bitfield,
public void runConnection(I2PSnarkUtil util, PeerListener listener, BandwidthListener bwl, BitField bitfield,
MagnetState mState, boolean uploadOnly) {
if (state != null)
throw new IllegalStateException("Peer already started");
@@ -288,7 +288,7 @@ public class Peer implements Comparable<Peer>
PeerConnectionIn in = new PeerConnectionIn(this, din);
PeerConnectionOut out = new PeerConnectionOut(this, dout);
PeerState s = new PeerState(this, listener, metainfo, in, out);
PeerState s = new PeerState(this, listener, bwl, metainfo, in, out);
if ((options & OPTION_EXTENSION) != 0) {
if (_log.shouldLog(Log.DEBUG))
@@ -651,12 +651,17 @@ public class Peer implements Comparable<Peer>
return (s == null) || s.choked;
}
/////// begin BandwidthListener interface ///////
/**
* Increment the counter.
* @since 0.8.4
*/
public void downloaded(int size) {
downloaded.addAndGet(size);
PeerState s = state;
if (s != null)
s.getBandwidthListener().downloaded(size);
}
/**
@@ -665,6 +670,9 @@ public class Peer implements Comparable<Peer>
*/
public void uploaded(int size) {
uploaded.addAndGet(size);
PeerState s = state;
if (s != null)
s.getBandwidthListener().uploaded(size);
}
/**
@@ -688,13 +696,115 @@ public class Peer implements Comparable<Peer>
}
/**
* Returns the average rate in Bps
*/
public long getUploadRate()
{
return PeerCoordinator.getRate(uploaded_old);
}
public long getDownloadRate()
{
return PeerCoordinator.getRate(downloaded_old);
}
/**
* Should we send this many bytes?
* Do NOT call uploaded() after this.
* @since 0.9.62
*/
public boolean shouldSend(int size) {
PeerState s = state;
if (s != null) {
boolean rv = s.getBandwidthListener().shouldSend(size);
if (rv)
uploaded.addAndGet(size);
return rv;
}
return false;
}
/**
* Should we request this many bytes?
* @since 0.9.62
*/
public boolean shouldRequest(int size) {
PeerState s = state;
if (s != null)
return s.getBandwidthListener().shouldRequest(this, size);
return false;
}
/**
* Should we request this many bytes?
* @since 0.9.62
*/
public boolean shouldRequest(Peer peer, int size) {
if (peer != this)
return false;
PeerState s = state;
if (s != null)
return s.getBandwidthListener().shouldRequest(this, size);
return false;
}
/**
* Current limit in Bps
* @since 0.9.62
*/
public long getUpBWLimit() {
PeerState s = state;
if (s != null)
return s.getBandwidthListener().getUpBWLimit();
return Integer.MAX_VALUE;
}
/**
* Is snark as a whole over its limit?
* @since 0.9.62
*/
public boolean overUpBWLimit()
{
PeerState s = state;
if (s != null)
return s.getBandwidthListener().overUpBWLimit();
return false;
}
/**
* Current limit in Bps
* @since 0.9.62
*/
public long getDownBWLimit() {
PeerState s = state;
if (s != null)
return s.getBandwidthListener().getDownBWLimit();
return Integer.MAX_VALUE;
}
/**
* Are we currently over the limit?
* @since 0.9.62
*/
public boolean overDownBWLimit() {
PeerState s = state;
if (s != null)
return s.getBandwidthListener().overDownBWLimit();
return false;
}
/**
* Push the total uploaded/downloaded onto a RATE_DEPTH deep stack
* Resets the downloaded and uploaded counters to zero.
*/
public void resetCounters()
{
downloaded.set(0);
uploaded.set(0);
void setRateHistory() {
long up = uploaded.getAndSet(0);
PeerCoordinator.setRate(up, uploaded_old);
long down = downloaded.getAndSet(0);
PeerCoordinator.setRate(down, downloaded_old);
}
/////// end BandwidthListener interface ///////
public long getInactiveTime() {
PeerState s = state;
@@ -761,28 +871,6 @@ public class Peer implements Comparable<Peer>
return s.bitfield.complete();
}
/**
* Push the total uploaded/downloaded onto a RATE_DEPTH deep stack
*/
public void setRateHistory(long up, long down)
{
PeerCoordinator.setRate(up, uploaded_old);
PeerCoordinator.setRate(down, downloaded_old);
}
/**
* Returns the 4-minute-average rate in Bps
*/
public long getUploadRate()
{
return PeerCoordinator.getRate(uploaded_old);
}
public long getDownloadRate()
{
return PeerCoordinator.getRate(downloaded_old);
}
/** @since 0.9.31 */
int getTotalCommentsSent() {
return _totalCommentsSent;

View File

@@ -113,8 +113,7 @@ class PeerCheckerTask implements Runnable
uploaded += upload;
long download = peer.getDownloaded();
downloaded += download;
peer.setRateHistory(upload, download);
peer.resetCounters();
peer.setRateHistory();
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(peer + ":"
@@ -127,10 +126,9 @@ class PeerCheckerTask implements Runnable
}
// Choke a percentage of them rather than all so it isn't so drastic...
// unless this torrent is over the limit all by itself.
// choke 5/8 of the time when seeding and 3/8 when leeching
// choke 3/8 of the time when seeding and 1/4 when leeching
boolean overBWLimitChoke = upload > 0 &&
((overBWLimit && (random.nextInt(8) > (coordinator.completed() ? 2 : 4))) ||
((overBWLimit && (random.nextInt(8) > (coordinator.completed() ? 4 : 5))) ||
(coordinator.overUpBWLimit(uploaded)));
// If we are at our max uploaders and we have lots of other

View File

@@ -157,7 +157,7 @@ class PeerConnectionIn implements Runnable
Request req = ps.getOutstandingRequest(piece, begin, len);
if (req != null)
{
req.read(din);
req.read(din, peer);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received data(" + piece + "," + begin + ") from " + peer);
ps.pieceMessage(req);

View File

@@ -73,6 +73,7 @@ class PeerConnectionOut implements Runnable
{
try
{
boolean shouldThrottleRequests = false;
while (!quit && peer.isConnected())
{
Message m = null;
@@ -89,7 +90,7 @@ class PeerConnectionOut implements Runnable
synchronized(sendQueue)
{
while (!quit && peer.isConnected() && sendQueue.isEmpty())
while (!quit && peer.isConnected() && (shouldThrottleRequests || sendQueue.isEmpty()))
{
try
{
@@ -98,12 +99,13 @@ class PeerConnectionOut implements Runnable
// dout.flush();
// Wait till more data arrives.
sendQueue.wait(60*1000);
sendQueue.wait(shouldThrottleRequests ? 5000 : 60*1000);
}
catch (InterruptedException ie)
{
/* ignored */
}
shouldThrottleRequests = false;
}
state = peer.state;
if (!quit && state != null && peer.isConnected())
@@ -125,7 +127,6 @@ class PeerConnectionOut implements Runnable
{
if (state.choking) {
it.remove();
//SimpleTimer.getInstance().removeEvent(nm.expireEvent);
if (peer.supportsFast()) {
Message r = new Message(Message.REJECT, nm.piece, nm.begin, nm.length);
if (_log.shouldLog(Log.DEBUG))
@@ -135,23 +136,50 @@ class PeerConnectionOut implements Runnable
}
nm = null;
}
else if (nm.type == Message.REQUEST && state.choked)
else if (nm.type == Message.REQUEST)
{
it.remove();
//SimpleTimer.getInstance().removeEvent(nm.expireEvent);
nm = null;
if (state.choked) {
it.remove();
nm = null;
} else if (shouldThrottleRequests) {
// previous request in queue throttled, skip this one too
if (_log.shouldWarn())
_log.warn("Additional throttle: " + nm + " to " + peer);
nm = null;
} else if (!peer.shouldRequest(nm.length)) {
// request throttle, skip this and all others in this loop
if (_log.shouldWarn())
_log.warn("Throttle: " + nm + " to " + peer);
shouldThrottleRequests = true;
nm = null;
}
}
if (nm != null)
{
m = nm;
//SimpleTimer.getInstance().removeEvent(nm.expireEvent);
it.remove();
}
}
if (m == null) {
m = sendQueue.poll();
//SimpleTimer.getInstance().removeEvent(m.expireEvent);
m = sendQueue.peek();
if (m != null && m.type == Message.PIECE) {
// bandwidth limiting
// Pieces are the last thing in the queue to be sent so we can
// simply wait right here and then loop
if (!peer.shouldSend(Math.min(m.length, PeerState.PARTSIZE))) {
if (_log.shouldWarn())
_log.warn("Throttle: " + m + " to " + peer);
try {
sendQueue.wait(5000);
} catch (InterruptedException ie) {}
continue;
}
} else if (m != null && m.type == Message.REQUEST) {
if (shouldThrottleRequests)
continue;
}
m = sendQueue.poll();
}
}
}
@@ -178,17 +206,14 @@ class PeerConnectionOut implements Runnable
// only count the rest of the upload after sendMessage().
int remainder = 0;
if (m.type == Message.PIECE) {
if (m.len <= PeerState.PARTSIZE) {
state.uploaded(m.len);
} else {
state.uploaded(PeerState.PARTSIZE);
// first PARTSIZE was signalled in shouldSend() above
if (m.len > PeerState.PARTSIZE)
remainder = m.len - PeerState.PARTSIZE;
}
}
m.sendMessage(dout);
if (remainder > 0)
state.uploaded(remainder);
peer.uploaded(remainder);
m = null;
}
}

View File

@@ -55,7 +55,7 @@ import org.klomp.snark.dht.DHT;
/**
* Coordinates what peer does what.
*/
class PeerCoordinator implements PeerListener
class PeerCoordinator implements PeerListener, BandwidthListener
{
private final Log _log;
@@ -124,6 +124,12 @@ class PeerCoordinator implements PeerListener
/** Timer to handle all periodical tasks. */
private final CheckEvent timer;
// RerequestEvent and related values
private final SimpleTimer2.TimedEvent rerequestTimer;
private final Object rerequestLock = new Object();
private boolean wasRequestAllowed;
private boolean isRerequestScheduled;
private final byte[] id;
private final byte[] infohash;
@@ -145,6 +151,7 @@ class PeerCoordinator implements PeerListener
private final MagnetState magnetState;
private final CoordinatorListener listener;
private final BandwidthListener bwListener;
private final I2PSnarkUtil _util;
private final RandomSource _random;
@@ -163,7 +170,7 @@ class PeerCoordinator implements PeerListener
* @param storage null if in magnet mode
*/
public PeerCoordinator(I2PSnarkUtil util, byte[] id, byte[] infohash, MetaInfo metainfo, Storage storage,
CoordinatorListener listener, Snark torrent)
CoordinatorListener listener, Snark torrent, BandwidthListener bwl)
{
_util = util;
_random = util.getContext().random();
@@ -174,6 +181,7 @@ class PeerCoordinator implements PeerListener
this.storage = storage;
this.listener = listener;
this.snark = torrent;
bwListener = bwl;
wantedPieces = new ArrayList<Piece>();
setWantedPieces();
@@ -188,6 +196,9 @@ class PeerCoordinator implements PeerListener
timer = new CheckEvent(_util.getContext(), new PeerCheckerTask(_util, this));
timer.schedule((CHECK_PERIOD / 2) + _random.nextInt((int) CHECK_PERIOD));
// NOT scheduled until needed
rerequestTimer = new RerequestEvent();
// we don't store the last-requested time, so just delay a random amount
_commentsLastRequested.set(util.getContext().clock().now() - (COMMENT_REQ_INTERVAL - _random.nextLong(COMMENT_REQ_DELAY)));
}
@@ -208,6 +219,42 @@ class PeerCoordinator implements PeerListener
}
}
/**
* Rerequest after unthrottled
* @since 0.9.62
*/
private class RerequestEvent extends SimpleTimer2.TimedEvent {
/** caller must schedule */
public RerequestEvent() {
super(_util.getContext().simpleTimer2());
}
public void timeReached() {
if (bwListener.shouldRequest(null, 0)) {
if (_log.shouldWarn())
_log.warn("Now unthrottled, rerequest timer poking all peers");
// so shouldRequest() won't fire us up again
synchronized(rerequestLock) {
wasRequestAllowed = true;
}
for (Peer p : peers) {
if (p.isInteresting() && !p.isChoked())
p.request();
}
synchronized(rerequestLock) {
isRerequestScheduled = false;
}
} else {
if (_log.shouldWarn())
_log.warn("Still throttled, rerequest timer reschedule");
synchronized(rerequestLock) {
wasRequestAllowed = false;
}
schedule(2*1000);
}
}
}
/**
* Only called externally from Storage after the double-check fails.
* Sets wantedBytes too.
@@ -328,6 +375,78 @@ class PeerCoordinator implements PeerListener
return downloaded.get();
}
/////// begin BandwidthListener interface ///////
/**
* Called when a peer has uploaded some bytes of a piece.
* @since 0.9.62 params changed
*/
public void uploaded(int size) {
uploaded.addAndGet(size);
bwListener.uploaded(size);
}
/**
* Called when a peer has downloaded some bytes of a piece.
* @since 0.9.62 params changed
*/
public void downloaded(int size) {
downloaded.addAndGet(size);
bwListener.downloaded(size);
}
/**
* Should we send this many bytes?
* Do NOT call uploaded() if this returns true.
* @since 0.9.62
*/
public boolean shouldSend(int size) {
boolean rv = bwListener.shouldSend(size);
if (rv)
uploaded.addAndGet(size);
return rv;
}
/**
* Should we request this many bytes?
* @since 0.9.62
*/
public boolean shouldRequest(Peer peer, int size) {
boolean rv;
synchronized(rerequestLock) {
rv = bwListener.shouldRequest(peer, size);
if (!wasRequestAllowed && rv) {
// we weren't allowed and now we are
if (isRerequestScheduled) {
// just let the timer run when scheduled, do not pull it in
// to prevent thrashing
//if (_log.shouldWarn())
// _log.warn("Now unthrottled, BUT DON'T reschedule rerequest timer");
} else {
// schedule the timer
// we still have to throw it to the timer so we don't loop
if (_log.shouldWarn())
_log.warn("Now unthrottled, schedule rerequest timer");
isRerequestScheduled = true;
// no rush, wait at little while
rerequestTimer.reschedule(1000);
}
wasRequestAllowed = true;
} else if (wasRequestAllowed && !rv) {
// we were allowed and now we aren't
if (!isRerequestScheduled) {
// schedule the timer
if (_log.shouldWarn())
_log.warn("Now throttled, schedule rerequest timer");
isRerequestScheduled = true;
rerequestTimer.schedule(3*1000);
}
wasRequestAllowed = false;
}
}
return rv;
}
/**
* Push the total uploaded/downloaded onto a RATE_DEPTH deep stack
*/
@@ -402,6 +521,49 @@ class PeerCoordinator implements PeerListener
return rate / (factor * CHECK_PERIOD / 1000);
}
/**
* Current limit in Bps
* @since 0.9.62
*/
public long getUpBWLimit() {
return bwListener.getUpBWLimit();
}
/**
* Is snark as a whole over its limit?
*/
public boolean overUpBWLimit()
{
return bwListener.overUpBWLimit();
}
/**
* Is a particular peer who has downloaded this many bytes from us
* in the last CHECK_PERIOD over its limit?
*/
public boolean overUpBWLimit(long total)
{
return total * 1000 / CHECK_PERIOD > getUpBWLimit();
}
/**
* Current limit in Bps
* @since 0.9.62
*/
public long getDownBWLimit() {
return bwListener.getDownBWLimit();
}
/**
* Are we currently over the limit?
* @since 0.9.62
*/
public boolean overDownBWLimit() {
return bwListener.overDownBWLimit();
}
/////// end BandwidthListener interface ///////
public MetaInfo getMetaInfo()
{
return metainfo;
@@ -659,7 +821,7 @@ class PeerCoordinator implements PeerListener
{
public void run()
{
peer.runConnection(_util, listener, bitfield, magnetState, partialComplete);
peer.runConnection(_util, listener, PeerCoordinator.this, bitfield, magnetState, partialComplete);
}
};
String threadName = "Snark peer " + peer.toString();
@@ -683,6 +845,8 @@ class PeerCoordinator implements PeerListener
{
if (storage == null || storage.getBitField().size() == 0)
return;
if (overUpBWLimit())
return;
// linked list will contain all interested peers that we choke.
// At the start are the peers that have us unchoked at the end the
@@ -780,15 +944,6 @@ class PeerCoordinator implements PeerListener
*/
private static final int MAX_PARALLEL_REQUESTS = 4;
/**
* Returns one of pieces in the given BitField that is still wanted or
* -1 if none of the given pieces are wanted.
*/
public int wantPiece(Peer peer, BitField havePieces) {
Piece pc = wantPiece(peer, havePieces, true);
return pc != null ? pc.getId() : -1;
}
/**
* Returns one of pieces in the given BitField that is still wanted or
* null if none of the given pieces are wanted.
@@ -1005,28 +1160,6 @@ class PeerCoordinator implements PeerListener
}
}
/**
* Called when a peer has uploaded some bytes of a piece.
*/
public void uploaded(Peer peer, int size)
{
uploaded.addAndGet(size);
//if (listener != null)
// listener.peerChange(this, peer);
}
/**
* Called when a peer has downloaded some bytes of a piece.
*/
public void downloaded(Peer peer, int size)
{
downloaded.addAndGet(size);
//if (listener != null)
// listener.peerChange(this, peer);
}
/**
* Returns false if the piece is no good (according to the hash).
* In that case the peer that supplied the piece should probably be
@@ -1161,7 +1294,7 @@ class PeerCoordinator implements PeerListener
}
if (uploaders.get() < allowedUploaders())
{
if(peer.isChoking())
if(peer.isChoking() && !overUpBWLimit())
{
uploaders.incrementAndGet();
interestedUploaders.incrementAndGet();
@@ -1219,7 +1352,7 @@ class PeerCoordinator implements PeerListener
* Also mark the piece unrequested if this peer was the only one.
*
* @param peer partials, must include the zero-offset (empty) ones too.
* No dup pieces, piece.setDownloaded() must be set.
* No dup pieces.
* len field in Requests is ignored.
* @since 0.8.2
*/
@@ -1740,27 +1873,6 @@ class PeerCoordinator implements PeerListener
interestedAndChoking.addAndGet(toAdd);
}
/**
* Is snark as a whole over its limit?
*/
public boolean overUpBWLimit()
{
if (listener != null)
return listener.overUpBWLimit();
return false;
}
/**
* Is a particular peer who has downloaded this many bytes from us
* in the last CHECK_PERIOD over its limit?
*/
public boolean overUpBWLimit(long total)
{
if (listener != null)
return listener.overUpBWLimit(total * 1000 / CHECK_PERIOD);
return false;
}
/**
* Convenience
* @since 0.9.2

View File

@@ -120,36 +120,6 @@ interface PeerListener
*/
ByteArray gotRequest(Peer peer, int piece, int off, int len);
/**
* Called when a (partial) piece has been downloaded from the peer.
*
* @param peer the Peer from which size bytes where downloaded.
* @param size the number of bytes that where downloaded.
*/
void downloaded(Peer peer, int size);
/**
* Called when a (partial) piece has been uploaded to the peer.
*
* @param peer the Peer to which size bytes where uploaded.
* @param size the number of bytes that where uploaded.
*/
void uploaded(Peer peer, int size);
/**
* Called when we are downloading from the peer and need to ask for
* a new piece. Might be called multiple times before
* <code>gotPiece()</code> is called.
*
* @param peer the Peer that will be asked to provide the piece.
* @param bitfield a BitField containing the pieces that the other
* side has.
*
* @return one of the pieces from the bitfield that we want or -1 if
* we are no longer interested in the peer.
*/
int wantPiece(Peer peer, BitField bitfield);
/**
* Called when we are downloading from the peer and may need to ask for
* a new piece. Returns true if wantPiece() or getPartialPiece() would return a piece.

View File

@@ -41,6 +41,7 @@ class PeerState implements DataLoader
private final Peer peer;
/** Fixme, used by Peer.disconnect() to get to the coordinator */
final PeerListener listener;
private final BandwidthListener bwListener;
/** Null before we have it. locking: this */
private MetaInfo metainfo;
/** Null unless needed. Contains -1 for all. locking: this */
@@ -67,6 +68,7 @@ class PeerState implements DataLoader
private final List<Request> outstandingRequests = new ArrayList<Request>();
/** the tail (NOT the head) of the request queue */
private Request lastRequest = null;
private int currentMaxPipeline;
// FIXME if piece size < PARTSIZE, pipeline could be bigger
/** @since 0.9.47 */
@@ -81,17 +83,23 @@ class PeerState implements DataLoader
/**
* @param metainfo null if in magnet mode
*/
PeerState(Peer peer, PeerListener listener, MetaInfo metainfo,
PeerState(Peer peer, PeerListener listener, BandwidthListener bwl, MetaInfo metainfo,
PeerConnectionIn in, PeerConnectionOut out)
{
this.peer = peer;
this.listener = listener;
bwListener = bwl;
this.metainfo = metainfo;
this.in = in;
this.out = out;
}
/**
* @since 0.9.62
*/
BandwidthListener getBandwidthListener() { return bwListener; }
// NOTE Methods that inspect or change the state synchronize (on this).
void keepAliveMessage()
@@ -389,7 +397,6 @@ class PeerState implements DataLoader
void uploaded(int size)
{
peer.uploaded(size);
listener.uploaded(peer, size);
}
// This is used to flag that we have to back up from the firstOutstandingRequest
@@ -408,8 +415,8 @@ class PeerState implements DataLoader
void pieceMessage(Request req)
{
int size = req.len;
peer.downloaded(size);
listener.downloaded(peer, size);
// Now reported byte-by-byte in PartialPiece
//peer.downloaded(size);
if (_log.shouldLog(Log.DEBUG))
_log.debug("got end of Chunk("
@@ -417,11 +424,11 @@ class PeerState implements DataLoader
+ peer);
// Last chunk needed for this piece?
// FIXME if priority changed to skip, we will think we're done when we aren't
if (getFirstOutstandingRequest(req.getPiece()) == -1)
PartialPiece pp = req.getPartialPiece();
if (pp.isComplete())
{
// warning - may block here for a while
if (listener.gotPiece(peer, req.getPartialPiece()))
if (listener.gotPiece(peer, pp))
{
if (_log.shouldLog(Log.DEBUG))
_log.debug("Got " + req.getPiece() + ": " + peer);
@@ -445,6 +452,9 @@ class PeerState implements DataLoader
}
/**
* TODO this is how we tell we got all the chunks in pieceMessage() above.
*
*
* @return index in outstandingRequests or -1
*/
synchronized private int getFirstOutstandingRequest(int piece)
@@ -873,65 +883,91 @@ class PeerState implements DataLoader
* By PeerCoordinator.updatePiecePriorities()
*</pre>
*/
synchronized void addRequest()
void addRequest()
{
// no bitfield yet? nothing to request then.
if (bitfield == null)
return;
if (metainfo == null)
return;
boolean more_pieces = true;
while (more_pieces)
{
more_pieces = outstandingRequests.size() < peer.getMaxPipeline();
// We want something and we don't have outstanding requests?
if (more_pieces && lastRequest == null) {
// we have nothing in the queue right now
if (!interesting) {
// If we need something, set interesting but delay pulling
// a request from the PeerCoordinator until unchoked.
if (listener.needPiece(this.peer, bitfield)) {
setInteresting(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() we need something, setting interesting, delaying requestNextPiece()");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() needs nothing");
}
return;
}
if (choked) {
// If choked, delay pulling
// a request from the PeerCoordinator until unchoked.
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() we are choked, delaying requestNextPiece()");
return;
}
// huh? rv unused
more_pieces = requestNextPiece();
} else if (more_pieces) // We want something
// Initial bw check. We do the actual accounting in PeerConnectionOut.
// Implement a simple AIMD slow-start on the request queue size with
// currentMaxPipeline counter.
// Avoid cross-peer deadlocks from PeerCoordinator, call this outside the lock
if (!bwListener.shouldRequest(peer, 0)) {
synchronized(this) {
// Due to changes elsewhere we can let this go down to zero now
currentMaxPipeline /= 2;
}
if (_log.shouldWarn())
_log.warn(peer + " throttle request, interesting? " + interesting + " choked? " + choked +
" reqq: " + outstandingRequests.size() + " maxp: " + currentMaxPipeline);
return;
}
synchronized(this) {
// adjust currentMaxPipeline
long rate = bwListener.getDownloadRate();
long limit = bwListener.getDownBWLimit();
if (rate < limit * 7 / 10) {
if (currentMaxPipeline < peer.getMaxPipeline())
currentMaxPipeline++;
} else if (rate > limit * 9 / 10) {
currentMaxPipeline = 1;
} else if (currentMaxPipeline < 2) {
currentMaxPipeline++;
}
boolean more_pieces = true;
while (more_pieces)
{
int pieceLength;
boolean isLastChunk;
pieceLength = metainfo.getPieceLength(lastRequest.getPiece());
isLastChunk = lastRequest.off + lastRequest.len == pieceLength;
// Last part of a piece?
if (isLastChunk)
more_pieces = outstandingRequests.size() < currentMaxPipeline;
// We want something and we don't have outstanding requests?
if (more_pieces && lastRequest == null) {
// we have nothing in the queue right now
if (!interesting) {
// If we need something, set interesting but delay pulling
// a request from the PeerCoordinator until unchoked.
if (listener.needPiece(this.peer, bitfield)) {
setInteresting(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() we need something, setting interesting, delaying requestNextPiece()");
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() needs nothing");
}
return;
}
if (choked) {
// If choked, delay pulling
// a request from the PeerCoordinator until unchoked.
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " addRequest() we are choked, delaying requestNextPiece()");
return;
}
more_pieces = requestNextPiece();
else
} else if (more_pieces) // We want something
{
PartialPiece nextPiece = lastRequest.getPartialPiece();
int nextBegin = lastRequest.off + PARTSIZE;
int maxLength = pieceLength - nextBegin;
int nextLength = maxLength > PARTSIZE ? PARTSIZE
: maxLength;
Request req
= new Request(nextPiece,nextBegin, nextLength);
outstandingRequests.add(req);
if (!choked)
out.sendRequest(req);
lastRequest = req;
int pieceLength;
boolean isLastChunk;
pieceLength = metainfo.getPieceLength(lastRequest.getPiece());
isLastChunk = lastRequest.off + lastRequest.len == pieceLength;
// Last part of a piece?
if (isLastChunk)
more_pieces = requestNextPiece();
else
{
PartialPiece nextPiece = lastRequest.getPartialPiece();
int nextBegin = lastRequest.off + PARTSIZE;
int maxLength = pieceLength - nextBegin;
int nextLength = maxLength > PARTSIZE ? PARTSIZE
: maxLength;
Request req
= new Request(nextPiece,nextBegin, nextLength);
outstandingRequests.add(req);
if (!choked)
out.sendRequest(req);
lastRequest = req;
}
}
}
}
@@ -972,47 +1008,6 @@ class PeerState implements DataLoader
pp.release();
}
}
/******* getPartialPiece() does it all now
// Note that in addition to the bitfield, PeerCoordinator uses
// its request tracking and isRequesting() to determine
// what piece to give us next.
int nextPiece = listener.wantPiece(peer, bitfield);
if (nextPiece != -1
&& (lastRequest == null || lastRequest.getPiece() != nextPiece)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " want piece " + nextPiece);
// Fail safe to make sure we are interested
// When we transition into the end game we may not be interested...
if (!interesting) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " transition to end game, setting interesting");
interesting = true;
out.sendInterest(true);
}
int piece_length = metainfo.getPieceLength(nextPiece);
//Catch a common place for OOMs esp. on 1MB pieces
byte[] bs;
try {
bs = new byte[piece_length];
} catch (OutOfMemoryError oom) {
_log.warn("Out of memory, can't request piece " + nextPiece, oom);
return false;
}
int length = Math.min(piece_length, PARTSIZE);
Request req = new Request(nextPiece, bs, 0, length);
outstandingRequests.add(req);
if (!choked)
out.sendRequest(req);
lastRequest = req;
return true;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " no more pieces to request");
}
*******/
}
// failsafe

View File

@@ -74,8 +74,8 @@ class Request
/**
* @since 0.9.1
*/
public void read(DataInputStream din) throws IOException {
piece.read(din, off, len);
public void read(DataInputStream din, BandwidthListener bwl) throws IOException {
piece.read(din, off, len, bwl);
}
/**

View File

@@ -608,7 +608,7 @@ public class Snark
if (_log.shouldLog(Log.INFO))
_log.info("Starting PeerCoordinator, ConnectionAcceptor, and TrackerClient");
activity = "Collecting pieces";
coordinator = new PeerCoordinator(_util, id, infoHash, meta, storage, this, this);
coordinator = new PeerCoordinator(_util, id, infoHash, meta, storage, this, this, completeListener.getBandwidthListener());
coordinator.setUploaded(savedUploaded);
if (_peerCoordinatorSet != null) {
// multitorrent
@@ -1470,31 +1470,6 @@ public class Snark
return totalUploaders > limit;
}
/**
* Is i2psnark as a whole over its limit?
*/
public boolean overUpBWLimit() {
if (_peerCoordinatorSet == null)
return false;
long total = 0;
for (PeerCoordinator c : _peerCoordinatorSet) {
if (!c.halted())
total += c.getCurrentUploadRate();
}
long limit = 1024l * _util.getMaxUpBW();
if (_log.shouldLog(Log.INFO))
_log.info("Total up bw: " + total + " Limit: " + limit);
return total > limit;
}
/**
* Is a particular peer who has this recent download rate (in Bps) over our upstream bandwidth limit?
*/
public boolean overUpBWLimit(long total) {
long limit = 1024l * _util.getMaxUpBW();
return total > limit;
}
/**
* A unique ID for this torrent, useful for RPC
* @return positive value unless you wrap around

View File

@@ -89,6 +89,7 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
private final I2PSnarkUtil _util;
private final PeerCoordinatorSet _peerCoordinatorSet;
private final ConnectionAcceptor _connectionAcceptor;
private final BandwidthManager _bwManager;
private Thread _monitor;
private volatile boolean _running;
private volatile boolean _stopping;
@@ -104,6 +105,8 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
//public static final String PROP_EEP_PORT = "i2psnark.eepPort";
public static final String PROP_UPLOADERS_TOTAL = "i2psnark.uploaders.total";
public static final String PROP_UPBW_MAX = "i2psnark.upbw.max";
/** @since 0.9.62 */
public static final String PROP_DOWNBW_MAX = "i2psnark.downbw.max";
public static final String PROP_DIR = "i2psnark.dir";
private static final String PROP_META_PREFIX = "i2psnark.zmeta.";
private static final String PROP_META_RUNNING = "running";
@@ -168,7 +171,9 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
public static final String PROP_MAX_FILES_PER_TORRENT = "i2psnark.maxFilesPerTorrent";
public static final int MIN_UP_BW = 10;
public static final int MIN_DOWN_BW = 2 * MIN_UP_BW;
public static final int DEFAULT_MAX_UP_BW = 25;
private static final int DEFAULT_MAX_DOWN_BW = 200;
public static final int DEFAULT_STARTUP_DELAY = 3;
public static final int DEFAULT_REFRESH_DELAY_SECS = 15;
private static final int DEFAULT_PAGE_SIZE = 50;
@@ -281,6 +286,7 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
_util = new I2PSnarkUtil(_context, ctxName, this);
_peerCoordinatorSet = new PeerCoordinatorSet();
_connectionAcceptor = new ConnectionAcceptor(_util, _peerCoordinatorSet);
_bwManager = new BandwidthManager(ctx, DEFAULT_MAX_UP_BW * 1024, DEFAULT_MAX_DOWN_BW * 1024);
DEFAULT_AUTO_START = !ctx.isRouterContext();
String cfile = ctxName + CONFIG_FILE_SUFFIX;
File configFile = new File(cfile);
@@ -459,6 +465,14 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
/** hook to I2PSnarkUtil for the servlet */
public I2PSnarkUtil util() { return _util; }
/**
* The BandwidthManager.
* @since 0.9.62
*/
public BandwidthListener getBandwidthListener() {
return _bwManager;
}
/**
* Use if it does not include a link.
* Escapes '&lt;' and '&gt;' before queueing
@@ -968,13 +982,19 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
* @return true if we got a response from the router
*/
private boolean getBWLimit() {
boolean shouldSet = !_config.containsKey(PROP_UPBW_MAX);
if (shouldSet || !_context.isRouterContext()) {
int[] limits = BWLimits.getBWLimits(_util.getI2CPHost(), _util.getI2CPPort());
if (limits == null)
return false;
if (shouldSet && limits[1] > 0)
_util.setMaxUpBW(limits[1]);
int[] limits = BWLimits.getBWLimits(_util.getI2CPHost(), _util.getI2CPPort());
if (limits == null)
return false;
int up = limits[1];
if (up > 0) {
int maxup = getInt(PROP_UPBW_MAX, DEFAULT_MAX_UP_BW);
_util.setMaxUpBW(up);
_bwManager.setUpBWLimit(Math.min(up, maxup) * 1000L);
}
int down = limits[0];
if (down > 0) {
int maxdown = getInt(PROP_DOWNBW_MAX, DEFAULT_MAX_DOWN_BW);
_bwManager.setDownBWLimit(Math.min(down, maxdown) * 1000L);
}
return true;
}
@@ -1046,13 +1066,13 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
public void updateConfig(String dataDir, boolean filesPublic, boolean autoStart, boolean smartSort, String refreshDelay,
String startDelay, String pageSize, String seedPct, String eepHost,
String eepPort, String i2cpHost, String i2cpPort, String i2cpOpts,
String upLimit, String upBW, boolean useOpenTrackers, boolean useDHT, String theme,
String upLimit, String upBW, String downBW, boolean useOpenTrackers, boolean useDHT, String theme,
String lang, boolean enableRatings, boolean enableComments, String commentName, boolean collapsePanels) {
synchronized(_configLock) {
locked_updateConfig(dataDir, filesPublic, autoStart, smartSort, refreshDelay,
startDelay, pageSize, seedPct, eepHost,
eepPort, i2cpHost, i2cpPort, i2cpOpts,
upLimit, upBW, useOpenTrackers, useDHT, theme,
upLimit, upBW, downBW, useOpenTrackers, useDHT, theme,
lang, enableRatings, enableComments, commentName, collapsePanels);
}
}
@@ -1060,7 +1080,7 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
private void locked_updateConfig(String dataDir, boolean filesPublic, boolean autoStart, boolean smartSort, String refreshDelay,
String startDelay, String pageSize, String seedPct, String eepHost,
String eepPort, String i2cpHost, String i2cpPort, String i2cpOpts,
String upLimit, String upBW, boolean useOpenTrackers, boolean useDHT, String theme,
String upLimit, String upBW, String downBW, boolean useOpenTrackers, boolean useDHT, String theme,
String lang, boolean enableRatings, boolean enableComments, String commentName, boolean collapsePanels) {
boolean changed = false;
boolean interruptMonitor = false;
@@ -1098,6 +1118,7 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
if ( limit != _util.getMaxUpBW()) {
if ( limit >= MIN_UP_BW ) {
_util.setMaxUpBW(limit);
_bwManager.setUpBWLimit(limit * 1000L);
changed = true;
_config.setProperty(PROP_UPBW_MAX, Integer.toString(limit));
addMessage(_t("Up BW limit changed to {0}KBps", limit));
@@ -1106,6 +1127,20 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
}
}
}
if (downBW != null) {
int limit = (int) (_bwManager.getDownBWLimit() / 1024);
try { limit = Integer.parseInt(downBW.trim()); } catch (NumberFormatException nfe) {}
if ( limit != _bwManager.getDownBWLimit()) {
if ( limit >= MIN_DOWN_BW ) {
_bwManager.setDownBWLimit(limit * 1000L);
changed = true;
_config.setProperty(PROP_DOWNBW_MAX, Integer.toString(limit));
//addMessage(_t("Up BW limit changed to {0}KBps", limit));
} else {
//addMessage(_t("Minimum up bandwidth limit is {0}KBps", MIN_UP_BW));
}
}
}
if (startDelay != null && _context.isRouterContext()) {
int minutes = _util.getStartupDelay();
@@ -1259,7 +1294,9 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
Properties p = new Properties();
p.putAll(opts);
_util.setI2CPConfig(i2cpHost, port, p);
_util.setMaxUpBW(getInt(PROP_UPBW_MAX, DEFAULT_MAX_UP_BW));
int max = getInt(PROP_UPBW_MAX, DEFAULT_MAX_UP_BW);
_util.setMaxUpBW(max);
_bwManager.setUpBWLimit(max * 1000);
addMessage(_t("I2CP and tunnel changes will take effect after stopping all torrents"));
} else if (!reconnect) {
// The usual case, the other two are if not in router context
@@ -1274,7 +1311,9 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
}
addMessage(_t("I2CP settings changed to {0}", i2cpHost + ':' + port + ' ' + i2cpOpts));
_util.setI2CPConfig(i2cpHost, port, opts);
_util.setMaxUpBW(getInt(PROP_UPBW_MAX, DEFAULT_MAX_UP_BW));
int max = getInt(PROP_UPBW_MAX, DEFAULT_MAX_UP_BW);
_util.setMaxUpBW(max);
_bwManager.setUpBWLimit(max * 1000);
boolean ok = _util.connect();
if (!ok) {
addMessage(_t("Unable to connect with the new settings, reverting to the old I2CP settings"));
@@ -2644,7 +2683,8 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList
}
}
if (routerOK)
addMessage(_t("Up bandwidth limit is {0} KBps", _util.getMaxUpBW()));
addMessage(_t("Down bandwidth limit is {0} KBps", _bwManager.getUpBWLimit() / 1024) + "; " +
_t("Up bandwidth limit is {0} KBps", _util.getMaxUpBW()));
}
} else {
autostart = false;

View File

@@ -319,6 +319,13 @@ class UpdateRunner implements UpdateTask, CompleteListener {
return _smgr.shouldAutoStart();
}
/**
* @since 0.9.62
*/
public BandwidthListener getBandwidthListener() {
return _smgr.getBandwidthListener();
}
//////// end CompleteListener methods
private static String linkify(String url) {

View File

@@ -122,7 +122,7 @@ class WebPeer extends Peer implements EepGet.StatusListener {
* @param uploadOnly if we are complete with skipped files, i.e. a partial seed
*/
@Override
public void runConnection(I2PSnarkUtil util, PeerListener listener, BitField ignore,
public void runConnection(I2PSnarkUtil util, PeerListener listener, BandwidthListener bwl, BitField ignore,
MagnetState mState, boolean uploadOnly) {
if (uploadOnly)
return;
@@ -204,6 +204,8 @@ class WebPeer extends Peer implements EepGet.StatusListener {
if (i >= maxRequests)
break;
Request r = outstandingRequests.get(i);
if (!shouldRequest(r.len))
break;
if (r.getPiece() == piece &&
lastRequest.off + lastRequest.len == r.off) {
requests.add(r);
@@ -340,7 +342,7 @@ class WebPeer extends Peer implements EepGet.StatusListener {
Request req = iter.next();
if (dis.available() < req.len)
break;
req.read(dis);
req.read(dis, this);
iter.remove();
if (_log.shouldWarn())
_log.warn("Saved chunk " + req + " recvd before failure");
@@ -363,7 +365,7 @@ class WebPeer extends Peer implements EepGet.StatusListener {
_log.debug("Fetch of piece: " + piece + " chunks: " + requests.size() + " offset: " + off + " torrent offset: " + toff + " len: " + tlen + " successful");
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
for (Request req : requests) {
req.read(dis);
req.read(dis, this);
}
PartialPiece pp = last.getPartialPiece();
@@ -519,6 +521,29 @@ class WebPeer extends Peer implements EepGet.StatusListener {
return false;
}
// begin BandwidthListener interface overrides
// Because super doesn't have a PeerState
/**
* @since 0.9.62
*/
@Override
public void downloaded(int size) {
super.downloaded(size);
_coordinator.downloaded(size);
}
/**
* Should we request this many bytes?
* @since 0.9.62
*/
@Override
public boolean shouldRequest(int size) {
return _coordinator.shouldRequest(this, size);
}
// end BandwidthListener interface overrides
// private methods below here implementing parts of PeerState
private synchronized void addRequest() {
@@ -548,7 +573,8 @@ class WebPeer extends Peer implements EepGet.StatusListener {
Request req = new Request(nextPiece,nextBegin, nextLength);
outstandingRequests.add(req);
lastRequest = req;
this.notifyAll();
if (shouldRequest(maxLength))
this.notifyAll();
}
}
}
@@ -567,7 +593,8 @@ class WebPeer extends Peer implements EepGet.StatusListener {
Request r = pp.getRequest();
outstandingRequests.add(r);
lastRequest = r;
this.notifyAll();
if (shouldRequest(r.len))
this.notifyAll();
return true;
} else {
if (_log.shouldLog(Log.WARN))
@@ -653,8 +680,6 @@ class WebPeer extends Peer implements EepGet.StatusListener {
public void bytesTransferred(long alreadyTransferred, int currentWrite, long bytesTransferred, long bytesRemaining, String url) {
lastRcvd = System.currentTimeMillis();
downloaded(currentWrite);
listener.downloaded(this, currentWrite);
}
public void attemptFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt, int numRetries, Exception cause) {}

View File

@@ -48,6 +48,7 @@ import net.i2p.util.SystemVersion;
import net.i2p.util.Translate;
import net.i2p.util.UIMessages;
import org.klomp.snark.BandwidthListener;
import org.klomp.snark.I2PSnarkUtil;
import org.klomp.snark.MagnetURI;
import org.klomp.snark.MetaInfo;
@@ -835,6 +836,7 @@ public class I2PSnarkServlet extends BasicServlet {
out.write(' ');
out.write(_t("Dht Debug"));
out.write("</label><div id=\"dhtDebugInner\">");
out.write(_manager.getBandwidthListener().toString());
out.write(dht.renderStatusHTML());
out.write("</div></div></th>");
}
@@ -1452,6 +1454,7 @@ public class I2PSnarkServlet extends BasicServlet {
String i2cpOpts = buildI2CPOpts(req);
String upLimit = req.getParameter("upLimit");
String upBW = req.getParameter("upBW");
String downBW = req.getParameter("downBW");
String refreshDel = req.getParameter("refreshDelay");
String startupDel = req.getParameter("startupDelay");
String pageSize = req.getParameter("pageSize");
@@ -1467,7 +1470,7 @@ public class I2PSnarkServlet extends BasicServlet {
boolean collapsePanels = req.getParameter("collapsePanels") != null;
_manager.updateConfig(dataDir, filesPublic, autoStart, smartSort, refreshDel, startupDel, pageSize,
seedPct, eepHost, eepPort, i2cpHost, i2cpPort, i2cpOpts,
upLimit, upBW, useOpenTrackers, useDHT, theme,
upLimit, upBW, downBW, useOpenTrackers, useDHT, theme,
lang, ratings, comments, commentsName, collapsePanels);
// update servlet
try {
@@ -2826,7 +2829,7 @@ public class I2PSnarkServlet extends BasicServlet {
out.write(":<td><input type=\"text\" name=\"upBW\" class=\"r\" value=\""
+ _manager.util().getMaxUpBW() + "\" size=\"4\" maxlength=\"4\""
+ " title=\"");
out.write(_t("Maximum bandwidth allocated for uploading"));
out.write(_t("Maximum bandwidth allocated"));
out.write("\"> KBps <td id=\"bwHelp\"><i>");
out.write(_t("Half available bandwidth recommended."));
if (_context.isRouterContext()) {
@@ -2836,6 +2839,24 @@ public class I2PSnarkServlet extends BasicServlet {
out.write(_t("Configure"));
out.write("]</a>");
}
out.write("\n" +
"<tr><td>");
out.write(_t("Down bandwidth limit"));
out.write(":<td><input type=\"text\" name=\"downBW\" class=\"r\" value=\""
+ (_manager.getBandwidthListener().getDownBWLimit() / 1000) + "\" size=\"4\" maxlength=\"4\""
+ " title=\"");
out.write(_t("Maximum bandwidth allocated"));
out.write("\"> KBps <td id=\"bwHelp\"><i>");
out.write(_t("Half available bandwidth recommended."));
if (_context.isRouterContext()) {
out.write("</i> <a href=\"/config.jsp\" target=\"blank\" title=\"");
out.write(_t("View or change router bandwidth"));
out.write("\">[");
out.write(_t("Configure"));
out.write("]</a>");
}
out.write("\n<tr><td><label for=\"useOpenTrackers\">");
out.write(_t("Use open trackers also"));
out.write(":</label><td colspan=\"2\"><input type=\"checkbox\" class=\"optbox\" name=\"useOpenTrackers\" id=\"useOpenTrackers\" value=\"true\" "