Newer
Older
/* TrackerClient - Class that informs a tracker and gets new peers.
Copyright (C) 2003 Mark J. Wielaard
This file is part of Snark.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software Foundation,
Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package org.klomp.snark;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.Set;
import net.i2p.util.ConvertToHash;
/**
* Informs metainfo tracker of events and gets new peers for peer
* coordinator.
*
* start() creates a thread and starts it.
* At the end of each run, a TimedEvent is queued on the SimpleTimer2 queue.
* The TimedEvent creates a new thread and starts it, so it does not
* clog SimpleTimer2.
*
* The thread runs one pass through the trackers, the PEX, and the DHT,
* then queues a new TimedEvent and exits.
*
* Thus there are only threads that are actively announcing, not one thread per torrent forever.
*
* start() may be called again after halt().
*
* @author Mark Wielaard (mark@klomp.org)
*/
private static final String NO_EVENT = "";
private static final String STARTED_EVENT = "started";
private static final String COMPLETED_EVENT = "completed";
private static final String STOPPED_EVENT = "stopped";
private static final String NOT_REGISTERED = "torrent not registered"; //bytemonsoon
/** this is our equivalent to router.utorrent.com for bootstrap */
private static final String DEFAULT_BACKUP_TRACKER = "http://tracker.welterde.i2p/a";
private final static int SLEEP = 5; // 5 minutes.
private final static int MAX_REGISTER_FAILS = 10; // * INITIAL_SLEEP = 15m to register
private final static int INITIAL_SLEEP = 90*1000;
private final static int MAX_CONSEC_FAILS = 5; // slow down after this
private final static int LONG_SLEEP = 30*60*1000; // sleep a while after lots of fails
private final static long MIN_DHT_ANNOUNCE_INTERVAL = 10*60*1000;
private final MetaInfo meta;
private final PeerCoordinator coordinator;
private final int port;
private final String _threadName;
private volatile boolean stop = true;
private volatile boolean started;
private volatile boolean _initialized;
private volatile int _runCount;
// running thread so it can be interrupted
private volatile Thread _thread;
// queued event so it can be cancelled
private volatile SimpleTimer2.TimedEvent _event;
// these 2 used in loop()
private volatile boolean runStarted;
private volatile int consecutiveFails;
private long lastDHTAnnounce;
private final List<TCTracker> trackers;
private final List<TCTracker> backupTrackers;
* @param additionalTrackerURL may be null, from the ?tr= param in magnet mode, otherwise ignored
public TrackerClient(I2PSnarkUtil util, MetaInfo meta, String additionalTrackerURL,
PeerCoordinator coordinator, Snark snark)
{
// Set unique name.
String id = urlencode(snark.getID());
_log = util.getContext().logManager().getLog(TrackerClient.class);
this.meta = meta;
this.coordinator = coordinator;
this.port = 6881; //(port == -1) ? 9 : port;
this.infoHash = urlencode(snark.getInfoHash());
this.peerID = urlencode(snark.getID());
this.trackers = new ArrayList(2);
}
public synchronized void start() {
if (!stop) {
if (_log.shouldLog(Log.WARN))
_log.warn("Already started: " + _threadName);
return;
}
stop = false;
consecutiveFails = 0;
runStarted = false;
_thread = new I2PAppThread(this, _threadName + " #" + (++_runCount), true);
_thread.start();
/**
* Interrupts this Thread to stop it.
*/
boolean wasStopped = stop;
if (wasStopped) {
if (_log.shouldLog(Log.WARN))
_log.warn("Already stopped: " + _threadName);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Stopping: " + _threadName);
stop = true;
}
SimpleTimer2.TimedEvent e = _event;
if (e != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cancelling next announce " + _threadName);
e.cancel();
_event = null;
}
Thread t = _thread;
if (t != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Interrupting " + t.getName());
t.interrupt();
}
if (!wasStopped)
unannounce();
}
private void queueLoop(long delay) {
_event = new Runner(delay);
}
private class Runner extends SimpleTimer2.TimedEvent {
public Runner(long delay) {
super(_util.getContext().simpleTimer2(), delay);
}
public void timeReached() {
_event = null;
_thread = new I2PAppThread(TrackerClient.this, _threadName + " #" + (++_runCount), true);
_thread.start();
}
}
if (!ok) {
try { Thread.sleep(30*1000); } catch (InterruptedException ie) {}
}
}
/**
* Setup the first time only,
* then one pass (usually) through the trackers, PEX, and DHT.
* This will take several seconds to several minutes.
*/
public void run() {
long begin = _util.getContext().clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Start " + Thread.currentThread().getName());
try {
if (!_initialized) {
setup();
}
if (trackers.isEmpty() && _util.getDHT() == null) {
stop = true;
this.snark.addMessage(_util.getString("No valid trackers for {0} - enable opentrackers or DHT?",
this.snark.getBaseName()));
_log.error("No valid trackers for " + this.snark.getBaseName());
this.snark.stopTorrent();
return;
}
if (!_initialized) {
_initialized = true;
// FIXME only when starting everybody at once, not for a single torrent
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {}
}
loop();
} finally {
// don't hold ref
_thread = null;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Finish " + Thread.currentThread().getName() +
" after " + DataHelper.formatDuration(_util.getContext().clock().now() - begin));
/**
* Do this one time only (not every time it is started).
* @since 0.9.1
*/
// Construct the list of trackers for this torrent,
// starting with the primary one listed in the metainfo,
// followed by the secondary open trackers
// It's painful, but try to make sure if an open tracker is also
// the primary tracker, that we don't add it twice.
else if (additionalTrackerURL != null)
primary = additionalTrackerURL;
Set<Hash> trackerHashes = new HashSet(8);
// primary tracker
if (isNewValidTracker(trackerHashes, primary)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Announce: [" + primary + "] infoHash: " + infoHash);
if (_log.shouldLog(Log.WARN))
_log.warn("Skipping invalid or non-i2p announce: " + primary);
// announce list
if (meta != null && !meta.isPrivate()) {
List<List<String>> list = meta.getAnnounceList();
if (list != null) {
for (List<String> llist : list) {
for (String url : llist) {
if (!isNewValidTracker(trackerHashes, url))
continue;
trackers.add(new TCTracker(url, trackers.isEmpty()));
if (_log.shouldLog(Log.DEBUG))
_log.debug("Additional announce (list): [" + url + "] for infoHash: " + infoHash);
}
}
}
}
// configured open trackers
if (meta == null || !meta.isPrivate()) {
List<String> tlist = _util.getOpenTrackers();
String url = tlist.get(i);
if (!isNewValidTracker(trackerHashes, url))
// opentrackers are primary if we don't have primary
trackers.add(new TCTracker(url, trackers.isEmpty()));
if (_log.shouldLog(Log.DEBUG))
_log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash);
// backup trackers if DHT needs bootstrapping
if (trackers.isEmpty() && (meta == null || !meta.isPrivate())) {
List<String> tlist = _util.getBackupTrackers();
for (int i = 0; i < tlist.size(); i++) {
String url = tlist.get(i);
if (!isNewValidTracker(trackerHashes, url))
backupTrackers.add(new TCTracker(url, false));
if (_log.shouldLog(Log.DEBUG))
_log.debug("Backup announce: [" + url + "] for infoHash: " + infoHash);
if (backupTrackers.isEmpty()) {
backupTrackers.add(new TCTracker(DEFAULT_BACKUP_TRACKER, false));
/**
* @param existing the ones we already know about
* @param ann an announce URL non-null
* @return true if ann is valid and new; adds to existing if returns true
* @since 0.9.5
*/
private boolean isNewValidTracker(Set<Hash> existing, String ann) {
Hash h = getHostHash(ann);
if (h == null) {
_log.error("Bad announce URL: [" + ann + ']');
return false;
}
boolean rv = existing.add(h);
if (!rv) {
if (_log.shouldLog(Log.INFO))
_log.info("Dup announce URL: [" + ann + ']');
}
return rv;
}
/**
* Announce to all the trackers, get peers from PEX and DHT, then queue up a SimpleTimer2 event.
* This will take several seconds to several minutes.
* @since 0.9.1
*/
private void loop() {
try
{
while(!stop)
{
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
int maxSeenPeers = 0;
if (!trackers.isEmpty())
maxSeenPeers = getPeersFromTrackers(trackers);
int p = getPeersFromPEX();
if (p > maxSeenPeers)
maxSeenPeers = p;
p = getPeersFromDHT();
if (p > maxSeenPeers)
maxSeenPeers = p;
// backup if DHT needs bootstrapping
if (trackers.isEmpty() && !backupTrackers.isEmpty() && dht != null && dht.size() < 16) {
p = getPeersFromTrackers(backupTrackers);
if (p > maxSeenPeers)
maxSeenPeers = p;
}
// we could try and total the unique peers but that's too hard for now
snark.setTrackerSeenPeers(maxSeenPeers);
if (stop)
return;
try {
// Sleep some minutes...
// Sleep the minimum interval for all the trackers, but 60s minimum
int delay;
Random r = _util.getContext().random();
int random = r.nextInt(120*1000);
if (completed && runStarted)
delay = 3*SLEEP*60*1000 + random;
else if (snark.getTrackerProblems() != null && ++consecutiveFails < MAX_CONSEC_FAILS)
delay = INITIAL_SLEEP;
else if ((!runStarted) && _runCount < MAX_CONSEC_FAILS)
delay = INITIAL_SLEEP;
else
// sleep a while, when we wake up we will contact only the trackers whose intervals have passed
delay = SLEEP*60*1000 + random;
if (delay > 20*1000) {
// put ourselves on SimpleTimer2
if (_log.shouldLog(Log.DEBUG))
_log.debug("Requeueing in " + DataHelper.formatDuration(delay) + ": " + Thread.currentThread().getName());
queueLoop(delay);
return;
} else if (delay > 0) {
Thread.sleep(delay);
}
} catch(InterruptedException interrupt) {}
} // *** end of while loop
} // try
catch (Throwable t)
{
_log.error("TrackerClient: " + t, t);
if (t instanceof OutOfMemoryError)
throw (OutOfMemoryError)t;
}
}
/**
* @return max peers seen
*/
private int getPeersFromTrackers(List<TCTracker> trckrs) {
long uploaded = coordinator.getUploaded();
long downloaded = coordinator.getDownloaded();
long left = coordinator.getLeft(); // -1 in magnet mode
// First time we got a complete download?
String event;
if (!completed && left == 0)
{
completed = true;
event = COMPLETED_EVENT;
}
else
event = NO_EVENT;
(completed || coordinator.needOutboundPeers() || !tr.started) &&
(event.equals(COMPLETED_EVENT) || System.currentTimeMillis() > tr.lastRequestTime + tr.interval))
{
try
{
if (!tr.started)
event = STARTED_EVENT;
TrackerInfo info = doRequest(tr, infoHash, peerID,
uploaded, downloaded, left,
event);
snark.setTrackerProblems(null);
tr.trackerProblems = null;
tr.registerFails = 0;
tr.consecutiveFails = 0;
if (tr.isPrimary)
consecutiveFails = 0;
if (snark.getTrackerSeenPeers() < tr.seenPeers) // update rising number quickly
snark.setTrackerSeenPeers(tr.seenPeers);
// pass everybody over to our tracker
dht.announce(snark.getInfoHash(), peer.getPeerID().getDestHash());
// we only want to talk to new people if we need things
// from them (duh)
List<Peer> ordered = new ArrayList(peers);
}
catch (IOException ioe)
{
// Probably not fatal (if it doesn't last to long...)
if (_log.shouldLog(Log.WARN))
_log.warn
("WARNING: Could not contact tracker at '"
+ tr.announce + "': " + ioe);
tr.trackerProblems = ioe.getMessage();
// don't show secondary tracker problems to the user
if (tr.isPrimary)
snark.setTrackerProblems(tr.trackerProblems);
if (tr.trackerProblems.toLowerCase(Locale.US).startsWith(NOT_REGISTERED)) {
//if (trckrs.size() == 1) {
// stop = true;
// snark.stopTorrent();
//} else { // hopefully each on the opentrackers list is really open
if (++tr.consecutiveFails == MAX_CONSEC_FAILS) {
tr.seenPeers = 0;
if (tr.interval < LONG_SLEEP)
tr.interval = LONG_SLEEP; // slow down
}
}
if (_log.shouldLog(Log.INFO))
_log.info("Not announcing to " + tr.announce + " last announce was " +
new Date(tr.lastRequestTime) + " interval is " + DataHelper.formatDuration(tr.interval));
}
if ((!tr.stop) && maxSeenPeers < tr.seenPeers)
maxSeenPeers = tr.seenPeers;
} // *** end of trackers loop here
return maxSeenPeers;
}
/**
* @return max peers seen
*/
private int getPeersFromPEX() {
if (coordinator.needOutboundPeers() && (meta == null || !meta.isPrivate()) && !stop) {
Set<PeerID> pids = coordinator.getPEXPeers();
if (!pids.isEmpty()) {
if (_log.shouldLog(Log.INFO))
_log.info("Got " + pids.size() + " from PEX");
List<Peer> peers = new ArrayList(pids.size());
for (PeerID pID : pids) {
peers.add(new Peer(pID, snark.getID(), snark.getInfoHash(), snark.getMetaInfo()));
}
Collections.shuffle(peers, r);
Iterator<Peer> it = peers.iterator();
Peer cur = it.next();
if (coordinator.addPeer(cur) && it.hasNext()) {
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
}
}
if (_log.shouldLog(Log.INFO))
_log.info("Not getting PEX peers");
/**
* @return max peers seen
*/
private int getPeersFromDHT() {
// Get peers from DHT
// FIXME this needs to be in its own thread
if (dht != null && (meta == null || !meta.isPrivate()) && (!stop) &&
_util.getContext().clock().now() > lastDHTAnnounce + MIN_DHT_ANNOUNCE_INTERVAL) {
numwant = 1;
else
numwant = _util.getMaxConnections();
Collection<Hash> hashes = dht.getPeers(snark.getInfoHash(), numwant, 2*60*1000);
if (!hashes.isEmpty()) {
lastDHTAnnounce = _util.getContext().clock().now();
if (_log.shouldLog(Log.INFO))
_log.info("Got " + hashes + " from DHT");
// announce ourselves while the token is still good
// FIXME this needs to be in its own thread
if (!stop) {

zzz
committed
// announce only to the 1 closest
int good = dht.announce(snark.getInfoHash(), 1, 5*60*1000);
if (_log.shouldLog(Log.INFO))
_log.info("Sent " + good + " good announces to DHT");
}
// now try these peers
if ((!stop) && !hashes.isEmpty()) {
List<Peer> peers = new ArrayList(hashes.size());
for (Hash h : hashes) {
try {
PeerID pID = new PeerID(h.getData(), _util);
peers.add(new Peer(pID, snark.getID(), snark.getInfoHash(), snark.getMetaInfo()));
} catch (InvalidBEncodingException ibe) {}
Collections.shuffle(peers, r);
Iterator<Peer> it = peers.iterator();
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
}
}
}
if (_log.shouldLog(Log.INFO))
_log.info("Not getting DHT peers");
/**
* Creates a thread for each tracker in parallel if tunnel is still open
* @since 0.9.1
*/
private void unannounce() {
// Local DHT tracker unannounce
DHT dht = _util.getDHT();
if (dht != null)
dht.unannounce(snark.getInfoHash());
if (_util.connected() &&
tr.started && (!tr.stop) && tr.trackerProblems == null) {
try {
(new I2PAppThread(new Unannouncer(tr), _threadName + " U" + (++i), true)).start();
} catch (OutOfMemoryError oom) {
// probably ran out of threads, ignore
tr.reset();
}
} else {
tr.reset();
}
}
}
/**
* Send "stopped" to a single tracker
* @since 0.9.1
*/
private class Unannouncer implements Runnable {
this.tr = tr;
}
public void run() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Running unannounce " + _threadName + " to " + tr.announce);
long uploaded = coordinator.getUploaded();
long downloaded = coordinator.getDownloaded();
long left = coordinator.getLeft();
try
{
// Don't try to restart I2CP connection just to say goodbye
if (_util.connected()) {
if (tr.started && (!tr.stop) && tr.trackerProblems == null)
doRequest(tr, infoHash, peerID, uploaded,
downloaded, left, STOPPED_EVENT);
}
catch(IOException ioe) { /* ignored */ }
}
private TrackerInfo doRequest(TCTracker tr, String infoHash,
String peerID, long uploaded,
long downloaded, long left, String event)
throws IOException
{
StringBuilder buf = new StringBuilder(512);
buf.append(tr.announce);
if (tr.announce.contains("?"))
buf.append('&');
else
buf.append('?');
buf.append("info_hash=").append(infoHash)
.append("&peer_id=").append(peerID)
.append("&port=").append(port)
.append("&ip=" ).append(_util.getOurIPString()).append(".i2p")
.append("&uploaded=").append(uploaded)
.append("&downloaded=").append(downloaded)
.append("&left=");
// What do we send for left in magnet mode? Can we omit it?
if (left >= 0)
buf.append(left);
else
buf.append('1');
buf.append("&compact=1"); // NOTE: opentracker will return 400 for &compact alone
if (! event.equals(NO_EVENT))
buf.append("&event=").append(event);
buf.append("&numwant=");
boolean small = left == 0 || event.equals(STOPPED_EVENT) || !coordinator.needOutboundPeers();
if (small)
buf.append('0');
buf.append(_util.getMaxConnections());
String s = buf.toString();
if (_log.shouldLog(Log.INFO))
_log.info("Sending TrackerClient request: " + s);
// Don't wait for a response to stopped when shutting down
boolean fast = _fastUnannounce && event.equals(STOPPED_EVENT);
byte[] fetched = _util.get(s, true, fast ? -1 : 0, small ? 128 : 1024, small ? 1024 : 8*1024);
if (fetched == null) {
throw new IOException("Error fetching " + s);
}
TrackerInfo info = new TrackerInfo(in, snark.getID(),
snark.getInfoHash(), snark.getMetaInfo(), _util);
if (_log.shouldLog(Log.INFO))
_log.info("TrackerClient response: " + info);
String failure = info.getFailureReason();
if (failure != null)
throw new IOException(failure);
tr.interval = Math.max(MIN_TRACKER_ANNOUNCE_INTERVAL, info.getInterval() * 1000l);
}
/**
* Very lazy byte[] to URL encoder. Just encodes almost everything, even
* some "normal" chars.
* By not encoding about 1/4 of the chars, we make random data like hashes about 16% smaller.
*
* RFC1738: 0-9a-zA-Z$-_.+!*'(),
* Us: 0-9a-zA-Z
*
*/
{
StringBuilder sb = new StringBuilder(bs.length*3);
for (int i = 0; i < bs.length; i++)
{
int c = bs[i] & 0xFF;
if ((c >= '0' && c <= '9') ||
(c >= 'A' && c <= 'Z') ||
(c >= 'a' && c <= 'z')) {
sb.append((char)c);
} else {
sb.append('%');
if (c < 16)
sb.append('0');
sb.append(Integer.toHexString(c));
}
}
return sb.toString();
}
* @param ann an announce URL
URL url;
try {
url = new URL(ann);
} catch (MalformedURLException mue) {
return false;
}
return url.getProtocol().equals("http") &&
(url.getHost().endsWith(".i2p") || url.getHost().equals("i2p")) &&
url.getPort() < 0;
}
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
/**
* @param ann an announce URL non-null
* @return a Hash for i2p hosts only, null otherwise
* @since 0.9.5
*/
private static Hash getHostHash(String ann) {
URL url;
try {
url = new URL(ann);
} catch (MalformedURLException mue) {
return null;
}
if (url.getPort() >= 0 || !url.getProtocol().equals("http"))
return null;
String host = url.getHost();
if (host.endsWith(".i2p"))
return ConvertToHash.getHash(host);
if (host.equals("i2p")) {
String path = url.getPath();
if (path == null || path.length() < 517 ||
!path.startsWith("/"))
return null;
String[] parts = path.substring(1).split("/?&;", 2);
return ConvertToHash.getHash(parts[0]);
}
return null;
}
final String announce;
final boolean isPrimary;
long interval;
long lastRequestTime;
String trackerProblems;
boolean stop;
boolean started;
int registerFails;
int consecutiveFails;
int seenPeers;
}
/**
* Call before restarting
* @since 0.9.1
*/
public void reset() {
lastRequestTime = 0;
trackerProblems = null;
stop = false;
started = false;
registerFails = 0;
consecutiveFails = 0;
seenPeers = 0;
}
}