I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit f00b8647 authored by zzz's avatar zzz
Browse files

Streamr: Add expiration timer

Log tweaks
parent 90bc0043
No related branches found
No related tags found
No related merge requests found
...@@ -3,8 +3,10 @@ package net.i2p.i2ptunnel.streamr; ...@@ -3,8 +3,10 @@ package net.i2p.i2ptunnel.streamr;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.List; import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*; import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.Log;
/** /**
* Sends to many Sinks * Sends to many Sinks
...@@ -12,6 +14,7 @@ import net.i2p.i2ptunnel.udp.*; ...@@ -12,6 +14,7 @@ import net.i2p.i2ptunnel.udp.*;
* @author zzz modded for I2PTunnel * @author zzz modded for I2PTunnel
*/ */
public class MultiSource implements Source, Sink { public class MultiSource implements Source, Sink {
private final Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
public MultiSource() { public MultiSource() {
this.sinks = new CopyOnWriteArrayList<Destination>(); this.sinks = new CopyOnWriteArrayList<Destination>();
...@@ -32,6 +35,14 @@ public class MultiSource implements Source, Sink { ...@@ -32,6 +35,14 @@ public class MultiSource implements Source, Sink {
* @throws RuntimeException * @throws RuntimeException
*/ */
public void send(Destination ignored_from, byte[] data) { public void send(Destination ignored_from, byte[] data) {
if (sinks.isEmpty()) {
if (log.shouldDebug())
log.debug("No subscribers to send " + data.length + " bytes to");
return;
}
if (log.shouldDebug())
log.debug("Sending " + data.length + " bytes to " + sinks.size() + " subscribers");
for(Destination dest : this.sinks) { for(Destination dest : this.sinks) {
this.sink.send(dest, data); this.sink.send(dest, data);
} }
......
...@@ -10,6 +10,7 @@ import net.i2p.util.Log; ...@@ -10,6 +10,7 @@ import net.i2p.util.Log;
* @author welterde/zzz * @author welterde/zzz
*/ */
public class Pinger implements Source, Runnable { public class Pinger implements Source, Runnable {
private final Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
public Pinger() { public Pinger() {
this.thread = new I2PAppThread(this); this.thread = new I2PAppThread(this);
...@@ -35,6 +36,8 @@ public class Pinger implements Source, Runnable { ...@@ -35,6 +36,8 @@ public class Pinger implements Source, Runnable {
data[0] = 1; data[0] = 1;
try { try {
this.sink.send(null, data); this.sink.send(null, data);
if (log.shouldDebug())
log.debug("Sent unsubscribe");
} catch (RuntimeException re) {} } catch (RuntimeException re) {}
} }
...@@ -47,8 +50,9 @@ public class Pinger implements Source, Runnable { ...@@ -47,8 +50,9 @@ public class Pinger implements Source, Runnable {
//System.out.print("p"); //System.out.print("p");
try { try {
this.sink.send(null, data); this.sink.send(null, data);
if (log.shouldDebug())
log.debug("Sent subscribe");
} catch (RuntimeException re) { } catch (RuntimeException re) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn()) if (log.shouldWarn())
log.warn("error sending", re); log.warn("error sending", re);
break; break;
......
package net.i2p.i2ptunnel.streamr; package net.i2p.i2ptunnel.streamr;
import java.util.Set; import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*; import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/** /**
* server-mode * server-mode
...@@ -15,10 +18,21 @@ import net.i2p.util.Log; ...@@ -15,10 +18,21 @@ import net.i2p.util.Log;
*/ */
public class Subscriber implements Sink { public class Subscriber implements Sink {
private final I2PAppContext ctx = I2PAppContext.getGlobalContext();
private final Log log = ctx.logManager().getLog(getClass());
private final Map<Destination, Long> subscriptions;
private final MultiSource multi;
private final SimpleTimer2.TimedEvent timer;
private volatile boolean timerRunning;
private static final int MAX_SUBSCRIPTIONS = 10;
private static final long EXPIRATION = 60*1000;
public Subscriber(MultiSource multi) { public Subscriber(MultiSource multi) {
this.multi = multi; this.multi = multi;
// subscriptions // subscriptions
this.subscriptions = new ConcurrentHashSet<Destination>(); this.subscriptions = new ConcurrentHashMap<Destination, Long>();
timer = new Expire();
} }
/** /**
...@@ -30,33 +44,74 @@ public class Subscriber implements Sink { ...@@ -30,33 +44,74 @@ public class Subscriber implements Sink {
public void send(Destination dest, byte[] data) { public void send(Destination dest, byte[] data) {
if(dest == null || data.length < 1) { if(dest == null || data.length < 1) {
// invalid packet // invalid packet
Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn()) if (log.shouldWarn())
log.warn("bad subscription from " + dest); log.warn("bad subscription from " + dest);
} else { } else {
byte ctrl = data[0]; byte ctrl = data[0];
if(ctrl == 0) { if(ctrl == 0) {
if (!this.subscriptions.contains(dest)) { if (this.subscriptions.put(dest, Long.valueOf(ctx.clock().now())) == null) {
if (subscriptions.size() > MAX_SUBSCRIPTIONS) {
subscriptions.remove(dest);
if (log.shouldWarn())
log.warn("Too many subscriptions, denying: " + dest.toBase32());
return;
}
// subscribe // subscribe
System.out.println("Add subscription: " + dest.toBase64().substring(0,4)); if (log.shouldWarn())
this.subscriptions.add(dest); log.warn("Add subscription: " + dest.toBase32());
this.multi.add(dest); this.multi.add(dest);
} // else already subscribed if (!timerRunning) {
timer.reschedule(EXPIRATION);
timerRunning = true;
}
} else {
if (log.shouldInfo())
log.info("Continue subscription: " + dest.toBase32());
}
} else if(ctrl == 1) { } else if(ctrl == 1) {
// unsubscribe // unsubscribe
System.out.println("Remove subscription: " + dest.toBase64().substring(0,4)); if (log.shouldWarn())
boolean removed = this.subscriptions.remove(dest); log.warn("Remove subscription: " + dest.toBase32());
if(removed) if (subscriptions.remove(dest) != null)
multi.remove(dest); multi.remove(dest);
} else { } else {
// invalid packet // invalid packet
Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn()) if (log.shouldWarn())
log.warn("bad subscription from " + dest); log.warn("bad subscription from " + dest);
} }
} }
} }
private final Set<Destination> subscriptions; /** @since 0.9.46 */
private final MultiSource multi; private class Expire extends SimpleTimer2.TimedEvent {
public Expire() {
super(ctx.simpleTimer2());
}
public void timeReached() {
if (subscriptions.isEmpty()) {
timerRunning = false;
return;
}
long exp = ctx.clock().now() - EXPIRATION;
for (Iterator<Map.Entry<Destination, Long>> iter = subscriptions.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Destination, Long> e = iter.next();
long then = e.getValue().longValue();
if (then < exp) {
Destination dest = e.getKey();
iter.remove();
multi.remove(dest);
if (log.shouldWarn())
log.warn("Expired subscription: " + dest.toBase32());
}
}
if (!subscriptions.isEmpty()) {
schedule(EXPIRATION);
timerRunning = true;
} else {
timerRunning = false;
}
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment