diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index b93aa2a7..9b347612 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -85,6 +85,7 @@ import com.muwire.core.upload.UploadManager import com.muwire.core.util.MuWireLogManager import com.muwire.core.content.ContentControlEvent import com.muwire.core.content.ContentManager +import com.muwire.core.tracker.TrackerResponder import groovy.util.logging.Log import net.i2p.I2PAppContext @@ -136,6 +137,7 @@ public class Core { private final FeedClient feedClient private final WatchedDirectoryConverter watchedDirectoryConverter final WatchedDirectoryManager watchedDirectoryManager + private final TrackerResponder trackerResponder private final Router router @@ -371,6 +373,9 @@ public class Core { log.info("initializing upload manager") uploadManager = new UploadManager(eventBus, fileManager, meshManager, downloadManager, persisterFolderService, props) + + log.info("initializing tracker responder") + trackerResponder = new TrackerResponder(i2pSession, props, fileManager, downloadManager, meshManager, trustService, me) log.info("initializing connection establisher") connectionEstablisher = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) @@ -450,6 +455,7 @@ public class Core { updateClient?.start() feedManager.start() feedClient.start() + trackerResponder.start() } public void shutdown() { @@ -489,6 +495,8 @@ public class Core { feedManager.stop() log.info("shutting down feed client") feedClient.stop() + log.info("shutting down tracker responder") + trackerResponder.stop() log.info("shutting down connection manager") connectionManager.shutdown() log.info("killing i2p session") diff --git a/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy b/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy index a290b745..f4aafc43 100644 --- a/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy +++ b/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy @@ -31,6 +31,7 @@ class MuWireSettings { boolean shareHiddenFiles boolean searchComments boolean browseFiles + boolean allowTracking boolean fileFeed boolean advertiseFeed @@ -92,6 +93,7 @@ class MuWireSettings { outBw = Integer.valueOf(props.getProperty("outBw","128")) searchComments = Boolean.valueOf(props.getProperty("searchComments","true")) browseFiles = Boolean.valueOf(props.getProperty("browseFiles","true")) + allowTracking = Boolean.valueOf(props.getProperty("allowTracking","true")) // feed settings fileFeed = Boolean.valueOf(props.getProperty("fileFeed","true")) @@ -157,6 +159,7 @@ class MuWireSettings { props.setProperty("outBw", String.valueOf(outBw)) props.setProperty("searchComments", String.valueOf(searchComments)) props.setProperty("browseFiles", String.valueOf(browseFiles)) + props.setProperty("allowTracking", String.valueOf(allowTracking)) // feed settings props.setProperty("fileFeed", String.valueOf(fileFeed)) diff --git a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy index 1c026d55..99609dd7 100644 --- a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy @@ -2,7 +2,7 @@ package com.muwire.core.download class Pieces { private final BitSet done, claimed - private final int nPieces + final int nPieces private final float ratio private final Random random = new Random() private final Map partials = new HashMap<>() diff --git a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy index 95f0de87..227bc4d4 100644 --- a/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy +++ b/core/src/main/groovy/com/muwire/core/mesh/Mesh.groovy @@ -10,7 +10,7 @@ import net.i2p.util.ConcurrentHashSet class Mesh { private final InfoHash infoHash private final Set sources = new ConcurrentHashSet<>() - private final Pieces pieces + final Pieces pieces Mesh(InfoHash infoHash, Pieces pieces) { this.infoHash = infoHash diff --git a/core/src/main/groovy/com/muwire/core/tracker/TrackerResponder.groovy b/core/src/main/groovy/com/muwire/core/tracker/TrackerResponder.groovy new file mode 100644 index 00000000..ff535039 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/tracker/TrackerResponder.groovy @@ -0,0 +1,175 @@ +package com.muwire.core.tracker + +import java.util.logging.Level +import java.util.stream.Collectors + +import com.muwire.core.Constants +import com.muwire.core.InfoHash +import com.muwire.core.MuWireSettings +import com.muwire.core.Persona +import com.muwire.core.download.DownloadManager +import com.muwire.core.download.Pieces +import com.muwire.core.files.FileManager +import com.muwire.core.mesh.Mesh +import com.muwire.core.mesh.MeshManager +import com.muwire.core.trust.TrustLevel +import com.muwire.core.trust.TrustService +import com.muwire.core.util.DataUtil + +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.util.logging.Log +import net.i2p.client.I2PSession +import net.i2p.client.I2PSessionMuxedListener +import net.i2p.client.SendMessageOptions +import net.i2p.client.datagram.I2PDatagramDissector +import net.i2p.client.datagram.I2PDatagramMaker +import net.i2p.data.Base64 + +@Log +class TrackerResponder { + private final I2PSession i2pSession + private final MuWireSettings muSettings + private final FileManager fileManager + private final DownloadManager downloadManager + private final MeshManager meshManager + private final TrustService trustService + private final Persona me + + TrackerResponder(I2PSession i2pSession, MuWireSettings muSettings, + FileManager fileManager, DownloadManager downloadManager, + MeshManager meshManager, TrustService trustService, + Persona me) { + this.i2pSession = i2pSession + this.muSettings = muSettings + this.fileManager = fileManager + this.downloadManager = downloadManager + this.meshManager = meshManager + this.trustService = trustService + this.me = me + } + + void start() { + i2pSession.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT) + } + + void stop() { + // ??? + } + + private void respond(host, json) { + log.info("responding to host $host with json $json") + + def message = JsonOutput.toJson(json) + def maker = new I2PDatagramMaker(i2pSession) + message = maker.makeI2PDatagram(message.bytes) + def options = new SendMessageOptions() + options.setSendLeaseSet(false) + i2pSession.sendMessage(host, message, 0, message.length, I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT, Constants.TRACKER_PORT, options) + } + + class Listener implements I2PSessionMuxedListener { + + @Override + public void messageAvailable(I2PSession session, int msgId, long size) { + } + + @Override + public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) { + if (proto != I2PSession.PROTO_DATAGRAM) { + log.warning "Received unexpected protocol $proto" + return + } + + byte[] payload = session.receiveMessage(msgId) + def dissector = new I2PDatagramDissector() + try { + dissector.loadI2PDatagram(payload) + def sender = dissector.getSender() + + log.info("got a tracker datagram from ${sender.toBase32()}") + + // if not trusted, just drop it + TrustLevel trustLevel = trustService.getLevel(sender) + + if (trustLevel == TrustLevel.DISTRUSTED || + (trustLevel == TrustLevel.NEUTRAL && !muSettings.allowUntrusted)) { + log.info("dropping, untrusted") + return + } + + payload = dissector.getPayload() + def slurper = new JsonSlurper() + def json = slurper.parse(payload) + + if (json.type != "TrackerPing") { + log.warning("unknown type $json.type") + return + } + + def response = [:] + if (!muSettings.allowTracking) { + response.code = 403 + respond(sender, response) + return + } + + if (json.version != 1) { + log.warning("unknown version $json.version") + response.code = 400 + response.message = "I only support version 1" + respond(sender,response) + return + } + + if (json.infoHash == null) { + log.warning("infoHash missing") + return + } + + byte[] infoHashBytes = Base64.decode(json.infoHash) + InfoHash infoHash = new InfoHash(infoHashBytes) + + if (!(fileManager.isShared(infoHash) || downloadManager.isDownloading(infoHash))) { + response.code = 404 + respond(sender, response) + return + } + + Mesh mesh = meshManager.get(infoHash) + + if (fileManager.isShared(infoHash)) + response.code = 200 + else if (mesh != null) { + response.code = 206 + Pieces pieces = mesh.getPieces() + response.xHave = DataUtil.encodeXHave(pieces, pieces.getnPieces()) + } + + if (mesh != null) + response.altlocs = mesh.getRandom(10, me).stream().map({it.toBase64()}).collect(Collectors.toList()) + + respond(sender,response) + } catch (Exception e) { + log.log(Level.WARNING, "invalid datagram", e) + } + } + + @Override + public void reportAbuse(I2PSession session, int severity) { + } + + @Override + public void disconnected(I2PSession session) { + log.severe("session disconnected") + } + + @Override + public void errorOccurred(I2PSession session, String message, Throwable error) { + log.log(Level.SEVERE, message, error) + } + + } + + +} diff --git a/core/src/main/groovy/com/muwire/core/update/UpdateClient.groovy b/core/src/main/groovy/com/muwire/core/update/UpdateClient.groovy index 9175db04..87f62c82 100644 --- a/core/src/main/groovy/com/muwire/core/update/UpdateClient.groovy +++ b/core/src/main/groovy/com/muwire/core/update/UpdateClient.groovy @@ -2,6 +2,7 @@ package com.muwire.core.update import java.util.logging.Level +import com.muwire.core.Constants import com.muwire.core.EventBus import com.muwire.core.InfoHash import com.muwire.core.MuWireSettings @@ -63,7 +64,7 @@ class UpdateClient { } void start() { - session.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, 2) + session.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.UPDATE_PORT) timer.schedule({checkUpdate()} as TimerTask, 60000, 60 * 60 * 1000) } @@ -108,7 +109,7 @@ class UpdateClient { ping = maker.makeI2PDatagram(ping.bytes) def options = new SendMessageOptions() options.setSendLeaseSet(true) - session.sendMessage(UpdateServers.UPDATE_SERVER, ping, 0, ping.length, I2PSession.PROTO_DATAGRAM, 2, 0, options) + session.sendMessage(UpdateServers.UPDATE_SERVER, ping, 0, ping.length, I2PSession.PROTO_DATAGRAM, Constants.UPDATE_PORT, 0, options) } class Listener implements I2PSessionMuxedListener { diff --git a/core/src/main/java/com/muwire/core/Constants.java b/core/src/main/java/com/muwire/core/Constants.java index eef95fcc..5e92f6ef 100644 --- a/core/src/main/java/com/muwire/core/Constants.java +++ b/core/src/main/java/com/muwire/core/Constants.java @@ -17,5 +17,8 @@ public class Constants { public static final int MAX_COMMENT_LENGTH = 0x1 << 15; - public static final long MAX_QUERY_AGE = 5 * 60 * 1000L; + public static final long MAX_QUERY_AGE = 5 * 60 * 1000L; + + public static final int UPDATE_PORT = 2; + public static final int TRACKER_PORT = 3; }