* i2psnark:

- Reduce TrackerClient threads
   - Reduce delay between peer adds for faster startup
   - Thread the announces and reduce timeout when stopping
This commit is contained in:
zzz
2012-06-11 19:38:33 +00:00
parent 30e2f73d5f
commit 6e077ee621
8 changed files with 276 additions and 98 deletions

View File

@@ -35,18 +35,32 @@ import java.util.Random;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.util.Clock;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* Informs metainfo tracker of events and gets new peers for peer
* coordinator.
*
* start() creates a thread and starts it.
* At the end of each run, a TimedEvent is queued on the SimpleTimer2 queue.
* The TimedEvent creates a new thread and starts it, so it does not
* clog SimpleTimer2.
*
* The thread runs one pass through the trackers, the PEX, and the DHT,
* then queues a new TimedEvent and exits.
*
* Thus there are only threads that are actively announcing, not one thread per torrent forever.
*
* start() may be called again after halt().
*
* @author Mark Wielaard (mark@klomp.org)
*/
public class TrackerClient extends I2PAppThread
{
public class TrackerClient implements Runnable {
private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(TrackerClient.class);
private static final String NO_EVENT = "";
private static final String STARTED_EVENT = "started";
@@ -56,25 +70,39 @@ public class TrackerClient extends I2PAppThread
private final static int SLEEP = 5; // 5 minutes.
private final static int DELAY_MIN = 2000; // 2 secs.
private final static int DELAY_MUL = 1500; // 1.5 secs.
private final static int DELAY_RAND = 6*1000;
private final static int MAX_REGISTER_FAILS = 10; // * INITIAL_SLEEP = 15m to register
private final static int INITIAL_SLEEP = 90*1000;
private final static int MAX_CONSEC_FAILS = 5; // slow down after this
private final static int LONG_SLEEP = 30*60*1000; // sleep a while after lots of fails
private I2PSnarkUtil _util;
private final I2PSnarkUtil _util;
private final MetaInfo meta;
private final String infoHash;
private final String peerID;
private final String additionalTrackerURL;
private final PeerCoordinator coordinator;
private final Snark snark;
private final int port;
private final String _threadName;
private boolean stop;
private boolean started;
private volatile boolean stop = true;
private volatile boolean started;
private volatile boolean _initialized;
private volatile int _runCount;
// running thread so it can be interrupted
private volatile Thread _thread;
// queued event so it can be cancelled
private volatile SimpleTimer2.TimedEvent _event;
// these 2 used in loop()
private volatile boolean runStarted;
private volatile int consecutiveFails;
private List<Tracker> trackers;
private final List<Tracker> trackers;
/**
* Call start() to start it.
*
* @param meta null if in magnet mode
* @param additionalTrackerURL may be null, from the ?tr= param in magnet mode, otherwise ignored
*/
@@ -84,7 +112,7 @@ public class TrackerClient extends I2PAppThread
super();
// Set unique name.
String id = urlencode(snark.getID());
setName("TrackerClient " + id.substring(id.length() - 12));
_threadName = "TrackerClient " + id.substring(id.length() - 12);
_util = util;
this.meta = meta;
this.additionalTrackerURL = additionalTrackerURL;
@@ -92,12 +120,22 @@ public class TrackerClient extends I2PAppThread
this.snark = snark;
this.port = 6881; //(port == -1) ? 9 : port;
this.infoHash = urlencode(snark.getInfoHash());
this.peerID = urlencode(snark.getID());
this.trackers = new ArrayList(2);
}
@Override
public void start() {
if (stop) throw new RuntimeException("Dont rerun me, create a copy");
super.start();
public synchronized void start() {
if (!stop) {
if (_log.shouldLog(Log.WARN))
_log.warn("Already started: " + _threadName);
return;
}
stop = false;
consecutiveFails = 0;
runStarted = false;
_thread = new I2PAppThread(this, _threadName + " #" + (++_runCount), true);
_thread.start();
started = true;
}
@@ -107,10 +145,47 @@ public class TrackerClient extends I2PAppThread
/**
* Interrupts this Thread to stop it.
*/
public void halt()
{
stop = true;
this.interrupt();
public synchronized void halt() {
boolean wasStopped = stop;
if (wasStopped) {
if (_log.shouldLog(Log.WARN))
_log.warn("Already stopped: " + _threadName);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Stopping: " + _threadName);
stop = true;
}
SimpleTimer2.TimedEvent e = _event;
if (e != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cancelling next announce " + _threadName);
e.cancel();
_event = null;
}
Thread t = _thread;
if (t != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Interrupting " + t.getName());
t.interrupt();
}
if (!wasStopped)
unannounce();
}
private void queueLoop(long delay) {
_event = new Runner(delay);
}
private class Runner extends SimpleTimer2.TimedEvent {
public Runner(long delay) {
super(SimpleTimer2.getInstance(), delay);
}
public void timeReached() {
_event = null;
_thread = new I2PAppThread(TrackerClient.this, _threadName + " #" + (++_runCount), true);
_thread.start();
}
}
private boolean verifyConnected() {
@@ -123,20 +198,51 @@ public class TrackerClient extends I2PAppThread
return !stop && _util.connected();
}
@Override
public void run()
{
String infoHash = urlencode(snark.getInfoHash());
String peerID = urlencode(snark.getID());
/**
* Setup the first time only,
* then one pass (usually) through the trackers, PEX, and DHT.
* This will take several seconds to several minutes.
*/
public void run() {
long begin = Clock.getInstance().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Start " + Thread.currentThread().getName());
try {
if (!_initialized) {
setup();
// FIXME dht
if (trackers.isEmpty()) {
stop = true;
return;
}
_initialized = true;
// FIXME only when starting everybody at once, not for a single torrent
long delay = I2PAppContext.getGlobalContext().random().nextInt(30*1000);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {}
}
loop();
} finally {
// don't hold ref
_thread = null;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Finish " + Thread.currentThread().getName() +
" after " + DataHelper.formatDuration(Clock.getInstance().now() - begin));
}
}
/**
* Do this one time only (not every time it is started).
* @since 0.9.1
*/
public void setup() {
// Construct the list of trackers for this torrent,
// starting with the primary one listed in the metainfo,
// followed by the secondary open trackers
// It's painful, but try to make sure if an open tracker is also
// the primary tracker, that we don't add it twice.
// todo: check for b32 matches as well
trackers = new ArrayList(2);
String primary = null;
if (meta != null)
primary = meta.getAnnounce();
@@ -192,58 +298,32 @@ public class TrackerClient extends I2PAppThread
this.snark.stopTorrent();
return;
}
}
long uploaded = coordinator.getUploaded();
long downloaded = coordinator.getDownloaded();
long left = coordinator.getLeft();
boolean completed = (left == 0);
/**
* Announce to all the trackers, get peers from PEX and DHT, then queue up a SimpleTimer2 event.
* This will take several seconds to several minutes.
* @since 0.9.1
*/
private void loop() {
try
{
if (!verifyConnected()) return;
boolean runStarted = false;
boolean firstTime = true;
int consecutiveFails = 0;
Random r = I2PAppContext.getGlobalContext().random();
while(!stop)
{
if (!verifyConnected()) {
stop = true;
return;
}
// Local DHT tracker announce
if (_util.getDHT() != null)
_util.getDHT().announce(snark.getInfoHash());
try
{
// Sleep some minutes...
// Sleep the minimum interval for all the trackers, but 60s minimum
// except for the first time...
int delay;
int random = r.nextInt(120*1000);
if (firstTime) {
delay = r.nextInt(30*1000);
firstTime = false;
} else if (completed && runStarted)
delay = 3*SLEEP*60*1000 + random;
else if (snark.getTrackerProblems() != null && ++consecutiveFails < MAX_CONSEC_FAILS)
delay = INITIAL_SLEEP;
else
// sleep a while, when we wake up we will contact only the trackers whose intervals have passed
delay = SLEEP*60*1000 + random;
if (delay > 0)
Thread.sleep(delay);
}
catch(InterruptedException interrupt)
{
// ignore
}
if (stop)
break;
if (!verifyConnected()) return;
uploaded = coordinator.getUploaded();
downloaded = coordinator.getDownloaded();
left = coordinator.getLeft(); // -1 in magnet mode
long uploaded = coordinator.getUploaded();
long downloaded = coordinator.getDownloaded();
long left = coordinator.getLeft(); // -1 in magnet mode
boolean completed = (left == 0);
// First time we got a complete download?
String event;
@@ -303,7 +383,7 @@ public class TrackerClient extends I2PAppThread
// FIXME if id == us || dest == us continue;
// only delay if we actually make an attempt to add peer
if(coordinator.addPeer(cur) && it.hasNext()) {
int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN;
int delay = r.nextInt(DELAY_RAND) + DELAY_MIN;
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
}
}
@@ -354,7 +434,7 @@ public class TrackerClient extends I2PAppThread
while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) {
Peer cur = it.next();
if (coordinator.addPeer(cur) && it.hasNext()) {
int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN;
int delay = r.nextInt(DELAY_RAND) + DELAY_MIN;
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
}
}
@@ -390,7 +470,7 @@ public class TrackerClient extends I2PAppThread
while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) {
Peer cur = it.next();
if (coordinator.addPeer(cur) && it.hasNext()) {
int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN;
int delay = r.nextInt(DELAY_RAND) + DELAY_MIN;
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
}
}
@@ -400,8 +480,36 @@ public class TrackerClient extends I2PAppThread
// we could try and total the unique peers but that's too hard for now
snark.setTrackerSeenPeers(maxSeenPeers);
if (stop)
return;
if (!runStarted)
_util.debug(" Retrying in one minute...", Snark.DEBUG);
try {
// Sleep some minutes...
// Sleep the minimum interval for all the trackers, but 60s minimum
int delay;
int random = r.nextInt(120*1000);
if (completed && runStarted)
delay = 3*SLEEP*60*1000 + random;
else if (snark.getTrackerProblems() != null && ++consecutiveFails < MAX_CONSEC_FAILS)
delay = INITIAL_SLEEP;
else
// sleep a while, when we wake up we will contact only the trackers whose intervals have passed
delay = SLEEP*60*1000 + random;
if (delay > 20*1000) {
// put ourselves on SimpleTimer2
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requeueing in " + DataHelper.formatDuration(delay) + ": " + Thread.currentThread().getName());
queueLoop(delay);
return;
} else if (delay > 0) {
Thread.sleep(delay);
}
} catch(InterruptedException interrupt) {}
} // *** end of while loop
} // try
catch (Throwable t)
@@ -410,26 +518,61 @@ public class TrackerClient extends I2PAppThread
if (t instanceof OutOfMemoryError)
throw (OutOfMemoryError)t;
}
finally
{
// Local DHT tracker unannounce
if (_util.getDHT() != null)
_util.getDHT().unannounce(snark.getInfoHash());
}
/**
* Creates a thread for each tracker in parallel if tunnel is still open
* @since 0.9.1
*/
private void unannounce() {
// Local DHT tracker unannounce
if (_util.getDHT() != null)
_util.getDHT().unannounce(snark.getInfoHash());
int i = 0;
for (Tracker tr : trackers) {
if (_util.connected() &&
tr.started && (!tr.stop) && tr.trackerProblems == null) {
try {
(new I2PAppThread(new Unannouncer(tr), _threadName + " Unannounce " + (++i), true)).start();
} catch (OutOfMemoryError oom) {
// probably ran out of threads, ignore
tr.reset();
}
} else {
tr.reset();
}
}
}
/**
* Send "stopped" to a single tracker
* @since 0.9.1
*/
private class Unannouncer implements Runnable {
private final Tracker tr;
public Unannouncer(Tracker tr) {
this.tr = tr;
}
public void run() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Running unannounce " + _threadName + " to " + tr.announce);
long uploaded = coordinator.getUploaded();
long downloaded = coordinator.getDownloaded();
long left = coordinator.getLeft();
try
{
// try to contact everybody we can
// Don't try to restart I2CP connection just to say goodbye
for (Iterator iter = trackers.iterator(); iter.hasNext(); ) {
if (!_util.connected()) return;
Tracker tr = (Tracker)iter.next();
if (tr.started && (!tr.stop) && tr.trackerProblems == null)
doRequest(tr, infoHash, peerID, uploaded,
if (_util.connected()) {
if (tr.started && (!tr.stop) && tr.trackerProblems == null)
doRequest(tr, infoHash, peerID, uploaded,
downloaded, left, STOPPED_EVENT);
}
}
}
catch(IOException ioe) { /* ignored */ }
}
tr.reset();
}
}
private TrackerInfo doRequest(Tracker tr, String infoHash,
@@ -467,7 +610,8 @@ public class TrackerClient extends I2PAppThread
_util.debug("Sending TrackerClient request: " + s, Snark.INFO);
tr.lastRequestTime = System.currentTimeMillis();
File fetched = _util.get(s);
// Don't wait for a response to stopped.
File fetched = _util.get(s, true, event.equals(STOPPED_EVENT) ? -1 : 0);
if (fetched == null) {
throw new IOException("Error fetching " + s);
}
@@ -556,6 +700,13 @@ public class TrackerClient extends I2PAppThread
announce = a;
isPrimary = p;
interval = INITIAL_SLEEP;
}
/**
* Call before restarting
* @since 0.9.1
*/
public void reset() {
lastRequestTime = 0;
trackerProblems = null;
stop = false;