diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java index bc2af15143733bcc7856a0444e2e19ec398d1f71..bbe785cda72ae5dc653c3eda2aeb34a1968a534c 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java @@ -8,12 +8,17 @@ import net.i2p.i2ptunnel.udp.*; import net.i2p.util.Log; /** - * Sends to one of many Sinks + * Sends to one of many Sinks based on the toPort + * * @author zzz modded from streamr/MultiSource */ public class MultiSink<S extends Sink> implements Source, Sink { + private final Map<Integer, S> cache; - public MultiSink(Map<Destination, S> cache) { + /** + * @param cache map of toPort to Sink + */ + public MultiSink(Map<Integer, S> cache) { this.cache = cache; } @@ -23,18 +28,32 @@ public class MultiSink<S extends Sink> implements Source, Sink { public void start() {} /** + * Send to a single sink looked up by toPort + * * May throw RuntimeException from underlying sinks + * + * @param from passed along + * @param fromPort passed along + * @param toPort passed along + * @since 0.9.53 added fromPort and toPort parameters * @throws RuntimeException */ - public void send(Destination from, byte[] data) { - Sink s = this.cache.get(from); + public void send(Destination from, int fromPort, int toPort, byte[] data) { + Sink s = cache.get(toPort); + if (s == null && toPort == 0 && cache.size() == 1) { + // for now, do the server a favor if the toPort isn't specified + for (Sink ss : cache.values()) { + s = ss; + break; + } + } if (s == null) { Log log = I2PAppContext.getGlobalContext().logManager().getLog(MultiSink.class); - log.error("No where to go for " + from.calculateHash().toBase64().substring(0, 6)); + String frm = (from != null) ? from.toBase32() : "raw"; + if (log.shouldWarn()) + log.warn("No where to go for " + frm + " port " + fromPort + " to port " + toPort); return; } - s.send(from, data); + s.send(from, fromPort, toPort, data); } - - private Map<Destination, S> cache; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java deleted file mode 100644 index 7e99efa420face51e7344eb95e45c3a07232dee5..0000000000000000000000000000000000000000 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java +++ /dev/null @@ -1,38 +0,0 @@ -package net.i2p.i2ptunnel.socks; - -import java.util.Map; - -import net.i2p.data.Destination; -import net.i2p.i2ptunnel.udp.*; -import net.i2p.util.Log; - -/** - * Track who the reply goes to - * @author zzz - */ -public class ReplyTracker<S extends Sink> implements Source, Sink { - - public ReplyTracker(S reply, Map<Destination, S> cache) { - this.reply = reply; - this.cache = cache; - } - - public void setSink(Sink sink) { - this.sink = sink; - } - - public void start() {} - - /** - * May throw RuntimeException from underlying sink - * @throws RuntimeException - */ - public void send(Destination to, byte[] data) { - this.cache.put(to, this.reply); - this.sink.send(to, data); - } - - private S reply; - private Map<Destination, S> cache; - private Sink sink; -} diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSHeader.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSHeader.java index 25baa879cabcebdab39d5b6912f64bd95d8c5df2..4e7eada952b2dea24a4e56f1356da9ad74c86c73 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSHeader.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSHeader.java @@ -1,8 +1,11 @@ package net.i2p.i2ptunnel.socks; +import java.util.Arrays; + import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; import net.i2p.data.Destination; +import net.i2p.util.Addresses; /** * Save the SOCKS header from a datagram @@ -11,9 +14,12 @@ import net.i2p.data.Destination; * @author zzz */ public class SOCKSHeader { + private byte[] header; + private static final byte[] beg = {0,0,0,3,60}; /** * @param data the whole packet + * @throws IllegalArgumentException on bad socks format */ public SOCKSHeader(byte[] data) { if (data.length <= 8) @@ -25,13 +31,11 @@ public class SOCKSHeader { int headerlen = 0; int addressType = data[3]; if (addressType == 1) { - // this will fail in getDestination() headerlen = 6 + 4; } else if (addressType == 3) { headerlen = 6 + 1 + (data[4] & 0xff); } else if (addressType == 4) { - // this will fail in getDestination() - // but future garlicat partial hash lookup possible? + // future garlicat partial hash lookup possible? headerlen = 6 + 16; } else { throw new IllegalArgumentException("Unknown address type: " + addressType); @@ -42,33 +46,62 @@ public class SOCKSHeader { this.header = new byte[headerlen]; System.arraycopy(data, 0, this.header, 0, headerlen); } - - private static final byte[] beg = {0,0,0,3,60}; - private static final byte[] end = {0,0}; /** * Make a dummy header from a dest, * for those cases where we want to receive unsolicited datagrams. * Unused for now. + * + * @param port I2CP port 0-65535 + * @since 0.9.53 add port param */ - public SOCKSHeader(Destination dest) { - this.header = new byte[beg.length + 60 + end.length]; + public SOCKSHeader(Destination dest, int port) { + this.header = new byte[beg.length + 60 + 2]; System.arraycopy(beg, 0, this.header, 0, beg.length); String b32 = dest.toBase32(); System.arraycopy(DataHelper.getASCII(b32), 0, this.header, beg.length, 60); - System.arraycopy(end, 0, this.header, beg.length + 60, end.length); + DataHelper.toLong(header, beg.length + 60, 2, port); } + /** + * As of 0.9.53, returns IP address as a string for address types 1 and 4. + * + * @return hostname or null for unknown address type + */ public String getHost() { int addressType = this.header[3]; - if (addressType != 3) - return null; - int namelen = (this.header[4] & 0xff); - byte[] nameBytes = new byte[namelen]; - System.arraycopy(this.header, 5, nameBytes, 0, namelen); - return DataHelper.getUTF8(nameBytes); + if (addressType == 3) { + int namelen = (this.header[4] & 0xff); + return DataHelper.getUTF8(header, 5, namelen); + } + if (addressType == 1) + return Addresses.toString(Arrays.copyOfRange(header, 4, 4)); + if (addressType == 4) + return Addresses.toString(Arrays.copyOfRange(header, 4, 16)); + return null; + } + + /** + * @return 0 - 65535 + * @since 0.9.53 + */ + public int getPort() { + int namelen; + int addressType = header[3]; + if (addressType == 3) + namelen = 1 + (header[4] & 0xff); + else if (addressType == 1) + namelen = 4; + else if (addressType == 4) + namelen = 16; + else + return 0; + return (int) DataHelper.fromLong(header, 4 + namelen, 2); } + /** + * @return destination or null + */ public Destination getDestination() { String name = getHost(); if (name == null) @@ -80,6 +113,4 @@ public class SOCKSHeader { public byte[] getBytes() { return header; } - - private byte[] header; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java index 773a4c04b7a55dfe8f0e1738bb9be405121b13fc..2b33275f27e65b286e871899776d8375b6552651 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java @@ -5,6 +5,7 @@ import java.net.InetAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.Map; +import net.i2p.client.streaming.I2PSocketAddress; import net.i2p.data.Destination; import net.i2p.i2ptunnel.udp.*; @@ -14,22 +15,26 @@ import net.i2p.i2ptunnel.udp.*; * ports, it happens outside of here. * * TX: - * UDPSource -> SOCKSUDPUnwrapper -> ReplyTracker ( -> I2PSink in SOCKSUDPTunnel) + * UDPSource -> SOCKSUDPUnwrapper -> (I2PSink in SOCKSUDPTunnel) * * RX: * UDPSink <- SOCKSUDPWrapper ( <- MultiSink <- I2PSource in SOCKSUDPTunnel) * * The Unwrapper passes headers to the Wrapper through a cache. - * The ReplyTracker passes sinks to MultiSink through a cache. + * MultiSink routes packets based on toPort. * * @author zzz */ public class SOCKSUDPPort implements Source, Sink { + private final UDPSink udpsink; + private final UDPSource udpsource; + private final SOCKSUDPWrapper wrapper; + private final SOCKSUDPUnwrapper unwrapper; - public SOCKSUDPPort(InetAddress host, int port, Map<Destination, SOCKSUDPPort> replyMap) { + public SOCKSUDPPort(InetAddress host, int port, Map<Integer, SOCKSUDPPort> replyMap) { // this passes the host and port from UDPUnwrapper to UDPWrapper - Map<Destination, SOCKSHeader> cache = new ConcurrentHashMap<Destination, SOCKSHeader>(4); + Map<I2PSocketAddress, SOCKSHeader> cache = new ConcurrentHashMap<I2PSocketAddress, SOCKSHeader>(4); // rcv from I2P and send to a port this.wrapper = new SOCKSUDPWrapper(cache); @@ -41,8 +46,6 @@ public class SOCKSUDPPort implements Source, Sink { this.udpsource = new UDPSource(sock); this.unwrapper = new SOCKSUDPUnwrapper(cache); this.udpsource.setSink(this.unwrapper); - this.udptracker = new ReplyTracker<SOCKSUDPPort>(this, replyMap); - this.unwrapper.setSink(this.udptracker); } /** Socks passes this back to the client on the TCP connection */ @@ -51,7 +54,7 @@ public class SOCKSUDPPort implements Source, Sink { } public void setSink(Sink sink) { - this.udptracker.setSink(sink); + this.unwrapper.setSink(sink); } public void start() { @@ -66,16 +69,14 @@ public class SOCKSUDPPort implements Source, Sink { /** * May throw RuntimeException from underlying sink + + * @param from will be passed along + * @param fromPort will be passed along + * @param toPort will be passed along + * @since 0.9.53 added fromPort and toPort parameters * @throws RuntimeException */ - public void send(Destination from, byte[] data) { - this.wrapper.send(from, data); + public void send(Destination from, int fromPort, int toPort, byte[] data) { + this.wrapper.send(from, fromPort, toPort, data); } - - - private UDPSink udpsink; - private UDPSource udpsource; - private SOCKSUDPWrapper wrapper; - private SOCKSUDPUnwrapper unwrapper; - private ReplyTracker<SOCKSUDPPort> udptracker; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPTunnel.java index 23e7090d743980c8a2321bb382228e1bcd4b8006..1c40f1b5880a8e93bbf7b296ea779531e3592e54 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPTunnel.java @@ -4,6 +4,7 @@ import java.net.InetAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.Iterator; import java.util.Map; + import net.i2p.data.Destination; import net.i2p.i2ptunnel.I2PTunnel; import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPClientBase; @@ -12,18 +13,24 @@ import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPClientBase; * A Datagram Tunnel that can have multiple bidirectional ports on the UDP side. * * TX: - * (ReplyTracker in multiple SOCKSUDPPorts -> ) I2PSink + * (multiple SOCKSUDPPorts -> ) I2PSink * * RX: * (SOCKSUDPWrapper in multiple SOCKSUDPPorts <- ) MultiSink <- I2PSource * - * The reply from a dest goes to the last SOCKSUDPPort that sent to that dest. - * If multiple ports are talking to a dest at the same time, this isn't - * going to work very well. + * The replies must be to the same I2CP toPort as the outbound fromPort. + * If the server does not honor that, the replies will be dropped. + * + * The replies must be repliable. Raw datagrams are not supported, and would + * require a unique source port for each target. + * + * Preliminary, untested, possibly incomplete. * * @author zzz modded from streamr/StreamrConsumer */ public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase { + private final Map<Integer, SOCKSUDPPort> ports; + private final MultiSink<SOCKSUDPPort> demuxer; /** * Set up a tunnel with no UDP side yet. @@ -33,15 +40,14 @@ public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase { super(null, tunnel, tunnel, tunnel); this.ports = new ConcurrentHashMap<Integer, SOCKSUDPPort>(1); - this.cache = new ConcurrentHashMap<Destination, SOCKSUDPPort>(1); - this.demuxer = new MultiSink<SOCKSUDPPort>(this.cache); + this.demuxer = new MultiSink<SOCKSUDPPort>(ports); setSink(this.demuxer); } /** @return the UDP port number */ public int add(InetAddress host, int port) { - SOCKSUDPPort sup = new SOCKSUDPPort(host, port, this.cache); + SOCKSUDPPort sup = new SOCKSUDPPort(host, port, ports); this.ports.put(Integer.valueOf(sup.getPort()), sup); sup.setSink(this); sup.start(); @@ -52,11 +58,6 @@ public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase { SOCKSUDPPort sup = this.ports.remove(port); if (sup != null) sup.stop(); - for (Iterator<Map.Entry<Destination, SOCKSUDPPort>> iter = cache.entrySet().iterator(); iter.hasNext();) { - Map.Entry<Destination, SOCKSUDPPort> e = iter.next(); - if (e.getValue() == sup) - iter.remove(); - } } @Override @@ -81,12 +82,5 @@ public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase { sup.stop(); } this.ports.clear(); - this.cache.clear(); } - - - - private Map<Integer, SOCKSUDPPort> ports; - private Map<Destination, SOCKSUDPPort> cache; - private MultiSink<SOCKSUDPPort> demuxer; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java index 89a27cde1a7d463abde1cc4e43de27a75d3fa153..dbdcdca741ee412ad3fdaf46855c3a059d45e972 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java @@ -5,20 +5,23 @@ import java.util.Map; import net.i2p.I2PAppContext; import net.i2p.data.Destination; import net.i2p.i2ptunnel.udp.*; +import net.i2p.client.streaming.I2PSocketAddress; import net.i2p.util.Log; /** - * Strip a SOCKS header off a datagram, convert it to a Destination + * Strip a SOCKS header off a datagram, convert it to a Destination and port * Ref: RFC 1928 * * @author zzz */ public class SOCKSUDPUnwrapper implements Source, Sink { + private Sink sink; + private final Map<I2PSocketAddress, SOCKSHeader> cache; /** * @param cache put headers here to pass to SOCKSUDPWrapper */ - public SOCKSUDPUnwrapper(Map<Destination, SOCKSHeader> cache) { + public SOCKSUDPUnwrapper(Map<I2PSocketAddress, SOCKSHeader> cache) { this.cache = cache; } @@ -31,9 +34,13 @@ public class SOCKSUDPUnwrapper implements Source, Sink { /** * * May throw RuntimeException from underlying sink + * @param ignored_from ignored + * @param fromPort will be passed along + * @param toPort ignored + * @since 0.9.53 added fromPort and toPort parameters * @throws RuntimeException */ - public void send(Destination ignored_from, byte[] data) { + public void send(Destination ignored_from, int fromPort, int toPort, byte[] data) { SOCKSHeader h; try { h = new SOCKSHeader(data); @@ -50,14 +57,14 @@ public class SOCKSUDPUnwrapper implements Source, Sink { return; } - cache.put(dest, h); + cache.put(new I2PSocketAddress(dest, toPort), h); int headerlen = h.getBytes().length; byte unwrapped[] = new byte[data.length - headerlen]; System.arraycopy(data, headerlen, unwrapped, 0, unwrapped.length); - this.sink.send(dest, unwrapped); + // We pass the local DatagramSocket's port through as the I2CP from port, + // so that it will come back as the toPort in the reply, + // and MultiSink will send it to the right SOCKSUDPWrapper/SOCKSUDPPort + this.sink.send(dest, fromPort, h.getPort(), unwrapped); } - - private Sink sink; - private Map<Destination, SOCKSHeader> cache; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java index daad929f4af98ccce905cf1eb7fcd97f6ee9ace8..942fb87a11e48c44ea7fce671d6173893903ffba 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java @@ -2,8 +2,11 @@ package net.i2p.i2ptunnel.socks; import java.util.Map; +import net.i2p.I2PAppContext; +import net.i2p.client.streaming.I2PSocketAddress; import net.i2p.data.Destination; import net.i2p.i2ptunnel.udp.*; +import net.i2p.util.Log; /** * Put a SOCKS header on a datagram @@ -12,7 +15,10 @@ import net.i2p.i2ptunnel.udp.*; * @author zzz */ public class SOCKSUDPWrapper implements Source, Sink { - public SOCKSUDPWrapper(Map<Destination, SOCKSHeader> cache) { + private Sink sink; + private final Map<I2PSocketAddress, SOCKSHeader> cache; + + public SOCKSUDPWrapper(Map<I2PSocketAddress, SOCKSHeader> cache) { this.cache = cache; } @@ -26,13 +32,23 @@ public class SOCKSUDPWrapper implements Source, Sink { * Use the cached header, which should have the host string and port * * May throw RuntimeException from underlying sink + * @since 0.9.53 added fromPort and toPort parameters * @throws RuntimeException */ - public void send(Destination from, byte[] data) { + public void send(Destination from, int fromPort, int toPort, byte[] data) { if (this.sink == null) return; + if (from == null) { + // TODO to handle raw replies, SOCKSUDPWrapper would have to use a unique + // fromPort for every target or request, and we would lookup the + // destination by toPort + Log log = I2PAppContext.getGlobalContext().logManager().getLog(SOCKSUDPWrapper.class); + if (log.shouldWarn()) + log.warn("No support for raw datagrams, from port " + fromPort + " to port " + toPort); + return; + } - SOCKSHeader h = cache.get(from); + SOCKSHeader h = cache.get(new I2PSocketAddress(from, fromPort)); if (h == null) { // RFC 1928 says drop // h = new SOCKSHeader(from); @@ -43,9 +59,6 @@ public class SOCKSUDPWrapper implements Source, Sink { byte wrapped[] = new byte[header.length + data.length]; System.arraycopy(header, 0, wrapped, 0, header.length); System.arraycopy(data, 0, wrapped, header.length, data.length); - this.sink.send(from, wrapped); + this.sink.send(from, fromPort, toPort, wrapped); } - - private Sink sink; - private Map<Destination, SOCKSHeader> cache; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SocketWrapper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SocketWrapper.java index 6e4ecc565705216a15f527abf5f4ccfcb82e0920..072058708b502614ba627644d58aca618cd94a71 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SocketWrapper.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SocketWrapper.java @@ -8,6 +8,7 @@ import java.net.Socket; import java.nio.channels.SelectableChannel; import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketAddress; import net.i2p.client.streaming.I2PSocketOptions; import net.i2p.data.DataFormatException; import net.i2p.data.Destination; diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java index 0708345abc8964bc2603a4ca82b708f587e48025..1a0033d3ff5a7e5f0ed78a92a48cda9cbb2e7f0f 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java @@ -14,10 +14,12 @@ import net.i2p.util.Log; * @author zzz modded for I2PTunnel */ public class MultiSource implements Source, Sink { + private Sink sink; + private final List<MSink> sinks; private final Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); public MultiSource() { - this.sinks = new CopyOnWriteArrayList<Destination>(); + this.sinks = new CopyOnWriteArrayList<MSink>(); } public void setSink(Sink sink) { @@ -32,9 +34,10 @@ public class MultiSource implements Source, Sink { /** * May throw RuntimeException from underlying sinks + * @since 0.9.53 added fromPort and toPort parameters * @throws RuntimeException */ - public void send(Destination ignored_from, byte[] data) { + public void send(Destination ignored_from, int ignored_fromPort, int ignored_toPort, byte[] data) { if (sinks.isEmpty()) { if (log.shouldDebug()) log.debug("No subscribers to send " + data.length + " bytes to"); @@ -43,19 +46,53 @@ public class MultiSource implements Source, Sink { if (log.shouldDebug()) log.debug("Sending " + data.length + " bytes to " + sinks.size() + " subscribers"); - for(Destination dest : this.sinks) { - this.sink.send(dest, data); + for(MSink ms : this.sinks) { + this.sink.send(ms.dest, ms.fromPort, ms.toPort, data); } } - public void add(Destination sink) { - this.sinks.add(sink); + /** + * @since 0.9.53 changed to MSink parameter + */ + public void add(MSink ms) { + sinks.add(ms); } - public void remove(Destination sink) { - this.sinks.remove(sink); + /** + * @since 0.9.53 changed to MSink parameter + */ + public void remove(MSink ms) { + sinks.remove(ms); + } + + /** + * @since 0.9.53 + */ + static class MSink { + public final Destination dest; + public final int fromPort, toPort; + + public MSink(Destination dest, int fromPort, int toPort) { + this.dest = dest; this.fromPort = fromPort; this.toPort = toPort; + } + + @Override + public int hashCode() { + return dest.hashCode() | fromPort | (toPort << 16); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MSink)) + return false; + MSink s = (MSink) o; + return dest.equals(s.dest) && fromPort == s.fromPort && toPort == s.toPort; + } + + @Override + public String toString() { + return "from port " + fromPort + " to " + dest.toBase32() + ':' + toPort; + + } } - - private Sink sink; - private final List<Destination> sinks; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java index 5a5590089588db9e97d53bd4fa2615b85f5e9d50..9efe1bd30ea28789ca982d72e8bc6084e68033c6 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java @@ -10,10 +10,21 @@ import net.i2p.util.Log; * @author welterde/zzz */ public class Pinger implements Source, Runnable { - private final Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + protected Sink sink; + protected final Thread thread; + private final Object waitlock = new Object(); + protected volatile boolean running; + private final Log log; + private final int fromPort; - public Pinger() { + /** + * @param fromPort the I2CP from port + * @since 0.9.53 added ctx and fromPort params + */ + public Pinger(I2PAppContext ctx, int fromPort) { this.thread = new I2PAppThread(this); + log = ctx.logManager().getLog(getClass()); + this.fromPort = fromPort; } public void setSink(Sink sink) { @@ -22,7 +33,6 @@ public class Pinger implements Source, Runnable { public void start() { this.running = true; - //this.waitlock = new Object(); this.thread.start(); } @@ -35,9 +45,9 @@ public class Pinger implements Source, Runnable { byte[] data = new byte[1]; data[0] = 1; try { - this.sink.send(null, data); + this.sink.send(null, fromPort, 0, data); if (log.shouldDebug()) - log.debug("Sent unsubscribe"); + log.debug("Sent unsubscribe from port " + fromPort); } catch (RuntimeException re) {} } @@ -47,11 +57,10 @@ public class Pinger implements Source, Runnable { data[0] = 0; int i = 0; while(this.running) { - //System.out.print("p"); try { - this.sink.send(null, data); + this.sink.send(null, fromPort, 0, data); if (log.shouldDebug()) - log.debug("Sent subscribe"); + log.debug("Sent subscribe from port " + fromPort); } catch (RuntimeException re) { if (log.shouldWarn()) log.warn("error sending", re); @@ -71,9 +80,4 @@ public class Pinger implements Source, Runnable { } } } - - protected Sink sink; - protected final Thread thread; - private final Object waitlock = new Object(); - protected volatile boolean running; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java index 49b314b3300d52c2cd9f61f831a37a2114ed212b..f3f8e38f128d5479762df48e5b789702f7ef7e27 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java @@ -24,11 +24,13 @@ public class StreamrConsumer extends I2PTunnelUDPClientBase { super(destination, l, notifyThis, tunnel); // create udp-destination - this.sink = new UDPSink(host, port); + UDPSink udps = new UDPSink(host, port); + int localPort = udps.getPort(); + this.sink = udps; setSink(this.sink); // create pinger - this.pinger = new Pinger(); + this.pinger = new Pinger(_context, localPort); this.pinger.setSink(this); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java index cef5bf1d13b3a0c70d4f6951b5001ba165de3b78..4d8568c6cf3576a58a5c2654fdfae2db0e3879c5 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java @@ -21,8 +21,7 @@ public class StreamrProducer extends I2PTunnelUDPServerBase { public StreamrProducer(int port, File privkey, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { - // verify subscription requests - super(true, privkey, privkeyname, l, notifyThis, tunnel); + super(privkey, privkeyname, l, notifyThis, tunnel); // The broadcaster this.multi = new MultiSource(); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java index 69bcb008f2d13290b882638368da0200a401a5fe..3402c23aa0372000d071cebf684eb51db1ba72c0 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java @@ -20,7 +20,7 @@ public class Subscriber implements Sink { private final I2PAppContext ctx = I2PAppContext.getGlobalContext(); private final Log log = ctx.logManager().getLog(getClass()); - private final Map<Destination, Long> subscriptions; + private final Map<MultiSource.MSink, Long> subscriptions; private final MultiSource multi; private final SimpleTimer2.TimedEvent timer; private volatile boolean timerRunning; @@ -31,7 +31,7 @@ public class Subscriber implements Sink { public Subscriber(MultiSource multi) { this.multi = multi; // subscriptions - this.subscriptions = new ConcurrentHashMap<Destination, Long>(); + this.subscriptions = new ConcurrentHashMap<MultiSource.MSink, Long>(); timer = new Expire(); } @@ -40,44 +40,47 @@ public class Subscriber implements Sink { * * @param dest to subscribe or unsubscribe * @param data must be a single byte, 0 to subscribe, 1 to unsubscribe + * @since 0.9.53 added fromPort and toPort parameters */ - public void send(Destination dest, byte[] data) { + public void send(Destination dest, int fromPort, int toPort, byte[] data) { if(dest == null || data.length < 1) { // invalid packet if (log.shouldWarn()) - log.warn("bad subscription from " + dest); + log.warn("bad subscription from " + dest.toBase32() + ':' + fromPort); } else { - byte ctrl = data[0]; + // swap fromPort and toPort for the replies + MultiSource.MSink ms = new MultiSource.MSink(dest, toPort, fromPort); + int ctrl = data[0] & 0xff; if(ctrl == 0) { - if (this.subscriptions.put(dest, Long.valueOf(ctx.clock().now())) == null) { + if (this.subscriptions.put(ms, Long.valueOf(ctx.clock().now())) == null) { if (subscriptions.size() > MAX_SUBSCRIPTIONS) { subscriptions.remove(dest); if (log.shouldWarn()) - log.warn("Too many subscriptions, denying: " + dest.toBase32()); + log.warn("Too many subscriptions, denying: " + ms); return; } // subscribe if (log.shouldWarn()) - log.warn("Add subscription: " + dest.toBase32()); - this.multi.add(dest); + log.warn("Add subscription: " + ms); + this.multi.add(ms); if (!timerRunning) { timer.reschedule(EXPIRATION); timerRunning = true; } } else { if (log.shouldInfo()) - log.info("Continue subscription: " + dest.toBase32()); + log.info("Continue subscription: " + ms); } } else if(ctrl == 1) { // unsubscribe if (log.shouldWarn()) - log.warn("Remove subscription: " + dest.toBase32()); - if (subscriptions.remove(dest) != null) - multi.remove(dest); + log.warn("Remove subscription: " + ms); + if (subscriptions.remove(ms) != null) + multi.remove(ms); } else { // invalid packet if (log.shouldWarn()) - log.warn("bad subscription from " + dest); + log.warn("bad subscription flag " + ctrl + " from " + ms); } } } @@ -95,15 +98,15 @@ public class Subscriber implements Sink { return; } long exp = ctx.clock().now() - EXPIRATION; - for (Iterator<Map.Entry<Destination, Long>> iter = subscriptions.entrySet().iterator(); iter.hasNext(); ) { - Map.Entry<Destination, Long> e = iter.next(); + for (Iterator<Map.Entry<MultiSource.MSink, Long>> iter = subscriptions.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry<MultiSource.MSink, Long> e = iter.next(); long then = e.getValue().longValue(); if (then < exp) { - Destination dest = e.getKey(); + MultiSource.MSink ms = e.getKey(); iter.remove(); - multi.remove(dest); + multi.remove(ms); if (log.shouldWarn()) - log.warn("Expired subscription: " + dest.toBase32()); + log.warn("Expired subscription: " + ms); } } if (!subscriptions.isEmpty()) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java index 7052009269c7cc81953e9fc433e5c007c3becdf1..e7c648c4bfd24e49909e81ccae23a2b4f82d5e22 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java @@ -14,14 +14,40 @@ import net.i2p.client.datagram.I2PDatagramMaker; */ public class I2PSink implements Sink { + protected final boolean raw; + protected final I2PSession sess; + protected final Destination dest; + protected final I2PDatagramMaker maker; + /** + * @since 0.9.53 + */ + protected final int toPort; + + /** + * repliable (not raw) + */ public I2PSink(I2PSession sess, Destination dest) { this(sess, dest, false); } + /** + * @param raw false for repliable + */ public I2PSink(I2PSession sess, Destination dest, boolean raw) { + this(sess, dest, raw, I2PSession.PORT_UNSPECIFIED); + } + + /** + * @param raw false for repliable + * @param fromPort I2CP source port, 0-65535 + * @param toPort I2CP destination port, 0-65535 + * @since 0.9.53 + */ + public I2PSink(I2PSession sess, Destination dest, boolean raw, int toPort) { this.sess = sess; this.dest = dest; this.raw = raw; + this.toPort = toPort; // create maker if (raw) { @@ -34,31 +60,30 @@ public class I2PSink implements Sink { /** * @param src ignored + * @param fromPort I2CP port + * @param ign_toPort ignored + * @since 0.9.53 added fromPort and toPort parameters, breaking change, sorry * @throws RuntimeException if session is closed */ - public synchronized void send(Destination src, byte[] data) { + public synchronized void send(Destination src, int fromPort, int ign_toPort, byte[] data) { //System.out.print("w"); // create payload byte[] payload; - if(!this.raw) { + if (!this.raw) { synchronized(this.maker) { payload = this.maker.makeI2PDatagram(data); } - } else + } else { payload = data; + } // send message try { this.sess.sendMessage(this.dest, payload, (this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM), - I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + fromPort, toPort); } catch (I2PSessionException ise) { throw new RuntimeException("failed to send data", ise); } } - - protected final boolean raw; - protected final I2PSession sess; - protected final Destination dest; - protected final I2PDatagramMaker maker; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java index 9308bd379e3f691364f48a927d03b9b297902475..a97874bb16c14289e39fcfd31c3494537c569f15 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java @@ -14,6 +14,10 @@ import net.i2p.client.datagram.I2PDatagramMaker; */ public class I2PSinkAnywhere implements Sink { + protected final boolean raw; + protected final I2PSession sess; + protected final I2PDatagramMaker maker; + public I2PSinkAnywhere(I2PSession sess) { this(sess, false); } @@ -35,27 +39,35 @@ public class I2PSinkAnywhere implements Sink { * @param to - where it's going * @throws RuntimeException if session is closed */ - public synchronized void send(Destination to, byte[] data) { + public void send(Destination to, byte[] data) { + send(to, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED, data); + } + + /** + * @param to - where it's going + * @param fromPort I2CP port 0 - 65535 + * @param toPort I2CP port 0 - 65535 + * @since 0.9.53 + * @throws RuntimeException if session is closed + */ + public synchronized void send(Destination to, int fromPort, int toPort, byte[] data) { // create payload byte[] payload; if(!this.raw) { synchronized(this.maker) { payload = this.maker.makeI2PDatagram(data); } - } else + } else { payload = data; + } // send message try { this.sess.sendMessage(to, payload, (this.raw ? I2PSession.PROTO_DATAGRAM_RAW : I2PSession.PROTO_DATAGRAM), - I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + fromPort, toPort); } catch (I2PSessionException ise) { throw new RuntimeException("failed to send data", ise); } } - - protected final boolean raw; - protected final I2PSession sess; - protected final I2PDatagramMaker maker; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java index e50e1a051fc8ad6b332f121e3d1a197eb9755253..92d0dcebca62975e3aa339806bc0cfce1641098b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java @@ -1,94 +1,105 @@ package net.i2p.i2ptunnel.udp; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; - import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; -import net.i2p.client.I2PSessionListener; +import net.i2p.client.I2PSessionMuxedListener; import net.i2p.client.datagram.I2PDatagramDissector; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; /** + * Refactored in 0.9.53 to support I2CP protocols and ports * * @author welterde */ -public class I2PSource implements Source, Runnable { +public class I2PSource implements Source { + + protected final I2PSession sess; + protected Sink sink; + private final Protocol protocol; + private final int port; + private final I2PDatagramDissector diss; + private final Log log; + /** + * @since 0.9.53 + */ + public enum Protocol { REPLIABLE, RAW, BOTH } + + /** + * Handles both REPLIABLE and RAW on any port + */ public I2PSource(I2PSession sess) { - this(sess, true, false); + this(sess, Protocol.BOTH); } - public I2PSource(I2PSession sess, boolean verify) { - this(sess, verify, false); + /** + * Listen on all I2CP ports. + * No support for arbitrary protocol numbers. + * + * @param protocol REPLIABLE, RAW, or BOTH + * @since 0.9.53 + */ + public I2PSource(I2PSession sess, Protocol protocol) { + this(sess, protocol, I2PSession.PORT_ANY); } - public I2PSource(I2PSession sess, boolean verify, boolean raw) { + /** + * @param port I2CP port or I2PSession.PORT_ANY + * @param portocol REPLIABLE, RAW, or BOTH + * @since 0.9.53 + */ + public I2PSource(I2PSession sess, Protocol protocol, int port) { this.sess = sess; - this.verify = verify; - this.raw = raw; - - // create queue - this.queue = new ArrayBlockingQueue<Integer>(256); - - // create listener - this.sess.setSessionListener(new Listener()); - - // create thread - this.thread = new I2PAppThread(this); + this.protocol = protocol; + this.port = port; + diss = (protocol != Protocol.RAW) ? new I2PDatagramDissector() : null; + log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); } public void setSink(Sink sink) { this.sink = sink; } - + public void start() { - this.thread.start(); + // create listener + Listener l = new Listener(); + if (protocol != Protocol.RAW) + sess.addMuxedSessionListener(l, I2PSession.PROTO_DATAGRAM, port); + if (protocol != Protocol.REPLIABLE) + sess.addMuxedSessionListener(l, I2PSession.PROTO_DATAGRAM_RAW, port); } - public void run() { - // create dissector - I2PDatagramDissector diss = new I2PDatagramDissector(); - _running = true; - while (_running) { + protected class Listener implements I2PSessionMuxedListener { + + public void messageAvailable(I2PSession sess, int id, long size) { + throw new IllegalStateException("muxed"); + } + + /** + * @since 0.9.53 + */ + public void messageAvailable(I2PSession session, int id, long size, int proto, int fromPort, int toPort) { + if (log.shouldDebug()) + log.debug("Got " + size + " bytes, proto: " + proto + " from port: " + fromPort + " to port: " + toPort); try { - // get id - int id = this.queue.take(); - // receive message - byte[] msg = this.sess.receiveMessage(id); - - if(!this.raw) { + byte[] msg = session.receiveMessage(id); + if (proto == I2PSession.PROTO_DATAGRAM) { // load datagram into it diss.loadI2PDatagram(msg); - // now call sink - if(this.verify) - this.sink.send(diss.getSender(), diss.getPayload()); - else - this.sink.send(diss.extractSender(), diss.extractPayload()); + sink.send(diss.getSender(), fromPort, toPort, diss.getPayload()); + } else if (proto == I2PSession.PROTO_DATAGRAM_RAW) { + sink.send(null, fromPort, toPort, msg); } else { - // verify is ignored - this.sink.send(null, msg); + if (log.shouldWarn()) + log.warn("dropping message with unknown protocol " + proto); } //System.out.print("r"); } catch(Exception e) { - Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); if (log.shouldWarn()) - log.warn("error sending", e); - break; - } - } - } - - protected class Listener implements I2PSessionListener { - - public void messageAvailable(I2PSession sess, int id, long size) { - try { - queue.put(id); - } catch(Exception e) { - // ignore + log.warn("error receiving datagram", e); } } @@ -97,24 +108,11 @@ public class I2PSource implements Source, Runnable { } public void disconnected(I2PSession arg0) { - _running = false; - thread.interrupt(); } public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) { - Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); log.error(arg1, arg2); - _running = false; - thread.interrupt(); } } - - protected final I2PSession sess; - protected final BlockingQueue<Integer> queue; - protected Sink sink; - protected final Thread thread; - protected final boolean verify; - protected final boolean raw; - private volatile boolean _running; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java index b125391605682f8217ada247efc08536f174345c..1319263b6db5ffb2a5efc777696eb238a2e15842 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java @@ -8,8 +8,11 @@ import net.i2p.data.Destination; */ public interface Sink { /** - * @param src some implementations may ignore + * @param fromPort I2CP source port, 0-65535 + * @param toPort I2CP destination port, 0-65535 + * @param src some implementations may ignore, may be null in some implementations + * @since 0.9.53 added fromPort and toPort parameters, breaking change, sorry * @throws RuntimeException in some implementations */ - public void send(Destination src, byte[] data); + public void send(Destination src, int fromPort, int toPort, byte[] data); } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java index 2766b1e0776f3f551d5148a2a8cb671b99237544..ab56396aa4bb965c7b898ebdd99e814bede9f906 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java @@ -13,7 +13,13 @@ import net.i2p.data.Destination; */ public class UDPSink implements Sink { + protected final DatagramSocket sock; + protected final InetAddress remoteHost; + protected final int remotePort; + /** + * @param host where to send + * @param port where to send * @throws IllegalArgumentException on DatagramSocket IOException */ public UDPSink(InetAddress host, int port) { @@ -23,18 +29,31 @@ public class UDPSink implements Sink { } catch (IOException e) { throw new IllegalArgumentException("failed to open udp-socket", e); } - this.remoteHost = host; + this.remotePort = port; + } - // remote port + + /** + * @param socket existing socket + * @param host where to send + * @param port where to send + * @since 0.9.53 + */ + public UDPSink(DatagramSocket socket, InetAddress host, int port) { + sock = socket; + this.remoteHost = host; this.remotePort = port; } /** * @param src ignored + * @param fromPort ignored + * @param toPort ignored + * @since 0.9.53 added fromPort and toPort parameters, breaking change, sorry * @throws RuntimeException on DatagramSocket IOException */ - public void send(Destination src, byte[] data) { + public void send(Destination src, int fromPort, int toPort, byte[] data) { // if data.length > this.sock.getSendBufferSize() ... // create packet @@ -48,6 +67,9 @@ public class UDPSink implements Sink { } } + /** + * @return the local port of the DatagramSocket we are sending from + */ public int getPort() { return this.sock.getLocalPort(); } @@ -60,9 +82,4 @@ public class UDPSink implements Sink { public void stop() { this.sock.close(); } - - protected final DatagramSocket sock; - protected final InetAddress remoteHost; - protected final int remotePort; - } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java index e54394107615624c1970f5ead20cb5ec608b6aa3..8bdb3af44a2bc002e6ee08fd82487be274b3fc2f 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java @@ -13,6 +13,10 @@ import net.i2p.util.Log; * @author welterde */ public class UDPSource implements Source, Runnable { + protected final DatagramSocket sock; + protected Sink sink; + protected final Thread thread; + private final int port; public static final int MAX_SIZE = 15360; /** @@ -25,7 +29,7 @@ public class UDPSource implements Source, Runnable { } catch (IOException e) { throw new RuntimeException("failed to listen...", e); } - + this.port = port; // create thread this.thread = new I2PAppThread(this); } @@ -33,6 +37,7 @@ public class UDPSource implements Source, Runnable { /** use socket from UDPSink */ public UDPSource(DatagramSocket sock) { this.sock = sock; + port = sock.getLocalPort(); this.thread = new I2PAppThread(this); } @@ -60,7 +65,7 @@ public class UDPSource implements Source, Runnable { System.arraycopy(pack.getData(), 0, nbuf, 0, nbuf.length); // transfer to sink - this.sink.send(null, nbuf); + this.sink.send(null, port, 0, nbuf); //System.out.print("i"); } catch(Exception e) { Log log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); @@ -71,11 +76,15 @@ public class UDPSource implements Source, Runnable { } } + /** + * @return the local port of the DatagramSocket we are receiving on + * @since 0.9.53 + */ + public int getPort() { + return port; + } + public void stop() { this.sock.close(); } - - protected final DatagramSocket sock; - protected Sink sink; - protected final Thread thread; } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java index 20445aceffad40fb107366ff44330b3ea95fe723..fdd1ea120d5e72b94652e3bdb63691d8f631224b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java @@ -12,6 +12,7 @@ import net.i2p.client.I2PClient; import net.i2p.client.I2PClientFactory; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; +import net.i2p.client.streaming.I2PSocketAddress; import net.i2p.crypto.SigType; import net.i2p.data.Destination; import net.i2p.i2ptunnel.I2PTunnel; @@ -57,7 +58,6 @@ import net.i2p.util.EventDispatcher; private final I2PSession _session; private final Source _i2pSource; private final Sink _i2pSink; - private final Destination _otherDest; /** * @throws IllegalArgumentException if the I2CP configuration is b0rked so @@ -103,19 +103,19 @@ import net.i2p.util.EventDispatcher; throw new RuntimeException("failed to create session", exc); } - // Setup the source. Always expect raw unverified datagrams. - _i2pSource = new I2PSource(_session, false, true); + // Setup the source. Handle both repliable and raw datagrams, on all ports. + _i2pSource = new I2PSource(_session, I2PSource.Protocol.BOTH); // Setup the sink. Always send repliable datagrams. if (destination != null && destination.length() > 0) { - _otherDest = _context.namingService().lookup(destination); - if (_otherDest == null) { + I2PSocketAddress addr = new I2PSocketAddress(destination); + if (addr.isUnresolved()) { + // unlike in I2PTunnelClient, we don't defer and retry resolution later l.log("Could not resolve " + destination); throw new RuntimeException("failed to create session - could not resolve " + destination); - } - _i2pSink = new I2PSink(_session, _otherDest, false); + } + _i2pSink = new I2PSink(_session, addr.getAddress(), false, addr.getPort()); } else { - _otherDest = null; _i2pSink = new I2PSinkAnywhere(_session, false); } } @@ -178,10 +178,11 @@ import net.i2p.util.EventDispatcher; * Sink Methods * * @param to - ignored if configured for a single destination - * (we use the dest specified in the constructor) + * (we use the dest specified in the constructor) + * @since 0.9.53 added fromPort and toPort parameters * @throws RuntimeException if session is closed */ - public void send(Destination to, byte[] data) { - _i2pSink.send(to, data); + public void send(Destination to, int fromPort, int toPort, byte[] data) { + _i2pSink.send(to, fromPort, toPort, data); } } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java index ade6e3058bbfddd1fdf2e9e3c9ea4d7dc3f276d5..adedc95c1850db38d3fb7009034e057ededc25dd 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java @@ -67,15 +67,14 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin * badly that we cant create a socketManager * */ - - public I2PTunnelUDPServerBase(boolean verify, File privkey, String privkeyname, Logging l, + public I2PTunnelUDPServerBase(File privkey, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super("UDPServer <- " + privkeyname, notifyThis, tunnel); _log = tunnel.getContext().logManager().getLog(I2PTunnelUDPServerBase.class); FileInputStream fis = null; try { fis = new FileInputStream(privkey); - init(verify, fis, privkeyname, l); + init(fis, privkeyname, l); } catch (IOException ioe) { _log.error("Error starting server", ioe); notifyEvent("openServerResult", "error"); @@ -85,7 +84,7 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin } } - private void init(boolean verify, InputStream privData, String privkeyname, Logging l) { + private void init(InputStream privData, String privkeyname, Logging l) { this.l = l; // create i2pclient @@ -99,8 +98,8 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin throw new RuntimeException("failed to create session", exc); } - // Setup the source. Always expect repliable datagrams, optionally verify - _i2pSource = new I2PSource(_session, verify, false); + // Setup the source. Always expect repliable datagrams, listen on all ports. + _i2pSource = new I2PSource(_session, I2PSource.Protocol.REPLIABLE); // Setup the sink. Always send raw datagrams. _i2pSink = new I2PSinkAnywhere(_session, true); @@ -188,11 +187,12 @@ public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sin * Sink Methods * * @param to + * @since 0.9.53 added fromPort and toPort parameters * @throws RuntimeException if session is closed * */ - public void send(Destination to, byte[] data) { - _i2pSink.send(to, data); + public void send(Destination to, int fromPort, int toPort, byte[] data) { + _i2pSink.send(to, fromPort, toPort, data); } }