i2ptunnel: Refactor UDPTunnel, Streamr, and SOCKS UDP for I2CP ports

- Add fromPort and toPort to Sink interface (breaking API change)
- Change cache maps from Destination to I2PSocketAddress to include port
- Accept host:port for destination in Streamr Client, use port
  as toPort in pinger
- Change to muxed listener in I2PSource, only listen for
  specified protocols
- Eliminate thread and queue in I2PSource, process messages inline
  in the listener
- Add support for handling both repliable and raw datagrams in
  a single I2PSource instance
- Remove verify option from I2PSource and I2PTunnelUDPServerBase,
  always verify repliable datagrams
- Add getPort() method to UDPSource
- Add a constructor to UDPSink to pass in an existing DatagramSocket
- Change I2PTunnelUDPClientBase to receive both repliable and raw
- Change SOCKSUDPTunnel reply handling strategy to key on I2CP toPort;
  remove ReplyTracker; the tunnel would not have worked before, because
  it expected raw replies only but MultiSink required a destination
  to look up where to forward the reply.
- Mark SOCKSUDPTunnel as preliminary; note lack of support
  for raw replies; untested
- Change Streamr Client Pinger to support fromPort
- Change Streamr Server to remember fromPort in subscriptions
  and use it as toPort in data stream
- Move fields to top of classes for sanity
- Cleanups and log tweaks
This commit is contained in:
zzz
2021-12-08 13:05:27 -05:00
parent 22ff40bc84
commit b1a4a8517e
21 changed files with 416 additions and 278 deletions

View File

@@ -8,12 +8,17 @@ import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.Log;
/**
* Sends to one of many Sinks
* Sends to one of many Sinks based on the toPort
*
* @author zzz modded from streamr/MultiSource
*/
public class MultiSink<S extends Sink> implements Source, Sink {
private final Map<Integer, S> cache;
public MultiSink(Map<Destination, S> cache) {
/**
* @param cache map of toPort to Sink
*/
public MultiSink(Map<Integer, S> cache) {
this.cache = cache;
}
@@ -23,18 +28,32 @@ public class MultiSink<S extends Sink> implements Source, Sink {
public void start() {}
/**
* Send to a single sink looked up by toPort
*
* May throw RuntimeException from underlying sinks
*
* @param from passed along
* @param fromPort passed along
* @param toPort passed along
* @since 0.9.53 added fromPort and toPort parameters
* @throws RuntimeException
*/
public void send(Destination from, byte[] data) {
Sink s = this.cache.get(from);
public void send(Destination from, int fromPort, int toPort, byte[] data) {
Sink s = cache.get(toPort);
if (s == null && toPort == 0 && cache.size() == 1) {
// for now, do the server a favor if the toPort isn't specified
for (Sink ss : cache.values()) {
s = ss;
break;
}
}
if (s == null) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(MultiSink.class);
log.error("No where to go for " + from.calculateHash().toBase64().substring(0, 6));
String frm = (from != null) ? from.toBase32() : "raw";
if (log.shouldWarn())
log.warn("No where to go for " + frm + " port " + fromPort + " to port " + toPort);
return;
}
s.send(from, data);
s.send(from, fromPort, toPort, data);
}
private Map<Destination, S> cache;
}

View File

@@ -1,38 +0,0 @@
package net.i2p.i2ptunnel.socks;
import java.util.Map;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.Log;
/**
* Track who the reply goes to
* @author zzz
*/
public class ReplyTracker<S extends Sink> implements Source, Sink {
public ReplyTracker(S reply, Map<Destination, S> cache) {
this.reply = reply;
this.cache = cache;
}
public void setSink(Sink sink) {
this.sink = sink;
}
public void start() {}
/**
* May throw RuntimeException from underlying sink
* @throws RuntimeException
*/
public void send(Destination to, byte[] data) {
this.cache.put(to, this.reply);
this.sink.send(to, data);
}
private S reply;
private Map<Destination, S> cache;
private Sink sink;
}

View File

@@ -1,8 +1,11 @@
package net.i2p.i2ptunnel.socks;
import java.util.Arrays;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Addresses;
/**
* Save the SOCKS header from a datagram
@@ -11,9 +14,12 @@ import net.i2p.data.Destination;
* @author zzz
*/
public class SOCKSHeader {
private byte[] header;
private static final byte[] beg = {0,0,0,3,60};
/**
* @param data the whole packet
* @throws IllegalArgumentException on bad socks format
*/
public SOCKSHeader(byte[] data) {
if (data.length <= 8)
@@ -25,13 +31,11 @@ public class SOCKSHeader {
int headerlen = 0;
int addressType = data[3];
if (addressType == 1) {
// this will fail in getDestination()
headerlen = 6 + 4;
} else if (addressType == 3) {
headerlen = 6 + 1 + (data[4] & 0xff);
} else if (addressType == 4) {
// this will fail in getDestination()
// but future garlicat partial hash lookup possible?
// future garlicat partial hash lookup possible?
headerlen = 6 + 16;
} else {
throw new IllegalArgumentException("Unknown address type: " + addressType);
@@ -42,33 +46,62 @@ public class SOCKSHeader {
this.header = new byte[headerlen];
System.arraycopy(data, 0, this.header, 0, headerlen);
}
private static final byte[] beg = {0,0,0,3,60};
private static final byte[] end = {0,0};
/**
* Make a dummy header from a dest,
* for those cases where we want to receive unsolicited datagrams.
* Unused for now.
*
* @param port I2CP port 0-65535
* @since 0.9.53 add port param
*/
public SOCKSHeader(Destination dest) {
this.header = new byte[beg.length + 60 + end.length];
public SOCKSHeader(Destination dest, int port) {
this.header = new byte[beg.length + 60 + 2];
System.arraycopy(beg, 0, this.header, 0, beg.length);
String b32 = dest.toBase32();
System.arraycopy(DataHelper.getASCII(b32), 0, this.header, beg.length, 60);
System.arraycopy(end, 0, this.header, beg.length + 60, end.length);
DataHelper.toLong(header, beg.length + 60, 2, port);
}
/**
* As of 0.9.53, returns IP address as a string for address types 1 and 4.
*
* @return hostname or null for unknown address type
*/
public String getHost() {
int addressType = this.header[3];
if (addressType != 3)
return null;
int namelen = (this.header[4] & 0xff);
byte[] nameBytes = new byte[namelen];
System.arraycopy(this.header, 5, nameBytes, 0, namelen);
return DataHelper.getUTF8(nameBytes);
if (addressType == 3) {
int namelen = (this.header[4] & 0xff);
return DataHelper.getUTF8(header, 5, namelen);
}
if (addressType == 1)
return Addresses.toString(Arrays.copyOfRange(header, 4, 4));
if (addressType == 4)
return Addresses.toString(Arrays.copyOfRange(header, 4, 16));
return null;
}
/**
* @return 0 - 65535
* @since 0.9.53
*/
public int getPort() {
int namelen;
int addressType = header[3];
if (addressType == 3)
namelen = 1 + (header[4] & 0xff);
else if (addressType == 1)
namelen = 4;
else if (addressType == 4)
namelen = 16;
else
return 0;
return (int) DataHelper.fromLong(header, 4 + namelen, 2);
}
/**
* @return destination or null
*/
public Destination getDestination() {
String name = getHost();
if (name == null)
@@ -80,6 +113,4 @@ public class SOCKSHeader {
public byte[] getBytes() {
return header;
}
private byte[] header;
}

View File

@@ -5,6 +5,7 @@ import java.net.InetAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import net.i2p.client.streaming.I2PSocketAddress;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
@@ -14,22 +15,26 @@ import net.i2p.i2ptunnel.udp.*;
* ports, it happens outside of here.
*
* TX:
* UDPSource -&gt; SOCKSUDPUnwrapper -&gt; ReplyTracker ( -&gt; I2PSink in SOCKSUDPTunnel)
* UDPSource -&gt; SOCKSUDPUnwrapper -&gt; (I2PSink in SOCKSUDPTunnel)
*
* RX:
* UDPSink &lt;- SOCKSUDPWrapper ( &lt;- MultiSink &lt;- I2PSource in SOCKSUDPTunnel)
*
* The Unwrapper passes headers to the Wrapper through a cache.
* The ReplyTracker passes sinks to MultiSink through a cache.
* MultiSink routes packets based on toPort.
*
* @author zzz
*/
public class SOCKSUDPPort implements Source, Sink {
private final UDPSink udpsink;
private final UDPSource udpsource;
private final SOCKSUDPWrapper wrapper;
private final SOCKSUDPUnwrapper unwrapper;
public SOCKSUDPPort(InetAddress host, int port, Map<Destination, SOCKSUDPPort> replyMap) {
public SOCKSUDPPort(InetAddress host, int port, Map<Integer, SOCKSUDPPort> replyMap) {
// this passes the host and port from UDPUnwrapper to UDPWrapper
Map<Destination, SOCKSHeader> cache = new ConcurrentHashMap<Destination, SOCKSHeader>(4);
Map<I2PSocketAddress, SOCKSHeader> cache = new ConcurrentHashMap<I2PSocketAddress, SOCKSHeader>(4);
// rcv from I2P and send to a port
this.wrapper = new SOCKSUDPWrapper(cache);
@@ -41,8 +46,6 @@ public class SOCKSUDPPort implements Source, Sink {
this.udpsource = new UDPSource(sock);
this.unwrapper = new SOCKSUDPUnwrapper(cache);
this.udpsource.setSink(this.unwrapper);
this.udptracker = new ReplyTracker<SOCKSUDPPort>(this, replyMap);
this.unwrapper.setSink(this.udptracker);
}
/** Socks passes this back to the client on the TCP connection */
@@ -51,7 +54,7 @@ public class SOCKSUDPPort implements Source, Sink {
}
public void setSink(Sink sink) {
this.udptracker.setSink(sink);
this.unwrapper.setSink(sink);
}
public void start() {
@@ -66,16 +69,14 @@ public class SOCKSUDPPort implements Source, Sink {
/**
* May throw RuntimeException from underlying sink
* @param from will be passed along
* @param fromPort will be passed along
* @param toPort will be passed along
* @since 0.9.53 added fromPort and toPort parameters
* @throws RuntimeException
*/
public void send(Destination from, byte[] data) {
this.wrapper.send(from, data);
public void send(Destination from, int fromPort, int toPort, byte[] data) {
this.wrapper.send(from, fromPort, toPort, data);
}
private UDPSink udpsink;
private UDPSource udpsource;
private SOCKSUDPWrapper wrapper;
private SOCKSUDPUnwrapper unwrapper;
private ReplyTracker<SOCKSUDPPort> udptracker;
}

View File

@@ -4,6 +4,7 @@ import java.net.InetAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.I2PTunnel;
import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPClientBase;
@@ -12,18 +13,24 @@ import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPClientBase;
* A Datagram Tunnel that can have multiple bidirectional ports on the UDP side.
*
* TX:
* (ReplyTracker in multiple SOCKSUDPPorts -&gt; ) I2PSink
* (multiple SOCKSUDPPorts -&gt; ) I2PSink
*
* RX:
* (SOCKSUDPWrapper in multiple SOCKSUDPPorts &lt;- ) MultiSink &lt;- I2PSource
*
* The reply from a dest goes to the last SOCKSUDPPort that sent to that dest.
* If multiple ports are talking to a dest at the same time, this isn't
* going to work very well.
* The replies must be to the same I2CP toPort as the outbound fromPort.
* If the server does not honor that, the replies will be dropped.
*
* The replies must be repliable. Raw datagrams are not supported, and would
* require a unique source port for each target.
*
* Preliminary, untested, possibly incomplete.
*
* @author zzz modded from streamr/StreamrConsumer
*/
public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase {
private final Map<Integer, SOCKSUDPPort> ports;
private final MultiSink<SOCKSUDPPort> demuxer;
/**
* Set up a tunnel with no UDP side yet.
@@ -33,15 +40,14 @@ public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase {
super(null, tunnel, tunnel, tunnel);
this.ports = new ConcurrentHashMap<Integer, SOCKSUDPPort>(1);
this.cache = new ConcurrentHashMap<Destination, SOCKSUDPPort>(1);
this.demuxer = new MultiSink<SOCKSUDPPort>(this.cache);
this.demuxer = new MultiSink<SOCKSUDPPort>(ports);
setSink(this.demuxer);
}
/** @return the UDP port number */
public int add(InetAddress host, int port) {
SOCKSUDPPort sup = new SOCKSUDPPort(host, port, this.cache);
SOCKSUDPPort sup = new SOCKSUDPPort(host, port, ports);
this.ports.put(Integer.valueOf(sup.getPort()), sup);
sup.setSink(this);
sup.start();
@@ -52,11 +58,6 @@ public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase {
SOCKSUDPPort sup = this.ports.remove(port);
if (sup != null)
sup.stop();
for (Iterator<Map.Entry<Destination, SOCKSUDPPort>> iter = cache.entrySet().iterator(); iter.hasNext();) {
Map.Entry<Destination, SOCKSUDPPort> e = iter.next();
if (e.getValue() == sup)
iter.remove();
}
}
@Override
@@ -81,12 +82,5 @@ public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase {
sup.stop();
}
this.ports.clear();
this.cache.clear();
}
private Map<Integer, SOCKSUDPPort> ports;
private Map<Destination, SOCKSUDPPort> cache;
private MultiSink<SOCKSUDPPort> demuxer;
}

View File

@@ -5,20 +5,23 @@ import java.util.Map;
import net.i2p.I2PAppContext;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
import net.i2p.client.streaming.I2PSocketAddress;
import net.i2p.util.Log;
/**
* Strip a SOCKS header off a datagram, convert it to a Destination
* Strip a SOCKS header off a datagram, convert it to a Destination and port
* Ref: RFC 1928
*
* @author zzz
*/
public class SOCKSUDPUnwrapper implements Source, Sink {
private Sink sink;
private final Map<I2PSocketAddress, SOCKSHeader> cache;
/**
* @param cache put headers here to pass to SOCKSUDPWrapper
*/
public SOCKSUDPUnwrapper(Map<Destination, SOCKSHeader> cache) {
public SOCKSUDPUnwrapper(Map<I2PSocketAddress, SOCKSHeader> cache) {
this.cache = cache;
}
@@ -31,9 +34,13 @@ public class SOCKSUDPUnwrapper implements Source, Sink {
/**
*
* May throw RuntimeException from underlying sink
* @param ignored_from ignored
* @param fromPort will be passed along
* @param toPort ignored
* @since 0.9.53 added fromPort and toPort parameters
* @throws RuntimeException
*/
public void send(Destination ignored_from, byte[] data) {
public void send(Destination ignored_from, int fromPort, int toPort, byte[] data) {
SOCKSHeader h;
try {
h = new SOCKSHeader(data);
@@ -50,14 +57,14 @@ public class SOCKSUDPUnwrapper implements Source, Sink {
return;
}
cache.put(dest, h);
cache.put(new I2PSocketAddress(dest, toPort), h);
int headerlen = h.getBytes().length;
byte unwrapped[] = new byte[data.length - headerlen];
System.arraycopy(data, headerlen, unwrapped, 0, unwrapped.length);
this.sink.send(dest, unwrapped);
// We pass the local DatagramSocket's port through as the I2CP from port,
// so that it will come back as the toPort in the reply,
// and MultiSink will send it to the right SOCKSUDPWrapper/SOCKSUDPPort
this.sink.send(dest, fromPort, h.getPort(), unwrapped);
}
private Sink sink;
private Map<Destination, SOCKSHeader> cache;
}

View File

@@ -2,8 +2,11 @@ package net.i2p.i2ptunnel.socks;
import java.util.Map;
import net.i2p.I2PAppContext;
import net.i2p.client.streaming.I2PSocketAddress;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.udp.*;
import net.i2p.util.Log;
/**
* Put a SOCKS header on a datagram
@@ -12,7 +15,10 @@ import net.i2p.i2ptunnel.udp.*;
* @author zzz
*/
public class SOCKSUDPWrapper implements Source, Sink {
public SOCKSUDPWrapper(Map<Destination, SOCKSHeader> cache) {
private Sink sink;
private final Map<I2PSocketAddress, SOCKSHeader> cache;
public SOCKSUDPWrapper(Map<I2PSocketAddress, SOCKSHeader> cache) {
this.cache = cache;
}
@@ -26,13 +32,23 @@ public class SOCKSUDPWrapper implements Source, Sink {
* Use the cached header, which should have the host string and port
*
* May throw RuntimeException from underlying sink
* @since 0.9.53 added fromPort and toPort parameters
* @throws RuntimeException
*/
public void send(Destination from, byte[] data) {
public void send(Destination from, int fromPort, int toPort, byte[] data) {
if (this.sink == null)
return;
if (from == null) {
// TODO to handle raw replies, SOCKSUDPWrapper would have to use a unique
// fromPort for every target or request, and we would lookup the
// destination by toPort
Log log = I2PAppContext.getGlobalContext().logManager().getLog(SOCKSUDPWrapper.class);
if (log.shouldWarn())
log.warn("No support for raw datagrams, from port " + fromPort + " to port " + toPort);
return;
}
SOCKSHeader h = cache.get(from);
SOCKSHeader h = cache.get(new I2PSocketAddress(from, fromPort));
if (h == null) {
// RFC 1928 says drop
// h = new SOCKSHeader(from);
@@ -43,9 +59,6 @@ public class SOCKSUDPWrapper implements Source, Sink {
byte wrapped[] = new byte[header.length + data.length];
System.arraycopy(header, 0, wrapped, 0, header.length);
System.arraycopy(data, 0, wrapped, header.length, data.length);
this.sink.send(from, wrapped);
this.sink.send(from, fromPort, toPort, wrapped);
}
private Sink sink;
private Map<Destination, SOCKSHeader> cache;
}

View File

@@ -8,6 +8,7 @@ import java.net.Socket;
import java.nio.channels.SelectableChannel;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketAddress;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;

View File

@@ -14,10 +14,12 @@ import net.i2p.util.Log;
* @author zzz modded for I2PTunnel
*/
public class MultiSource implements Source, Sink {
private Sink sink;
private final List<MSink> sinks;
private final Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
public MultiSource() {
this.sinks = new CopyOnWriteArrayList<Destination>();
this.sinks = new CopyOnWriteArrayList<MSink>();
}
public void setSink(Sink sink) {
@@ -32,9 +34,10 @@ public class MultiSource implements Source, Sink {
/**
* May throw RuntimeException from underlying sinks
* @since 0.9.53 added fromPort and toPort parameters
* @throws RuntimeException
*/
public void send(Destination ignored_from, byte[] data) {
public void send(Destination ignored_from, int ignored_fromPort, int ignored_toPort, byte[] data) {
if (sinks.isEmpty()) {
if (log.shouldDebug())
log.debug("No subscribers to send " + data.length + " bytes to");
@@ -43,19 +46,53 @@ public class MultiSource implements Source, Sink {
if (log.shouldDebug())
log.debug("Sending " + data.length + " bytes to " + sinks.size() + " subscribers");
for(Destination dest : this.sinks) {
this.sink.send(dest, data);
for(MSink ms : this.sinks) {
this.sink.send(ms.dest, ms.fromPort, ms.toPort, data);
}
}
public void add(Destination sink) {
this.sinks.add(sink);
/**
* @since 0.9.53 changed to MSink parameter
*/
public void add(MSink ms) {
sinks.add(ms);
}
public void remove(Destination sink) {
this.sinks.remove(sink);
/**
* @since 0.9.53 changed to MSink parameter
*/
public void remove(MSink ms) {
sinks.remove(ms);
}
/**
* @since 0.9.53
*/
static class MSink {
public final Destination dest;
public final int fromPort, toPort;
public MSink(Destination dest, int fromPort, int toPort) {
this.dest = dest; this.fromPort = fromPort; this.toPort = toPort;
}
@Override
public int hashCode() {
return dest.hashCode() | fromPort | (toPort << 16);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof MSink))
return false;
MSink s = (MSink) o;
return dest.equals(s.dest) && fromPort == s.fromPort && toPort == s.toPort;
}
@Override
public String toString() {
return "from port " + fromPort + " to " + dest.toBase32() + ':' + toPort;
}
}
private Sink sink;
private final List<Destination> sinks;
}

View File

@@ -10,10 +10,21 @@ import net.i2p.util.Log;
* @author welterde/zzz
*/
public class Pinger implements Source, Runnable {
private final Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
protected Sink sink;
protected final Thread thread;
private final Object waitlock = new Object();
protected volatile boolean running;
private final Log log;
private final int fromPort;
public Pinger() {
/**
* @param fromPort the I2CP from port
* @since 0.9.53 added ctx and fromPort params
*/
public Pinger(I2PAppContext ctx, int fromPort) {
this.thread = new I2PAppThread(this);
log = ctx.logManager().getLog(getClass());
this.fromPort = fromPort;
}
public void setSink(Sink sink) {
@@ -22,7 +33,6 @@ public class Pinger implements Source, Runnable {
public void start() {
this.running = true;
//this.waitlock = new Object();
this.thread.start();
}
@@ -35,9 +45,9 @@ public class Pinger implements Source, Runnable {
byte[] data = new byte[1];
data[0] = 1;
try {
this.sink.send(null, data);
this.sink.send(null, fromPort, 0, data);
if (log.shouldDebug())
log.debug("Sent unsubscribe");
log.debug("Sent unsubscribe from port " + fromPort);
} catch (RuntimeException re) {}
}
@@ -47,11 +57,10 @@ public class Pinger implements Source, Runnable {
data[0] = 0;
int i = 0;
while(this.running) {
//System.out.print("p");
try {
this.sink.send(null, data);
this.sink.send(null, fromPort, 0, data);
if (log.shouldDebug())
log.debug("Sent subscribe");
log.debug("Sent subscribe from port " + fromPort);
} catch (RuntimeException re) {
if (log.shouldWarn())
log.warn("error sending", re);
@@ -71,9 +80,4 @@ public class Pinger implements Source, Runnable {
}
}
}
protected Sink sink;
protected final Thread thread;
private final Object waitlock = new Object();
protected volatile boolean running;
}

View File

@@ -24,11 +24,13 @@ public class StreamrConsumer extends I2PTunnelUDPClientBase {
super(destination, l, notifyThis, tunnel);
// create udp-destination
this.sink = new UDPSink(host, port);
UDPSink udps = new UDPSink(host, port);
int localPort = udps.getPort();
this.sink = udps;
setSink(this.sink);
// create pinger
this.pinger = new Pinger();
this.pinger = new Pinger(_context, localPort);
this.pinger.setSink(this);
}

View File

@@ -21,8 +21,7 @@ public class StreamrProducer extends I2PTunnelUDPServerBase {
public StreamrProducer(int port,
File privkey, String privkeyname, Logging l,
EventDispatcher notifyThis, I2PTunnel tunnel) {
// verify subscription requests
super(true, privkey, privkeyname, l, notifyThis, tunnel);
super(privkey, privkeyname, l, notifyThis, tunnel);
// The broadcaster
this.multi = new MultiSource();

View File

@@ -20,7 +20,7 @@ public class Subscriber implements Sink {
private final I2PAppContext ctx = I2PAppContext.getGlobalContext();
private final Log log = ctx.logManager().getLog(getClass());
private final Map<Destination, Long> subscriptions;
private final Map<MultiSource.MSink, Long> subscriptions;
private final MultiSource multi;
private final SimpleTimer2.TimedEvent timer;
private volatile boolean timerRunning;
@@ -31,7 +31,7 @@ public class Subscriber implements Sink {
public Subscriber(MultiSource multi) {
this.multi = multi;
// subscriptions
this.subscriptions = new ConcurrentHashMap<Destination, Long>();
this.subscriptions = new ConcurrentHashMap<MultiSource.MSink, Long>();
timer = new Expire();
}
@@ -40,44 +40,47 @@ public class Subscriber implements Sink {
*
* @param dest to subscribe or unsubscribe
* @param data must be a single byte, 0 to subscribe, 1 to unsubscribe
* @since 0.9.53 added fromPort and toPort parameters
*/
public void send(Destination dest, byte[] data) {
public void send(Destination dest, int fromPort, int toPort, byte[] data) {
if(dest == null || data.length < 1) {
// invalid packet
if (log.shouldWarn())
log.warn("bad subscription from " + dest);
log.warn("bad subscription from " + dest.toBase32() + ':' + fromPort);
} else {
byte ctrl = data[0];
// swap fromPort and toPort for the replies
MultiSource.MSink ms = new MultiSource.MSink(dest, toPort, fromPort);
int ctrl = data[0] & 0xff;
if(ctrl == 0) {
if (this.subscriptions.put(dest, Long.valueOf(ctx.clock().now())) == null) {
if (this.subscriptions.put(ms, Long.valueOf(ctx.clock().now())) == null) {
if (subscriptions.size() > MAX_SUBSCRIPTIONS) {
subscriptions.remove(dest);
if (log.shouldWarn())
log.warn("Too many subscriptions, denying: " + dest.toBase32());
log.warn("Too many subscriptions, denying: " + ms);
return;
}
// subscribe
if (log.shouldWarn())
log.warn("Add subscription: " + dest.toBase32());
this.multi.add(dest);
log.warn("Add subscription: " + ms);
this.multi.add(ms);
if (!timerRunning) {
timer.reschedule(EXPIRATION);
timerRunning = true;
}
} else {
if (log.shouldInfo())
log.info("Continue subscription: " + dest.toBase32());
log.info("Continue subscription: " + ms);
}
} else if(ctrl == 1) {
// unsubscribe
if (log.shouldWarn())
log.warn("Remove subscription: " + dest.toBase32());
if (subscriptions.remove(dest) != null)
multi.remove(dest);
log.warn("Remove subscription: " + ms);
if (subscriptions.remove(ms) != null)
multi.remove(ms);
} else {
// invalid packet
if (log.shouldWarn())
log.warn("bad subscription from " + dest);
log.warn("bad subscription flag " + ctrl + " from " + ms);
}
}
}
@@ -95,15 +98,15 @@ public class Subscriber implements Sink {
return;
}
long exp = ctx.clock().now() - EXPIRATION;
for (Iterator<Map.Entry<Destination, Long>> iter = subscriptions.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Destination, Long> e = iter.next();
for (Iterator<Map.Entry<MultiSource.MSink, Long>> iter = subscriptions.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<MultiSource.MSink, Long> e = iter.next();
long then = e.getValue().longValue();
if (then < exp) {
Destination dest = e.getKey();
MultiSource.MSink ms = e.getKey();
iter.remove();
multi.remove(dest);
multi.remove(ms);
if (log.shouldWarn())
log.warn("Expired subscription: " + dest.toBase32());
log.warn("Expired subscription: " + ms);
}
}
if (!subscriptions.isEmpty()) {

View File

@@ -14,14 +14,40 @@ import net.i2p.client.datagram.I2PDatagramMaker;
*/
public class I2PSink implements Sink {
protected final boolean raw;
protected final I2PSession sess;
protected final Destination dest;
protected final I2PDatagramMaker maker;
/**
* @since 0.9.53
*/
protected final int toPort;
/**
* repliable (not raw)
*/
public I2PSink(I2PSession sess, Destination dest) {
this(sess, dest, false);
}
/**
* @param raw false for repliable
*/
public I2PSink(I2PSession sess, Destination dest, boolean raw) {
this(sess, dest, raw, I2PSession.PORT_UNSPECIFIED);
}
/**
* @param raw false for repliable
* @param fromPort I2CP source port, 0-65535
* @param toPort I2CP destination port, 0-65535
* @since 0.9.53
*/
public I2PSink(I2PSession sess, Destination dest, boolean raw, int toPort) {
this.sess = sess;
this.dest = dest;
this.raw = raw;
this.toPort = toPort;
// create maker
if (raw) {
@@ -34,31 +60,30 @@ public class I2PSink implements Sink {
/**
* @param src ignored
* @param fromPort I2CP port
* @param ign_toPort ignored
* @since 0.9.53 added fromPort and toPort parameters, breaking change, sorry
* @throws RuntimeException if session is closed
*/
public synchronized void send(Destination src, byte[] data) {
public synchronized void send(Destination src, int fromPort, int ign_toPort, byte[] data) {
//System.out.print("w");
// create payload
byte[] payload;
if(!this.raw) {
if (!this.raw) {
synchronized(this.maker) {
payload = this.maker.makeI2PDatagram(data);
}
} else
} else {
payload = data;
}
// send message
try {
this.sess.sendMessage(this.dest, payload,
(this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM),
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
fromPort, toPort);
} catch (I2PSessionException ise) {
throw new RuntimeException("failed to send data", ise);
}
}
protected final boolean raw;
protected final I2PSession sess;
protected final Destination dest;
protected final I2PDatagramMaker maker;
}

View File

@@ -14,6 +14,10 @@ import net.i2p.client.datagram.I2PDatagramMaker;
*/
public class I2PSinkAnywhere implements Sink {
protected final boolean raw;
protected final I2PSession sess;
protected final I2PDatagramMaker maker;
public I2PSinkAnywhere(I2PSession sess) {
this(sess, false);
}
@@ -35,27 +39,35 @@ public class I2PSinkAnywhere implements Sink {
* @param to - where it's going
* @throws RuntimeException if session is closed
*/
public synchronized void send(Destination to, byte[] data) {
public void send(Destination to, byte[] data) {
send(to, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED, data);
}
/**
* @param to - where it's going
* @param fromPort I2CP port 0 - 65535
* @param toPort I2CP port 0 - 65535
* @since 0.9.53
* @throws RuntimeException if session is closed
*/
public synchronized void send(Destination to, int fromPort, int toPort, byte[] data) {
// create payload
byte[] payload;
if(!this.raw) {
synchronized(this.maker) {
payload = this.maker.makeI2PDatagram(data);
}
} else
} else {
payload = data;
}
// send message
try {
this.sess.sendMessage(to, payload,
(this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM),
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
fromPort, toPort);
} catch (I2PSessionException ise) {
throw new RuntimeException("failed to send data", ise);
}
}
protected final boolean raw;
protected final I2PSession sess;
protected final I2PDatagramMaker maker;
}

View File

@@ -1,94 +1,105 @@
package net.i2p.i2ptunnel.udp;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionListener;
import net.i2p.client.I2PSessionMuxedListener;
import net.i2p.client.datagram.I2PDatagramDissector;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
/**
* Refactored in 0.9.53 to support I2CP protocols and ports
*
* @author welterde
*/
public class I2PSource implements Source, Runnable {
public class I2PSource implements Source {
protected final I2PSession sess;
protected Sink sink;
private final Protocol protocol;
private final int port;
private final I2PDatagramDissector diss;
private final Log log;
/**
* @since 0.9.53
*/
public enum Protocol { REPLIABLE, RAW, BOTH }
/**
* Handles both REPLIABLE and RAW on any port
*/
public I2PSource(I2PSession sess) {
this(sess, true, false);
this(sess, Protocol.BOTH);
}
public I2PSource(I2PSession sess, boolean verify) {
this(sess, verify, false);
/**
* Listen on all I2CP ports.
* No support for arbitrary protocol numbers.
*
* @param protocol REPLIABLE, RAW, or BOTH
* @since 0.9.53
*/
public I2PSource(I2PSession sess, Protocol protocol) {
this(sess, protocol, I2PSession.PORT_ANY);
}
public I2PSource(I2PSession sess, boolean verify, boolean raw) {
/**
* @param port I2CP port or I2PSession.PORT_ANY
* @param portocol REPLIABLE, RAW, or BOTH
* @since 0.9.53
*/
public I2PSource(I2PSession sess, Protocol protocol, int port) {
this.sess = sess;
this.verify = verify;
this.raw = raw;
// create queue
this.queue = new ArrayBlockingQueue<Integer>(256);
// create listener
this.sess.setSessionListener(new Listener());
// create thread
this.thread = new I2PAppThread(this);
this.protocol = protocol;
this.port = port;
diss = (protocol != Protocol.RAW) ? new I2PDatagramDissector() : null;
log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
}
public void setSink(Sink sink) {
this.sink = sink;
}
public void start() {
this.thread.start();
// create listener
Listener l = new Listener();
if (protocol != Protocol.RAW)
sess.addMuxedSessionListener(l, I2PSession.PROTO_DATAGRAM, port);
if (protocol != Protocol.REPLIABLE)
sess.addMuxedSessionListener(l, I2PSession.PROTO_DATAGRAM_RAW, port);
}
public void run() {
// create dissector
I2PDatagramDissector diss = new I2PDatagramDissector();
_running = true;
while (_running) {
protected class Listener implements I2PSessionMuxedListener {
public void messageAvailable(I2PSession sess, int id, long size) {
throw new IllegalStateException("muxed");
}
/**
* @since 0.9.53
*/
public void messageAvailable(I2PSession session, int id, long size, int proto, int fromPort, int toPort) {
if (log.shouldDebug())
log.debug("Got " + size + " bytes, proto: " + proto + " from port: " + fromPort + " to port: " + toPort);
try {
// get id
int id = this.queue.take();
// receive message
byte[] msg = this.sess.receiveMessage(id);
if(!this.raw) {
byte[] msg = session.receiveMessage(id);
if (proto == I2PSession.PROTO_DATAGRAM) {
// load datagram into it
diss.loadI2PDatagram(msg);
// now call sink
if(this.verify)
this.sink.send(diss.getSender(), diss.getPayload());
else
this.sink.send(diss.extractSender(), diss.extractPayload());
sink.send(diss.getSender(), fromPort, toPort, diss.getPayload());
} else if (proto == I2PSession.PROTO_DATAGRAM_RAW) {
sink.send(null, fromPort, toPort, msg);
} else {
// verify is ignored
this.sink.send(null, msg);
if (log.shouldWarn())
log.warn("dropping message with unknown protocol " + proto);
}
//System.out.print("r");
} catch(Exception e) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (log.shouldWarn())
log.warn("error sending", e);
break;
}
}
}
protected class Listener implements I2PSessionListener {
public void messageAvailable(I2PSession sess, int id, long size) {
try {
queue.put(id);
} catch(Exception e) {
// ignore
log.warn("error receiving datagram", e);
}
}
@@ -97,24 +108,11 @@ public class I2PSource implements Source, Runnable {
}
public void disconnected(I2PSession arg0) {
_running = false;
thread.interrupt();
}
public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
log.error(arg1, arg2);
_running = false;
thread.interrupt();
}
}
protected final I2PSession sess;
protected final BlockingQueue<Integer> queue;
protected Sink sink;
protected final Thread thread;
protected final boolean verify;
protected final boolean raw;
private volatile boolean _running;
}

View File

@@ -8,8 +8,11 @@ import net.i2p.data.Destination;
*/
public interface Sink {
/**
* @param src some implementations may ignore
* @param fromPort I2CP source port, 0-65535
* @param toPort I2CP destination port, 0-65535
* @param src some implementations may ignore, may be null in some implementations
* @since 0.9.53 added fromPort and toPort parameters, breaking change, sorry
* @throws RuntimeException in some implementations
*/
public void send(Destination src, byte[] data);
public void send(Destination src, int fromPort, int toPort, byte[] data);
}

View File

@@ -13,7 +13,13 @@ import net.i2p.data.Destination;
*/
public class UDPSink implements Sink {
protected final DatagramSocket sock;
protected final InetAddress remoteHost;
protected final int remotePort;
/**
* @param host where to send
* @param port where to send
* @throws IllegalArgumentException on DatagramSocket IOException
*/
public UDPSink(InetAddress host, int port) {
@@ -23,18 +29,31 @@ public class UDPSink implements Sink {
} catch (IOException e) {
throw new IllegalArgumentException("failed to open udp-socket", e);
}
this.remoteHost = host;
this.remotePort = port;
}
// remote port
/**
* @param socket existing socket
* @param host where to send
* @param port where to send
* @since 0.9.53
*/
public UDPSink(DatagramSocket socket, InetAddress host, int port) {
sock = socket;
this.remoteHost = host;
this.remotePort = port;
}
/**
* @param src ignored
* @param fromPort ignored
* @param toPort ignored
* @since 0.9.53 added fromPort and toPort parameters, breaking change, sorry
* @throws RuntimeException on DatagramSocket IOException
*/
public void send(Destination src, byte[] data) {
public void send(Destination src, int fromPort, int toPort, byte[] data) {
// if data.length > this.sock.getSendBufferSize() ...
// create packet
@@ -48,6 +67,9 @@ public class UDPSink implements Sink {
}
}
/**
* @return the local port of the DatagramSocket we are sending from
*/
public int getPort() {
return this.sock.getLocalPort();
}
@@ -60,9 +82,4 @@ public class UDPSink implements Sink {
public void stop() {
this.sock.close();
}
protected final DatagramSocket sock;
protected final InetAddress remoteHost;
protected final int remotePort;
}

View File

@@ -13,6 +13,10 @@ import net.i2p.util.Log;
* @author welterde
*/
public class UDPSource implements Source, Runnable {
protected final DatagramSocket sock;
protected Sink sink;
protected final Thread thread;
private final int port;
public static final int MAX_SIZE = 15360;
/**
@@ -25,7 +29,7 @@ public class UDPSource implements Source, Runnable {
} catch (IOException e) {
throw new RuntimeException("failed to listen...", e);
}
this.port = port;
// create thread
this.thread = new I2PAppThread(this);
}
@@ -33,6 +37,7 @@ public class UDPSource implements Source, Runnable {
/** use socket from UDPSink */
public UDPSource(DatagramSocket sock) {
this.sock = sock;
port = sock.getLocalPort();
this.thread = new I2PAppThread(this);
}
@@ -60,7 +65,7 @@ public class UDPSource implements Source, Runnable {
System.arraycopy(pack.getData(), 0, nbuf, 0, nbuf.length);
// transfer to sink
this.sink.send(null, nbuf);
this.sink.send(null, port, 0, nbuf);
//System.out.print("i");
} catch(Exception e) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
@@ -71,11 +76,15 @@ public class UDPSource implements Source, Runnable {
}
}
/**
* @return the local port of the DatagramSocket we are receiving on
* @since 0.9.53
*/
public int getPort() {
return port;
}
public void stop() {
this.sock.close();
}
protected final DatagramSocket sock;
protected Sink sink;
protected final Thread thread;
}

View File

@@ -12,6 +12,7 @@ import net.i2p.client.I2PClient;
import net.i2p.client.I2PClientFactory;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.streaming.I2PSocketAddress;
import net.i2p.crypto.SigType;
import net.i2p.data.Destination;
import net.i2p.i2ptunnel.I2PTunnel;
@@ -57,7 +58,6 @@ import net.i2p.util.EventDispatcher;
private final I2PSession _session;
private final Source _i2pSource;
private final Sink _i2pSink;
private final Destination _otherDest;
/**
* @throws IllegalArgumentException if the I2CP configuration is b0rked so
@@ -103,19 +103,19 @@ import net.i2p.util.EventDispatcher;
throw new RuntimeException("failed to create session", exc);
}
// Setup the source. Always expect raw unverified datagrams.
_i2pSource = new I2PSource(_session, false, true);
// Setup the source. Handle both repliable and raw datagrams, on all ports.
_i2pSource = new I2PSource(_session, I2PSource.Protocol.BOTH);
// Setup the sink. Always send repliable datagrams.
if (destination != null && destination.length() > 0) {
_otherDest = _context.namingService().lookup(destination);
if (_otherDest == null) {
I2PSocketAddress addr = new I2PSocketAddress(destination);
if (addr.isUnresolved()) {
// unlike in I2PTunnelClient, we don't defer and retry resolution later
l.log("Could not resolve " + destination);
throw new RuntimeException("failed to create session - could not resolve " + destination);
}
_i2pSink = new I2PSink(_session, _otherDest, false);
}
_i2pSink = new I2PSink(_session, addr.getAddress(), false, addr.getPort());
} else {
_otherDest = null;
_i2pSink = new I2PSinkAnywhere(_session, false);
}
}
@@ -178,10 +178,11 @@ import net.i2p.util.EventDispatcher;
* Sink Methods
*
* @param to - ignored if configured for a single destination
* (we use the dest specified in the constructor)
* (we use the dest specified in the constructor)
* @since 0.9.53 added fromPort and toPort parameters
* @throws RuntimeException if session is closed
*/
public void send(Destination to, byte[] data) {
_i2pSink.send(to, data);
public void send(Destination to, int fromPort, int toPort, byte[] data) {
_i2pSink.send(to, fromPort, toPort, data);
}
}

View File

@@ -67,15 +67,14 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin
* badly that we cant create a socketManager
*
*/
public I2PTunnelUDPServerBase(boolean verify, File privkey, String privkeyname, Logging l,
public I2PTunnelUDPServerBase(File privkey, String privkeyname, Logging l,
EventDispatcher notifyThis, I2PTunnel tunnel) {
super("UDPServer <- " + privkeyname, notifyThis, tunnel);
_log = tunnel.getContext().logManager().getLog(I2PTunnelUDPServerBase.class);
FileInputStream fis = null;
try {
fis = new FileInputStream(privkey);
init(verify, fis, privkeyname, l);
init(fis, privkeyname, l);
} catch (IOException ioe) {
_log.error("Error starting server", ioe);
notifyEvent("openServerResult", "error");
@@ -85,7 +84,7 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin
}
}
private void init(boolean verify, InputStream privData, String privkeyname, Logging l) {
private void init(InputStream privData, String privkeyname, Logging l) {
this.l = l;
// create i2pclient
@@ -99,8 +98,8 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin
throw new RuntimeException("failed to create session", exc);
}
// Setup the source. Always expect repliable datagrams, optionally verify
_i2pSource = new I2PSource(_session, verify, false);
// Setup the source. Always expect repliable datagrams, listen on all ports.
_i2pSource = new I2PSource(_session, I2PSource.Protocol.REPLIABLE);
// Setup the sink. Always send raw datagrams.
_i2pSink = new I2PSinkAnywhere(_session, true);
@@ -188,11 +187,12 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin
* Sink Methods
*
* @param to
* @since 0.9.53 added fromPort and toPort parameters
* @throws RuntimeException if session is closed
*
*/
public void send(Destination to, byte[] data) {
_i2pSink.send(to, data);
public void send(Destination to, int fromPort, int toPort, byte[] data) {
_i2pSink.send(to, fromPort, toPort, data);
}
}