diff --git a/tracker/src/main/groovy/com/muwire/tracker/Host.groovy b/tracker/src/main/groovy/com/muwire/tracker/Host.groovy new file mode 100644 index 00000000..8b38920c --- /dev/null +++ b/tracker/src/main/groovy/com/muwire/tracker/Host.groovy @@ -0,0 +1,27 @@ +package com.muwire.tracker + +import com.muwire.core.Persona + +/** + * A participant in a swarm. The same persona can be a member of multiple + * swarms, but in that case it would have multiple Host objects + */ +class Host { + final Persona persona + long lastPinged + long lastResponded + volatile String xHave + + Host(Persona persona) { + this.persona = persona + } + + boolean isExpired(long cutoff) { + lastPinged > lastResponded && lastResponded <= cutoff + } + + @Override + public String toString() { + "Host:[${persona.getHumanReadableName()} lastPinged:$lastPinged lastResponded:$lastResponded xHave:$xHave]" + } +} diff --git a/tracker/src/main/groovy/com/muwire/tracker/Pinger.groovy b/tracker/src/main/groovy/com/muwire/tracker/Pinger.groovy new file mode 100644 index 00000000..44073fe1 --- /dev/null +++ b/tracker/src/main/groovy/com/muwire/tracker/Pinger.groovy @@ -0,0 +1,14 @@ +package com.muwire.tracker + +import org.springframework.stereotype.Component + +import com.muwire.core.Core + +@Component +class Pinger { + private final Core core + + Pinger(Core core) { + this.core = core + } +} diff --git a/tracker/src/main/groovy/com/muwire/tracker/Swarm.groovy b/tracker/src/main/groovy/com/muwire/tracker/Swarm.groovy new file mode 100644 index 00000000..21a46587 --- /dev/null +++ b/tracker/src/main/groovy/com/muwire/tracker/Swarm.groovy @@ -0,0 +1,133 @@ +package com.muwire.tracker + +import java.util.function.Function + +import com.muwire.core.InfoHash +import com.muwire.core.Persona + +import groovy.util.logging.Log + +/** + * A swarm for a given file + */ +@Log +class Swarm { + private final InfoHash infoHash + + /** + * Invariant: these four collections are mutually exclusive. + * A given host can be only in one of them at the same time. + */ + private final Map seeds = new HashMap<>() + private final Map leeches = new HashMap<>() + private final Map unknown = new HashMap<>() + private final Set negative = new HashSet<>() + + /** + * hosts which are currently being pinged. Hosts can be in here + * and in the collections above, except for negative. + */ + private final Map inFlight = new HashMap<>() + + Swarm(InfoHash infoHash) { + this.infoHash = infoHash + } + + /** + * @param cutoff expire hosts older than this + */ + synchronized void expire(long cutoff) { + doExpire(cutoff, seeds) + doExpire(cutoff, leeches) + doExpire(cutoff, unknown) + } + + private static void doExpire(long cutoff, Map map) { + for (Iterator iter = map.keySet().iterator(); iter.hasNext();) { + Persona p = iter.next() + Host h = map.get(p) + if (h.isExpired(cutoff)) + iter.remove() + } + } + + synchronized boolean needsQuery() { + seeds.isEmpty() && + leeches.isEmpty() && + inFlight.isEmpty() && + unknown.isEmpty() + } + + synchronized boolean isHealthy() { + !seeds.isEmpty() + // TODO add xHave accumulation of leeches + } + + synchronized void add(Persona p) { + if (!(seeds.containsKey(p) || leeches.containsKey(p) || + negative.contains(p) || inFlight.containsKey(p))) + unknown.computeIfAbsent(p, {new Host(it)} as Function) + } + + synchronized void handleResponse(Host responder, int code) { + Host h = inFlight.remove(responder.persona) + if (responder != h) + log.warning("received a response mismatch from host $responder vs $h") + + responder.lastResponded = System.currentTimeMillis() + switch(code) { + case 200: addSeed(responder); break + case 206 : addLeech(responder); break; + default : + addNegative(responder) + } + } + + synchronized void fail(Host failed) { + Host h = inFlight.remove(failed.persona) + if (h != failed) + log.warning("failed a host that wasn't in flight $failed vs $h") + } + + private void addSeed(Host h) { + leeches.remove(h.persona) + unknown.remove(h.persona) + seeds.put(h.persona, h) + } + + private void addLeech(Host h) { + unknown.remove(h.persona) + seeds.remove(h.persona) + leeches.put(h.persona, h) + } + + private void addNegative(Host h) { + unknown.remove(h.persona) + seeds.remove(h.persona) + leeches.remove(h.persona) + negative.add(h.persona) + } + + synchronized List getBatchToPing(int max) { + List rv = new ArrayList<>() + rv.addAll(unknown.values()) + rv.addAll(seeds.values()) + rv.addAll(leeches.values()) + rv.removeAll(inFlight.values()) + + Collections.sort(rv, {l, r -> + Long.compare(l.lastPinged, r.lastPinged) + } as Comparator) + + if (rv.size() > max) + rv = rv[0..(max-1)] + + final long now = System.currentTimeMillis() + rv.each { + it.lastPinged = now + inFlight.put(it.persona, it) + } + + rv + } +} diff --git a/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy b/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy new file mode 100644 index 00000000..3f4cf8bc --- /dev/null +++ b/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy @@ -0,0 +1,64 @@ +package com.muwire.tracker + +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Function + +import javax.annotation.PostConstruct + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Component + +import com.muwire.core.Core +import com.muwire.core.InfoHash +import com.muwire.core.search.QueryEvent +import com.muwire.core.search.SearchEvent +import com.muwire.core.search.UIResultBatchEvent +import com.muwire.core.util.DataUtil + +import groovy.util.logging.Log +import net.i2p.crypto.DSAEngine +import net.i2p.data.Signature + +@Component +@Log +class SwarmManager { + @Autowired + private Core core + + @Autowired + private Pinger pinger + + private final Map swarms = new ConcurrentHashMap<>() + + @PostConstruct + public void postConstruct() { + core.eventBus.register(UIResultBatchEvent.class, this) + } + + void onUIResultBatchEvent(UIResultBatchEvent e) { + InfoHash ih = e.results[0].infohash + Swarm swarm = swarms.get(ih) + if (swarm == null) { + log.warning("no swarm found for result with infoHash $ih") + return + } + + swarm.add(e.results[0].sender) + } + + void track(InfoHash infoHash) { + Swarm swarm = swarms.computeIfAbsent(infoHash, {new Swarm(it)} as Function) + if (swarm.needsQuery()) { + UUID uuid = UUID.randomUUID() + def searchEvent = new SearchEvent(searchHash : infoHash.getRoot(), uuid: uuid, oobInfohash: true, compressedResults : true, persona : core.me) + byte [] payload = infoHash.getRoot() + boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop + + Signature sig = DSAEngine.getInstance().sign(payload, core.spk) + long timestamp = System.currentTimeMillis() + core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop, + replyTo: core.me.destination, receivedOn: core.me.destination, + originator : core.me, sig : sig.data, queryTime : timestamp, sig2 : DataUtil.signUUID(uuid, timestamp, core.spk))) + } + } +} diff --git a/tracker/src/main/groovy/com/muwire/tracker/Tracker.groovy b/tracker/src/main/groovy/com/muwire/tracker/Tracker.groovy index 2fdc50f8..2ec3c2a7 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/Tracker.groovy +++ b/tracker/src/main/groovy/com/muwire/tracker/Tracker.groovy @@ -122,6 +122,11 @@ class Tracker { } + @Bean + Core core() { + core + } + @Bean WebServerFactoryCustomizer wsCustomizer() { wsCustomizer diff --git a/tracker/src/main/groovy/com/muwire/tracker/TrackerService.java b/tracker/src/main/groovy/com/muwire/tracker/TrackerService.java index aaae77c2..4ace8c46 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/TrackerService.java +++ b/tracker/src/main/groovy/com/muwire/tracker/TrackerService.java @@ -2,4 +2,5 @@ package com.muwire.tracker; public interface TrackerService { public TrackerStatus status(); + public void track(String infoHash); } diff --git a/tracker/src/main/groovy/com/muwire/tracker/TrackerServiceImpl.groovy b/tracker/src/main/groovy/com/muwire/tracker/TrackerServiceImpl.groovy index 573ad1c7..2790c146 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/TrackerServiceImpl.groovy +++ b/tracker/src/main/groovy/com/muwire/tracker/TrackerServiceImpl.groovy @@ -1,13 +1,23 @@ package com.muwire.tracker +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Component + import com.muwire.core.Core +import com.muwire.core.InfoHash import com.muwire.core.UILoadedEvent +import net.i2p.data.Base64 + +@Component class TrackerServiceImpl implements TrackerService { private final TrackerStatus status = new TrackerStatus() private final Core core + @Autowired + private SwarmManager swarmManager + TrackerServiceImpl(Core core) { this.core = core status.status = "Starting" @@ -22,4 +32,10 @@ class TrackerServiceImpl implements TrackerService { void onUILoadedEvent(UILoadedEvent e) { status.status = "Running" } + + @Override + public void track(String infoHash) { + InfoHash ih = new InfoHash(Base64.decode(infoHash)) + swarmManager.track(ih) + } }