diff --git a/tracker/src/main/groovy/com/muwire/tracker/Pinger.groovy b/tracker/src/main/groovy/com/muwire/tracker/Pinger.groovy index 44073fe1..7e2e1393 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/Pinger.groovy +++ b/tracker/src/main/groovy/com/muwire/tracker/Pinger.groovy @@ -1,14 +1,113 @@ package com.muwire.tracker +import java.util.concurrent.ConcurrentHashMap +import java.util.logging.Level + +import javax.annotation.PostConstruct + +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Component +import com.muwire.core.Constants import com.muwire.core.Core +import groovy.json.JsonOutput +import groovy.util.logging.Log +import net.i2p.client.I2PSession +import net.i2p.client.I2PSessionMuxedListener +import net.i2p.client.SendMessageOptions +import net.i2p.client.datagram.I2PDatagramMaker +import net.i2p.data.Base64 + @Component +@Log class Pinger { - private final Core core + @Autowired + private Core core - Pinger(Core core) { - this.core = core + @Autowired + private SwarmManager swarmManager + + @Autowired + private TrackerProperties trackerProperties + + private final Map inFlight = new ConcurrentHashMap<>() + private final Timer expiryTimer = new Timer("pinger-timer",true) + + @PostConstruct + private void registerListener() { + core.getI2pSession().addMuxedSessionListener(new Listener(), I2PSession.PROTO_DATAGRAM, Constants.TRACKER_PORT) + expiryTimer.schedule({expirePings()} as TimerTask, 1000, 1000) } + + private void expirePings() { + final long now = System.currentTimeMillis() + for(Iterator iter = inFlight.keySet().iterator(); iter.hasNext();) { + UUID uuid = iter.next() + PingInProgress ping = inFlight.get(uuid) + if (now - ping.pingTime > trackerProperties.getSwarmParameters().getPingTimeout() * 1000L) { + iter.remove() + swarmManager.fail(ping.target) + } + } + } + + void ping(SwarmManager.HostAndIH target, long now) { + UUID uuid = UUID.randomUUID() + def ping = new PingInProgress(target, now) + inFlight.put(uuid, ping) + + def message = [:] + message.type = "TrackerPing" + message.version = 1 + message.infoHash = Base64.encode(target.getInfoHash().getRoot()) + message.uuid = uuid.toString() + + message = JsonOutput.toJson(message) + def maker = new I2PDatagramMaker(core.getI2pSession()) + message = maker.makeI2PDatagram(message.bytes) + def options = new SendMessageOptions() + options.setSendLeaseSet(true) + core.getI2pSession().sendMessage(target.getHost().getPersona().getDestination(), message, 0, message.length, I2PSession.PROTO_DATAGRAM, + Constants.TRACKER_PORT, Constants.TRACKER_PORT, options) + } + + private static class PingInProgress { + private final SwarmManager.HostAndIH target + private final long pingTime + PingInProgress(SwarmManager.HostAndIH target, long pingTime) { + this.target = target + this.pingTime = pingTime + } + } + + private class Listener implements I2PSessionMuxedListener { + + @Override + public void messageAvailable(I2PSession session, int msgId, long size) { + } + + @Override + public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromport, int toport) { + // TODO Auto-generated method stub + + } + + @Override + public void reportAbuse(I2PSession session, int severity) { + log.warning("reportabuse $session $severity") + } + + @Override + public void disconnected(I2PSession session) { + log.severe("disconnected") + } + + @Override + public void errorOccurred(I2PSession session, String message, Throwable error) { + log.log(Level.SEVERE,message,error) + } + + } + } diff --git a/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy b/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy index 56ae4a1a..4cb00348 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy +++ b/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy @@ -90,6 +90,8 @@ class SwarmManager { } log.info("will ping $toPing") + + toPing.each { pinger.ping(it, now) } } private void query(Swarm swarm) { @@ -132,9 +134,13 @@ class SwarmManager { swarms.get(infoHash)?.info() } - private static class HostAndIH { - private final Host host - private final InfoHash infoHash + void fail(HostAndIH target) { + swarms.get(target.infoHash)?.fail(target.host) + } + + public static class HostAndIH { + final Host host + final InfoHash infoHash HostAndIH(Host host, InfoHash infoHash) { this.host = host this.infoHash = infoHash diff --git a/tracker/src/main/groovy/com/muwire/tracker/TrackerProperties.groovy b/tracker/src/main/groovy/com/muwire/tracker/TrackerProperties.groovy index 994754df..3fcda9ef 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/TrackerProperties.groovy +++ b/tracker/src/main/groovy/com/muwire/tracker/TrackerProperties.groovy @@ -25,5 +25,7 @@ class TrackerProperties { int pingInterval = 15 /** how long to wait before declaring a host is dead, in minutes */ int expiry = 60 + /** how long to wait for a host to respond to a ping, in seconds */ + int pingTimeout = 20 } }