Merge branch 'tracking-server-side'

This commit is contained in:
Zlatin Balevsky
2020-04-29 12:37:06 +01:00
27 changed files with 1187 additions and 10 deletions

4
.gitignore vendored
View File

@@ -2,7 +2,7 @@
**/.settings
**/build
.gradle
.project
.classpath
**/.project
**/.classpath
**/*.rej
**/*.orig

View File

@@ -85,6 +85,7 @@ import com.muwire.core.upload.UploadManager
import com.muwire.core.util.MuWireLogManager
import com.muwire.core.content.ContentControlEvent
import com.muwire.core.content.ContentManager
import com.muwire.core.tracker.TrackerResponder
import groovy.util.logging.Log
import net.i2p.I2PAppContext
@@ -112,7 +113,7 @@ public class Core {
final Properties i2pOptions
final MuWireSettings muOptions
private final I2PSession i2pSession;
final I2PSession i2pSession;
final TrustService trustService
final TrustSubscriber trustSubscriber
private final PersisterService persisterService
@@ -136,6 +137,7 @@ public class Core {
private final FeedClient feedClient
private final WatchedDirectoryConverter watchedDirectoryConverter
final WatchedDirectoryManager watchedDirectoryManager
private final TrackerResponder trackerResponder
private final Router router
@@ -163,9 +165,9 @@ public class Core {
i2pOptionsFile.withInputStream { i2pOptions.load(it) }
if (!i2pOptions.containsKey("inbound.nickname"))
i2pOptions["inbound.nickname"] = "MuWire"
i2pOptions["inbound.nickname"] = tunnelName
if (!i2pOptions.containsKey("outbound.nickname"))
i2pOptions["outbound.nickname"] = "MuWire"
i2pOptions["outbound.nickname"] = tunnelName
}
if (!(i2pOptions.hasProperty("i2np.ntcp.port")
&& i2pOptions.hasProperty("i2np.udp.port")
@@ -371,6 +373,9 @@ public class Core {
log.info("initializing upload manager")
uploadManager = new UploadManager(eventBus, fileManager, meshManager, downloadManager, persisterFolderService, props)
log.info("initializing tracker responder")
trackerResponder = new TrackerResponder(i2pSession, props, fileManager, downloadManager, meshManager, trustService, me)
log.info("initializing connection establisher")
connectionEstablisher = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache)
@@ -450,6 +455,7 @@ public class Core {
updateClient?.start()
feedManager.start()
feedClient.start()
trackerResponder.start()
}
public void shutdown() {
@@ -489,6 +495,8 @@ public class Core {
feedManager.stop()
log.info("shutting down feed client")
feedClient.stop()
log.info("shutting down tracker responder")
trackerResponder.stop()
log.info("shutting down connection manager")
connectionManager.shutdown()
log.info("killing i2p session")

View File

@@ -31,6 +31,7 @@ class MuWireSettings {
boolean shareHiddenFiles
boolean searchComments
boolean browseFiles
boolean allowTracking
boolean fileFeed
boolean advertiseFeed
@@ -92,6 +93,7 @@ class MuWireSettings {
outBw = Integer.valueOf(props.getProperty("outBw","128"))
searchComments = Boolean.valueOf(props.getProperty("searchComments","true"))
browseFiles = Boolean.valueOf(props.getProperty("browseFiles","true"))
allowTracking = Boolean.valueOf(props.getProperty("allowTracking","true"))
// feed settings
fileFeed = Boolean.valueOf(props.getProperty("fileFeed","true"))
@@ -157,6 +159,7 @@ class MuWireSettings {
props.setProperty("outBw", String.valueOf(outBw))
props.setProperty("searchComments", String.valueOf(searchComments))
props.setProperty("browseFiles", String.valueOf(browseFiles))
props.setProperty("allowTracking", String.valueOf(allowTracking))
// feed settings
props.setProperty("fileFeed", String.valueOf(fileFeed))

View File

@@ -2,7 +2,7 @@ package com.muwire.core.download
class Pieces {
private final BitSet done, claimed
private final int nPieces
final int nPieces
private final float ratio
private final Random random = new Random()
private final Map<Integer,Integer> partials = new HashMap<>()

View File

@@ -10,7 +10,7 @@ import net.i2p.util.ConcurrentHashSet
class Mesh {
private final InfoHash infoHash
private final Set<Persona> sources = new ConcurrentHashSet<>()
private final Pieces pieces
final Pieces pieces
Mesh(InfoHash infoHash, Pieces pieces) {
this.infoHash = infoHash

View File

@@ -0,0 +1,214 @@
package com.muwire.core.tracker
import java.util.concurrent.ConcurrentHashMap
import java.util.logging.Level
import java.util.stream.Collectors
import com.muwire.core.Constants
import com.muwire.core.InfoHash
import com.muwire.core.MuWireSettings
import com.muwire.core.Persona
import com.muwire.core.download.DownloadManager
import com.muwire.core.download.Pieces
import com.muwire.core.files.FileManager
import com.muwire.core.mesh.Mesh
import com.muwire.core.mesh.MeshManager
import com.muwire.core.trust.TrustLevel
import com.muwire.core.trust.TrustService
import com.muwire.core.util.DataUtil
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.util.logging.Log
import net.i2p.client.I2PSession
import net.i2p.client.I2PSessionMuxedListener
import net.i2p.client.SendMessageOptions
import net.i2p.client.datagram.I2PDatagramDissector
import net.i2p.client.datagram.I2PDatagramMaker
import net.i2p.data.Base64
@Log
class TrackerResponder {
private final I2PSession i2pSession
private final MuWireSettings muSettings
private final FileManager fileManager
private final DownloadManager downloadManager
private final MeshManager meshManager
private final TrustService trustService
private final Persona me
private final Map<UUID,Long> uuids = new HashMap<>()
private final Timer expireTimer = new Timer("tracker-responder-timer", true)
private static final long UUID_LIFETIME = 10 * 60 * 1000
TrackerResponder(I2PSession i2pSession, MuWireSettings muSettings,
FileManager fileManager, DownloadManager downloadManager,
MeshManager meshManager, TrustService trustService,
Persona me) {
this.i2pSession = i2pSession
this.muSettings = muSettings
this.fileManager = fileManager
this.downloadManager = downloadManager
this.meshManager = meshManager
this.trustService = trustService
this.me = me
}
void start() {
i2pSession.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT)
expireTimer.schedule({expireUUIDs()} as TimerTask, UUID_LIFETIME, UUID_LIFETIME)
}
void stop() {
expireTimer.cancel()
}
private void expireUUIDs() {
final long now = System.currentTimeMillis()
synchronized(uuids) {
for (Iterator<UUID> iter = uuids.keySet().iterator(); iter.hasNext();) {
UUID uuid = iter.next();
Long time = uuids.get(uuid)
if (now - time > UUID_LIFETIME)
iter.remove()
}
}
}
private void respond(host, json) {
log.info("responding to host $host with json $json")
def message = JsonOutput.toJson(json)
def maker = new I2PDatagramMaker(i2pSession)
message = maker.makeI2PDatagram(message.bytes)
def options = new SendMessageOptions()
options.setSendLeaseSet(false)
i2pSession.sendMessage(host, message, 0, message.length, I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT, Constants.TRACKER_PORT, options)
}
class Listener implements I2PSessionMuxedListener {
@Override
public void messageAvailable(I2PSession session, int msgId, long size) {
}
@Override
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) {
if (proto != I2PSession.PROTO_DATAGRAM) {
log.warning "Received unexpected protocol $proto"
return
}
byte[] payload = session.receiveMessage(msgId)
def dissector = new I2PDatagramDissector()
try {
dissector.loadI2PDatagram(payload)
def sender = dissector.getSender()
log.info("got a tracker datagram from ${sender.toBase32()}")
// if not trusted, just drop it
TrustLevel trustLevel = trustService.getLevel(sender)
if (trustLevel == TrustLevel.DISTRUSTED ||
(trustLevel == TrustLevel.NEUTRAL && !muSettings.allowUntrusted)) {
log.info("dropping, untrusted")
return
}
payload = dissector.getPayload()
def slurper = new JsonSlurper()
def json = slurper.parse(payload)
if (json.type != "TrackerPing") {
log.warning("unknown type $json.type")
return
}
def response = [:]
response.type = "TrackerPong"
response.me = me.toBase64()
if (json.infoHash == null) {
log.warning("infoHash missing")
return
}
if (json.uuid == null) {
log.warning("uuid missing")
return
}
UUID uuid = UUID.fromString(json.uuid)
synchronized(uuids) {
if (uuids.containsKey(uuid)) {
log.warning("duplicate uuid $uuid")
return
}
uuids.put(uuid, System.currentTimeMillis())
}
response.uuid = json.uuid
if (!muSettings.allowTracking) {
response.code = 403
respond(sender, response)
return
}
if (json.version != 1) {
log.warning("unknown version $json.version")
response.code = 400
response.message = "I only support version 1"
respond(sender,response)
return
}
byte[] infoHashBytes = Base64.decode(json.infoHash)
InfoHash infoHash = new InfoHash(infoHashBytes)
log.info("servicing request for infoHash ${json.infoHash} with uuid ${json.uuid}")
if (!(fileManager.isShared(infoHash) || downloadManager.isDownloading(infoHash))) {
response.code = 404
respond(sender, response)
return
}
Mesh mesh = meshManager.get(infoHash)
if (fileManager.isShared(infoHash))
response.code = 200
else if (mesh != null) {
response.code = 206
Pieces pieces = mesh.getPieces()
response.xHave = DataUtil.encodeXHave(pieces, pieces.getnPieces())
}
if (mesh != null)
response.altlocs = mesh.getRandom(10, me).stream().map({it.toBase64()}).collect(Collectors.toList())
respond(sender,response)
} catch (Exception e) {
log.log(Level.WARNING, "invalid datagram", e)
}
}
@Override
public void reportAbuse(I2PSession session, int severity) {
}
@Override
public void disconnected(I2PSession session) {
log.severe("session disconnected")
}
@Override
public void errorOccurred(I2PSession session, String message, Throwable error) {
log.log(Level.SEVERE, message, error)
}
}
}

View File

@@ -2,6 +2,7 @@ package com.muwire.core.update
import java.util.logging.Level
import com.muwire.core.Constants
import com.muwire.core.EventBus
import com.muwire.core.InfoHash
import com.muwire.core.MuWireSettings
@@ -63,7 +64,7 @@ class UpdateClient {
}
void start() {
session.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, 2)
session.addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.UPDATE_PORT)
timer.schedule({checkUpdate()} as TimerTask, 60000, 60 * 60 * 1000)
}
@@ -108,7 +109,7 @@ class UpdateClient {
ping = maker.makeI2PDatagram(ping.bytes)
def options = new SendMessageOptions()
options.setSendLeaseSet(true)
session.sendMessage(UpdateServers.UPDATE_SERVER, ping, 0, ping.length, I2PSession.PROTO_DATAGRAM, 2, 0, options)
session.sendMessage(UpdateServers.UPDATE_SERVER, ping, 0, ping.length, I2PSession.PROTO_DATAGRAM, Constants.UPDATE_PORT, 0, options)
}
class Listener implements I2PSessionMuxedListener {

View File

@@ -17,5 +17,8 @@ public class Constants {
public static final int MAX_COMMENT_LENGTH = 0x1 << 15;
public static final long MAX_QUERY_AGE = 5 * 60 * 1000L;
public static final long MAX_QUERY_AGE = 5 * 60 * 1000L;
public static final int UPDATE_PORT = 2;
public static final int TRACKER_PORT = 3;
}

View File

@@ -104,6 +104,10 @@ class OptionsController {
model.browseFiles = browseFiles
settings.browseFiles = browseFiles
boolean allowTracking = view.allowTrackingCheckbox.model.isSelected()
model.allowTracking = allowTracking
settings.allowTracking = allowTracking
text = view.speedSmoothSecondsField.text
model.speedSmoothSeconds = Integer.valueOf(text)
settings.speedSmoothSeconds = Integer.valueOf(text)

View File

@@ -18,6 +18,7 @@ class OptionsModel {
@Observable String incompleteLocation
@Observable boolean searchComments
@Observable boolean browseFiles
@Observable boolean allowTracking
@Observable int speedSmoothSeconds
@Observable int totalUploadSlots
@Observable int uploadSlotsPerUser
@@ -83,6 +84,7 @@ class OptionsModel {
incompleteLocation = settings.incompleteLocation.getAbsolutePath()
searchComments = settings.searchComments
browseFiles = settings.browseFiles
allowTracking = settings.allowTracking
speedSmoothSeconds = settings.speedSmoothSeconds
totalUploadSlots = settings.totalUploadSlots
uploadSlotsPerUser = settings.uploadSlotsPerUser

View File

@@ -43,6 +43,7 @@ class OptionsView {
def shareHiddenCheckbox
def searchCommentsCheckbox
def browseFilesCheckbox
def allowTrackingCheckbox
def speedSmoothSecondsField
def totalUploadSlotsField
def uploadSlotsPerUserField
@@ -107,6 +108,10 @@ class OptionsView {
fill : GridBagConstraints.HORIZONTAL, weightx: 100))
browseFilesCheckbox = checkBox(selected : bind {model.browseFiles}, constraints : gbc(gridx : 1, gridy : 1,
anchor : GridBagConstraints.LINE_END, fill : GridBagConstraints.HORIZONTAL, weightx: 0))
label(text : "Allow tracking", constraints : gbc(gridx: 0, gridy: 2, anchor: GridBagConstraints.LINE_START,
fill : GridBagConstraints.HORIZONTAL, weightx: 100))
allowTrackingCheckbox = checkBox(selected : bind {model.allowTracking}, constraints : gbc(gridx: 1, gridy : 2,
anchor : GridBagConstraints.LINE_END, fill : GridBagConstraints.HORIZONTAL, weightx : 0))
}
panel (border : titledBorder(title : "Download Settings", border : etchedBorder(), titlePosition : TitledBorder.TOP,

View File

@@ -5,5 +5,6 @@ include 'core'
include 'gui'
include 'cli'
include 'cli-lanterna'
include 'tracker'
// include 'webui'
// include 'plug'

47
tracker/build.gradle Normal file
View File

@@ -0,0 +1,47 @@
buildscript {
repositories {
jcenter()
mavenLocal()
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:5.2.0'
}
}
plugins {
id 'org.springframework.boot' version '2.2.6.RELEASE'
}
apply plugin : 'application'
apply plugin : 'io.spring.dependency-management'
application {
mainClassName = 'com.muwire.tracker.Tracker'
applicationDefaultJvmArgs = ['-Djava.util.logging.config.file=logging.properties','-Xmx256M',"-Dbuild.version=${project.version}"]
applicationName = 'mwtrackerd'
}
apply plugin : 'com.github.johnrengelman.shadow'
springBoot {
buildInfo {
properties {
version = "${project.version}"
name = "mwtrackerd"
}
}
}
dependencies {
compile project(":core")
compile 'com.github.briandilley.jsonrpc4j:jsonrpc4j:1.5.3'
compile 'org.springframework.boot:spring-boot-starter'
compile 'org.springframework.boot:spring-boot-starter-actuator'
compile 'org.springframework.boot:spring-boot-starter-web'
runtime 'javax.jws:jsr181-api:1.0-MR1'
}

View File

@@ -0,0 +1,28 @@
package com.muwire.tracker
import com.muwire.core.Persona
/**
* A participant in a swarm. The same persona can be a member of multiple
* swarms, but in that case it would have multiple Host objects
*/
class Host {
final Persona persona
long lastPinged
long lastResponded
int failures
volatile String xHave
Host(Persona persona) {
this.persona = persona
}
boolean isExpired(long cutoff, int maxFailures) {
lastPinged > lastResponded && lastResponded <= cutoff && failures >= maxFailures
}
@Override
public String toString() {
"Host:[${persona.getHumanReadableName()} lastPinged:$lastPinged lastResponded:$lastResponded failures:$failures xHave:$xHave]"
}
}

View File

@@ -0,0 +1,182 @@
package com.muwire.tracker
import java.util.concurrent.ConcurrentHashMap
import java.util.logging.Level
import javax.annotation.PostConstruct
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import com.muwire.core.Constants
import com.muwire.core.Core
import com.muwire.core.Persona
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.util.logging.Log
import net.i2p.client.I2PSession
import net.i2p.client.I2PSessionMuxedListener
import net.i2p.client.SendMessageOptions
import net.i2p.client.datagram.I2PDatagramDissector
import net.i2p.client.datagram.I2PDatagramMaker
import net.i2p.data.Base64
@Component
@Log
class Pinger {
@Autowired
private Core core
@Autowired
private SwarmManager swarmManager
@Autowired
private TrackerProperties trackerProperties
private final Map<UUID, PingInProgress> inFlight = new ConcurrentHashMap<>()
private final Timer expiryTimer = new Timer("pinger-timer",true)
@PostConstruct
private void registerListener() {
core.getI2pSession().addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT)
expiryTimer.schedule({expirePings()} as TimerTask, 1000, 1000)
}
private void expirePings() {
final long now = System.currentTimeMillis()
for(Iterator<UUID> iter = inFlight.keySet().iterator(); iter.hasNext();) {
UUID uuid = iter.next()
PingInProgress ping = inFlight.get(uuid)
if (now - ping.pingTime > trackerProperties.getSwarmParameters().getPingTimeout() * 1000L) {
iter.remove()
swarmManager.fail(ping.target)
}
}
}
void ping(SwarmManager.HostAndIH target, long now) {
UUID uuid = UUID.randomUUID()
def ping = new PingInProgress(target, now)
inFlight.put(uuid, ping)
def message = [:]
message.type = "TrackerPing"
message.version = 1
message.infoHash = Base64.encode(target.getInfoHash().getRoot())
message.uuid = uuid.toString()
message = JsonOutput.toJson(message)
def maker = new I2PDatagramMaker(core.getI2pSession())
message = maker.makeI2PDatagram(message.bytes)
def options = new SendMessageOptions()
options.setSendLeaseSet(true)
core.getI2pSession().sendMessage(target.getHost().getPersona().getDestination(), message, 0, message.length, I2PSession.PROTO_DATAGRAM,
Constants.TRACKER_PORT, Constants.TRACKER_PORT, options)
}
private static class PingInProgress {
private final SwarmManager.HostAndIH target
private final long pingTime
PingInProgress(SwarmManager.HostAndIH target, long pingTime) {
this.target = target
this.pingTime = pingTime
}
}
private class Listener implements I2PSessionMuxedListener {
@Override
public void messageAvailable(I2PSession session, int msgId, long size) {
}
@Override
public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) {
if (proto != I2PSession.PROTO_DATAGRAM) {
log.warning("received unexpected protocol $proto")
return
}
byte [] payload = session.receiveMessage(msgId)
def dissector = new I2PDatagramDissector()
try {
dissector.loadI2PDatagram(payload)
def sender = dissector.getSender()
log.info("got a response from ${sender.toBase32()}")
payload = dissector.getPayload()
def slurper = new JsonSlurper()
def json = slurper.parse(payload)
if (json.type != "TrackerPong") {
log.warning("unknown type ${json.type}")
return
}
if (json.me == null) {
log.warning("sender persona missing")
return
}
Persona senderPersona = new Persona(new ByteArrayInputStream(Base64.decode(json.me)))
if (sender != senderPersona.getDestination()) {
log.warning("persona in payload does not match sender ${senderPersona.getHumanReadableName()}")
return
}
if (json.uuid == null) {
log.warning("uuid missing")
return
}
UUID uuid = UUID.fromString(json.uuid)
def ping = inFlight.remove(uuid)
if (ping == null) {
log.warning("no ping in progress for $uuid")
return
}
if (json.code == null) {
log.warning("no code")
return
}
int code = json.code
if (json.xHave != null)
ping.target.host.xHave = json.xHave
Set<Persona> altlocs = new HashSet<>()
json.altlocs?.collect(altlocs,{ new Persona(new ByteArrayInputStream(Base64.decode(it))) })
log.info("For ${ping.target.infoHash} received code $code and altlocs ${altlocs.size()}")
swarmManager.handleResponse(ping.target, code, altlocs)
} catch (Exception e) {
log.log(Level.WARNING,"invalid datagram",e)
}
}
@Override
public void reportAbuse(I2PSession session, int severity) {
log.warning("reportabuse $session $severity")
}
@Override
public void disconnected(I2PSession session) {
log.severe("disconnected")
}
@Override
public void errorOccurred(I2PSession session, String message, Throwable error) {
log.log(Level.SEVERE,message,error)
}
}
}

View File

@@ -0,0 +1,71 @@
package com.muwire.tracker
class SetupWizard {
private final File home
SetupWizard(File home) {
this.home = home
}
Properties performSetup() {
println "**** Welcome to mwtrackerd setup wizard *****"
println "This wizard ask you some questions and configure the settings for the MuWire tracker daemon."
println "The settings will be saved in ${home.getAbsolutePath()} where you can edit them manually if you wish."
println "You can re-run this wizard by launching mwtrackerd with the \"setup\" argument."
println "*****************"
Scanner scanner = new Scanner(System.in)
Properties rv = new Properties()
// nickname
while(true) {
println "Please select a nickname for your tracker"
String nick = scanner.nextLine()
if (nick.trim().length() == 0) {
println "nickname cannot be empty"
continue
}
rv['nickname'] = nick
break
}
// i2cp host and port
println "Enter the address of an I2P or I2Pd router to connect to. (default is 127.0.0.1)"
String i2cpHost = scanner.nextLine()
if (i2cpHost.trim().length() == 0)
i2cpHost = "127.0.0.1"
rv['i2cp.tcp.host'] = i2cpHost
println "Enter the port of the I2CP interface of the I2P[d] router (default is 7654)"
String i2cpPort = scanner.nextLine()
if (i2cpPort.trim().length() == 0)
i2cpPort = "7654"
rv['i2cp.tcp.port'] = i2cpPort
// json-rpc interface
println "Enter the address to which to bind the JSON-RPC interface of the tracker."
println "Default is 127.0.0.1. If you want to allow JSON-RPC connections from other hosts you can enter 0.0.0.0"
String jsonRpcIface = scanner.nextLine()
if (jsonRpcIface.trim().length() == 0)
jsonRpcIface = "127.0.0.1"
rv['jsonrpc.iface'] = jsonRpcIface
println "Enter the port on which the JSON-RPC interface should listen. (default is 12345)"
String jsonRpcPort = scanner.nextLine()
if (jsonRpcPort.trim().length() == 0)
jsonRpcPort = "12345"
rv['jsonrpc.port'] = jsonRpcPort
// that's all
println "*****************"
println "That's all the setup that's required to get the tracker up and running."
println "The tracker has many other settings which can be changed in the config files."
println "Refer to the documentation for their description."
println "*****************"
rv
}
}

View File

@@ -0,0 +1,182 @@
package com.muwire.tracker
import java.util.function.Function
import com.muwire.core.InfoHash
import com.muwire.core.Persona
import groovy.util.logging.Log
/**
* A swarm for a given file
*/
@Log
class Swarm {
final InfoHash infoHash
/**
* Invariant: these four collections are mutually exclusive.
* A given host can be only in one of them at the same time.
*/
private final Map<Persona,Host> seeds = new HashMap<>()
private final Map<Persona,Host> leeches = new HashMap<>()
private final Map<Persona,Host> unknown = new HashMap<>()
private final Set<Persona> negative = new HashSet<>()
/**
* hosts which are currently being pinged. Hosts can be in here
* and in the collections above, except for negative.
*/
private final Map<Persona, Host> inFlight = new HashMap<>()
/**
* Last time a query was made to the MW network for this hash
*/
private long lastQueryTime
/**
* Last time a batch of hosts was pinged
*/
private long lastPingTime
Swarm(InfoHash infoHash) {
this.infoHash = infoHash
}
/**
* @param cutoff expire hosts older than this
*/
synchronized void expire(long cutoff, int maxFailures) {
doExpire(cutoff, maxFailures, seeds)
doExpire(cutoff, maxFailures, leeches)
doExpire(cutoff, maxFailures, unknown)
}
private static void doExpire(long cutoff, int maxFailures, Map<Persona,Host> map) {
for (Iterator<Persona> iter = map.keySet().iterator(); iter.hasNext();) {
Persona p = iter.next()
Host h = map.get(p)
if (h.isExpired(cutoff, maxFailures))
iter.remove()
}
}
synchronized boolean shouldQuery(long queryCutoff, long now) {
if (!(seeds.isEmpty() &&
leeches.isEmpty() &&
inFlight.isEmpty() &&
unknown.isEmpty()))
return false
if (lastQueryTime <= queryCutoff) {
lastQueryTime = now
return true
}
false
}
synchronized boolean isHealthy() {
!seeds.isEmpty()
// TODO add xHave accumulation of leeches
}
synchronized void add(Persona p) {
if (!(seeds.containsKey(p) || leeches.containsKey(p) ||
negative.contains(p) || inFlight.containsKey(p)))
unknown.computeIfAbsent(p, {new Host(it)} as Function)
}
synchronized void handleResponse(Host responder, int code) {
Host h = inFlight.remove(responder.persona)
if (responder != h)
log.warning("received a response mismatch from host $responder vs $h")
responder.lastResponded = System.currentTimeMillis()
responder.failures = 0
switch(code) {
case 200: addSeed(responder); break
case 206 : addLeech(responder); break;
default :
addNegative(responder)
}
}
synchronized void fail(Host failed) {
Host h = inFlight.remove(failed.persona)
if (h != failed)
log.warning("failed a host that wasn't in flight $failed vs $h")
h.failures++
}
private void addSeed(Host h) {
leeches.remove(h.persona)
unknown.remove(h.persona)
seeds.put(h.persona, h)
}
private void addLeech(Host h) {
unknown.remove(h.persona)
seeds.remove(h.persona)
leeches.put(h.persona, h)
}
private void addNegative(Host h) {
unknown.remove(h.persona)
seeds.remove(h.persona)
leeches.remove(h.persona)
negative.add(h.persona)
}
/**
* @param max number of hosts to give back
* @param now what time is it now
* @param cutoff only consider hosts which have been pinged before this time
* @return hosts to be pinged
*/
synchronized List<Host> getBatchToPing(int max, long now, long cutOff) {
List<Host> rv = new ArrayList<>()
rv.addAll(unknown.values())
rv.addAll(seeds.values())
rv.addAll(leeches.values())
rv.removeAll(inFlight.values())
rv.removeAll { it.lastPinged >= cutOff }
Collections.sort(rv, {l, r ->
Long.compare(l.lastPinged, r.lastPinged)
} as Comparator<Host>)
if (rv.size() > max)
rv = rv[0..(max-1)]
rv.each {
it.lastPinged = now
inFlight.put(it.persona, it)
}
if (!rv.isEmpty())
lastPingTime = now
rv
}
synchronized long getLastPingTime() {
lastPingTime
}
public Info info() {
List<String> seeders = seeds.keySet().collect { it.getHumanReadableName() }
List<String> leechers = leeches.keySet().collect { it.getHumanReadableName() }
return new Info(seeders, leechers, unknown.size(), negative.size())
}
public static class Info {
final List<String> seeders, leechers
final int unknown, negative
Info(List<String> seeders, List<String> leechers, int unknown, int negative) {
this.seeders = seeders
this.leechers = leechers
this.unknown = unknown
this.negative = negative
}
}
}

View File

@@ -0,0 +1,165 @@
package com.muwire.tracker
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
import javax.annotation.PostConstruct
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import com.muwire.core.Core
import com.muwire.core.InfoHash
import com.muwire.core.Persona
import com.muwire.core.search.QueryEvent
import com.muwire.core.search.SearchEvent
import com.muwire.core.search.UIResultBatchEvent
import com.muwire.core.util.DataUtil
import groovy.util.logging.Log
import net.i2p.crypto.DSAEngine
import net.i2p.data.Signature
@Component
@Log
class SwarmManager {
@Autowired
private Core core
@Autowired
private Pinger pinger
@Autowired
private TrackerProperties trackerProperties
private final Map<InfoHash, Swarm> swarms = new ConcurrentHashMap<>()
private final Map<UUID, InfoHash> queries = new ConcurrentHashMap<>()
private final Timer swarmTimer = new Timer("swarm-timer",true)
@PostConstruct
public void postConstruct() {
core.eventBus.register(UIResultBatchEvent.class, this)
swarmTimer.schedule({trackSwarms()} as TimerTask, 10 * 1000, 10 * 1000)
}
void onUIResultBatchEvent(UIResultBatchEvent e) {
InfoHash stored = queries.get(e.uuid)
InfoHash ih = e.results[0].infohash
if (ih != stored) {
log.warning("infohash mismatch in result $ih vs $stored")
return
}
Swarm swarm = swarms.get(ih)
if (swarm == null) {
log.warning("no swarm found for result with infoHash $ih")
return
}
log.info("got a result with uuid ${e.uuid} for infoHash $ih")
swarm.add(e.results[0].sender)
}
int countSwarms() {
swarms.size()
}
private void trackSwarms() {
final long now = System.currentTimeMillis()
final long expiryCutoff = now - trackerProperties.getSwarmParameters().getExpiry() * 60 * 1000L
final int maxFailures = trackerProperties.getSwarmParameters().getMaxFailures()
swarms.values().each { it.expire(expiryCutoff, maxFailures) }
final long queryCutoff = now - trackerProperties.getSwarmParameters().getQueryInterval() * 60 * 60 * 1000L
swarms.values().each {
if (it.shouldQuery(queryCutoff, now))
query(it)
}
List<Swarm> swarmList = new ArrayList<>(swarms.values())
Collections.sort(swarmList,{Swarm x, Swarm y ->
Long.compare(x.getLastPingTime(), y.getLastPingTime())
} as Comparator<Swarm>)
List<HostAndIH> toPing = new ArrayList<>()
final int amount = trackerProperties.getSwarmParameters().getPingParallel()
final long pingCutoff = now - trackerProperties.getSwarmParameters().getPingInterval() * 60 * 1000L
for(int i = 0; i < swarmList.size() && toPing.size() < amount; i++) {
Swarm s = swarmList.get(i)
List<Host> hostsFromSwarm = s.getBatchToPing(amount - toPing.size(), now, pingCutoff)
hostsFromSwarm.collect(toPing, { host -> new HostAndIH(host, s.getInfoHash())})
}
log.info("will ping $toPing")
toPing.each { pinger.ping(it, now) }
}
private void query(Swarm swarm) {
InfoHash infoHash = swarm.getInfoHash()
cleanQueryMap(infoHash)
UUID uuid = UUID.randomUUID()
queries.put(uuid, infoHash)
log.info("will query MW network for $infoHash with uuid $uuid")
def searchEvent = new SearchEvent(searchHash : infoHash.getRoot(), uuid: uuid, oobInfohash: true, compressedResults : true, persona : core.me)
byte [] payload = infoHash.getRoot()
boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop
Signature sig = DSAEngine.getInstance().sign(payload, core.spk)
long timestamp = System.currentTimeMillis()
core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop,
replyTo: core.me.destination, receivedOn: core.me.destination,
originator : core.me, sig : sig.data, queryTime : timestamp, sig2 : DataUtil.signUUID(uuid, timestamp, core.spk)))
}
void track(InfoHash infoHash) {
swarms.computeIfAbsent(infoHash, {new Swarm(it)} as Function)
}
boolean forget(InfoHash infoHash) {
Swarm swarm = swarms.remove(infoHash)
if (swarm != null) {
cleanQueryMap(infoHash)
return true
} else
return false
}
private void cleanQueryMap(InfoHash infoHash) {
queries.values().removeAll {it == infoHash}
}
Swarm.Info info(InfoHash infoHash) {
swarms.get(infoHash)?.info()
}
void fail(HostAndIH target) {
log.info("failing $target")
swarms.get(target.infoHash)?.fail(target.host)
}
void handleResponse(HostAndIH target, int code, Set<Persona> altlocs) {
Swarm swarm = swarms.get(target.infoHash)
swarm?.handleResponse(target.host, code)
altlocs.each {
swarm?.add(it)
}
}
public static class HostAndIH {
final Host host
final InfoHash infoHash
HostAndIH(Host host, InfoHash infoHash) {
this.host = host
this.infoHash = infoHash
}
@Override
public String toString() {
"$host:$infoHash"
}
}
}

View File

@@ -0,0 +1,10 @@
package com.muwire.tracker;
public class TrackRequest {
String infoHash;
@Override
public String toString() {
return "infoHash: " +infoHash;
}
}

View File

@@ -0,0 +1,119 @@
package com.muwire.tracker
import java.nio.charset.StandardCharsets
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.web.server.ConfigurableWebServerFactory
import org.springframework.boot.web.server.WebServerFactoryCustomizer
import org.springframework.context.annotation.Bean
import com.googlecode.jsonrpc4j.spring.JsonServiceExporter
import com.muwire.core.Core
import com.muwire.core.MuWireSettings
import com.muwire.core.UILoadedEvent
import com.muwire.core.files.AllFilesLoadedEvent
@SpringBootApplication
class Tracker {
private static final String VERSION = System.getProperty("build.version")
private static Core core
private static TrackerService trackerService
public static void main(String [] args) {
println "Launching MuWire Tracker version $VERSION"
File home = new File(System.getProperty("user.home"))
home = new File(home, ".mwtrackerd")
home.mkdir()
File mwProps = new File(home, "MuWire.properties")
File i2pProps = new File(home, "i2p.properties")
File trackerProps = new File(home, "tracker.properties")
boolean launchSetup = false
if (args.length > 0 && args[0] == "setup") {
println "Setup requested, entering setup wizard"
launchSetup = true
} else if (!(mwProps.exists() && i2pProps.exists() && trackerProps.exists())) {
println "Config files not found, entering setup wizard"
launchSetup = true
}
if (launchSetup) {
SetupWizard wizard = new SetupWizard(home)
Properties props = wizard.performSetup()
// nickname goes to mw.props
MuWireSettings mwSettings = new MuWireSettings()
mwSettings.nickname = props['nickname']
mwProps.withPrintWriter("UTF-8", {
mwSettings.write(it)
})
// i2cp host & port go in i2p.properties
def i2cpProps = new Properties()
i2cpProps['i2cp.tcp.port'] = props['i2cp.tcp.port']
i2cpProps['i2cp.tcp.host'] = props['i2cp.tcp.host']
i2cpProps['inbound.nickname'] = "MuWire Tracker"
i2cpProps['outbound.nickname'] = "MuWire Tracker"
i2pProps.withPrintWriter { i2cpProps.store(it, "") }
// json rcp props go in tracker.properties
def jsonProps = new Properties()
jsonProps['tracker.jsonRpc.iface'] = props['jsonrpc.iface']
jsonProps['tracker.jsonRpc.port'] = props['jsonrpc.port']
trackerProps.withPrintWriter { jsonProps.store(it, "") }
}
Properties p = new Properties()
mwProps.withReader("UTF-8", { p.load(it) } )
MuWireSettings muSettings = new MuWireSettings(p)
p = new Properties()
trackerProps.withInputStream { p.load(it) }
core = new Core(muSettings, home, VERSION)
// init json service object
trackerService = new TrackerServiceImpl(core)
core.eventBus.with {
register(UILoadedEvent.class, trackerService)
}
Thread coreStarter = new Thread({
core.startServices()
core.eventBus.publish(new UILoadedEvent())
} as Runnable)
coreStarter.start()
System.setProperty("spring.config.location", trackerProps.getAbsolutePath())
SpringApplication.run(Tracker.class, args)
}
@Bean
Core core() {
core
}
@Bean
public TrackerService trackerService() {
trackerService
}
@Bean(name = '/tracker')
public JsonServiceExporter jsonServiceExporter() {
def exporter = new JsonServiceExporter()
exporter.setService(trackerService())
exporter.setServiceInterface(TrackerService.class)
exporter
}
}

View File

@@ -0,0 +1,33 @@
package com.muwire.tracker
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.stereotype.Component
@Component
@ConfigurationProperties("tracker")
class TrackerProperties {
final JsonRpc jsonRpc = new JsonRpc()
public static class JsonRpc {
InetAddress iface
int port
}
final SwarmParameters swarmParameters = new SwarmParameters()
public static class SwarmParameters {
/** how often to kick of queries on the MW net, in hours */
int queryInterval = 1
/** how many hosts to ping in parallel */
int pingParallel = 5
/** interval of time between pinging the same host, in minutes */
int pingInterval = 15
/** how long to wait before declaring a host is dead, in minutes */
int expiry = 60
/** how long to wait for a host to respond to a ping, in seconds */
int pingTimeout = 20
/** Do not expire a host until it has failed this many times */
int maxFailures = 3
}
}

View File

@@ -0,0 +1,8 @@
package com.muwire.tracker;
public interface TrackerService {
public TrackerStatus status();
public void track(String infoHash);
public boolean forget(String infoHash);
public Swarm.Info info(String infoHash);
}

View File

@@ -0,0 +1,54 @@
package com.muwire.tracker
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import com.muwire.core.Core
import com.muwire.core.InfoHash
import com.muwire.core.UILoadedEvent
import net.i2p.data.Base64
@Component
class TrackerServiceImpl implements TrackerService {
private final TrackerStatus status = new TrackerStatus()
private final Core core
@Autowired
private SwarmManager swarmManager
TrackerServiceImpl(Core core) {
this.core = core
status.status = "Starting"
}
public TrackerStatus status() {
status.connections = core.getConnectionManager().getConnections().size()
status.swarms = swarmManager.countSwarms()
status
}
void onUILoadedEvent(UILoadedEvent e) {
status.status = "Running"
}
@Override
public void track(String infoHash) {
InfoHash ih = new InfoHash(Base64.decode(infoHash))
swarmManager.track(ih)
}
@Override
public boolean forget(String infoHash) {
InfoHash ih = new InfoHash(Base64.decode(infoHash))
swarmManager.forget(ih)
}
@Override
public Swarm.Info info(String infoHash) {
InfoHash ih = new InfoHash(Base64.decode(infoHash))
swarmManager.info(ih)
}
}

View File

@@ -0,0 +1,7 @@
package com.muwire.tracker
class TrackerStatus {
volatile String status
int connections
int swarms
}

View File

@@ -0,0 +1,20 @@
package com.muwire.tracker
import org.springframework.boot.web.server.ConfigurableWebServerFactory
import org.springframework.boot.web.server.WebServerFactoryCustomizer
import org.springframework.stereotype.Component
@Component
class WebServerConfiguration implements WebServerFactoryCustomizer<ConfigurableWebServerFactory> {
private final TrackerProperties trackerProperties
WebServerConfiguration(TrackerProperties trackerProperties) {
this.trackerProperties = trackerProperties;
}
@Override
public void customize(ConfigurableWebServerFactory factory) {
factory.setAddress(trackerProperties.jsonRpc.getIface())
factory.setPort(trackerProperties.jsonRpc.port)
}
}

View File

@@ -82,6 +82,7 @@ public class ConfigurationServlet extends HttpServlet {
core.getMuOptions().setAutoPublishSharedFiles(false);
core.getMuOptions().setDefaultFeedAutoDownload(false);
core.getMuOptions().setDefaultFeedSequential(false);
core.getMuOptions().setAllowTracking(false);
}
private void update(String name, String value) throws Exception {
@@ -99,6 +100,7 @@ public class ConfigurationServlet extends HttpServlet {
case "shareHiddenFiles" : core.getMuOptions().setShareHiddenFiles(true); break;
case "searchComments" : core.getMuOptions().setSearchComments(true); break;
case "browseFiles" : core.getMuOptions().setBrowseFiles(true); break;
case "allowTracking" : core.getMuOptions().setAllowTracking(true); break;
case "speedSmoothSeconds" : core.getMuOptions().setSpeedSmoothSeconds(Integer.parseInt(value)); break;
case "inbound.length" : core.getI2pOptions().setProperty(name, value); break;
case "inbound.quantity" : core.getI2pOptions().setProperty(name, value); break;

View File

@@ -59,6 +59,14 @@ Exception error = (Exception) application.getAttribute("MWConfigError");
</td>
<td><p align="right"><input type="checkbox" <% if (core.getMuOptions().getBrowseFiles()) out.write("checked"); %> name="browseFiles" value="true"></p></td>
</tr>
<tr>
<td>
<div class="tooltip"><%=Util._t("Allow tracking")%>
<span class="tooltiptext"><%=Util._t("Allow trackers to track your shared files?")%></span>
</div>
</td>
<td><p align="right"><input type="checkbox" <% if (core.getMuOptions().getAllowTracking()) out.write("checked"); %> name="allowTracking" value="true"></p></td>
</tr>
</table>
</div>
<div class="configuration-section">