diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index cc324e08f..0fb4d97c2 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -86,6 +86,8 @@ public class I2PSnarkUtil implements DisconnectListener { private boolean _areFilesPublic; private List _openTrackers; private DHT _dht; + private boolean _enableUDP = ENABLE_UDP_TRACKER; + private UDPTrackerClient _udpTracker; private long _startedTime; private final DisconnectListener _discon; private int _maxFilesPerTorrent = SnarkManager.DEFAULT_MAX_FILES_PER_TORRENT; @@ -99,6 +101,7 @@ public class I2PSnarkUtil implements DisconnectListener { public static final String PROP_MAX_BW = "i2cp.outboundBytesPerSecond"; public static final boolean DEFAULT_USE_DHT = true; public static final String EEPGET_USER_AGENT = "I2PSnark"; + private static final boolean ENABLE_UDP_TRACKER = true; private static final List HIDDEN_I2CP_OPTS = Arrays.asList(new String[] { PROP_MAX_BW, "inbound.length", "outbound.length", "inbound.quantity", "outbound.quantity" }); @@ -341,7 +344,7 @@ public class I2PSnarkUtil implements DisconnectListener { if (opts.getProperty(I2PClient.PROP_SIGTYPE) == null) opts.setProperty(I2PClient.PROP_SIGTYPE, "EdDSA_SHA512_Ed25519"); if (opts.getProperty("i2cp.leaseSetEncType") == null) - opts.setProperty("i2cp.leaseSetEncType", "4,0"); + opts.setProperty("i2cp.leaseSetEncType", "4"); // assume compressed content if (opts.getProperty(I2PClient.PROP_GZIP) == null) opts.setProperty(I2PClient.PROP_GZIP, "false"); @@ -355,6 +358,11 @@ public class I2PSnarkUtil implements DisconnectListener { } if (_shouldUseDHT && _manager != null && _dht == null) _dht = new KRPC(_context, _baseName, _manager.getSession()); + if (_enableUDP &&_manager != null) { + if (_udpTracker == null) + _udpTracker = new UDPTrackerClient(_context, _manager.getSession(), this); + _udpTracker.start(); + } return (_manager != null); } @@ -381,6 +389,12 @@ public class I2PSnarkUtil implements DisconnectListener { */ public DHT getDHT() { return _dht; } + /** + * @return null if disabled or not started + * @since 0.9.14 + */ + public UDPTrackerClient getUDPTrackerClient() { return _udpTracker; } + public boolean connected() { return _manager != null; } /** @since 0.9.1 */ @@ -403,6 +417,10 @@ public class I2PSnarkUtil implements DisconnectListener { _dht.stop(); _dht = null; } + if (_udpTracker != null) { + _udpTracker.stop(); + _udpTracker = null; + } _startedTime = 0; I2PSocketManager mgr = _manager; // FIXME this can cause race NPEs elsewhere @@ -753,6 +771,16 @@ public class I2PSnarkUtil implements DisconnectListener { return _shouldUseDHT; } + /** @since 0.9.67 */ + public void setUDPEnabled(boolean yes) { + _enableUDP = yes; + } + + /** @since 0.9.67 */ + public boolean udpEnabled() { + return _enableUDP; + } + /** @since 0.9.31 */ public void setRatingsEnabled(boolean yes) { _enableRatings = yes; diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 285ce26e5..96754d09c 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -3075,6 +3075,13 @@ public class SnarkManager implements CompleteListener, ClientApp, DisconnectList */ public List getSortedTrackers() { List rv = new ArrayList(_trackerMap.values()); + if (!_util.udpEnabled()) { + for (Iterator iter = rv.iterator(); iter.hasNext(); ) { + Tracker tr = iter.next(); + if (tr.announceURL.startsWith("udp://")) + iter.remove(); + } + } Collections.sort(rv, new IgnoreCaseComparator()); return rv; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index da285d80a..c7a369bb9 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -28,6 +28,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashSet; import java.util.Iterator; @@ -89,6 +90,7 @@ public class TrackerClient implements Runnable { /** No guidance in BEP 5; standard practice is K (=8) */ private static final int DHT_ANNOUNCE_PEERS = 4; public static final int PORT = 6881; + private static final int DEFAULT_UDP_TRACKER_PORT = 6969; private static final int MAX_TRACKERS = 12; // tracker.welterde.i2p private static final Hash DSA_ONLY_TRACKER = ConvertToHash.getHash("cfmqlafjfmgkzbt4r3jsfyhgsr5abgxryl6fnz3d3y5a365di5aa.b32.i2p"); @@ -291,7 +293,6 @@ public class TrackerClient implements Runnable { // followed by the secondary open trackers // It's painful, but try to make sure if an open tracker is also // the primary tracker, that we don't add it twice. - // todo: check for b32 matches as well String primary = null; if (meta != null) primary = meta.getAnnounce(); @@ -316,37 +317,31 @@ public class TrackerClient implements Runnable { // announce list // We completely ignore the BEP 12 processing rules if (meta != null && !meta.isPrivate()) { + List urls = new ArrayList(16); List> list = meta.getAnnounceList(); if (list != null) { for (List llist : list) { for (String url : llist) { - if (!isNewValidTracker(trackerHashes, url)) - continue; - trackers.add(new TCTracker(url, trackers.isEmpty())); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Additional announce (list): [" + url + "] for infoHash: " + infoHash); + urls.add(url); } } - if (trackers.size() > 2) { - // shuffle everything but the primary - TCTracker pri = trackers.remove(0); - Collections.shuffle(trackers, _util.getContext().random()); - trackers.add(0, pri); + } + // configured open trackers + urls.addAll(_util.getOpenTrackers()); + if (urls.size() > 1) { + Collections.shuffle(trackers, _util.getContext().random()); + if (_util.udpEnabled()) { + // sort the list to put udp first so it will trump http + Collections.sort(urls, new URLComparator()); } } - } - - // configured open trackers - if (meta == null || !meta.isPrivate()) { - List tlist = _util.getOpenTrackers(); - for (int i = 0; i < tlist.size(); i++) { - String url = tlist.get(i); + for (String url : urls) { if (!isNewValidTracker(trackerHashes, url)) continue; - // opentrackers are primary if we don't have primary - trackers.add(new TCTracker(url, trackers.isEmpty())); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash); + // first one is primary if we don't have a primary + trackers.add(new TCTracker(url, trackers.isEmpty())); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash); } } @@ -526,18 +521,30 @@ public class TrackerClient implements Runnable { if (len > 0 && downloaded > len) downloaded = len; left = coordinator.getLeft(); - String event; - if (!tr.started) { - event = STARTED_EVENT; - } else if (newlyCompleted) { - event = COMPLETED_EVENT; + TrackerInfo info; + if (tr.isUDP) { + int event; + if (!tr.started) { + event = UDPTrackerClient.EVENT_STARTED; + } else if (newlyCompleted) { + event = UDPTrackerClient.EVENT_COMPLETED; + } else { + event = UDPTrackerClient.EVENT_NONE; + } + info = doRequest(tr, uploaded, downloaded, left, event); } else { - event = NO_EVENT; + String event; + if (!tr.started) { + event = STARTED_EVENT; + } else if (newlyCompleted) { + event = COMPLETED_EVENT; + } else { + event = NO_EVENT; + } + info = doRequest(tr, infoHash, peerID, + uploaded, downloaded, left, + event); } - TrackerInfo info = doRequest(tr, infoHash, peerID, - uploaded, downloaded, left, - event); - snark.setTrackerProblems(null); tr.trackerProblems = null; tr.registerFails = 0; @@ -839,21 +846,27 @@ public class TrackerClient implements Runnable { if (len > 0 && downloaded > len) downloaded = len; long left = coordinator.getLeft(); - try - { + try { // Don't try to restart I2CP connection just to say goodbye - if (_util.connected()) { - if (tr.started && (!tr.stop) && tr.trackerProblems == null) - doRequest(tr, infoHash, peerID, uploaded, - downloaded, left, STOPPED_EVENT); - } - } + if (_util.connected()) { + if (tr.started && (!tr.stop) && tr.trackerProblems == null) { + if (tr.isUDP) { + doRequest(tr, uploaded, + downloaded, left, UDPTrackerClient.EVENT_STOPPED); + } else { + doRequest(tr, infoHash, peerID, uploaded, + downloaded, left, STOPPED_EVENT); + } + } + } + } catch(IOException ioe) { /* ignored */ } tr.reset(); } } /** + * HTTP - blocking * * Note: IOException message text gets displayed in the UI * @@ -922,6 +935,48 @@ public class TrackerClient implements Runnable { return info; } + /** + * UDP - blocking + * + * @return null if _fastUnannounce && event == STOPPED + * @since 0.9.54 + */ + private TrackerInfo doRequest(TCTracker tr, long uploaded, + long downloaded, long left, int event) throws IOException { + UDPTrackerClient udptc = _util.getUDPTrackerClient(); + if (udptc == null) + throw new IOException("no UDPTC"); + if (_log.shouldLog(Log.INFO)) + _log.info("Sending UDPTrackerClient request"); + + tr.lastRequestTime = System.currentTimeMillis(); + // Don't wait for a response to stopped when shutting down + boolean fast = _fastUnannounce && event == UDPTrackerClient.EVENT_STOPPED; + long maxWait = fast ? 5*1000 : 60*1000; + boolean small = left == 0 || event == UDPTrackerClient.EVENT_STOPPED || !coordinator.needOutboundPeers(); + int numWant = small ? 0 : _util.getMaxConnections(); + UDPTrackerClient.TrackerResponse fetched = udptc.announce(meta.getInfoHash(), snark.getID(), numWant, + maxWait, tr.host, tr.port, + downloaded, left, uploaded, event, fast); + if (fast) + return null; + if (fetched == null) + throw new IOException("UDP announce error to: " + tr.host); + + TrackerInfo info = new TrackerInfo(fetched.getPeers(), fetched.getInterval(), fetched.getSeedCount(), + fetched.getLeechCount(), fetched.getFailureReason(), + snark.getID(), snark.getInfoHash(), snark.getMetaInfo(), _util); + if (_log.shouldLog(Log.INFO)) + _log.info("TrackerClient response: " + info); + + String failure = info.getFailureReason(); + if (failure != null) + throw new IOException(failure); + + tr.interval = Math.max(MIN_TRACKER_ANNOUNCE_INTERVAL, info.getInterval() * 1000l); + return info; + } + /** * Very lazy byte[] to URL encoder. Just encodes almost everything, even * some "normal" chars. @@ -969,8 +1024,12 @@ public class TrackerClient implements Runnable { String path = url.getPath(); if (path == null || !path.startsWith("/")) return false; - return "http".equals(url.getScheme()) && url.getHost() != null && - (url.getHost().endsWith(".i2p") || url.getHost().equals("i2p")); + String scheme = url.getScheme(); + if (!("http".equals(scheme) || "udp".equals(scheme))) + return false; + String host = url.getHost(); + return host != null && + (host.endsWith(".i2p") || host.equals("i2p")); } /** @@ -980,14 +1039,15 @@ public class TrackerClient implements Runnable { * @return a Hash for i2p hosts only, null otherwise * @since 0.9.5 */ - private static Hash getHostHash(String ann) { + private Hash getHostHash(String ann) { URI url; try { url = new URI(ann); } catch (URISyntaxException use) { return null; } - if (!"http".equals(url.getScheme())) + String scheme = url.getScheme(); + if (!("http".equals(scheme) || (_util.udpEnabled() && "udp".equals(scheme)))) return null; String host = url.getHost(); if (host == null) { @@ -1022,11 +1082,30 @@ public class TrackerClient implements Runnable { return null; } + /** + * UDP before HTTP + * + * @since 0.9.67 + */ + private static class URLComparator implements Comparator { + public int compare(String l, String r) { + boolean ul = l.startsWith("udp://"); + boolean ur = r.startsWith("udp://"); + if (ul && !ur) + return -1; + if (ur && !ul) + return -1; + return 0; + } + } + private static class TCTracker { final String announce; final String host; final boolean isPrimary; + final boolean isUDP; + final int port; long interval; long lastRequestTime; String trackerProblems; @@ -1037,14 +1116,27 @@ public class TrackerClient implements Runnable { int seenPeers; /** - * @param a must be a valid http URL with a path + * @param a must be a valid http URL with a path, + * or a udp URL (path is ignored) * @param p true if primary */ public TCTracker(String a, boolean p) { announce = a; - String s = a.substring(7); - host = s.substring(0, s.indexOf('/')); + URI url; + try { + url = new URI(a); + isUDP = "udp".equals(url.getScheme()); + host = url.getHost(); + int pt = url.getPort(); + if (pt < 0) { + pt = isUDP ? DEFAULT_UDP_TRACKER_PORT : 80; + } + port = pt; + } catch (URISyntaxException use) { + // shouldn't happen, already validated + throw new IllegalArgumentException(use); + } isPrimary = p; interval = INITIAL_SLEEP; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerInfo.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerInfo.java index 9c39fa4d6..ecb647166 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerInfo.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerInfo.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import net.i2p.data.Hash; + import org.klomp.snark.bencode.BDecoder; import org.klomp.snark.bencode.BEValue; import org.klomp.snark.bencode.InvalidBEncodingException; @@ -123,6 +125,21 @@ class TrackerInfo } ******/ + /** + * To convert returned UDPTracker data to the standard structure + * @param hashes may be null + * @param error may be null + * @since 0.9.14 + */ + public TrackerInfo(Set hashes, int interval, int complete, int incomplete, String error, + byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util) { + peers = getPeers(hashes, my_id, infohash, metainfo, util); + this.interval = interval; + this.complete = complete; + this.incomplete = incomplete; + failure_reason = error; + } + /** List of Dictionaries or List of Strings */ private static Set getPeers(List l, byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util) throws IOException @@ -179,6 +196,31 @@ class TrackerInfo return peers; } + /** + * From Hash to Peer + * @since 0.9.14 + */ + private static Set getPeers(Set hashes, byte[] my_id, + byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util) { + if (hashes == null) + return Collections.emptySet(); + Set peers = new HashSet(hashes.size()); + for (Hash h : hashes) { + PeerID peerID; + byte[] hash = new byte[HASH_LENGTH]; + System.arraycopy(h.getData(), 0, hash, 0, HASH_LENGTH); + try { + peerID = new PeerID(hash, util); + } catch (InvalidBEncodingException ibe) { + // won't happen + continue; + } + peers.add(new Peer(peerID, my_id, infohash, metainfo)); + } + + return peers; + } + public Set getPeers() { return peers; diff --git a/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java index ce0db9316..d73b83157 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/UDPTrackerClient.java @@ -1,5 +1,6 @@ package org.klomp.snark; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -11,8 +12,8 @@ import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.client.I2PSessionMuxedListener; import net.i2p.client.SendMessageOptions; -import net.i2p.client.datagram.I2PDatagramDissector; -import net.i2p.client.datagram.I2PDatagramMaker; +import net.i2p.client.datagram.Datagram2; +import net.i2p.client.datagram.Datagram3; import net.i2p.client.datagram.I2PInvalidDatagramException; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; @@ -21,6 +22,9 @@ import net.i2p.data.Hash; import net.i2p.util.Log; import net.i2p.util.SimpleTimer2; +import org.klomp.snark.I2PSnarkUtil; +import org.klomp.snark.TrackerClient; + /** * One of these for all trackers and info hashes. * Ref: BEP 15, proposal 160 @@ -28,15 +32,17 @@ import net.i2p.util.SimpleTimer2; * The main difference from BEP 15 is that the announce response * contains a 32-byte hash instead of a 4-byte IP and a 2-byte port. * - * This implements only "fast mode". - * We send only repliable datagrams, and - * receive only raw datagrams, as follows: + * We send both repliable and raw datagrams, but + * we only receive raw datagrams, as follows: * *
  *  client		tracker		type
  *  ------		-------		----
- *   announce  -->			repliable
- *            	<-- 	ann resp	raw
+ *   conn req	-->			(repliable to query port)
+ *          	<--	conn resp	(raw from resp port)
+ *   announce  -->			(raw to resp port)
+ *            	<-- 	ann resp	(raw from resp port)
+ *          	<--	error		(raw from resp port)
  *
* * @since 0.9.53, enabled in 0.9.54 @@ -48,6 +54,8 @@ class UDPTrackerClient implements I2PSessionMuxedListener { /** hook to inject and receive datagrams */ private final I2PSession _session; private final I2PSnarkUtil _util; + /** 20 byte random id */ + private final int _myKey; private final Hash _myHash; /** unsigned dgrams */ private final int _rPort; @@ -55,8 +63,11 @@ class UDPTrackerClient implements I2PSessionMuxedListener { private final ConcurrentHashMap _trackers; /** our TID to tracker */ private final Map _sentQueries; + private final SimpleTimer2.TimedEvent _cleaner; private boolean _isRunning; + private static final long INIT_CONN_ID = 0x41727101980L; + public static final int EVENT_NONE = 0; public static final int EVENT_COMPLETED = 1; public static final int EVENT_STARTED = 2; @@ -70,6 +81,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { private static final int SEND_CRYPTO_TAGS = 8; private static final int LOW_CRYPTO_TAGS = 4; + private static final long CONN_EXPIRATION = 60*1000; // BEP 15 private static final long DEFAULT_TIMEOUT = 15*1000; private static final long DEFAULT_QUERY_TIMEOUT = 60*1000; private static final long CLEAN_TIME = 163*1000; @@ -79,8 +91,6 @@ class UDPTrackerClient implements I2PSessionMuxedListener { private static final int MIN_INTERVAL = 15*60; private static final int MAX_INTERVAL = 8*60*60; - private enum WaitState { INIT, SUCCESS, TIMEOUT, FAIL } - /** * */ @@ -90,9 +100,11 @@ class UDPTrackerClient implements I2PSessionMuxedListener { _util = util; _log = ctx.logManager().getLog(UDPTrackerClient.class); _rPort = TrackerClient.PORT - 1; + _myKey = ctx.random().nextInt(); _myHash = session.getMyDestination().calculateHash(); _trackers = new ConcurrentHashMap(8); _sentQueries = new ConcurrentHashMap(32); + _cleaner = new Cleaner(); } @@ -104,6 +116,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { return; _session.addMuxedSessionListener(this, I2PSession.PROTO_DATAGRAM_RAW, _rPort); _isRunning = true; + _cleaner.schedule(7 * CLEAN_TIME); } /** @@ -114,6 +127,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { return; _isRunning = false; _session.removeListener(I2PSession.PROTO_DATAGRAM_RAW, _rPort); + _cleaner.cancel(); _trackers.clear(); for (ReplyWaiter w : _sentQueries.values()) { w.cancel(); @@ -128,7 +142,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { * * @param ih the Info Hash (torrent) * @param max maximum number of peers to return - * @param maxWait the maximum time to wait (ms) must be greater than 0 + * @param maxWait the maximum time to wait (ms) must be > 0 * @param fast if true, don't wait for dest, no retx, ... * @return null on fail or if fast is true */ @@ -138,7 +152,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { int event, boolean fast) { long now = _context.clock().now(); long end = now + maxWait; - if (toPort < 0) + if (toPort <= 0) throw new IllegalArgumentException(); Tracker tr = getTracker(toHost, toPort); if (tr.getDest(fast) == null) { @@ -154,6 +168,12 @@ class UDPTrackerClient implements I2PSessionMuxedListener { _log.info("out of time after resolving: " + tr); return null; } + Long cid = getConnection(tr, now + toWait); + if (cid == null) { + if (_log.shouldInfo()) + _log.info("no connection for: " + tr); + return null; + } if (fast) { toWait = 0; } else { @@ -164,7 +184,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { return null; } } - ReplyWaiter w = sendAnnounce(tr, 0, ih, peerID, + ReplyWaiter w = sendAnnounce(tr, cid.longValue(), ih, peerID, downloaded, left, uploaded, event, max, toWait); if (fast) return null; @@ -184,6 +204,55 @@ class UDPTrackerClient implements I2PSessionMuxedListener { //////// private below here + /** + * @return the connection ID, or null on fail + */ + private Long getConnection(Tracker tr, long untilTime) { + boolean shouldConnect = false; + synchronized(tr) { + boolean wasInProgress = false; + while(true) { + Long conn = tr.getConnection(); + if (conn != null) + return conn; + // don't resend right after somebody else failed + if (wasInProgress) + return null; + long now = _context.clock().now(); + long toWait = untilTime - now; + if (toWait <= 0) + return null; + if (tr.isConnInProgress()) { + wasInProgress = true; + try { + tr.wait(toWait); + } catch (InterruptedException ie) {} + } else { + shouldConnect = true; + tr.setConnInProgress(true); + break; + } + } + } + if (shouldConnect) { + long now = _context.clock().now(); + long toWait = untilTime - now; + if (toWait <= 1000) { + tr.setConnInProgress(false); + return null; + } + ReplyWaiter w = sendConnReq(tr, toWait); + if (w == null) { + tr.setConnInProgress(false); + return null; + } + boolean success = waitAndRetransmit(w, untilTime); + if (success) + return tr.getConnection(); + } + return null; + } + /** * @return non-null */ @@ -197,6 +266,40 @@ class UDPTrackerClient implements I2PSessionMuxedListener { ///// Sending..... + /** + * Send one time with a new tid + * @param toWait > 0 + * @return null on failure or if toWait <= 0 + */ + private ReplyWaiter sendConnReq(Tracker tr, long toWait) { + if (toWait <= 0) + throw new IllegalArgumentException(); + int tid = _context.random().nextInt(); + byte[] payload = sendConnReq(tr, tid); + if (payload != null) { + ReplyWaiter rv = new ReplyWaiter(tid, tr, ACTION_CONNECT, payload, toWait); + _sentQueries.put(Integer.valueOf(tid), rv); + if (_log.shouldInfo()) + _log.info("Sent: " + rv + " timeout: " + toWait); + return rv; + } + return null; + } + + /** + * Send one time with given tid + * @return the payload or null on failure + */ + private byte[] sendConnReq(Tracker tr, int tid) { + // same as BEP 15 + byte[] payload = new byte[16]; + DataHelper.toLong8(payload, 0, INIT_CONN_ID); + // next 4 bytes are already zero + DataHelper.toLong(payload, 12, 4, tid); + boolean rv = sendMessage(tr.getDest(true), tr.getPort(), payload, true); + return rv ? payload : null; + } + /** * Send one time with a new tid * @param toWait if <= 0 does not register @@ -210,7 +313,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { byte[] payload = sendAnnounce(tr, tid, connID, ih, id, downloaded, left, uploaded, event, numWant); if (payload != null) { if (toWait > 0) { - ReplyWaiter rv = new ReplyWaiter(tid, tr, payload, toWait); + ReplyWaiter rv = new ReplyWaiter(tid, tr, ACTION_ANNOUNCE, payload, toWait); _sentQueries.put(Integer.valueOf(tid), rv); if (_log.shouldInfo()) _log.info("Sent: " + rv + " timeout: " + toWait); @@ -242,7 +345,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { DataHelper.toLong(payload, 80, 4, event); DataHelper.toLong(payload, 92, 4, numWant); DataHelper.toLong(payload, 96, 2, TrackerClient.PORT); - boolean rv = sendMessage(tr.getDest(true), tr.getPort(), payload, true); + boolean rv = sendMessage(tr.getDest(true), tr.getPort(), payload, false); return rv ? payload : null; } @@ -257,9 +360,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { if (toWait <= 0) return false; w.wait(toWait); - } catch (InterruptedException ie) { - return false; - } + } catch (InterruptedException ie) {} switch (w.getState()) { case INIT: continue; @@ -290,11 +391,12 @@ class UDPTrackerClient implements I2PSessionMuxedListener { * @return success */ private boolean resend(ReplyWaiter w, long toWait) { + boolean repliable = w.getExpectedAction() == ACTION_CONNECT; Tracker tr = w.getSentTo(); int port = tr.getPort(); if (_log.shouldInfo()) _log.info("Resending: " + w + " timeout: " + toWait); - boolean rv = sendMessage(tr.getDest(true), port, w.getPayload(), true); + boolean rv = sendMessage(tr.getDest(true), port, w.getPayload(), repliable); if (rv) { _sentQueries.put(Integer.valueOf(w.getID()), w); w.schedule(toWait); @@ -319,28 +421,35 @@ class UDPTrackerClient implements I2PSessionMuxedListener { _log.info("send failed, no dest"); return false; } - if (dest.calculateHash().equals(_myHash)) + Hash to = dest.calculateHash(); + if (to.equals(_myHash)) throw new IllegalArgumentException("don't send to ourselves"); if (repliable) { - I2PDatagramMaker dgMaker = new I2PDatagramMaker(_session); - payload = dgMaker.makeI2PDatagram(payload); - if (payload == null) { + try { + payload = Datagram2.make(_context, _session, payload, to); + } catch (DataFormatException dfe) { if (_log.shouldWarn()) - _log.warn("DGM fail"); + _log.warn("DG2 fail", dfe); + return false; + } + } else { + try { + payload = Datagram3.make(_context, _session, payload); + } catch (DataFormatException dfe) { + if (_log.shouldWarn()) + _log.warn("DG3 fail", dfe); return false; } } SendMessageOptions opts = new SendMessageOptions(); opts.setDate(_context.clock().now() + 60*1000); - opts.setTagsToSend(SEND_CRYPTO_TAGS); - opts.setTagThreshold(LOW_CRYPTO_TAGS); if (!repliable) opts.setSendLeaseSet(false); try { boolean success = _session.sendMessage(dest, payload, 0, payload.length, - repliable ? I2PSession.PROTO_DATAGRAM : I2PSession.PROTO_DATAGRAM_RAW, + repliable ? I2PSession.PROTO_DATAGRAM2 : I2PSession.PROTO_DATAGRAM3, _rPort, toPort, opts); if (success) { // ... @@ -376,36 +485,69 @@ class UDPTrackerClient implements I2PSessionMuxedListener { _log.info("Rcvd msg with no one waiting: " + tid); return; } + int expect = waiter.getExpectedAction(); + if (expect != action && action != ACTION_ERROR) { + if (_log.shouldInfo()) + _log.info("Got action " + action + " but wanted " + expect + " for: " + waiter); + waiter.gotReply(false); + return; + } - if (action == ACTION_ANNOUNCE) { + switch (action) { + case ACTION_CONNECT: + receiveConnection(waiter, payload, fromPort); + break; + + case ACTION_ANNOUNCE: receiveAnnounce(waiter, payload); - } else if (action == ACTION_ERROR) { - receiveError(waiter, payload); - } else { - // includes ACTION_CONNECT + break; + + case ACTION_ERROR: + receiveError(waiter, payload, expect); + break; + + default: if (_log.shouldInfo()) _log.info("Rcvd msg with unknown action: " + action + " for: " + waiter); waiter.gotReply(false); Tracker tr = waiter.getSentTo(); tr.gotError(); + break; + } + } + + /** + * @param lifetime ms + */ + private void receiveConnection(ReplyWaiter waiter, byte[] payload, int fromPort) { + Tracker tr = waiter.getSentTo(); + if (payload.length >= 16) { + long cid = DataHelper.fromLong8(payload, 8); + long lifetime; + if (payload.length >= 18) { + // extension to BEP 15 + lifetime = DataHelper.fromLong(payload, 16, 2) * 1000; + } else { + lifetime = CONN_EXPIRATION; + } + if (_log.shouldInfo()) + _log.info("Rcvd connect response, id = " + cid + " lifetime = " + (lifetime / 1000) + " from " + tr); + tr.setConnection(cid, fromPort, lifetime); + waiter.gotReply(true); + } else { + waiter.gotReply(false); + tr.gotError(); } } private void receiveAnnounce(ReplyWaiter waiter, byte[] payload) { Tracker tr = waiter.getSentTo(); - if (payload.length >= 22) { + if (payload.length >= 20) { int interval = Math.min(MAX_INTERVAL, Math.max(MIN_INTERVAL, (int) DataHelper.fromLong(payload, 8, 4))); int leeches = (int) DataHelper.fromLong(payload, 12, 4); int seeds = (int) DataHelper.fromLong(payload, 16, 4); - int peers = (int) DataHelper.fromLong(payload, 20, 2); - if (22 + (peers * Hash.HASH_LENGTH) > payload.length) { - if (_log.shouldWarn()) - _log.warn("Short reply"); - waiter.gotReply(false); - tr.gotError(); - return; - } + int peers = (payload.length - 20) / Hash.HASH_LENGTH; if (_log.shouldInfo()) _log.info("Rcvd " + peers + " peers from " + tr); Set hashes; @@ -426,7 +568,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { } } - private void receiveError(ReplyWaiter waiter, byte[] payload) { + private void receiveError(ReplyWaiter waiter, byte[] payload, int expected) { String msg; if (payload.length > 8) { msg = DataHelper.getUTF8(payload, 8, payload.length - 8); @@ -437,6 +579,9 @@ class UDPTrackerClient implements I2PSessionMuxedListener { waiter.gotResponse(resp); Tracker tr = waiter.getSentTo(); tr.gotError(); + if (waiter.getExpectedAction() == ACTION_ANNOUNCE) { + // TODO if we were waiting for an announce reply, fire off a new connection request + } } // I2PSessionMuxedListener interface ---------------- @@ -488,6 +633,43 @@ class UDPTrackerClient implements I2PSessionMuxedListener { _log.warn("UDPTC got error msg: ", error); } + /** + * Cleaner-upper + */ + private class Cleaner extends SimpleTimer2.TimedEvent { + + public Cleaner() { + super(SimpleTimer2.getInstance(), 7 * CLEAN_TIME); + } + + public void timeReached() { + if (!_isRunning) + return; + long now = _context.clock().now(); +/******** + if (_log.shouldLog(Log.DEBUG)) + _log.debug("UDPTC cleaner starting with " + + _blacklist.size() + " in blacklist, " + + _outgoingTokens.size() + " sent Tokens, " + + _incomingTokens.size() + " rcvd Tokens"); + long expire = now - MAX_TOKEN_AGE; + for (Iterator iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) { + Token tok = iter.next(); + if (tok.lastSeen() < expire) + iter.remove(); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("UDPTC cleaner done, now with " + + _blacklist.size() + " in blacklist, " + + _outgoingTokens.size() + " sent Tokens, " + + _incomingTokens.size() + " rcvd Tokens, " + + _knownNodes.size() + " known peers, " + + _sentQueries.size() + " queries awaiting response"); +*******/ + schedule(CLEAN_TIME); + } + } + public static class TrackerResponse { private final int interval, complete, incomplete; @@ -578,20 +760,27 @@ class UDPTrackerClient implements I2PSessionMuxedListener { } } + private enum ConnState { INVALID, IN_PROGRESS, VALID } + private class Tracker extends HostPort { private final Object destLock = new Object(); private Destination dest; + // we store as a Long because all values are valid, so null is unset + private Long cid; private long expires; private long lastHeardFrom; private long lastFailed; private int consecFails; + private int responsePort; private int interval = DEFAULT_INTERVAL; + private ConnState state = ConnState.INVALID; private static final long DELAY = 15*1000; public Tracker(String host, int port) { super(host, port); + responsePort = port; } /** @@ -606,12 +795,57 @@ class UDPTrackerClient implements I2PSessionMuxedListener { } } + public synchronized void setConnInProgress(boolean yes) { + if (yes) + state = ConnState.IN_PROGRESS; + else if (state == ConnState.IN_PROGRESS) + state = ConnState.INVALID; + } + + public synchronized boolean isConnInProgress() { + return state == ConnState.IN_PROGRESS; + } + + public synchronized boolean isConnValid() { + return state == ConnState.VALID && + expires > _context.clock().now(); + } + + public synchronized void connFailed() { + replyTimeout(); + expires = 0; + state = ConnState.INVALID; + } + /** does not change state */ public synchronized void replyTimeout() { consecFails++; lastFailed = _context.clock().now(); } + /** + * sets heardFrom + * @param lifetime ms + */ + public synchronized void setConnection(long cid, int rport, long lifetime) { + this.cid = Long.valueOf(cid); + responsePort = rport; + long now = _context.clock().now(); + lastHeardFrom = now; + expires = now + lifetime; + consecFails = 0; + state = ConnState.VALID; + } + + /** + * @return null if invalid + */ + public synchronized Long getConnection() { + if (isConnValid()) + return cid; + return null; + } + public synchronized int getInterval() { return interval; } @@ -629,7 +863,9 @@ class UDPTrackerClient implements I2PSessionMuxedListener { public synchronized void gotError() { long now = _context.clock().now(); lastHeardFrom = now; - consecFails = 0; + consecFails++; + state = ConnState.INVALID; + cid = null; this.notifyAll(); } @@ -640,16 +876,20 @@ class UDPTrackerClient implements I2PSessionMuxedListener { @Override public String toString() { - return "UDP Tracker " + host + ':' + port + " hasDest? " + (dest != null); + return "UDP Tracker " + host + ':' + port + " hasDest? " + (dest != null) + + "valid? " + isConnValid() + " conn ID: " + (cid != null ? cid : "none") + ' ' + state; } } + private enum WaitState { INIT, SUCCESS, TIMEOUT, FAIL } + /** * Callback for replies */ private class ReplyWaiter extends SimpleTimer2.TimedEvent { private final int tid; private final Tracker sentTo; + private final int action; private final byte[] data; private TrackerResponse replyObject; private WaitState state = WaitState.INIT; @@ -659,10 +899,11 @@ class UDPTrackerClient implements I2PSessionMuxedListener { * Any sent data to be remembered may be stored by setSentObject(). * Reply object may be in getReplyObject(). */ - public ReplyWaiter(int tid, Tracker tracker, byte[] payload, long toWait) { + public ReplyWaiter(int tid, Tracker tracker, int action, byte[] payload, long toWait) { super(SimpleTimer2.getInstance(), toWait); this.tid = tid; sentTo = tracker; + this.action = action; data = payload; } @@ -674,6 +915,10 @@ class UDPTrackerClient implements I2PSessionMuxedListener { return sentTo; } + public int getExpectedAction() { + return action; + } + public byte[] getPayload() { return data; } @@ -719,7 +964,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { */ public synchronized void gotResponse(TrackerResponse resp) { replyObject = resp; - gotReply(true); + gotReply(resp.error == null); } /** @@ -736,9 +981,9 @@ class UDPTrackerClient implements I2PSessionMuxedListener { // don't trump success or failure if (state != WaitState.INIT) return; - //if (action == ACTION_CONNECT) - // sentTo.connFailed(); - //else + if (action == ACTION_CONNECT) + sentTo.connFailed(); + else sentTo.replyTimeout(); setState(WaitState.TIMEOUT); if (_log.shouldWarn()) @@ -747,7 +992,7 @@ class UDPTrackerClient implements I2PSessionMuxedListener { @Override public String toString() { - return "Waiting for ID: " + tid + " to: " + sentTo + " state: " + state; + return "Message type: " + action + " ID: " + tid + " to: " + sentTo + " state: " + state; } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 4c97852f7..74739cd8f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -2472,6 +2472,8 @@ public class I2PSnarkServlet extends BasicServlet { String trackerLinkUrl = getTrackerLinkUrl(announce, infohash); if (announce.startsWith("http://")) announce = announce.substring(7); + else if (announce.startsWith("udp://")) + announce = announce.substring(6); // strip path int slsh = announce.indexOf('/'); if (slsh > 0) @@ -4097,11 +4099,11 @@ public class I2PSnarkServlet extends BasicServlet { * Just to hide non-i2p trackers from the details page. * @since 0.9.46 */ - private static boolean isI2PTracker(String url) { + private boolean isI2PTracker(String url) { try { URI uri = new URI(url); String method = uri.getScheme(); - if (!"http".equals(method) && !"https".equals(method)) + if (!("http".equals(method) || (_manager.util().udpEnabled() && "udp".equals(method)))) return false; String host = uri.getHost(); if (host == null || !host.endsWith(".i2p"))