working injection and query kickoff through json-rpc, wip on swarm monitoring

This commit is contained in:
Zlatin Balevsky
2020-04-28 23:35:29 +01:00
parent 7860aa2b1c
commit 71789d96d2
7 changed files with 260 additions and 0 deletions

View File

@@ -0,0 +1,27 @@
package com.muwire.tracker
import com.muwire.core.Persona
/**
* A participant in a swarm. The same persona can be a member of multiple
* swarms, but in that case it would have multiple Host objects
*/
class Host {
final Persona persona
long lastPinged
long lastResponded
volatile String xHave
Host(Persona persona) {
this.persona = persona
}
boolean isExpired(long cutoff) {
lastPinged > lastResponded && lastResponded <= cutoff
}
@Override
public String toString() {
"Host:[${persona.getHumanReadableName()} lastPinged:$lastPinged lastResponded:$lastResponded xHave:$xHave]"
}
}

View File

@@ -0,0 +1,14 @@
package com.muwire.tracker
import org.springframework.stereotype.Component
import com.muwire.core.Core
@Component
class Pinger {
private final Core core
Pinger(Core core) {
this.core = core
}
}

View File

@@ -0,0 +1,133 @@
package com.muwire.tracker
import java.util.function.Function
import com.muwire.core.InfoHash
import com.muwire.core.Persona
import groovy.util.logging.Log
/**
* A swarm for a given file
*/
@Log
class Swarm {
private final InfoHash infoHash
/**
* Invariant: these four collections are mutually exclusive.
* A given host can be only in one of them at the same time.
*/
private final Map<Persona,Host> seeds = new HashMap<>()
private final Map<Persona,Host> leeches = new HashMap<>()
private final Map<Persona,Host> unknown = new HashMap<>()
private final Set<Persona> negative = new HashSet<>()
/**
* hosts which are currently being pinged. Hosts can be in here
* and in the collections above, except for negative.
*/
private final Map<Persona, Host> inFlight = new HashMap<>()
Swarm(InfoHash infoHash) {
this.infoHash = infoHash
}
/**
* @param cutoff expire hosts older than this
*/
synchronized void expire(long cutoff) {
doExpire(cutoff, seeds)
doExpire(cutoff, leeches)
doExpire(cutoff, unknown)
}
private static void doExpire(long cutoff, Map<Persona,Host> map) {
for (Iterator<Persona> iter = map.keySet().iterator(); iter.hasNext();) {
Persona p = iter.next()
Host h = map.get(p)
if (h.isExpired(cutoff))
iter.remove()
}
}
synchronized boolean needsQuery() {
seeds.isEmpty() &&
leeches.isEmpty() &&
inFlight.isEmpty() &&
unknown.isEmpty()
}
synchronized boolean isHealthy() {
!seeds.isEmpty()
// TODO add xHave accumulation of leeches
}
synchronized void add(Persona p) {
if (!(seeds.containsKey(p) || leeches.containsKey(p) ||
negative.contains(p) || inFlight.containsKey(p)))
unknown.computeIfAbsent(p, {new Host(it)} as Function)
}
synchronized void handleResponse(Host responder, int code) {
Host h = inFlight.remove(responder.persona)
if (responder != h)
log.warning("received a response mismatch from host $responder vs $h")
responder.lastResponded = System.currentTimeMillis()
switch(code) {
case 200: addSeed(responder); break
case 206 : addLeech(responder); break;
default :
addNegative(responder)
}
}
synchronized void fail(Host failed) {
Host h = inFlight.remove(failed.persona)
if (h != failed)
log.warning("failed a host that wasn't in flight $failed vs $h")
}
private void addSeed(Host h) {
leeches.remove(h.persona)
unknown.remove(h.persona)
seeds.put(h.persona, h)
}
private void addLeech(Host h) {
unknown.remove(h.persona)
seeds.remove(h.persona)
leeches.put(h.persona, h)
}
private void addNegative(Host h) {
unknown.remove(h.persona)
seeds.remove(h.persona)
leeches.remove(h.persona)
negative.add(h.persona)
}
synchronized List<Host> getBatchToPing(int max) {
List<Host> rv = new ArrayList<>()
rv.addAll(unknown.values())
rv.addAll(seeds.values())
rv.addAll(leeches.values())
rv.removeAll(inFlight.values())
Collections.sort(rv, {l, r ->
Long.compare(l.lastPinged, r.lastPinged)
} as Comparator<Host>)
if (rv.size() > max)
rv = rv[0..(max-1)]
final long now = System.currentTimeMillis()
rv.each {
it.lastPinged = now
inFlight.put(it.persona, it)
}
rv
}
}

View File

@@ -0,0 +1,64 @@
package com.muwire.tracker
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
import javax.annotation.PostConstruct
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import com.muwire.core.Core
import com.muwire.core.InfoHash
import com.muwire.core.search.QueryEvent
import com.muwire.core.search.SearchEvent
import com.muwire.core.search.UIResultBatchEvent
import com.muwire.core.util.DataUtil
import groovy.util.logging.Log
import net.i2p.crypto.DSAEngine
import net.i2p.data.Signature
@Component
@Log
class SwarmManager {
@Autowired
private Core core
@Autowired
private Pinger pinger
private final Map<InfoHash, Swarm> swarms = new ConcurrentHashMap<>()
@PostConstruct
public void postConstruct() {
core.eventBus.register(UIResultBatchEvent.class, this)
}
void onUIResultBatchEvent(UIResultBatchEvent e) {
InfoHash ih = e.results[0].infohash
Swarm swarm = swarms.get(ih)
if (swarm == null) {
log.warning("no swarm found for result with infoHash $ih")
return
}
swarm.add(e.results[0].sender)
}
void track(InfoHash infoHash) {
Swarm swarm = swarms.computeIfAbsent(infoHash, {new Swarm(it)} as Function)
if (swarm.needsQuery()) {
UUID uuid = UUID.randomUUID()
def searchEvent = new SearchEvent(searchHash : infoHash.getRoot(), uuid: uuid, oobInfohash: true, compressedResults : true, persona : core.me)
byte [] payload = infoHash.getRoot()
boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop
Signature sig = DSAEngine.getInstance().sign(payload, core.spk)
long timestamp = System.currentTimeMillis()
core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop,
replyTo: core.me.destination, receivedOn: core.me.destination,
originator : core.me, sig : sig.data, queryTime : timestamp, sig2 : DataUtil.signUUID(uuid, timestamp, core.spk)))
}
}
}

View File

@@ -122,6 +122,11 @@ class Tracker {
}
@Bean
Core core() {
core
}
@Bean
WebServerFactoryCustomizer<ConfigurableWebServerFactory> wsCustomizer() {
wsCustomizer

View File

@@ -2,4 +2,5 @@ package com.muwire.tracker;
public interface TrackerService {
public TrackerStatus status();
public void track(String infoHash);
}

View File

@@ -1,13 +1,23 @@
package com.muwire.tracker
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import com.muwire.core.Core
import com.muwire.core.InfoHash
import com.muwire.core.UILoadedEvent
import net.i2p.data.Base64
@Component
class TrackerServiceImpl implements TrackerService {
private final TrackerStatus status = new TrackerStatus()
private final Core core
@Autowired
private SwarmManager swarmManager
TrackerServiceImpl(Core core) {
this.core = core
status.status = "Starting"
@@ -22,4 +32,10 @@ class TrackerServiceImpl implements TrackerService {
void onUILoadedEvent(UILoadedEvent e) {
status.status = "Running"
}
@Override
public void track(String infoHash) {
InfoHash ih = new InfoHash(Base64.decode(infoHash))
swarmManager.track(ih)
}
}