From 0b058c0ffd3a7fed6b58e0e9c9181db732ca34b4 Mon Sep 17 00:00:00 2001 From: zzz <zzz@i2pmail.org> Date: Sun, 16 Jan 2022 08:58:03 -0500 Subject: [PATCH] i2psnark: Add UDPTrackerClient WIP - not hooked in, untested - target 1.8.0 / 0.9.54 Requires significant changes to TrackerClient Adapted from mtn branch i2p.i2p.zzz.udpsnark (2014) Ref: Proposal 160 --- .../src/org/klomp/snark/UDPTrackerClient.java | 753 ++++++++++++++++++ 1 file changed, 753 insertions(+) create mode 100644 apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java diff --git a/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java new file mode 100644 index 0000000000..3f57d232f5 --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java @@ -0,0 +1,753 @@ +package org.klomp.snark; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import net.i2p.I2PAppContext; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; +import net.i2p.client.I2PSessionMuxedListener; +import net.i2p.client.SendMessageOptions; +import net.i2p.client.datagram.I2PDatagramDissector; +import net.i2p.client.datagram.I2PDatagramMaker; +import net.i2p.client.datagram.I2PInvalidDatagramException; +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.data.Hash; +import net.i2p.util.Log; +import net.i2p.util.SimpleTimer2; + +/** + * One of these for all trackers and info hashes. + * Ref: BEP 15, proposal 160 + * + * The main difference from BEP 15 is that the announce response + * contains a 32-byte hash instead of a 4-byte IP and a 2-byte port. + * + * This implements only "fast mode". + * We send only repliable datagrams, and + * receive only raw datagrams, as follows: + * + *<pre> + * client tracker type + * ------ ------- ---- + * announce --> repliable + * <-- ann resp raw + *</pre> + * + * @since 0.9.53, enabled in 0.9.54 + */ +class UDPTrackerClient implements I2PSessionMuxedListener { + + private final I2PAppContext _context; + private final Log _log; + /** hook to inject and receive datagrams */ + private final I2PSession _session; + private final I2PSnarkUtil _util; + private final Hash _myHash; + /** unsigned dgrams */ + private final int _rPort; + /** dest and port to tracker data */ + private final ConcurrentHashMap<HostPort, Tracker> _trackers; + /** our TID to tracker */ + private final Map<Integer, ReplyWaiter> _sentQueries; + private boolean _isRunning; + + public static final int EVENT_NONE = 0; + public static final int EVENT_COMPLETED = 1; + public static final int EVENT_STARTED = 2; + public static final int EVENT_STOPPED = 3; + + private static final int ACTION_CONNECT = 0; + private static final int ACTION_ANNOUNCE = 1; + private static final int ACTION_SCRAPE = 2; + private static final int ACTION_ERROR = 3; + + private static final int SEND_CRYPTO_TAGS = 8; + private static final int LOW_CRYPTO_TAGS = 4; + + private static final long DEFAULT_TIMEOUT = 15*1000; + private static final long DEFAULT_QUERY_TIMEOUT = 60*1000; + private static final long CLEAN_TIME = 163*1000; + + /** in seconds */ + private static final int DEFAULT_INTERVAL = 60*60; + private static final int MIN_INTERVAL = 15*60; + private static final int MAX_INTERVAL = 8*60*60; + + private enum WaitState { INIT, SUCCESS, TIMEOUT, FAIL } + + /** + * + */ + public UDPTrackerClient(I2PAppContext ctx, I2PSession session, I2PSnarkUtil util) { + _context = ctx; + _session = session; + _util = util; + _log = ctx.logManager().getLog(UDPTrackerClient.class); + _rPort = TrackerClient.PORT - 1; + _myHash = session.getMyDestination().calculateHash(); + _trackers = new ConcurrentHashMap<HostPort, Tracker>(8); + _sentQueries = new ConcurrentHashMap<Integer, ReplyWaiter>(32); + } + + + /** + * Can't be restarted after stopping? + */ + public synchronized void start() { + if (_isRunning) + return; + _session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort); + _isRunning = true; + } + + /** + * Stop everything. + */ + public synchronized void stop() { + if (!_isRunning) + return; + _isRunning = false; + _session.removeListener(I2PSession.PROTO_DATAGRAM_RAW, _rPort); + _trackers.clear(); + for (ReplyWaiter w : _sentQueries.values()) { + w.cancel(); + } + _sentQueries.clear(); + } + + /** + * Announce and get peers for a torrent. + * Blocking! + * Caller should run in a thread. + * + * @param ih the Info Hash (torrent) + * @param max maximum number of peers to return + * @param maxWait the maximum time to wait (ms) must be > 0 + * @param fast if true, don't wait for dest, no retx, ... + * @return null on fail or if fast is true + */ + public TrackerResponse announce(byte[] ih, byte[] peerID, int max, long maxWait, + String toHost, int toPort, + long downloaded, long left, long uploaded, + int event, boolean fast) { + long now = _context.clock().now(); + long end = now + maxWait; + if (toPort < 0) + throw new IllegalArgumentException(); + Tracker tr = getTracker(toHost, toPort); + if (tr.getDest(fast) == null) { + if (_log.shouldInfo()) + _log.info("cannot resolve " + tr); + return null; + } + long toWait = end - now; + if (!fast) + toWait = toWait * 3 / 4; + if (toWait < 1000) { + if (_log.shouldInfo()) + _log.info("out of time after resolving: " + tr); + return null; + } + if (fast) { + toWait = 0; + } else { + toWait = end - now; + if (toWait < 1000) { + if (_log.shouldInfo()) + _log.info("out of time after getting conn: " + tr); + return null; + } + } + ReplyWaiter w = sendAnnounce(tr, 0, ih, peerID, + downloaded, left, uploaded, event, max, toWait); + if (fast) + return null; + if (w == null) { + if (_log.shouldInfo()) + _log.info("initial announce failed: " + tr); + return null; + } + boolean success = waitAndRetransmit(w, end); + _sentQueries.remove(w.getID()); + if (success) + return w.getReplyObject(); + if (_log.shouldInfo()) + _log.info("announce failed after retx: " + tr); + return null; + } + + //////// private below here + + /** + * @return non-null + */ + private Tracker getTracker(String host, int port) { + Tracker ndp = new Tracker(host, port); + Tracker odp = _trackers.putIfAbsent(ndp, ndp); + if (odp != null) + ndp = odp; + return ndp; + } + + ///// Sending..... + + /** + * Send one time with a new tid + * @param toWait if <= 0 does not register + * @return null on failure or if toWait <= 0 + */ + private ReplyWaiter sendAnnounce(Tracker tr, long connID, + byte[] ih, byte[] id, + long downloaded, long left, long uploaded, + int event, int numWant, long toWait) { + int tid = _context.random().nextInt(); + byte[] payload = sendAnnounce(tr, tid, connID, ih, id, downloaded, left, uploaded, event, numWant); + if (payload != null) { + if (toWait > 0) { + ReplyWaiter rv = new ReplyWaiter(tid, tr, payload, toWait); + _sentQueries.put(Integer.valueOf(tid), rv); + if (_log.shouldInfo()) + _log.info("Sent: " + rv + " timeout: " + toWait); + return rv; + } + if (_log.shouldInfo()) + _log.info("Sent annc " + event + " to " + tr + " no wait"); + } + return null; + } + + /** + * Send one time with given tid + * @return the payload or null on failure + */ + private byte[] sendAnnounce(Tracker tr, int tid, long connID, + byte[] ih, byte[] id, + long downloaded, long left, long uploaded, + int event, int numWant) { + byte[] payload = new byte[98]; + DataHelper.toLong8(payload, 0, connID); + DataHelper.toLong(payload, 8, 4, ACTION_ANNOUNCE); + DataHelper.toLong(payload, 12, 4, tid); + System.arraycopy(ih, 0, payload, 16, 20); + System.arraycopy(id, 0, payload, 36, 20); + DataHelper.toLong(payload, 56, 8, downloaded); + DataHelper.toLong(payload, 64, 8, left); + DataHelper.toLong(payload, 72, 8, uploaded); + DataHelper.toLong(payload, 80, 4, event); + DataHelper.toLong(payload, 92, 4, numWant); + DataHelper.toLong(payload, 96, 2, TrackerClient.PORT); + boolean rv = sendMessage(tr.getDest(true), tr.getPort(), payload, true); + return rv ? payload : null; + } + + /** + * wait after initial send, resend if necessary + */ + private boolean waitAndRetransmit(ReplyWaiter w, long untilTime) { + synchronized(w) { + while(true) { + try { + long toWait = untilTime - _context.clock().now(); + if (toWait <= 0) + return false; + w.wait(toWait); + } catch (InterruptedException ie) { + return false; + } + switch (w.getState()) { + case INIT: + continue; + + case SUCCESS: + return true; + + case FAIL: + return false; + + case TIMEOUT: + if (_log.shouldInfo()) + _log.info("Timeout: " + w); + long toWait = untilTime - _context.clock().now(); + if (toWait <= 1000) + return false; + boolean ok = resend(w, Math.min(toWait, w.getSentTo().getTimeout())); + if (!ok) + return false; + continue; + } + } + } + } + + /** + * Resend the stored payload + * @return success + */ + private boolean resend(ReplyWaiter w, long toWait) { + Tracker tr = w.getSentTo(); + int port = tr.getPort(); + if (_log.shouldInfo()) + _log.info("Resending: " + w + " timeout: " + toWait); + boolean rv = sendMessage(tr.getDest(true), port, w.getPayload(), true); + if (rv) { + _sentQueries.put(Integer.valueOf(w.getID()), w); + w.schedule(toWait); + } + return rv; + } + + /** + * Lowest-level send message call. + * @param dest may be null, returns false + * @param repliable true for conn request, false for announce + * @return success + */ + private boolean sendMessage(Destination dest, int toPort, byte[] payload, boolean repliable) { + if (!_isRunning) { + if (_log.shouldInfo()) + _log.info("send failed, not running"); + return false; + } + if (dest == null) { + if (_log.shouldInfo()) + _log.info("send failed, no dest"); + return false; + } + if (dest.calculateHash().equals(_myHash)) + throw new IllegalArgumentException("don't send to ourselves"); + + if (repliable) { + I2PDatagramMaker dgMaker = new I2PDatagramMaker(_session); + payload = dgMaker.makeI2PDatagram(payload); + if (payload == null) { + if (_log.shouldWarn()) + _log.warn("DGM fail"); + return false; + } + } + + SendMessageOptions opts = new SendMessageOptions(); + opts.setDate(_context.clock().now() + 60*1000); + opts.setTagsToSend(SEND_CRYPTO_TAGS); + opts.setTagThreshold(LOW_CRYPTO_TAGS); + if (!repliable) + opts.setSendLeaseSet(false); + try { + boolean success = _session.sendMessage(dest, payload, 0, payload.length, + repliable ? I2PSession.PROTO_DATAGRAM : I2PSession.PROTO_DATAGRAM_RAW, + _rPort, toPort, opts); + if (success) { + // ... + } else { + if (_log.shouldWarn()) + _log.warn("sendMessage fail"); + } + return success; + } catch (I2PSessionException ise) { + if (_log.shouldWarn()) + _log.warn("sendMessage fail", ise); + return false; + } + } + + ///// Reception..... + + /** + * @param from dest or null if it didn't come in on signed port + */ + private void receiveMessage(Destination from, int fromPort, byte[] payload) { + if (payload.length < 8) { + if (_log.shouldInfo()) + _log.info("Got short message: " + payload.length + " bytes"); + return; + } + + int action = (int) DataHelper.fromLong(payload, 0, 4); + int tid = (int) DataHelper.fromLong(payload, 4, 4); + ReplyWaiter waiter = _sentQueries.remove(Integer.valueOf(tid)); + if (waiter == null) { + if (_log.shouldInfo()) + _log.info("Rcvd msg with no one waiting: " + tid); + return; + } + + if (action == ACTION_ANNOUNCE) { + receiveAnnounce(waiter, payload); + } else if (action == ACTION_ERROR) { + receiveError(waiter, payload); + } else { + // includes ACTION_CONNECT + if (_log.shouldInfo()) + _log.info("Rcvd msg with unknown action: " + action + " for: " + waiter); + waiter.gotReply(false); + Tracker tr = waiter.getSentTo(); + tr.gotError(); + } + } + + private void receiveAnnounce(ReplyWaiter waiter, byte[] payload) { + Tracker tr = waiter.getSentTo(); + if (payload.length >= 22) { + int interval = Math.min(MAX_INTERVAL, Math.max(MIN_INTERVAL, + (int) DataHelper.fromLong(payload, 8, 4))); + int leeches = (int) DataHelper.fromLong(payload, 12, 4); + int seeds = (int) DataHelper.fromLong(payload, 16, 4); + int peers = (int) DataHelper.fromLong(payload, 20, 2); + if (22 + (peers * Hash.HASH_LENGTH) > payload.length) { + if (_log.shouldWarn()) + _log.warn("Short reply"); + waiter.gotReply(false); + tr.gotError(); + return; + } + if (_log.shouldInfo()) + _log.info("Rcvd " + peers + " peers from " + tr); + Set<Hash> hashes; + if (peers > 0) { + hashes = new HashSet<Hash>(peers); + for (int off = 20; off < payload.length; off += Hash.HASH_LENGTH) { + hashes.add(Hash.create(payload, off)); + } + } else { + hashes = Collections.emptySet(); + } + TrackerResponse resp = new TrackerResponse(interval, seeds, leeches, hashes); + waiter.gotResponse(resp); + tr.setInterval(interval); + } else { + waiter.gotReply(false); + tr.gotError(); + } + } + + private void receiveError(ReplyWaiter waiter, byte[] payload) { + String msg; + if (payload.length > 8) { + msg = DataHelper.getUTF8(payload, 8, payload.length - 8); + } else { + msg = ""; + } + TrackerResponse resp = new TrackerResponse(msg); + waiter.gotResponse(resp); + Tracker tr = waiter.getSentTo(); + tr.gotError(); + } + + // I2PSessionMuxedListener interface ---------------- + + /** + * Instruct the client that the given session has received a message + * + * Will be called only if you register via addMuxedSessionListener(). + * Will be called only for the proto(s) and toPort(s) you register for. + * + * @param session session to notify + * @param msgId message number available + * @param size size of the message - why it's a long and not an int is a mystery + * @param proto 1-254 or 0 for unspecified + * @param fromPort 1-65535 or 0 for unspecified + * @param toPort 1-65535 or 0 for unspecified + */ + public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) { + // TODO throttle + try { + byte[] payload = session.receiveMessage(msgId); + if (payload == null) + return; + if (toPort == _rPort) { + // raw + receiveMessage(null, fromPort, payload); + } else { + if (_log.shouldWarn()) + _log.warn("msg on bad port"); + } + } catch (I2PSessionException e) { + if (_log.shouldWarn()) + _log.warn("bad msg"); + } + } + + /** for non-muxed */ + public void messageAvailable(I2PSession session, int msgId, long size) {} + + public void reportAbuse(I2PSession session, int severity) {} + + public void disconnected(I2PSession session) { + if (_log.shouldWarn()) + _log.warn("UDPTC disconnected"); + } + + public void errorOccurred(I2PSession session, String message, Throwable error) { + if (_log.shouldWarn()) + _log.warn("UDPTC got error msg: ", error); + } + + public static class TrackerResponse { + + private final int interval, complete, incomplete; + private final String error; + private final Set<Hash> peers; + + /** success */ + public TrackerResponse(int interval, int seeds, int leeches, Set<Hash> peers) { + this.interval = interval; + complete = seeds; + incomplete = leeches; + this.peers = peers; + error = null; + } + + /** failure */ + public TrackerResponse(String errorMsg) { + interval = DEFAULT_INTERVAL; + complete = 0; + incomplete = 0; + peers = null; + error = errorMsg; + } + + public Set<Hash> getPeers() { + return peers; + } + + public int getPeerCount() { + int pc = peers == null ? 0 : peers.size(); + return Math.max(pc, complete + incomplete - 1); + } + + public int getSeedCount() { + return complete; + } + + public int getLeechCount() { + return incomplete; + } + + public String getFailureReason() { + return error; + } + + /** in seconds */ + public int getInterval() { + return interval; + } + } + + private static class HostPort { + + protected final String host; + protected final int port; + + /** + * @param port the announce port + */ + public HostPort(String host, int port) { + this.host = host; + this.port = port; + } + + /** + * @return the announce port + */ + public int getPort() { + return port; + } + + @Override + public int hashCode() { + return host.hashCode() ^ port; + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof HostPort)) + return false; + HostPort dp = (HostPort) o; + return port == dp.port && host.equals(dp.host); + } + + @Override + public String toString() { + return "UDP Tracker " + host + ':' + port; + } + } + + private class Tracker extends HostPort { + + private final Object destLock = new Object(); + private Destination dest; + private long expires; + private long lastHeardFrom; + private long lastFailed; + private int consecFails; + private int interval = DEFAULT_INTERVAL; + + private static final long DELAY = 15*1000; + + public Tracker(String host, int port) { + super(host, port); + } + + /** + * @param fast if true, do not lookup + * @return dest or null + */ + public Destination getDest(boolean fast) { + synchronized(destLock) { + if (dest == null && !fast) + dest = _util.getDestination(host); + return dest; + } + } + + /** does not change state */ + public synchronized void replyTimeout() { + consecFails++; + lastFailed = _context.clock().now(); + } + + public synchronized int getInterval() { + return interval; + } + + /** sets heardFrom; calls notify */ + public synchronized void setInterval(int interval) { + long now = _context.clock().now(); + lastHeardFrom = now; + consecFails = 0; + this.interval = interval; + this.notifyAll(); + } + + /** sets heardFrom; calls notify */ + public synchronized void gotError() { + long now = _context.clock().now(); + lastHeardFrom = now; + consecFails = 0; + this.notifyAll(); + } + + /** doubled for each consecutive failure */ + public synchronized long getTimeout() { + return DEFAULT_TIMEOUT << Math.min(consecFails, 3); + } + + @Override + public String toString() { + return "UDP Tracker " + host + ':' + port + " hasDest? " + (dest != null); + } + } + + /** + * Callback for replies + */ + private class ReplyWaiter extends SimpleTimer2.TimedEvent { + private final int tid; + private final Tracker sentTo; + private final byte[] data; + private TrackerResponse replyObject; + private WaitState state = WaitState.INIT; + + /** + * Either wait on this object with a timeout, or use non-null Runnables. + * Any sent data to be remembered may be stored by setSentObject(). + * Reply object may be in getReplyObject(). + */ + public ReplyWaiter(int tid, Tracker tracker, byte[] payload, long toWait) { + super(SimpleTimer2.getInstance(), toWait); + this.tid = tid; + sentTo = tracker; + data = payload; + } + + public int getID() { + return tid; + } + + public Tracker getSentTo() { + return sentTo; + } + + public byte[] getPayload() { + return data; + } + + /** + * @return may be null depending on what happened. Cast to expected type. + */ + public synchronized TrackerResponse getReplyObject() { + return replyObject; + } + + /** + * If true, we got a reply, and getReplyObject() may contain something. + */ + public synchronized WaitState getState() { + return state; + } + + /** + * Will notify this. + * Also removes from _sentQueries and calls heardFrom(). + * Sets state to SUCCESS or FAIL. + */ + public synchronized void gotReply(boolean success) { + cancel(); + _sentQueries.remove(Integer.valueOf(tid)); + setState(success ? WaitState.SUCCESS : WaitState.FAIL); + } + + /** + * Will notify this and run onReply. + * Also removes from _sentQueries and calls heardFrom(). + */ + private synchronized void setState(WaitState state) { + this.state = state; + this.notifyAll(); + } + + /** + * Will notify this. + * Also removes from _sentQueries and calls heardFrom(). + * Sets state to SUCCESS. + */ + public synchronized void gotResponse(TrackerResponse resp) { + replyObject = resp; + gotReply(true); + } + + /** + * Sets state to INIT. + */ + @Override + public synchronized void schedule(long toWait) { + state = WaitState.INIT; + super.schedule(toWait); + } + + /** timer callback on timeout */ + public synchronized void timeReached() { + // don't trump success or failure + if (state != WaitState.INIT) + return; + //if (action == ACTION_CONNECT) + // sentTo.connFailed(); + //else + sentTo.replyTimeout(); + setState(WaitState.TIMEOUT); + if (_log.shouldWarn()) + _log.warn("timeout waiting for reply from " + sentTo); + } + + @Override + public String toString() { + return "Waiting for ID: " + tid + " to: " + sentTo + " state: " + state; + } + } +} -- GitLab