From 6484005569a98cfa2bbb5ec4d3e869b5e51f2e06 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Tue, 24 Feb 2009 23:28:53 +0000
Subject: [PATCH] I2PTunnel: First cut at SOCKS UDP (untested); also some
 streamr and UDP tweaks

---
 .../net/i2p/i2ptunnel/socks/MultiSink.java    | 35 +++++++
 .../net/i2p/i2ptunnel/socks/ReplyTracker.java | 36 +++++++
 .../net/i2p/i2ptunnel/socks/SOCKS5Server.java | 84 +++++++++++++++--
 .../net/i2p/i2ptunnel/socks/SOCKSHeader.java  | 89 ++++++++++++++++++
 .../net/i2p/i2ptunnel/socks/SOCKSUDPPort.java | 77 +++++++++++++++
 .../i2p/i2ptunnel/socks/SOCKSUDPTunnel.java   | 94 +++++++++++++++++++
 .../i2ptunnel/socks/SOCKSUDPUnwrapper.java    | 59 ++++++++++++
 .../i2p/i2ptunnel/socks/SOCKSUDPWrapper.java  | 49 ++++++++++
 .../i2p/i2ptunnel/streamr/MultiSource.java    |  4 +
 .../i2ptunnel/streamr/StreamrConsumer.java    |  3 +-
 .../i2ptunnel/streamr/StreamrProducer.java    |  5 +-
 .../src/net/i2p/i2ptunnel/udp/UDPSink.java    | 21 +++--
 .../src/net/i2p/i2ptunnel/udp/UDPSource.java  | 14 ++-
 13 files changed, 549 insertions(+), 21 deletions(-)
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSHeader.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPTunnel.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java

diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java
new file mode 100644
index 000000000..3c63758c1
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/MultiSink.java
@@ -0,0 +1,35 @@
+package net.i2p.i2ptunnel.socks;
+
+import java.util.Map;
+
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.util.Log;
+
+/**
+ * Sends to one of many Sinks
+ * @author zzz modded from streamr/MultiSource
+ */
+public class MultiSink implements Source, Sink {
+    private static final Log _log = new Log(MultiSink.class);
+
+    public MultiSink(Map cache) {
+        this.cache = cache;
+    }
+    
+    /** Don't use this - put sinks in the cache */
+    public void setSink(Sink sink) {}
+
+    public void start() {}
+
+    public void send(Destination from, byte[] data) {
+        Sink s = this.cache.get(from);
+        if (s == null) {
+            _log.error("No where to go for " + from.calculateHash().toBase64().substring(0, 6));
+            return;
+        }
+        s.send(from, data);
+    }
+    
+    private Map<Destination, Sink> cache;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java
new file mode 100644
index 000000000..f6a124c95
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/ReplyTracker.java
@@ -0,0 +1,36 @@
+package net.i2p.i2ptunnel.socks;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.util.Log;
+
+/**
+ * Track who the reply goes to
+ * @author zzz
+ */
+public class ReplyTracker implements Source, Sink {
+    private static final Log _log = new Log(MultiSink.class);
+
+    public ReplyTracker(Sink reply, Map cache) {
+        this.reply = reply;
+        this.cache = cache;
+    }
+    
+    public void setSink(Sink sink) {
+        this.sink = sink;
+    }
+
+    public void start() {}
+
+    public void send(Destination to, byte[] data) {
+        this.cache.put(to, this.reply);
+        this.sink.send(to, data);
+    }
+    
+    private Sink reply;
+    private Map<Destination, Sink> cache;
+    private Sink sink;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKS5Server.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKS5Server.java
index 38c50f266..5e5292607 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKS5Server.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKS5Server.java
@@ -13,12 +13,15 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.List;
 
 import net.i2p.I2PAppContext;
 import net.i2p.I2PException;
 import net.i2p.client.streaming.I2PSocket;
 import net.i2p.data.DataFormatException;
+import net.i2p.data.Destination;
 import net.i2p.i2ptunnel.I2PTunnel;
 import net.i2p.util.HexDump;
 import net.i2p.util.Log;
@@ -67,7 +70,8 @@ public class SOCKS5Server extends SOCKSServer {
             out = new DataOutputStream(clientSock.getOutputStream());
 
             init(in, out);
-            manageRequest(in, out);
+            if (manageRequest(in, out) == Command.UDP_ASSOCIATE)
+                handleUDP(in, out);
         } catch (IOException e) {
             throw new SOCKSException("Connection error (" + e.getMessage() + ")");
         }
@@ -111,7 +115,7 @@ public class SOCKS5Server extends SOCKSServer {
      * initialization, integrity/confidentiality encapsulations, etc)
      * has been stripped out of the input/output streams.
      */
-    private void manageRequest(DataInputStream in, DataOutputStream out) throws IOException, SOCKSException {
+    private int manageRequest(DataInputStream in, DataOutputStream out) throws IOException, SOCKSException {
         int socksVer = in.readByte() & 0xff;
         if (socksVer != SOCKS_VERSION_5) {
             _log.debug("error in SOCKS5 request (protocol != 5? wtf?)");
@@ -127,9 +131,12 @@ public class SOCKS5Server extends SOCKSServer {
             sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
             throw new SOCKSException("BIND command not supported");
         case Command.UDP_ASSOCIATE:
+          /*** if(!Boolean.valueOf(tunnel.getOptions().getProperty("i2ptunnel.socks.allowUDP")).booleanValue()) {
             _log.debug("UDP ASSOCIATE command is not supported!");
             sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
             throw new SOCKSException("UDP ASSOCIATE command not supported");
+           ***/
+            break;
         default:
             _log.debug("unknown command in request (" + Integer.toHexString(command) + ")");
             sendRequestReply(Reply.COMMAND_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
@@ -152,7 +159,8 @@ public class SOCKS5Server extends SOCKSServer {
                     connHostName += ".";
                 }
             }
-            _log.warn("IPV4 address type in request: " + connHostName + ". Is your client secure?");
+            if (command != Command.UDP_ASSOCIATE)
+                _log.warn("IPV4 address type in request: " + connHostName + ". Is your client secure?");
             break;
         case AddressType.DOMAINNAME:
             {
@@ -168,9 +176,12 @@ public class SOCKS5Server extends SOCKSServer {
             _log.debug("DOMAINNAME address type in request: " + connHostName);
             break;
         case AddressType.IPV6:
-            _log.warn("IP V6 address type in request! Is your client secure?" + " (IPv6 is not supported, anyway :-)");
-            sendRequestReply(Reply.ADDRESS_TYPE_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
-            throw new SOCKSException("IPV6 addresses not supported");
+            if (command != Command.UDP_ASSOCIATE) {
+                _log.warn("IP V6 address type in request! Is your client secure?" + " (IPv6 is not supported, anyway :-)");
+                sendRequestReply(Reply.ADDRESS_TYPE_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
+                throw new SOCKSException("IPV6 addresses not supported");
+            }
+            break;
         default:
             _log.debug("unknown address type in request (" + Integer.toHexString(command) + ")");
             sendRequestReply(Reply.ADDRESS_TYPE_NOT_SUPPORTED, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
@@ -183,6 +194,7 @@ public class SOCKS5Server extends SOCKSServer {
             sendRequestReply(Reply.CONNECTION_NOT_ALLOWED_BY_RULESET, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
             throw new SOCKSException("Invalid port number in request");
         }
+        return command;
     }
 
     protected void confirmConnection() throws SOCKSException {
@@ -293,6 +305,13 @@ public class SOCKS5Server extends SOCKSServer {
                 // Let's not due a new Dest for every request, huh?
                 //I2PSocketManager sm = I2PSocketManagerFactory.createManager();
                 //destSock = sm.connect(I2PTunnel.destFromName(connHostName), null);
+                Destination dest = I2PTunnel.destFromName(connHostName);
+                if (dest == null) {
+                    try {
+                        sendRequestReply(Reply.HOST_UNREACHABLE, AddressType.DOMAINNAME, null, "0.0.0.0", 0, out);
+                    } catch (IOException ioe) {}
+                    throw new SOCKSException("Host not found");
+                }
                 destSock = t.createI2PSocket(I2PTunnel.destFromName(connHostName));
             } else if ("localhost".equals(connHostName) || "127.0.0.1".equals(connHostName)) {
                 String err = "No localhost accesses allowed through the Socks Proxy";
@@ -358,6 +377,59 @@ public class SOCKS5Server extends SOCKSServer {
         return destSock;
     }
 
+    // This isn't really the right place for this, we can't stop the tunnel once it starts.
+    static SOCKSUDPTunnel _tunnel;
+    static Object _startLock = new Object();
+    static byte[] dummyIP = new byte[4];
+    /**
+     * We got a UDP associate command.
+     * Loop here looking for more, never return normally,
+     * or else I2PSocksTunnel will create a streaming lib connection.
+     *
+     * Do UDP Socks clients actually send more than one Associate request?
+     * RFC 1928 isn't clear... maybe not.
+     */
+    private void handleUDP(DataInputStream in, DataOutputStream out) throws SOCKSException {
+        List<Integer> ports = new ArrayList(1);
+        synchronized (_startLock) {
+            if (_tunnel == null) {
+                // tunnel options?
+                _tunnel = new SOCKSUDPTunnel(new I2PTunnel());
+                _tunnel.startRunning();
+            }
+        }
+        while (true) {
+            // Set it up. connHostName and connPort are the client's info.
+            InetAddress ia = null;
+            try {
+                ia = InetAddress.getByAddress(connHostName, dummyIP);
+            } catch (UnknownHostException uhe) {} // won't happen, no resolving done here
+            int myPort = _tunnel.add(ia, connPort);
+            ports.add(Integer.valueOf(myPort));
+            try {
+                sendRequestReply(Reply.SUCCEEDED, AddressType.IPV4, InetAddress.getByName("127.0.0.1"), null, myPort, out);
+            } catch (IOException ioe) { break; }
+
+            // wait for more ???
+            try {
+                int command = manageRequest(in, out);
+                // don't do this...
+                if (command != Command.UDP_ASSOCIATE)
+                    break;
+            } catch (IOException ioe) { break; }
+            catch (SOCKSException ioe) { break; }
+        }
+
+        for (Integer i : ports)
+            _tunnel.remove(i);
+
+        // Prevent I2PSocksTunnel from calling getDestinationI2PSocket() above
+        // to create a streaming lib connection...
+        // This isn't very elegant...
+        //
+        throw new SOCKSException("End of UDP Processing");
+    }
+
     /*
      * Some namespaces to enclose SOCKS protocol codes
      */
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSHeader.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSHeader.java
new file mode 100644
index 000000000..763b9aa10
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSHeader.java
@@ -0,0 +1,89 @@
+package net.i2p.i2ptunnel.socks;
+
+import net.i2p.data.Base32;
+import net.i2p.data.DataFormatException;
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.I2PTunnel;
+
+/**
+ * Save the SOCKS header from a datagram
+ * Ref: RFC 1928
+ *
+ * @author zzz
+ */
+public class SOCKSHeader {
+
+    /**
+     * @param data the whole packet
+     */
+    public SOCKSHeader(byte[] data) {
+        if (data.length <= 8)
+            throw new IllegalArgumentException("Header too short: " + data.length);
+        if (data[0] != 0 || data[1] != 0)
+            throw new IllegalArgumentException("Not a SOCKS datagram?");
+        if (data[2] != 0)
+            throw new IllegalArgumentException("We can't handle fragments!");
+        int headerlen = 0;
+        int addressType = data[3];
+        if (addressType == 1) {
+            // this will fail in getDestination()
+            headerlen = 6 + 4;
+        } else if (addressType == 3) {
+            headerlen = 6 + 1 + (data[4] & 0xff);
+        } else if (addressType == 4) {
+            // this will fail in getDestination()
+            // but future garlicat partial hash lookup possible?
+            headerlen = 6 + 16;
+        } else {
+            throw new IllegalArgumentException("Unknown address type: " + addressType);
+        }
+        if (data.length < headerlen)
+            throw new IllegalArgumentException("Header too short: " + data.length);
+
+        this.header = new byte[headerlen];
+        System.arraycopy(this.header, 0, data, 0, headerlen);
+    }
+    
+    private static final byte[] beg = {0,0,0,3,60};
+    private static final byte[] end = {'.','b','3','2','.','i','2','p',0,0};
+
+    /**
+     *  Make a dummy header from a dest,
+     *  for those cases where we want to receive unsolicited datagrams.
+     *  Unused for now.
+     */
+    public SOCKSHeader(Destination dest) {
+        this.header = new byte[beg.length + 52 + end.length];
+        System.arraycopy(this.header, 0, beg, 0, beg.length);
+        String b32 = Base32.encode(dest.calculateHash().getData());
+        System.arraycopy(this.header, beg.length, b32.getBytes(), 0, 52);
+        System.arraycopy(this.header, beg.length + 52, end, 0, end.length);
+    }
+    
+    public String getHost() {
+        int addressType = this.header[3];
+        if (addressType != 3)
+            return null;
+        int namelen = (this.header[4] & 0xff);
+        byte[] nameBytes = new byte[namelen];
+        System.arraycopy(nameBytes, 0, this.header, 5, namelen);
+        return new String(nameBytes);
+    }
+
+    public Destination getDestination() {
+        String name = getHost();
+        if (name == null)
+            return null;
+        try {
+            // the naming service does caching (thankfully)
+            return I2PTunnel.destFromName(name);
+        } catch (DataFormatException dfe) {}
+        return null;
+    }
+
+    public byte[] getBytes() {
+        return header;
+    }
+
+    private byte[] header;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java
new file mode 100644
index 000000000..b56c9082f
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPPort.java
@@ -0,0 +1,77 @@
+package net.i2p.i2ptunnel.socks;
+
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.udp.*;
+
+/**
+ * Implements a UDP port and Socks encapsulation / decapsulation.
+ * This is for a single port. If there is demuxing for multiple
+ * ports, it happens outside of here.
+ *
+ * TX:
+ *   UDPSource -> SOCKSUDPUnwrapper -> ReplyTracker ( -> I2PSink in SOCKSUDPTunnel)
+ *
+ * RX:
+ *   UDPSink <- SOCKSUDPWrapper ( <- MultiSink <- I2PSource in SOCKSUDPTunnel)
+ *
+ * The Unwrapper passes headers to the Wrapper through a cache.
+ * The ReplyTracker passes sinks to MultiSink through a cache.
+ *
+ * @author zzz
+ */
+public class SOCKSUDPPort implements Source, Sink {
+
+    public SOCKSUDPPort(InetAddress host, int port, Map replyMap) {
+
+        // this passes the host and port from UDPUnwrapper to UDPWrapper
+        Map cache = new ConcurrentHashMap(4);
+
+        // rcv from I2P and send to a port
+        this.wrapper = new SOCKSUDPWrapper(cache);
+        this.udpsink = new UDPSink(host, port);
+        this.wrapper.setSink(this.udpsink);
+        
+        // rcv from the same port and send to I2P
+        DatagramSocket sock = this.udpsink.getSocket();
+        this.udpsource = new UDPSource(sock);
+        this.unwrapper = new SOCKSUDPUnwrapper(cache);
+        this.udpsource.setSink(this.unwrapper);
+        this.udptracker = new ReplyTracker(this, replyMap);
+        this.unwrapper.setSink(this.udptracker);
+    }
+
+    /** Socks passes this back to the client on the TCP connection */
+    public int getPort() {
+        return this.udpsink.getPort();
+    }
+
+    public void setSink(Sink sink) {
+        this.udptracker.setSink(sink);
+    }
+
+    public void start() {
+        // the other Sources don't use start
+        this.udpsource.start();
+    }
+
+    public void stop() {
+        this.udpsink.stop();
+        this.udpsource.stop();
+    }
+
+    public void send(Destination from, byte[] data) {
+        this.wrapper.send(from, data);
+    }
+    
+
+    private UDPSink udpsink;
+    private UDPSource udpsource;
+    private SOCKSUDPWrapper wrapper;
+    private SOCKSUDPUnwrapper unwrapper;
+    private ReplyTracker udptracker;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPTunnel.java
new file mode 100644
index 000000000..0adaa1950
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPTunnel.java
@@ -0,0 +1,94 @@
+package net.i2p.i2ptunnel.socks;
+
+import java.net.InetAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.I2PTunnel;
+import net.i2p.i2ptunnel.Logging;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPClientBase;
+import net.i2p.util.EventDispatcher;
+
+/**
+ * A Datagram Tunnel that can have multiple bidirectional ports on the UDP side.
+ *
+ * TX:
+ *   (ReplyTracker in multiple SOCKSUDPPorts -> ) I2PSink
+ *
+ * RX:
+ *   (SOCKSUDPWrapper in multiple SOCKSUDPPorts <- ) MultiSink <- I2PSource
+ *
+ * The reply from a dest goes to the last SOCKSUDPPort that sent to that dest.
+ * If multiple ports are talking to a dest at the same time, this isn't
+ * going to work very well.
+ *
+ * @author zzz modded from streamr/StreamrConsumer
+ */
+public class SOCKSUDPTunnel extends I2PTunnelUDPClientBase {
+
+    /**
+     *  Set up a tunnel with no UDP side yet.
+     *  Use add() for each port.
+     */
+    public SOCKSUDPTunnel(I2PTunnel tunnel) {
+        super(null, tunnel, tunnel, tunnel);
+
+        this.ports = new ConcurrentHashMap(1);
+        this.cache = new ConcurrentHashMap(1);
+        this.demuxer = new MultiSink(this.cache);
+        setSink(this.demuxer);
+    }
+
+
+    /** @return the UDP port number */
+    public int add(InetAddress host, int port) {
+        SOCKSUDPPort sup = new SOCKSUDPPort(host, port, this.cache);
+        this.ports.put(Integer.valueOf(sup.getPort()), sup);
+        sup.setSink(this);
+        sup.start();
+        return sup.getPort();
+    }
+
+    public void remove(Integer port) {
+        SOCKSUDPPort sup = this.ports.remove(port);
+        if (sup != null)
+            sup.stop();
+        for (Iterator iter = cache.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry<Destination, SOCKSUDPPort> e = (Map.Entry) iter.next();
+            if (e.getValue() == sup)
+                iter.remove();
+        }
+    }
+    
+    public final void startRunning() {
+        super.startRunning();
+        // demuxer start() doesn't do anything
+        startall();
+    }
+    
+    public boolean close(boolean forced) {
+        stopall();
+        return super.close(forced);
+    }
+
+    /** you should really add() after startRunning() */
+    private void startall() {
+    }
+
+    private void stopall() {
+         for (SOCKSUDPPort sup : this.ports.values()) {
+              sup.stop();
+         }
+         this.ports.clear();
+         this.cache.clear();
+    }
+
+
+
+    private Map<Integer, SOCKSUDPPort> ports;
+    private Map<Destination, SOCKSUDPPort> cache;
+    private MultiSink demuxer;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java
new file mode 100644
index 000000000..2720b6fd4
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java
@@ -0,0 +1,59 @@
+package net.i2p.i2ptunnel.socks;
+
+import java.util.Map;
+
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.util.Log;
+
+/**
+ * Strip a SOCKS header off a datagram, convert it to a Destination
+ * Ref: RFC 1928
+ *
+ * @author zzz
+ */
+public class SOCKSUDPUnwrapper implements Source, Sink {
+    private static final Log _log = new Log(SOCKSUDPUnwrapper.class);
+
+    /**
+     * @param cache put headers here to pass to SOCKSUDPWrapper
+     */
+    public SOCKSUDPUnwrapper(Map<Destination, SOCKSHeader> cache) {
+        this.cache = cache;
+    }
+    
+    public void setSink(Sink sink) {
+        this.sink = sink;
+    }
+
+    public void start() {}
+
+    /**
+     *
+     */
+    public void send(Destination ignored_from, byte[] data) {
+        SOCKSHeader h;
+        try {
+            h = new SOCKSHeader(data);
+        } catch (IllegalArgumentException iae) {
+            _log.error(iae.toString());
+            return;
+        }
+        Destination dest = h.getDestination();
+        if (dest == null) {
+            // no, we aren't going to send non-i2p traffic to a UDP outproxy :)
+            _log.error("Destination not found: " + h.getHost());
+            return;
+        }
+
+        cache.put(dest, h);
+
+        int headerlen = h.getBytes().length;
+        byte unwrapped[] = new byte[data.length - headerlen];
+        System.arraycopy(unwrapped, 0, data, headerlen, unwrapped.length);
+        this.sink.send(dest, unwrapped);
+    }
+    
+    private Sink sink;
+    private Map<Destination, SOCKSHeader> cache;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java
new file mode 100644
index 000000000..4ec836157
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java
@@ -0,0 +1,49 @@
+package net.i2p.i2ptunnel.socks;
+
+import java.util.Map;
+
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.udp.*;
+
+/**
+ * Put a SOCKS header on a datagram
+ * Ref: RFC 1928
+ *
+ * @author zzz
+ */
+public class SOCKSUDPWrapper implements Source, Sink {
+    public SOCKSUDPWrapper(Map<Destination, SOCKSHeader> cache) {
+        this.cache = cache;
+    }
+    
+    public void setSink(Sink sink) {
+        this.sink = sink;
+    }
+
+    public void start() {}
+
+    /**
+     * Use the cached header, which should have the host string and port
+     *
+     */
+    public void send(Destination from, byte[] data) {
+        if (this.sink == null)
+            return;
+
+        SOCKSHeader h = cache.get(from);
+        if (h == null) {
+            // RFC 1928 says drop
+            // h = new SOCKSHeader(from);
+            return;
+        }
+
+        byte[] header = h.getBytes();
+        byte wrapped[] = new byte[header.length + data.length];
+        System.arraycopy(wrapped, 0, header, 0, header.length);
+        System.arraycopy(wrapped, header.length, data, 0, data.length);
+        this.sink.send(from, wrapped);
+    }
+    
+    private Sink sink;
+    private Map<Destination, SOCKSHeader> cache;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java
index 13d9b5520..5c5a08027 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java
@@ -27,6 +27,10 @@ public class MultiSource implements Source, Sink {
 
     public void start() {}
 
+    public void stop() {
+        this.sinks.clear();
+    }
+
     public void send(Destination ignored_from, byte[] data) {
         for(Destination dest : this.sinks) {
             this.sink.send(dest, data);
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java
index 3fc1d881b..02b443443 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java
@@ -47,6 +47,7 @@ public class StreamrConsumer extends I2PTunnelUDPClientBase {
     public boolean close(boolean forced) {
         // send unsubscribe-message
         this.pinger.stop();
+        this.sink.stop();
         return super.close(forced);
     }
 
@@ -59,6 +60,6 @@ public class StreamrConsumer extends I2PTunnelUDPClientBase {
 
     
     
-    private Sink sink;
+    private UDPSink sink;
     private Pinger pinger;
 }
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java
index d722c5f95..c3963b6a6 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java
@@ -51,7 +51,8 @@ public class StreamrProducer extends I2PTunnelUDPServerBase {
     }
     
     public boolean close(boolean forced) {
-        // need some stop() methods in UDPSource and MultiSource
+        this.server.stop();
+        this.multi.stop();
         return super.close(forced);
     }
 
@@ -65,6 +66,6 @@ public class StreamrProducer extends I2PTunnelUDPServerBase {
     
     
     private MultiSource multi;
-    private Source server;
+    private UDPSource server;
     private Sink subscriber;
 }
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java
index 15feba615..d2e8e8924 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java
@@ -34,6 +34,8 @@ public class UDPSink implements Sink {
     }
     
     public void send(Destination src, byte[] data) {
+        // if data.length > this.sock.getSendBufferSize() ...
+
         // create packet
         DatagramPacket packet = new DatagramPacket(data, data.length, this.remoteHost, this.remotePort);
         
@@ -46,17 +48,18 @@ public class UDPSink implements Sink {
         }
     }
     
+    public int getPort() {    
+        return this.sock.getLocalPort();    
+    }    
     
+    /** to pass to UDPSource constructor */
+    public DatagramSocket getSocket() {    
+        return this.sock;    
+    }    
     
-    
-    
-    
-    
-    
-    
-    
-    
-    
+    public void stop() {    
+        this.sock.close();    
+    }    
     
     
     
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java
index c54a984b0..fc1dd5bf2 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java
@@ -28,6 +28,13 @@ public class UDPSource implements Source, Runnable {
         // create thread
         this.thread = new Thread(this);
     }
+
+    /** use socket from UDPSink */
+    public UDPSource(DatagramSocket sock) {
+        this.sink = null;
+        this.sock = sock;
+        this.thread = new Thread(this);
+    }
     
     public void setSink(Sink sink) {
         this.sink = sink;
@@ -57,13 +64,14 @@ public class UDPSource implements Source, Runnable {
                 //System.out.print("i");
             } catch(Exception e) {
                 e.printStackTrace();
+                break;
             }
         }
     }
     
-    
-    
-    
+    public void stop() {    
+        this.sock.close();    
+    }    
     
     
     
-- 
GitLab