diff --git a/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java
new file mode 100644
index 000000000..3f57d232f
--- /dev/null
+++ b/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java
@@ -0,0 +1,753 @@
+package org.klomp.snark;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import net.i2p.I2PAppContext;
+import net.i2p.client.I2PSession;
+import net.i2p.client.I2PSessionException;
+import net.i2p.client.I2PSessionMuxedListener;
+import net.i2p.client.SendMessageOptions;
+import net.i2p.client.datagram.I2PDatagramDissector;
+import net.i2p.client.datagram.I2PDatagramMaker;
+import net.i2p.client.datagram.I2PInvalidDatagramException;
+import net.i2p.data.DataFormatException;
+import net.i2p.data.DataHelper;
+import net.i2p.data.Destination;
+import net.i2p.data.Hash;
+import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer2;
+
+/**
+ * One of these for all trackers and info hashes.
+ * Ref: BEP 15, proposal 160
+ *
+ * The main difference from BEP 15 is that the announce response
+ * contains a 32-byte hash instead of a 4-byte IP and a 2-byte port.
+ *
+ * This implements only "fast mode".
+ * We send only repliable datagrams, and
+ * receive only raw datagrams, as follows:
+ *
+ *
+ * client tracker type
+ * ------ ------- ----
+ * announce --> repliable
+ * <-- ann resp raw
+ *
+ *
+ * @since 0.9.53, enabled in 0.9.54
+ */
+class UDPTrackerClient implements I2PSessionMuxedListener {
+
+ private final I2PAppContext _context;
+ private final Log _log;
+ /** hook to inject and receive datagrams */
+ private final I2PSession _session;
+ private final I2PSnarkUtil _util;
+ private final Hash _myHash;
+ /** unsigned dgrams */
+ private final int _rPort;
+ /** dest and port to tracker data */
+ private final ConcurrentHashMap _trackers;
+ /** our TID to tracker */
+ private final Map _sentQueries;
+ private boolean _isRunning;
+
+ public static final int EVENT_NONE = 0;
+ public static final int EVENT_COMPLETED = 1;
+ public static final int EVENT_STARTED = 2;
+ public static final int EVENT_STOPPED = 3;
+
+ private static final int ACTION_CONNECT = 0;
+ private static final int ACTION_ANNOUNCE = 1;
+ private static final int ACTION_SCRAPE = 2;
+ private static final int ACTION_ERROR = 3;
+
+ private static final int SEND_CRYPTO_TAGS = 8;
+ private static final int LOW_CRYPTO_TAGS = 4;
+
+ private static final long DEFAULT_TIMEOUT = 15*1000;
+ private static final long DEFAULT_QUERY_TIMEOUT = 60*1000;
+ private static final long CLEAN_TIME = 163*1000;
+
+ /** in seconds */
+ private static final int DEFAULT_INTERVAL = 60*60;
+ private static final int MIN_INTERVAL = 15*60;
+ private static final int MAX_INTERVAL = 8*60*60;
+
+ private enum WaitState { INIT, SUCCESS, TIMEOUT, FAIL }
+
+ /**
+ *
+ */
+ public UDPTrackerClient(I2PAppContext ctx, I2PSession session, I2PSnarkUtil util) {
+ _context = ctx;
+ _session = session;
+ _util = util;
+ _log = ctx.logManager().getLog(UDPTrackerClient.class);
+ _rPort = TrackerClient.PORT - 1;
+ _myHash = session.getMyDestination().calculateHash();
+ _trackers = new ConcurrentHashMap(8);
+ _sentQueries = new ConcurrentHashMap(32);
+ }
+
+
+ /**
+ * Can't be restarted after stopping?
+ */
+ public synchronized void start() {
+ if (_isRunning)
+ return;
+ _session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort);
+ _isRunning = true;
+ }
+
+ /**
+ * Stop everything.
+ */
+ public synchronized void stop() {
+ if (!_isRunning)
+ return;
+ _isRunning = false;
+ _session.removeListener(I2PSession.PROTO_DATAGRAM_RAW, _rPort);
+ _trackers.clear();
+ for (ReplyWaiter w : _sentQueries.values()) {
+ w.cancel();
+ }
+ _sentQueries.clear();
+ }
+
+ /**
+ * Announce and get peers for a torrent.
+ * Blocking!
+ * Caller should run in a thread.
+ *
+ * @param ih the Info Hash (torrent)
+ * @param max maximum number of peers to return
+ * @param maxWait the maximum time to wait (ms) must be > 0
+ * @param fast if true, don't wait for dest, no retx, ...
+ * @return null on fail or if fast is true
+ */
+ public TrackerResponse announce(byte[] ih, byte[] peerID, int max, long maxWait,
+ String toHost, int toPort,
+ long downloaded, long left, long uploaded,
+ int event, boolean fast) {
+ long now = _context.clock().now();
+ long end = now + maxWait;
+ if (toPort < 0)
+ throw new IllegalArgumentException();
+ Tracker tr = getTracker(toHost, toPort);
+ if (tr.getDest(fast) == null) {
+ if (_log.shouldInfo())
+ _log.info("cannot resolve " + tr);
+ return null;
+ }
+ long toWait = end - now;
+ if (!fast)
+ toWait = toWait * 3 / 4;
+ if (toWait < 1000) {
+ if (_log.shouldInfo())
+ _log.info("out of time after resolving: " + tr);
+ return null;
+ }
+ if (fast) {
+ toWait = 0;
+ } else {
+ toWait = end - now;
+ if (toWait < 1000) {
+ if (_log.shouldInfo())
+ _log.info("out of time after getting conn: " + tr);
+ return null;
+ }
+ }
+ ReplyWaiter w = sendAnnounce(tr, 0, ih, peerID,
+ downloaded, left, uploaded, event, max, toWait);
+ if (fast)
+ return null;
+ if (w == null) {
+ if (_log.shouldInfo())
+ _log.info("initial announce failed: " + tr);
+ return null;
+ }
+ boolean success = waitAndRetransmit(w, end);
+ _sentQueries.remove(w.getID());
+ if (success)
+ return w.getReplyObject();
+ if (_log.shouldInfo())
+ _log.info("announce failed after retx: " + tr);
+ return null;
+ }
+
+ //////// private below here
+
+ /**
+ * @return non-null
+ */
+ private Tracker getTracker(String host, int port) {
+ Tracker ndp = new Tracker(host, port);
+ Tracker odp = _trackers.putIfAbsent(ndp, ndp);
+ if (odp != null)
+ ndp = odp;
+ return ndp;
+ }
+
+ ///// Sending.....
+
+ /**
+ * Send one time with a new tid
+ * @param toWait if <= 0 does not register
+ * @return null on failure or if toWait <= 0
+ */
+ private ReplyWaiter sendAnnounce(Tracker tr, long connID,
+ byte[] ih, byte[] id,
+ long downloaded, long left, long uploaded,
+ int event, int numWant, long toWait) {
+ int tid = _context.random().nextInt();
+ byte[] payload = sendAnnounce(tr, tid, connID, ih, id, downloaded, left, uploaded, event, numWant);
+ if (payload != null) {
+ if (toWait > 0) {
+ ReplyWaiter rv = new ReplyWaiter(tid, tr, payload, toWait);
+ _sentQueries.put(Integer.valueOf(tid), rv);
+ if (_log.shouldInfo())
+ _log.info("Sent: " + rv + " timeout: " + toWait);
+ return rv;
+ }
+ if (_log.shouldInfo())
+ _log.info("Sent annc " + event + " to " + tr + " no wait");
+ }
+ return null;
+ }
+
+ /**
+ * Send one time with given tid
+ * @return the payload or null on failure
+ */
+ private byte[] sendAnnounce(Tracker tr, int tid, long connID,
+ byte[] ih, byte[] id,
+ long downloaded, long left, long uploaded,
+ int event, int numWant) {
+ byte[] payload = new byte[98];
+ DataHelper.toLong8(payload, 0, connID);
+ DataHelper.toLong(payload, 8, 4, ACTION_ANNOUNCE);
+ DataHelper.toLong(payload, 12, 4, tid);
+ System.arraycopy(ih, 0, payload, 16, 20);
+ System.arraycopy(id, 0, payload, 36, 20);
+ DataHelper.toLong(payload, 56, 8, downloaded);
+ DataHelper.toLong(payload, 64, 8, left);
+ DataHelper.toLong(payload, 72, 8, uploaded);
+ DataHelper.toLong(payload, 80, 4, event);
+ DataHelper.toLong(payload, 92, 4, numWant);
+ DataHelper.toLong(payload, 96, 2, TrackerClient.PORT);
+ boolean rv = sendMessage(tr.getDest(true), tr.getPort(), payload, true);
+ return rv ? payload : null;
+ }
+
+ /**
+ * wait after initial send, resend if necessary
+ */
+ private boolean waitAndRetransmit(ReplyWaiter w, long untilTime) {
+ synchronized(w) {
+ while(true) {
+ try {
+ long toWait = untilTime - _context.clock().now();
+ if (toWait <= 0)
+ return false;
+ w.wait(toWait);
+ } catch (InterruptedException ie) {
+ return false;
+ }
+ switch (w.getState()) {
+ case INIT:
+ continue;
+
+ case SUCCESS:
+ return true;
+
+ case FAIL:
+ return false;
+
+ case TIMEOUT:
+ if (_log.shouldInfo())
+ _log.info("Timeout: " + w);
+ long toWait = untilTime - _context.clock().now();
+ if (toWait <= 1000)
+ return false;
+ boolean ok = resend(w, Math.min(toWait, w.getSentTo().getTimeout()));
+ if (!ok)
+ return false;
+ continue;
+ }
+ }
+ }
+ }
+
+ /**
+ * Resend the stored payload
+ * @return success
+ */
+ private boolean resend(ReplyWaiter w, long toWait) {
+ Tracker tr = w.getSentTo();
+ int port = tr.getPort();
+ if (_log.shouldInfo())
+ _log.info("Resending: " + w + " timeout: " + toWait);
+ boolean rv = sendMessage(tr.getDest(true), port, w.getPayload(), true);
+ if (rv) {
+ _sentQueries.put(Integer.valueOf(w.getID()), w);
+ w.schedule(toWait);
+ }
+ return rv;
+ }
+
+ /**
+ * Lowest-level send message call.
+ * @param dest may be null, returns false
+ * @param repliable true for conn request, false for announce
+ * @return success
+ */
+ private boolean sendMessage(Destination dest, int toPort, byte[] payload, boolean repliable) {
+ if (!_isRunning) {
+ if (_log.shouldInfo())
+ _log.info("send failed, not running");
+ return false;
+ }
+ if (dest == null) {
+ if (_log.shouldInfo())
+ _log.info("send failed, no dest");
+ return false;
+ }
+ if (dest.calculateHash().equals(_myHash))
+ throw new IllegalArgumentException("don't send to ourselves");
+
+ if (repliable) {
+ I2PDatagramMaker dgMaker = new I2PDatagramMaker(_session);
+ payload = dgMaker.makeI2PDatagram(payload);
+ if (payload == null) {
+ if (_log.shouldWarn())
+ _log.warn("DGM fail");
+ return false;
+ }
+ }
+
+ SendMessageOptions opts = new SendMessageOptions();
+ opts.setDate(_context.clock().now() + 60*1000);
+ opts.setTagsToSend(SEND_CRYPTO_TAGS);
+ opts.setTagThreshold(LOW_CRYPTO_TAGS);
+ if (!repliable)
+ opts.setSendLeaseSet(false);
+ try {
+ boolean success = _session.sendMessage(dest, payload, 0, payload.length,
+ repliable ? I2PSession.PROTO_DATAGRAM : I2PSession.PROTO_DATAGRAM_RAW,
+ _rPort, toPort, opts);
+ if (success) {
+ // ...
+ } else {
+ if (_log.shouldWarn())
+ _log.warn("sendMessage fail");
+ }
+ return success;
+ } catch (I2PSessionException ise) {
+ if (_log.shouldWarn())
+ _log.warn("sendMessage fail", ise);
+ return false;
+ }
+ }
+
+ ///// Reception.....
+
+ /**
+ * @param from dest or null if it didn't come in on signed port
+ */
+ private void receiveMessage(Destination from, int fromPort, byte[] payload) {
+ if (payload.length < 8) {
+ if (_log.shouldInfo())
+ _log.info("Got short message: " + payload.length + " bytes");
+ return;
+ }
+
+ int action = (int) DataHelper.fromLong(payload, 0, 4);
+ int tid = (int) DataHelper.fromLong(payload, 4, 4);
+ ReplyWaiter waiter = _sentQueries.remove(Integer.valueOf(tid));
+ if (waiter == null) {
+ if (_log.shouldInfo())
+ _log.info("Rcvd msg with no one waiting: " + tid);
+ return;
+ }
+
+ if (action == ACTION_ANNOUNCE) {
+ receiveAnnounce(waiter, payload);
+ } else if (action == ACTION_ERROR) {
+ receiveError(waiter, payload);
+ } else {
+ // includes ACTION_CONNECT
+ if (_log.shouldInfo())
+ _log.info("Rcvd msg with unknown action: " + action + " for: " + waiter);
+ waiter.gotReply(false);
+ Tracker tr = waiter.getSentTo();
+ tr.gotError();
+ }
+ }
+
+ private void receiveAnnounce(ReplyWaiter waiter, byte[] payload) {
+ Tracker tr = waiter.getSentTo();
+ if (payload.length >= 22) {
+ int interval = Math.min(MAX_INTERVAL, Math.max(MIN_INTERVAL,
+ (int) DataHelper.fromLong(payload, 8, 4)));
+ int leeches = (int) DataHelper.fromLong(payload, 12, 4);
+ int seeds = (int) DataHelper.fromLong(payload, 16, 4);
+ int peers = (int) DataHelper.fromLong(payload, 20, 2);
+ if (22 + (peers * Hash.HASH_LENGTH) > payload.length) {
+ if (_log.shouldWarn())
+ _log.warn("Short reply");
+ waiter.gotReply(false);
+ tr.gotError();
+ return;
+ }
+ if (_log.shouldInfo())
+ _log.info("Rcvd " + peers + " peers from " + tr);
+ Set hashes;
+ if (peers > 0) {
+ hashes = new HashSet(peers);
+ for (int off = 20; off < payload.length; off += Hash.HASH_LENGTH) {
+ hashes.add(Hash.create(payload, off));
+ }
+ } else {
+ hashes = Collections.emptySet();
+ }
+ TrackerResponse resp = new TrackerResponse(interval, seeds, leeches, hashes);
+ waiter.gotResponse(resp);
+ tr.setInterval(interval);
+ } else {
+ waiter.gotReply(false);
+ tr.gotError();
+ }
+ }
+
+ private void receiveError(ReplyWaiter waiter, byte[] payload) {
+ String msg;
+ if (payload.length > 8) {
+ msg = DataHelper.getUTF8(payload, 8, payload.length - 8);
+ } else {
+ msg = "";
+ }
+ TrackerResponse resp = new TrackerResponse(msg);
+ waiter.gotResponse(resp);
+ Tracker tr = waiter.getSentTo();
+ tr.gotError();
+ }
+
+ // I2PSessionMuxedListener interface ----------------
+
+ /**
+ * Instruct the client that the given session has received a message
+ *
+ * Will be called only if you register via addMuxedSessionListener().
+ * Will be called only for the proto(s) and toPort(s) you register for.
+ *
+ * @param session session to notify
+ * @param msgId message number available
+ * @param size size of the message - why it's a long and not an int is a mystery
+ * @param proto 1-254 or 0 for unspecified
+ * @param fromPort 1-65535 or 0 for unspecified
+ * @param toPort 1-65535 or 0 for unspecified
+ */
+ public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) {
+ // TODO throttle
+ try {
+ byte[] payload = session.receiveMessage(msgId);
+ if (payload == null)
+ return;
+ if (toPort == _rPort) {
+ // raw
+ receiveMessage(null, fromPort, payload);
+ } else {
+ if (_log.shouldWarn())
+ _log.warn("msg on bad port");
+ }
+ } catch (I2PSessionException e) {
+ if (_log.shouldWarn())
+ _log.warn("bad msg");
+ }
+ }
+
+ /** for non-muxed */
+ public void messageAvailable(I2PSession session, int msgId, long size) {}
+
+ public void reportAbuse(I2PSession session, int severity) {}
+
+ public void disconnected(I2PSession session) {
+ if (_log.shouldWarn())
+ _log.warn("UDPTC disconnected");
+ }
+
+ public void errorOccurred(I2PSession session, String message, Throwable error) {
+ if (_log.shouldWarn())
+ _log.warn("UDPTC got error msg: ", error);
+ }
+
+ public static class TrackerResponse {
+
+ private final int interval, complete, incomplete;
+ private final String error;
+ private final Set peers;
+
+ /** success */
+ public TrackerResponse(int interval, int seeds, int leeches, Set peers) {
+ this.interval = interval;
+ complete = seeds;
+ incomplete = leeches;
+ this.peers = peers;
+ error = null;
+ }
+
+ /** failure */
+ public TrackerResponse(String errorMsg) {
+ interval = DEFAULT_INTERVAL;
+ complete = 0;
+ incomplete = 0;
+ peers = null;
+ error = errorMsg;
+ }
+
+ public Set getPeers() {
+ return peers;
+ }
+
+ public int getPeerCount() {
+ int pc = peers == null ? 0 : peers.size();
+ return Math.max(pc, complete + incomplete - 1);
+ }
+
+ public int getSeedCount() {
+ return complete;
+ }
+
+ public int getLeechCount() {
+ return incomplete;
+ }
+
+ public String getFailureReason() {
+ return error;
+ }
+
+ /** in seconds */
+ public int getInterval() {
+ return interval;
+ }
+ }
+
+ private static class HostPort {
+
+ protected final String host;
+ protected final int port;
+
+ /**
+ * @param port the announce port
+ */
+ public HostPort(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ /**
+ * @return the announce port
+ */
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public int hashCode() {
+ return host.hashCode() ^ port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof HostPort))
+ return false;
+ HostPort dp = (HostPort) o;
+ return port == dp.port && host.equals(dp.host);
+ }
+
+ @Override
+ public String toString() {
+ return "UDP Tracker " + host + ':' + port;
+ }
+ }
+
+ private class Tracker extends HostPort {
+
+ private final Object destLock = new Object();
+ private Destination dest;
+ private long expires;
+ private long lastHeardFrom;
+ private long lastFailed;
+ private int consecFails;
+ private int interval = DEFAULT_INTERVAL;
+
+ private static final long DELAY = 15*1000;
+
+ public Tracker(String host, int port) {
+ super(host, port);
+ }
+
+ /**
+ * @param fast if true, do not lookup
+ * @return dest or null
+ */
+ public Destination getDest(boolean fast) {
+ synchronized(destLock) {
+ if (dest == null && !fast)
+ dest = _util.getDestination(host);
+ return dest;
+ }
+ }
+
+ /** does not change state */
+ public synchronized void replyTimeout() {
+ consecFails++;
+ lastFailed = _context.clock().now();
+ }
+
+ public synchronized int getInterval() {
+ return interval;
+ }
+
+ /** sets heardFrom; calls notify */
+ public synchronized void setInterval(int interval) {
+ long now = _context.clock().now();
+ lastHeardFrom = now;
+ consecFails = 0;
+ this.interval = interval;
+ this.notifyAll();
+ }
+
+ /** sets heardFrom; calls notify */
+ public synchronized void gotError() {
+ long now = _context.clock().now();
+ lastHeardFrom = now;
+ consecFails = 0;
+ this.notifyAll();
+ }
+
+ /** doubled for each consecutive failure */
+ public synchronized long getTimeout() {
+ return DEFAULT_TIMEOUT << Math.min(consecFails, 3);
+ }
+
+ @Override
+ public String toString() {
+ return "UDP Tracker " + host + ':' + port + " hasDest? " + (dest != null);
+ }
+ }
+
+ /**
+ * Callback for replies
+ */
+ private class ReplyWaiter extends SimpleTimer2.TimedEvent {
+ private final int tid;
+ private final Tracker sentTo;
+ private final byte[] data;
+ private TrackerResponse replyObject;
+ private WaitState state = WaitState.INIT;
+
+ /**
+ * Either wait on this object with a timeout, or use non-null Runnables.
+ * Any sent data to be remembered may be stored by setSentObject().
+ * Reply object may be in getReplyObject().
+ */
+ public ReplyWaiter(int tid, Tracker tracker, byte[] payload, long toWait) {
+ super(SimpleTimer2.getInstance(), toWait);
+ this.tid = tid;
+ sentTo = tracker;
+ data = payload;
+ }
+
+ public int getID() {
+ return tid;
+ }
+
+ public Tracker getSentTo() {
+ return sentTo;
+ }
+
+ public byte[] getPayload() {
+ return data;
+ }
+
+ /**
+ * @return may be null depending on what happened. Cast to expected type.
+ */
+ public synchronized TrackerResponse getReplyObject() {
+ return replyObject;
+ }
+
+ /**
+ * If true, we got a reply, and getReplyObject() may contain something.
+ */
+ public synchronized WaitState getState() {
+ return state;
+ }
+
+ /**
+ * Will notify this.
+ * Also removes from _sentQueries and calls heardFrom().
+ * Sets state to SUCCESS or FAIL.
+ */
+ public synchronized void gotReply(boolean success) {
+ cancel();
+ _sentQueries.remove(Integer.valueOf(tid));
+ setState(success ? WaitState.SUCCESS : WaitState.FAIL);
+ }
+
+ /**
+ * Will notify this and run onReply.
+ * Also removes from _sentQueries and calls heardFrom().
+ */
+ private synchronized void setState(WaitState state) {
+ this.state = state;
+ this.notifyAll();
+ }
+
+ /**
+ * Will notify this.
+ * Also removes from _sentQueries and calls heardFrom().
+ * Sets state to SUCCESS.
+ */
+ public synchronized void gotResponse(TrackerResponse resp) {
+ replyObject = resp;
+ gotReply(true);
+ }
+
+ /**
+ * Sets state to INIT.
+ */
+ @Override
+ public synchronized void schedule(long toWait) {
+ state = WaitState.INIT;
+ super.schedule(toWait);
+ }
+
+ /** timer callback on timeout */
+ public synchronized void timeReached() {
+ // don't trump success or failure
+ if (state != WaitState.INIT)
+ return;
+ //if (action == ACTION_CONNECT)
+ // sentTo.connFailed();
+ //else
+ sentTo.replyTimeout();
+ setState(WaitState.TIMEOUT);
+ if (_log.shouldWarn())
+ _log.warn("timeout waiting for reply from " + sentTo);
+ }
+
+ @Override
+ public String toString() {
+ return "Waiting for ID: " + tid + " to: " + sentTo + " state: " + state;
+ }
+ }
+}