From 720aa704c4c5b8750e123045131f5ce8a00cdb97 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Mon, 23 Feb 2009 05:09:44 +0000
Subject: [PATCH] port streamr to i2ptunnel

---
 LICENSE.txt                                   |   4 +
 .../java/src/net/i2p/i2ptunnel/I2PTunnel.java |  82 +++++++
 .../net/i2p/i2ptunnel/I2PTunnelIRCServer.java |   9 -
 .../net/i2p/i2ptunnel/TunnelController.java   |  59 +++--
 .../i2p/i2ptunnel/streamr/MultiSource.java    |  60 +++++
 .../src/net/i2p/i2ptunnel/streamr/Pinger.java |  59 +++++
 .../i2ptunnel/streamr/StreamrConsumer.java    |  64 +++++
 .../i2ptunnel/streamr/StreamrProducer.java    |  70 ++++++
 .../net/i2p/i2ptunnel/streamr/Subscriber.java |  75 ++++++
 .../src/net/i2p/i2ptunnel/udp/I2PSink.java    |  70 ++++++
 .../i2p/i2ptunnel/udp/I2PSinkAnywhere.java    |  67 ++++++
 .../src/net/i2p/i2ptunnel/udp/I2PSource.java  | 123 ++++++++++
 .../java/src/net/i2p/i2ptunnel/udp/Sink.java  |  17 ++
 .../src/net/i2p/i2ptunnel/udp/Source.java     |  15 ++
 .../src/net/i2p/i2ptunnel/udp/Stream.java     |  15 ++
 .../src/net/i2p/i2ptunnel/udp/UDPSink.java    |  74 ++++++
 .../src/net/i2p/i2ptunnel/udp/UDPSource.java  |  83 +++++++
 .../udpTunnel/I2PTunnelUDPClientBase.java     | 218 ++++++++++++++++++
 .../udpTunnel/I2PTunnelUDPServerBase.java     | 212 +++++++++++++++++
 .../src/net/i2p/i2ptunnel/web/IndexBean.java  |   8 +-
 apps/i2ptunnel/jsp/editClient.jsp             |  30 ++-
 apps/i2ptunnel/jsp/editServer.jsp             |   8 +
 apps/i2ptunnel/jsp/index.jsp                  |   2 +
 23 files changed, 1375 insertions(+), 49 deletions(-)
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Source.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Stream.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java
 create mode 100644 apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java

diff --git a/LICENSE.txt b/LICENSE.txt
index 324f532c61..e937985488 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -113,6 +113,10 @@ Applications:
    See licenses/LICENSE-I2PTunnel.txt
    See licenses/LICENSE-GPLv2.txt
 
+   I2PTunnel UDP and Streamr:
+   By welterde.
+   See licenses/LICENSE-GPLv2.txt
+
    Jetty 5.1.12:
    Copyright 2000-2004 Mort Bay Consulting Pty. Ltd.
    See licenses/LICENSE-Apache1.1.txt
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java
index 18b517cd2c..dc9dfd2fcc 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java
@@ -62,6 +62,8 @@ import net.i2p.data.DataFormatException;
 import net.i2p.data.DataHelper;
 import net.i2p.data.Destination;
 import net.i2p.i2ptunnel.socks.I2PSOCKSTunnel;
+import net.i2p.i2ptunnel.streamr.StreamrConsumer;
+import net.i2p.i2ptunnel.streamr.StreamrProducer;
 import net.i2p.util.EventDispatcher;
 import net.i2p.util.EventDispatcherImpl;
 import net.i2p.util.Log;
@@ -248,6 +250,10 @@ public class I2PTunnel implements Logging, EventDispatcher {
             runSOCKSTunnel(args, l);
         } else if ("connectclient".equals(cmdname)) {
             runConnectClient(args, l);
+        } else if ("streamrclient".equals(cmdname)) {
+            runStreamrClient(args, l);
+        } else if ("streamrserver".equals(cmdname)) {
+            runStreamrServer(args, l);
         } else if ("config".equals(cmdname)) {
             runConfig(args, l);
         } else if ("listen_on".equals(cmdname)) {
@@ -800,6 +806,82 @@ public class I2PTunnel implements Logging, EventDispatcher {
         }
     }
 
+    /**
+     * Streamr client
+     *
+     * @param args {targethost, targetport, destinationString}
+     * @param l logger to receive events and output
+     */
+    public void runStreamrClient(String args[], Logging l) {
+        if (args.length == 3) {
+            InetAddress host;
+            try {
+                host = InetAddress.getByName(args[0]);
+            } catch (UnknownHostException uhe) {
+                l.log("unknown host");
+                _log.error(getPrefix() + "Error resolving " + args[0], uhe);
+                notifyEvent("streamrtunnelTaskId", Integer.valueOf(-1));
+                return;
+            }
+
+            int port = -1;
+            try {
+                port = Integer.parseInt(args[1]);
+            } catch (NumberFormatException nfe) {
+                l.log("invalid port");
+                _log.error(getPrefix() + "Port specified is not valid: " + args[0], nfe);
+                notifyEvent("streamrtunnelTaskId", Integer.valueOf(-1));
+                return;
+            }
+
+            StreamrConsumer task = new StreamrConsumer(host, port, args[2], l, (EventDispatcher) this, this);
+            task.startRunning();
+            addtask(task);
+            notifyEvent("streamrtunnelTaskId", Integer.valueOf(task.getId()));
+        } else {
+            l.log("streamrclient <host> <port> <destination>");
+            l.log("  creates a tunnel that receives streaming data.");
+            notifyEvent("streamrtunnelTaskId", Integer.valueOf(-1));
+        }
+    }
+
+    /**
+     * Streamr server
+     *
+     * @param args {port, privkeyfile}
+     * @param l logger to receive events and output
+     */
+    public void runStreamrServer(String args[], Logging l) {
+        if (args.length == 2) {
+            int port = -1;
+            try {
+                port = Integer.parseInt(args[0]);
+            } catch (NumberFormatException nfe) {
+                l.log("invalid port");
+                _log.error(getPrefix() + "Port specified is not valid: " + args[0], nfe);
+                notifyEvent("streamrtunnelTaskId", Integer.valueOf(-1));
+                return;
+            }
+
+            File privKeyFile = new File(args[1]);
+            if (!privKeyFile.canRead()) {
+                l.log("private key file does not exist");
+                _log.error(getPrefix() + "Private key file does not exist or is not readable: " + args[3]);
+                notifyEvent("serverTaskId", Integer.valueOf(-1));
+                return;
+            }
+
+            StreamrProducer task = new StreamrProducer(port, privKeyFile, args[1], l, (EventDispatcher) this, this);
+            task.startRunning();
+            addtask(task);
+            notifyEvent("streamrtunnelTaskId", Integer.valueOf(task.getId()));
+        } else {
+            l.log("streamrserver <port> <privkeyfile>");
+            l.log("  creates a tunnel that sends streaming data.");
+            notifyEvent("streamrtunnelTaskId", Integer.valueOf(-1));
+        }
+    }
+
     /**
      * Specify the i2cp host and port 
      *
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java
index 970d90ff01..aa95e526c8 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCServer.java
@@ -61,21 +61,12 @@ public class I2PTunnelIRCServer extends I2PTunnelServer implements Runnable {
      * @throws IllegalArgumentException if the I2PTunnel does not contain
      *                                  valid config to contact the router
      */
-    public I2PTunnelIRCServer(InetAddress host, int port, String privData, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) {
-        super(host, port, privData, l, notifyThis, tunnel);
-        initCloak(tunnel);
-    }
 
     public I2PTunnelIRCServer(InetAddress host, int port, File privkey, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) {
         super(host, port, privkey, privkeyname, l, notifyThis, tunnel);
         initCloak(tunnel);
     }
 
-    public I2PTunnelIRCServer(InetAddress host, int port, InputStream privData, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) {
-        super(host, port, privData, privkeyname, l, notifyThis, tunnel);
-        initCloak(tunnel);
-    }
-
     /** generate a random 32 bytes, or the hash of the passphrase */
     private void initCloak(I2PTunnel tunnel) {
         Properties opts = tunnel.getClientOptions();
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java
index 82b253985f..6c5fa4eb99 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java
@@ -134,6 +134,8 @@ public class TunnelController implements Logging {
                 _log.warn("Cannot start the tunnel - no type specified");
             return;
         }
+        setI2CPOptions();
+        setSessionOptions();
         if ("httpclient".equals(type)) {
             startHttpClient();
         } else if("ircclient".equals(type)) {
@@ -144,21 +146,26 @@ public class TunnelController implements Logging {
             startConnectClient();
         } else if ("client".equals(type)) {
             startClient();
+        } else if ("streamrclient".equals(type)) {
+            startStreamrClient();
         } else if ("server".equals(type)) {
             startServer();
         } else if ("httpserver".equals(type)) {
             startHttpServer();
         } else if ("ircserver".equals(type)) {
             startIrcServer();
+        } else if ("streamrserver".equals(type)) {
+            startStreamrServer();
         } else {
             if (_log.shouldLog(Log.ERROR))
                 _log.error("Cannot start tunnel - unknown type [" + type + "]");
+            return;
         }
+        acquire();
+        _running = true;
     }
     
     private void startHttpClient() {
-        setI2CPOptions();
-        setSessionOptions();
         setListenOn();
         String listenPort = getListenPort();
         String proxyList = getProxyList();
@@ -167,13 +174,9 @@ public class TunnelController implements Logging {
             _tunnel.runHttpClient(new String[] { listenPort, sharedClient }, this);
         else
             _tunnel.runHttpClient(new String[] { listenPort, sharedClient, proxyList }, this);
-        acquire();
-        _running = true;
     }
     
     private void startConnectClient() {
-        setI2CPOptions();
-        setSessionOptions();
         setListenOn();
         String listenPort = getListenPort();
         String proxyList = getProxyList();
@@ -182,31 +185,39 @@ public class TunnelController implements Logging {
             _tunnel.runConnectClient(new String[] { listenPort, sharedClient }, this);
         else
             _tunnel.runConnectClient(new String[] { listenPort, sharedClient, proxyList }, this);
-        acquire();
-        _running = true;
     }
     
     private void startIrcClient() {
-        setI2CPOptions();
-        setSessionOptions();
         setListenOn();
         String listenPort = getListenPort();
         String dest = getTargetDestination();
         String sharedClient = getSharedClient();
         _tunnel.runIrcClient(new String[] { listenPort, dest, sharedClient }, this);
-        acquire();
-        _running = true;
     }
     
     private void startSocksClient() {
-        setI2CPOptions();
-        setSessionOptions();
         setListenOn();
         String listenPort = getListenPort();
         String sharedClient = getSharedClient();
         _tunnel.runSOCKSTunnel(new String[] { listenPort, sharedClient }, this);
-        acquire();
-        _running = true;
+    }
+    
+    /*
+     *  Streamr client is a UDP server, use the listenPort field for targetPort
+     *  and the listenOnInterface field for the targetHost
+     */
+    private void startStreamrClient() {
+        String targetHost = getListenOnInterface();
+        String targetPort = getListenPort();
+        String dest = getTargetDestination();
+        _tunnel.runStreamrClient(new String[] { targetHost, targetPort, dest }, this);
+    }
+    
+    /** Streamr server is a UDP client, use the targetPort field for listenPort  */
+    private void startStreamrServer() {
+        String listenPort = getTargetPort();
+        String privKeyFile = getPrivKeyFile(); 
+        _tunnel.runStreamrServer(new String[] { listenPort, privKeyFile }, this);
     }
     
     /** 
@@ -242,49 +253,33 @@ public class TunnelController implements Logging {
     }
     
     private void startClient() {
-        setI2CPOptions();
-        setSessionOptions();
         setListenOn();
         String listenPort = getListenPort(); 
         String dest = getTargetDestination();
         String sharedClient = getSharedClient();
         _tunnel.runClient(new String[] { listenPort, dest, sharedClient }, this);
-        acquire();
-        _running = true;
     }
 
     private void startServer() {
-        setI2CPOptions();
-        setSessionOptions();
         String targetHost = getTargetHost(); 
         String targetPort = getTargetPort(); 
         String privKeyFile = getPrivKeyFile(); 
         _tunnel.runServer(new String[] { targetHost, targetPort, privKeyFile }, this);
-        acquire();
-        _running = true;
     }
     
     private void startHttpServer() {
-        setI2CPOptions();
-        setSessionOptions();
         String targetHost = getTargetHost(); 
         String targetPort = getTargetPort(); 
         String spoofedHost = getSpoofedHost(); 
         String privKeyFile = getPrivKeyFile(); 
         _tunnel.runHttpServer(new String[] { targetHost, targetPort, spoofedHost, privKeyFile }, this);
-        acquire();
-        _running = true;
     }
     
     private void startIrcServer() {
-        setI2CPOptions();
-        setSessionOptions();
         String targetHost = getTargetHost(); 
         String targetPort = getTargetPort(); 
         String privKeyFile = getPrivKeyFile(); 
         _tunnel.runIrcServer(new String[] { targetHost, targetPort, privKeyFile }, this);
-        acquire();
-        _running = true;
     }
     
     private void setListenOn() {
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java
new file mode 100644
index 0000000000..13d9b55202
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/MultiSource.java
@@ -0,0 +1,60 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.streamr;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.List;
+
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.udp.*;
+
+/**
+ * Sends to many Sinks
+ * @author welterde
+ * @author zzz modded for I2PTunnel
+ */
+public class MultiSource implements Source, Sink {
+    public MultiSource() {
+        this.sinks = new CopyOnWriteArrayList<Destination>();
+    }
+    
+    public void setSink(Sink sink) {
+        this.sink = sink;
+    }
+
+    public void start() {}
+
+    public void send(Destination ignored_from, byte[] data) {
+        for(Destination dest : this.sinks) {
+            this.sink.send(dest, data);
+        }
+    }
+    
+    public void add(Destination sink) {
+        this.sinks.add(sink);
+    }
+    
+    public void remove(Destination sink) {
+        this.sinks.remove(sink);
+    }
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    private Sink sink;
+    private List<Destination> sinks;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java
new file mode 100644
index 0000000000..a3a7975361
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Pinger.java
@@ -0,0 +1,59 @@
+package net.i2p.i2ptunnel.streamr;
+
+import net.i2p.i2ptunnel.udp.*;
+
+/**
+ *
+ * @author welterde/zzz
+ */
+public class Pinger implements Source, Runnable {
+    public Pinger() {
+        this.thread = new Thread(this);
+    }
+    public void setSink(Sink sink) {
+        this.sink = sink;
+    }
+    
+    public void start() {
+        this.running = true;
+        this.waitlock = new Object();
+        this.thread.start();
+    }
+    
+    public void stop() {
+        this.running = false;
+        synchronized(this.waitlock) {
+            this.waitlock.notifyAll();
+        }
+        // send unsubscribe-message
+        byte[] data = new byte[1];
+        data[0] = 1;
+        this.sink.send(null, data);
+    }
+    
+    public void run() {
+        // send subscribe-message
+        byte[] data = new byte[1];
+        data[0] = 0;
+        int i = 0;
+        while(this.running) {
+            //System.out.print("p");
+            this.sink.send(null, data);
+            synchronized(this.waitlock) {
+                int delay = 10000;
+                if (i < 5) {
+                    i++;
+                    delay = 2000;
+                }
+                try {
+                    this.waitlock.wait(delay);
+                } catch(InterruptedException ie) {}
+            }
+        }
+    }
+
+    protected Sink sink;
+    protected Thread thread;
+    protected Object waitlock;
+    protected boolean running;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java
new file mode 100644
index 0000000000..3fc1d881b9
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.streamr;
+
+import java.net.InetAddress;
+
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.I2PTunnel;
+import net.i2p.i2ptunnel.Logging;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPClientBase;
+import net.i2p.util.EventDispatcher;
+
+/**
+ * Compared to a standard I2PTunnel,
+ * this acts like a client on the I2P side (no privkey file)
+ * but a server on the UDP side (sends to a configured host/port)
+ *
+ * @author welterde
+ * @author zzz modded for I2PTunnel
+ */
+public class StreamrConsumer extends I2PTunnelUDPClientBase {
+
+    public StreamrConsumer(InetAddress host, int port, String destination,
+                           Logging l, EventDispatcher notifyThis,
+                           I2PTunnel tunnel) {
+        super(destination, l, notifyThis, tunnel);
+
+        // create udp-destination
+        this.sink = new UDPSink(host, port);
+        setSink(this.sink);
+        
+        // create pinger
+        this.pinger = new Pinger();
+        this.pinger.setSink(this);
+    }
+    
+    public final void startRunning() {
+        super.startRunning();
+        // send subscribe-message
+        this.pinger.start();
+    }
+    
+    public boolean close(boolean forced) {
+        // send unsubscribe-message
+        this.pinger.stop();
+        return super.close(forced);
+    }
+
+
+
+
+
+
+
+
+    
+    
+    private Sink sink;
+    private Pinger pinger;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java
new file mode 100644
index 0000000000..d722c5f95c
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/StreamrProducer.java
@@ -0,0 +1,70 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.streamr;
+
+// system
+import java.io.File;
+
+// i2p
+import net.i2p.client.I2PSession;
+import net.i2p.i2ptunnel.I2PTunnel;
+import net.i2p.i2ptunnel.Logging;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPServerBase;
+import net.i2p.util.EventDispatcher;
+
+/**
+ * Compared to a standard I2PTunnel,
+ * this acts like a server on the I2P side (persistent privkey file)
+ * but a client on the UDP side (receives on a configured port)
+ *
+ * @author welterde
+ * @author zzz modded for I2PTunnel
+ */
+public class StreamrProducer extends I2PTunnelUDPServerBase {
+
+    public StreamrProducer(int port,
+                           File privkey, String privkeyname, Logging l,
+                           EventDispatcher notifyThis, I2PTunnel tunnel) {
+        // verify subscription requests
+        super(true, privkey, privkeyname, l, notifyThis, tunnel);
+        
+        // The broadcaster
+        this.multi = new MultiSource();
+        this.multi.setSink(this);
+
+        // The listener
+        this.subscriber = new Subscriber(this.multi);
+        setSink(this.subscriber);
+
+        // now start udp-server
+        this.server = new UDPSource(port);
+        this.server.setSink(this.multi);
+    }
+    
+    public final void startRunning() {
+        super.startRunning();
+        this.server.start();
+    }
+    
+    public boolean close(boolean forced) {
+        // need some stop() methods in UDPSource and MultiSource
+        return super.close(forced);
+    }
+
+
+    
+    
+    
+    
+    
+    
+    
+    
+    private MultiSource multi;
+    private Source server;
+    private Sink subscriber;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java
new file mode 100644
index 0000000000..97abdb8890
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/streamr/Subscriber.java
@@ -0,0 +1,75 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.streamr;
+
+// system
+import java.io.File;
+import java.util.Set;
+
+// i2p
+import net.i2p.client.I2PSession;
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.I2PTunnel;
+import net.i2p.i2ptunnel.Logging;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.i2ptunnel.udpTunnel.I2PTunnelUDPServerBase;
+import net.i2p.util.EventDispatcher;
+import net.i2p.util.ConcurrentHashSet;
+
+/**
+ * server-mode
+ * @author welterde
+ * @author zzz modded from Producer for I2PTunnel
+ */
+public class Subscriber implements Sink {
+
+    public Subscriber(MultiSource multi) {
+        this.multi = multi;
+        // subscriptions
+        this.subscriptions = new ConcurrentHashSet<Destination>();
+    }
+
+    public void send(Destination dest, byte[] data) {
+        if(dest == null || data.length < 1) {
+            // invalid packet
+            // TODO: write to log
+        } else {
+            byte ctrl = data[0];
+            if(ctrl == 0) {
+                if (!this.subscriptions.contains(dest)) {
+                    // subscribe
+                    System.out.println("Add subscription: " + dest.toBase64().substring(0,4));
+                    this.subscriptions.add(dest);
+                    this.multi.add(dest);
+                } // else already subscribed
+            } else if(ctrl == 1) {
+                // unsubscribe
+                System.out.println("Remove subscription: " + dest.toBase64().substring(0,4));
+                boolean removed = this.subscriptions.remove(dest);
+                if(removed)
+                    multi.remove(dest);
+            } else {
+                // invalid packet
+                // TODO: write to log
+            }
+        }
+    }
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    private I2PSession sess;
+    private Source listener;
+    private Set<Destination> subscriptions;
+    private MultiSource multi;
+    private Source server;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java
new file mode 100644
index 0000000000..3cbccf139e
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSink.java
@@ -0,0 +1,70 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.udp;
+
+// i2p
+import net.i2p.client.I2PSession;
+import net.i2p.client.I2PSessionException;
+import net.i2p.data.Destination;
+import net.i2p.client.datagram.I2PDatagramMaker;
+
+/**
+ * Producer
+ *
+ * This sends to a fixed destination specified in the constructor
+ *
+ * @author welterde
+ */
+public class I2PSink implements Sink {
+    public I2PSink(I2PSession sess, Destination dest) {
+        this(sess, dest, false);
+    }
+    public I2PSink(I2PSession sess, Destination dest, boolean raw) {
+        this.sess = sess;
+        this.dest = dest;
+        this.raw = raw;
+        
+        // create maker
+        if (!raw)
+            this.maker = new I2PDatagramMaker(this.sess);
+    }
+    
+    /** @param src ignored */
+    public synchronized void send(Destination src, byte[] data) {
+        //System.out.print("w");
+        // create payload
+        byte[] payload;
+        if(!this.raw)
+            payload = this.maker.makeI2PDatagram(data);
+        else
+            payload = data;
+        
+        // send message
+        try {
+            this.sess.sendMessage(this.dest, payload);
+        } catch(I2PSessionException exc) {
+            // TODO: handle better
+            exc.printStackTrace();
+        }
+    }
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    protected boolean raw;
+    protected I2PSession sess;
+    protected Destination dest;
+    protected I2PDatagramMaker maker;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java
new file mode 100644
index 0000000000..09385d46fe
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSinkAnywhere.java
@@ -0,0 +1,67 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.udp;
+
+// i2p
+import net.i2p.client.I2PSession;
+import net.i2p.client.I2PSessionException;
+import net.i2p.data.Destination;
+import net.i2p.client.datagram.I2PDatagramMaker;
+
+/**
+ * Producer
+ *
+ * This sends to any destination specified in send()
+ *
+ * @author zzz modded from I2PSink by welterde
+ */
+public class I2PSinkAnywhere implements Sink {
+    public I2PSinkAnywhere(I2PSession sess) {
+        this(sess, false);
+    }
+    public I2PSinkAnywhere(I2PSession sess, boolean raw) {
+        this.sess = sess;
+        this.raw = raw;
+        
+        // create maker
+        this.maker = new I2PDatagramMaker(this.sess);
+    }
+    
+    /** @param to - where it's going */
+    public synchronized void send(Destination to, byte[] data) {
+        // create payload
+        byte[] payload;
+        if(!this.raw)
+            payload = this.maker.makeI2PDatagram(data);
+        else
+            payload = data;
+        
+        // send message
+        try {
+            this.sess.sendMessage(to, payload);
+        } catch(I2PSessionException exc) {
+            // TODO: handle better
+            exc.printStackTrace();
+        }
+    }
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    protected boolean raw;
+    protected I2PSession sess;
+    protected Destination dest;
+    protected I2PDatagramMaker maker;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java
new file mode 100644
index 0000000000..0b54747772
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/I2PSource.java
@@ -0,0 +1,123 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.udp;
+
+// system
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+// i2p
+import net.i2p.client.I2PSession;
+import net.i2p.client.I2PSessionListener;
+import net.i2p.client.datagram.I2PDatagramDissector;
+
+/**
+ *
+ * @author welterde
+ */
+public class I2PSource implements Source, Runnable {
+    public I2PSource(I2PSession sess) {
+        this(sess, true, false);
+    }
+    public I2PSource(I2PSession sess, boolean verify) {
+        this(sess, verify, false);
+    }
+    public I2PSource(I2PSession sess, boolean verify, boolean raw) {
+        this.sess = sess;
+        this.sink = null;
+        this.verify = verify;
+        this.raw = raw;
+        
+        // create queue
+        this.queue = new ArrayBlockingQueue(256);
+        
+        // create listener
+        this.sess.setSessionListener(new Listener());
+        
+        // create thread
+        this.thread = new Thread(this);
+    }
+    
+    public void setSink(Sink sink) {
+        this.sink = sink;
+    }
+    
+    public void start() {
+        this.thread.start();
+    }
+    
+    public void run() {
+        // create dissector
+        I2PDatagramDissector diss = new I2PDatagramDissector();
+        while(true) {
+            try {
+                // get id
+                int id = this.queue.take();
+                
+                // receive message
+                byte[] msg = this.sess.receiveMessage(id);
+                
+                if(!this.raw) {
+                    // load datagram into it
+                    diss.loadI2PDatagram(msg);
+                    
+                    // now call sink
+                    if(this.verify)
+                        this.sink.send(diss.getSender(), diss.getPayload());
+                    else
+                        this.sink.send(diss.extractSender(), diss.extractPayload());
+                } else {
+                    // verify is ignored
+                    this.sink.send(null, msg);
+                }
+                //System.out.print("r");
+            } catch(Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    
+    
+    
+    
+    
+    
+    protected class Listener implements I2PSessionListener {
+
+        public void messageAvailable(I2PSession sess, int id, long size) {
+            try {
+                queue.put(id);
+            } catch(Exception e) {
+                // ignore
+            }
+        }
+
+        public void reportAbuse(I2PSession arg0, int arg1) {
+            // ignore
+        }
+
+        public void disconnected(I2PSession arg0) {
+            // ignore
+        }
+
+        public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) {
+            // ignore
+        }
+        
+    }
+    
+    
+    
+    
+    
+    
+    protected I2PSession sess;
+    protected BlockingQueue<Integer> queue;
+    protected Sink sink;
+    protected Thread thread;
+    protected boolean verify;
+    protected boolean raw;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java
new file mode 100644
index 0000000000..49e3e47a3e
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Sink.java
@@ -0,0 +1,17 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.udp;
+
+// i2p
+import net.i2p.data.Destination;
+
+/**
+ *
+ * @author welterde
+ */
+public interface Sink {
+    public void send(Destination src, byte[] data);
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Source.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Source.java
new file mode 100644
index 0000000000..f65d03b196
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Source.java
@@ -0,0 +1,15 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.udp;
+
+/**
+ *
+ * @author welterde
+ */
+public interface Source {
+    public void setSink(Sink sink);
+    public void start();
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Stream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Stream.java
new file mode 100644
index 0000000000..b8b57e696c
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/Stream.java
@@ -0,0 +1,15 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.udp;
+
+/**
+ *
+ * @author welterde
+ */
+public interface Stream {
+    public void start();
+    public void stop();
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java
new file mode 100644
index 0000000000..15feba6156
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSink.java
@@ -0,0 +1,74 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.udp;
+
+// system
+import java.net.DatagramSocket;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+
+// i2p
+import net.i2p.data.Destination;
+
+/**
+ *
+ * @author welterde
+ */
+public class UDPSink implements Sink {
+    public UDPSink(InetAddress host, int port) {
+        // create socket
+        try {
+            this.sock = new DatagramSocket();
+        } catch(Exception e) {
+            // TODO: fail better
+            throw new RuntimeException("failed to open udp-socket", e);
+        }
+        
+        this.remoteHost = host;
+        
+        // remote port
+        this.remotePort = port;
+    }
+    
+    public void send(Destination src, byte[] data) {
+        // create packet
+        DatagramPacket packet = new DatagramPacket(data, data.length, this.remoteHost, this.remotePort);
+        
+        // send packet
+        try {
+            this.sock.send(packet);
+        } catch(Exception e) {
+            // TODO: fail a bit better
+            e.printStackTrace();
+        }
+    }
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    protected DatagramSocket sock;
+    protected InetAddress remoteHost;
+    protected int remotePort;
+
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java
new file mode 100644
index 0000000000..c54a984b0a
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udp/UDPSource.java
@@ -0,0 +1,83 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package net.i2p.i2ptunnel.udp;
+
+// system
+import java.net.DatagramSocket;
+import java.net.DatagramPacket;
+
+/**
+ *
+ * @author welterde
+ */
+public class UDPSource implements Source, Runnable {
+    public static final int MAX_SIZE = 15360;
+    public UDPSource(int port) {
+        this.sink = null;
+        
+        // create udp-socket
+        try {
+            this.sock = new DatagramSocket(port);
+        } catch(Exception e) {
+            throw new RuntimeException("failed to listen...", e);
+        }
+        
+        // create thread
+        this.thread = new Thread(this);
+    }
+    
+    public void setSink(Sink sink) {
+        this.sink = sink;
+    }
+    
+    public void start() {
+        this.thread.start();
+    }
+    
+    public void run() {
+        // create packet
+        byte[] buf = new byte[MAX_SIZE];
+        DatagramPacket pack = new DatagramPacket(buf, buf.length);
+        while(true) {
+            try {
+                // receive...
+                this.sock.receive(pack);
+                
+                // create new data array
+                byte[] nbuf = new byte[pack.getLength()];
+                
+                // copy over
+                System.arraycopy(pack.getData(), 0, nbuf, 0, nbuf.length);
+                
+                // transfer to sink
+                this.sink.send(null, nbuf);
+                //System.out.print("i");
+            } catch(Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    
+    protected DatagramSocket sock;
+    protected Sink sink;
+    protected Thread thread;
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java
new file mode 100644
index 0000000000..0123be6eab
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPClientBase.java
@@ -0,0 +1,218 @@
+/* I2PTunnel is GPL'ed (with the exception mentioned in I2PTunnel.java)
+ * (c) 2003 - 2004 mihi
+ */
+package net.i2p.i2ptunnel.udpTunnel;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.NoRouteToHostException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import net.i2p.I2PAppContext;
+import net.i2p.I2PException;
+import net.i2p.client.I2PClient;
+import net.i2p.client.I2PClientFactory;
+import net.i2p.client.I2PSession;
+import net.i2p.client.I2PSessionException;
+import net.i2p.data.DataFormatException;
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.I2PTunnel;
+import net.i2p.i2ptunnel.I2PTunnelTask;
+import net.i2p.i2ptunnel.Logging;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.util.EventDispatcher;
+import net.i2p.util.I2PThread;
+import net.i2p.util.Log;
+
+public abstract class I2PTunnelUDPClientBase extends I2PTunnelTask implements Source, Sink {
+
+    private static final Log _log = new Log(I2PTunnelUDPClientBase.class);
+    protected I2PAppContext _context;
+    protected Logging l;
+
+    static final long DEFAULT_CONNECT_TIMEOUT = 60 * 1000;
+
+    private static volatile long __clientId = 0;
+    protected long _clientId;
+
+    protected Destination dest = null;
+
+    private boolean listenerReady = false;
+
+    private ServerSocket ss;
+
+    private Object startLock = new Object();
+    private boolean startRunning = false;
+
+    private byte[] pubkey;
+
+    private String handlerName;
+
+    private Object conLock = new Object();
+    
+    /** How many connections will we allow to be in the process of being built at once? */
+    private int _numConnectionBuilders;
+    /** How long will we allow sockets to sit in the _waitingSockets map before killing them? */
+    private int _maxWaitTime;
+    
+    private I2PSession _session;
+    private Source _i2pSource;
+    private Sink _i2pSink;
+    private Destination _otherDest;
+
+    /**
+     * Base client class that sets up an I2P Datagram client destination.
+     * The UDP side is not implemented here, as there are at least
+     * two possibilities:
+     *
+     * 1) UDP side is a "server"
+     *    Example: Streamr Consumer
+     *    - Configure a destination host and port
+     *    - External application sends no data
+     *    - Extending class must have a constructor with host and port arguments
+     *
+     * 2) UDP side is a client/server
+     *    Example: SOCKS UDP (DNS requests?)
+     *    - configure an inbound port and a destination host and port
+     *    - External application sends and receives data
+     *    - Extending class must have a constructor with host and 2 port arguments
+     *
+     * So the implementing class must create a UDPSource and/or UDPSink,
+     * and must call setSink().
+     *
+     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
+     *                                  badly that we cant create a socketManager
+     *
+     * @author zzz with portions from welterde's streamr
+     */
+    public I2PTunnelUDPClientBase(String destination, Logging l, EventDispatcher notifyThis,
+                                  I2PTunnel tunnel) throws IllegalArgumentException {
+        super("UDPServer", notifyThis, tunnel);
+        _clientId = ++__clientId;
+        this.l = l;
+
+        _context = tunnel.getContext();
+
+        tunnel.getClientOptions().setProperty("i2cp.dontPublishLeaseSet", "true");
+        
+        // create i2pclient and destination
+        I2PClient client = I2PClientFactory.createClient();
+        Destination dest;
+        byte[] key;
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream(512);
+            dest = client.createDestination(out);
+            key = out.toByteArray();
+        } catch(Exception exc) {
+            throw new RuntimeException("failed to create i2p-destination", exc);
+        }
+
+        // create a session
+        try {
+            ByteArrayInputStream in = new ByteArrayInputStream(key);
+            _session = client.createSession(in, tunnel.getClientOptions());
+        } catch(Exception exc) {
+            throw new RuntimeException("failed to create session", exc);
+        }
+
+        // Setup the source. Always expect raw unverified datagrams.
+        _i2pSource = new I2PSource(_session, false, true);
+
+        // Setup the sink. Always send repliable datagrams.
+        if (destination != null && destination.length() > 0) {
+            try {
+                _otherDest = I2PTunnel.destFromName(destination);
+            } catch (DataFormatException dfe) {}
+            if (_otherDest == null) {
+                l.log("Could not resolve " + destination);
+                throw new RuntimeException("failed to create session - could not resolve " + destination);
+             }
+            _i2pSink = new I2PSink(_session, _otherDest, false);
+        } else {
+            _i2pSink = new I2PSinkAnywhere(_session, false);
+        }   
+
+        //configurePool(tunnel);
+        
+    }
+    
+    /**
+     * Actually start working on outgoing connections.
+     * Classes should override to start UDP side as well.
+     *
+     * Not specified in I2PTunnelTask but used in both
+     * I2PTunnelClientBase and I2PTunnelServer so let's
+     * implement it here too.
+     */
+    public void startRunning() {
+        synchronized (startLock) {
+            try {
+                _session.connect();
+            } catch(I2PSessionException exc) {
+                throw new RuntimeException("failed to connect session", exc);
+            }
+            start();
+            startRunning = true;
+            startLock.notify();
+        }
+
+        if (open && listenerReady) {
+            notifyEvent("openBaseClientResult", "ok");
+        } else {
+            l.log("Error listening - please see the logs!");
+            notifyEvent("openBaseClientResult", "error");
+        }
+    }
+
+    /**
+     * I2PTunnelTask Methods
+     *
+     * Classes should override to close UDP side as well
+     */
+    public boolean close(boolean forced) {
+        if (!open) return true;
+        if (_session != null) {
+            try {
+                _session.destroySession();
+            } catch (I2PSessionException ise) {}
+        }
+        l.log("Closing client " + toString());
+        return true;
+    }
+
+    /**
+     *  Source Methods
+     *
+     *  Sets the receiver of the UDP datagrams from I2P
+     *  Subclass must call this after constructor
+     *  and before start()
+     */
+    public void setSink(Sink s) {
+        _i2pSource.setSink(s);
+    }
+
+    /** start the source */
+    public void start() {
+        _i2pSource.start();
+    }
+
+    /**
+     *  Sink Methods
+     *
+     * @param to - ignored if configured for a single destination
+     * (we use the dest specified in the constructor)
+     */
+    public void send(Destination to, byte[] data) {
+        _i2pSink.send(to, data);
+    }
+}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java
new file mode 100644
index 0000000000..fe129fb131
--- /dev/null
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/udpTunnel/I2PTunnelUDPServerBase.java
@@ -0,0 +1,212 @@
+/* I2PTunnel is GPL'ed (with the exception mentioned in I2PTunnel.java)
+ * (c) 2003 - 2004 mihi
+ */
+package net.i2p.i2ptunnel.udpTunnel;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import net.i2p.I2PAppContext;
+import net.i2p.I2PException;
+import net.i2p.client.I2PClient;
+import net.i2p.client.I2PClientFactory;
+import net.i2p.client.I2PSession;
+import net.i2p.client.I2PSessionException;
+import net.i2p.data.Base64;
+import net.i2p.data.Destination;
+import net.i2p.i2ptunnel.I2PTunnel;
+import net.i2p.i2ptunnel.I2PTunnelTask;
+import net.i2p.i2ptunnel.Logging;
+import net.i2p.i2ptunnel.udp.*;
+import net.i2p.util.EventDispatcher;
+import net.i2p.util.I2PThread;
+import net.i2p.util.Log;
+
+public class I2PTunnelUDPServerBase extends I2PTunnelTask implements Source, Sink {
+
+    private final static Log _log = new Log(I2PTunnelUDPServerBase.class);
+
+    private Object lock = new Object();
+    protected Object slock = new Object();
+
+    private static volatile long __serverId = 0;
+
+    private Logging l;
+
+    private static final long DEFAULT_READ_TIMEOUT = -1; // 3*60*1000;
+    /** default timeout to 3 minutes - override if desired */
+    protected long readTimeout = DEFAULT_READ_TIMEOUT;
+
+    private I2PSession _session;
+    private Source _i2pSource;
+    private Sink _i2pSink;
+
+    /**
+     * Base client class that sets up an I2P Datagram server destination.
+     * The UDP side is not implemented here, as there are at least
+     * two possibilities:
+     *
+     * 1) UDP side is a "client"
+     *    Example: Streamr Producer
+     *    - configure an inbound port
+     *    - External application receives no data
+     *    - Extending class must have a constructor with a port argument
+     *
+     * 2) UDP side is a client/server
+     *    Example: DNS
+     *    - configure an inbound port and a destination host and port
+     *    - External application sends and receives data
+     *    - Extending class must have a constructor with host and 2 port arguments
+     *
+     * So the implementing class must create a UDPSource and/or UDPSink,
+     * and must call setSink().
+     *
+     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
+     *                                  badly that we cant create a socketManager
+     *
+     * @author zzz with portions from welterde's streamr
+     */
+
+    public I2PTunnelUDPServerBase(boolean verify, File privkey, String privkeyname, Logging l,
+                           EventDispatcher notifyThis, I2PTunnel tunnel) {
+        super("UDPServer <- " + privkeyname, notifyThis, tunnel);
+        FileInputStream fis = null;
+        try {
+            fis = new FileInputStream(privkey);
+            init(verify, fis, privkeyname, l);
+        } catch (IOException ioe) {
+            _log.error("Error starting server", ioe);
+            notifyEvent("openServerResult", "error");
+        } finally {
+            if (fis != null)
+                try { fis.close(); } catch (IOException ioe) {}
+        }
+    }
+
+    private void init(boolean verify, InputStream privData, String privkeyname, Logging l) {
+        this.l = l;
+        int portNum = 7654;
+        if (getTunnel().port != null) {
+            try {
+                portNum = Integer.parseInt(getTunnel().port);
+            } catch (NumberFormatException nfe) {
+                _log.log(Log.CRIT, "Invalid port specified [" + getTunnel().port + "], reverting to " + portNum);
+            }
+        }
+
+        // create i2pclient
+        I2PClient client = I2PClientFactory.createClient();
+
+        try {
+            _session = client.createSession(privData, getTunnel().getClientOptions());
+        } catch(I2PSessionException exc) {
+            throw new RuntimeException("failed to create session", exc);
+        }
+
+        // Setup the source. Always expect repliable datagrams, optionally verify
+        _i2pSource = new I2PSource(_session, verify, false);
+
+        // Setup the sink. Always send raw datagrams.
+        _i2pSink = new I2PSinkAnywhere(_session, true);
+    }
+    
+    /**
+     * Classes should override to start UDP side as well.
+     *
+     * Not specified in I2PTunnelTask but used in both
+     * I2PTunnelClientBase and I2PTunnelServer so let's
+     * implement it here too.
+     */
+    public void startRunning() {
+        //synchronized (startLock) {
+            try {
+                _session.connect();
+            } catch(I2PSessionException exc) {
+                throw new RuntimeException("failed to connect session", exc);
+            }
+            start();
+        //}
+
+        l.log("Ready!");
+        notifyEvent("openServerResult", "ok");
+        open = true;
+    }
+
+    /**
+     * Set the read idle timeout for newly-created connections (in
+     * milliseconds).  After this time expires without data being reached from
+     * the I2P network, the connection itself will be closed.
+     */
+    public void setReadTimeout(long ms) {
+        readTimeout = ms;
+    }
+    
+    /**
+     * Get the read idle timeout for newly-created connections (in
+     * milliseconds).
+     *
+     * @return The read timeout used for connections
+     */
+    public long getReadTimeout() {
+        return readTimeout;
+    }
+
+    /**
+     * I2PTunnelTask Methods
+     *
+     * Classes should override to close UDP side as well
+     */
+    public boolean close(boolean forced) {
+        if (!open) return true;
+        synchronized (lock) {
+            l.log("Shutting down server " + toString());
+            try {
+                if (_session != null) {
+                    _session.destroySession();
+                }
+            } catch (I2PException ex) {
+                _log.error("Error destroying the session", ex);
+            }
+            l.log("Server shut down.");
+            open = false;
+            return true;
+        }
+    }
+
+    /**
+     *  Source Methods
+     *
+     *  Sets the receiver of the UDP datagrams from I2P
+     *  Subclass must call this after constructor
+     *  and before start()
+     */
+    public void setSink(Sink s) {
+        _i2pSource.setSink(s);
+    }
+
+    /** start the source */
+    public void start() {
+        _i2pSource.start();
+    }
+
+    /**
+     *  Sink Methods
+     *
+     * @param to
+     *
+     */
+    public void send(Destination to, byte[] data) {
+        _i2pSink.send(to, data);
+    }
+}
+
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java
index 045ea5e583..6fcd9f2fe3 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java
@@ -351,6 +351,7 @@ public class IndexBean {
         		("httpclient".equals(type)) ||
         		("sockstunnel".equals(type)) ||
         		("connectclient".equals(type)) ||
+        		("streamrclient".equals(type)) ||
         		("ircclient".equals(type)));
     }
     
@@ -387,6 +388,8 @@ public class IndexBean {
         else if ("sockstunnel".equals(internalType)) return "SOCKS 4/4a/5 proxy";
         else if ("connectclient".equals(internalType)) return "CONNECT/SSL/HTTPS proxy";
         else if ("ircserver".equals(internalType)) return "IRC server";
+        else if ("streamrclient".equals(internalType)) return "Streamr client";
+        else if ("streamrserver".equals(internalType)) return "Streamr server";
         else return internalType;
     }
     
@@ -434,7 +437,8 @@ public class IndexBean {
         TunnelController tun = getController(tunnel);
         if (tun == null) return "";
         String rv;
-        if ("client".equals(tun.getType())||"ircclient".equals(tun.getType()))
+        if ("client".equals(tun.getType()) || "ircclient".equals(tun.getType()) ||
+            "streamrclient".equals(tun.getType()))
             rv = tun.getTargetDestination();
         else
             rv = tun.getProxyList();
@@ -798,7 +802,7 @@ public class IndexBean {
         if ("httpclient".equals(_type) || "connectclient".equals(_type)) {
             if (_proxyList != null)
                 config.setProperty("proxyList", _proxyList);
-        } else if ("ircclient".equals(_type) || "client".equals(_type)) {
+        } else if ("ircclient".equals(_type) || "client".equals(_type) || "streamrclient".equals(_type)) {
             if (_targetDestination != null)
                 config.setProperty("targetDestination", _targetDestination);
         } else if ("httpserver".equals(_type)) {
diff --git a/apps/i2ptunnel/jsp/editClient.jsp b/apps/i2ptunnel/jsp/editClient.jsp
index 3e4c3ecd80..6a796eb7df 100644
--- a/apps/i2ptunnel/jsp/editClient.jsp
+++ b/apps/i2ptunnel/jsp/editClient.jsp
@@ -75,7 +75,11 @@
             </div>
                  
             <div id="accessField" class="rowItem">
+         <% if ("streamrclient".equals(tunnelType)) { %>
+                <label>Target:</label>
+         <% } else { %>
                 <label>Access Point:</label>
+         <% } %>
             </div>
             <div id="portField" class="rowItem">
                 <label for="port" accesskey="P">
@@ -87,14 +91,17 @@
                 </label>
                 <input type="text" size="6" maxlength="5" id="port" name="port" title="Access Port Number" value="<%=editBean.getClientPort(curTunnel)%>" class="freetext" />               
             </div>
+         <% String otherInterface = "";
+            String clientInterface = editBean.getClientInterface(curTunnel);
+            if ("streamrclient".equals(tunnelType)) {   
+                otherInterface = clientInterface;
+            } else { %>
             <div id="reachField" class="rowItem">
                 <label for="reachableBy" accesskey="r">
                     <span class="accessKey">R</span>eachable by:
                 </label>
                 <select id="reachableBy" name="reachableBy" title="Valid IP for Client Access" class="selectbox">
-                    <% String clientInterface = editBean.getClientInterface(curTunnel);
-                       String otherInterface = "";
-                       if (!("127.0.0.1".equals(clientInterface)) &&
+                  <%   if (!("127.0.0.1".equals(clientInterface)) &&
                            !("0.0.0.0".equals(clientInterface)) &&
                             (clientInterface != null) &&
                             (clientInterface.trim().length() > 0)) {
@@ -105,9 +112,18 @@
                     <option value="other"<%=(!("".equals(otherInterface))    ? " selected=\"selected\"" : "")%>>LAN Hosts (Please specify your LAN address)</option>
                 </select>                
             </div> 
+         <% } // streamrclient %>
             <div id="otherField" class="rowItem">
                 <label for="reachableByOther" accesskey="O">
+         <% if ("streamrclient".equals(tunnelType)) { %>
+                    Host:
+                    <% String vvv = otherInterface;
+                       if (vvv == null || "".equals(vvv.trim()))
+                           out.write(" <font color=\"red\">(required)</font>");
+                     %>
+         <% } else { %>
                     <span class="accessKey">O</span>ther:
+         <% } %>
                 </label>
                 <input type="text" size="20" id="reachableByOther" name="reachableByOther" title="Alternative IP for Client Access" value="<%=otherInterface%>" class="freetext" />                
             </div>
@@ -123,7 +139,7 @@
                 </label>
                 <input type="text" size="30" id="proxyList" name="proxyList" title="List of Outproxy I2P destinations" value="<%=editBean.getClientDestination(curTunnel)%>" class="freetext" />                
             </div>
-            <% } else if ("client".equals(tunnelType) || "ircclient".equals(tunnelType)) {
+            <% } else if ("client".equals(tunnelType) || "ircclient".equals(tunnelType) || "streamrclient".equals(tunnelType)) {
           %><div id="destinationField" class="rowItem">
                 <label for="targetDestination" accesskey="T">
                     <span class="accessKey">T</span>unnel Destination:
@@ -135,8 +151,9 @@
                 <input type="text" size="30" id="targetDestination" name="targetDestination" title="Destination of the Tunnel" value="<%=editBean.getClientDestination(curTunnel)%>" class="freetext" />                
                 <span class="comment">(name or destination)</span>
             </div>
-            <% }
-          %><div id="profileField" class="rowItem">
+         <% } %>
+         <% if (!"streamrclient".equals(tunnelType)) { %>
+            <div id="profileField" class="rowItem">
                 <label for="profile" accesskey="f">
                     Pro<span class="accessKey">f</span>ile:
                 </label>
@@ -160,6 +177,7 @@
                 <input value="true" type="checkbox" id="shared" name="shared" title="Share tunnels with other clients"<%=(editBean.isSharedClient(curTunnel) ? " checked=\"checked\"" : "")%> class="tickbox" />                
                 <span class="comment">(Share tunnels with other clients and irc/httpclients? Change requires restart of client proxy)</span>
             </div>
+         <% } // !streamrclient %>
             <div id="startupField" class="rowItem">
                 <label for="startOnLoad" accesskey="a">
                     <span class="accessKey">A</span>uto Start:
diff --git a/apps/i2ptunnel/jsp/editServer.jsp b/apps/i2ptunnel/jsp/editServer.jsp
index 82ac69dc5c..0e9f9c0caa 100644
--- a/apps/i2ptunnel/jsp/editServer.jsp
+++ b/apps/i2ptunnel/jsp/editServer.jsp
@@ -82,14 +82,20 @@
             </div>
                  
             <div id="targetField" class="rowItem">
+         <% if ("streamrserver".equals(tunnelType)) { %>
+                <label>Access Point:</label>
+         <% } else { %>
                 <label>Target:</label>
+         <% } %>
             </div>
+         <% if (!"streamrserver".equals(tunnelType)) { %>
             <div id="hostField" class="rowItem">
                 <label for="targetHost" accesskey="H">
                     <span class="accessKey">H</span>ost:
                 </label>
                 <input type="text" size="20" id="targetHost" name="targetHost" title="Target Hostname or IP" value="<%=editBean.getTargetHost(curTunnel)%>" class="freetext" />                
             </div>
+         <% } // !streamrserver %>
             <div id="portField" class="rowItem">
                 <label for="targetPort" accesskey="P">
                     <span class="accessKey">P</span>ort:
@@ -124,6 +130,7 @@
                 </label>
                 <input type="text" size="30" id="privKeyFile" name="privKeyFile" title="Path to Private Key File" value="<%=editBean.getPrivateKeyFile(curTunnel)%>" class="freetext" />               
             </div>
+         <% if (!"streamrserver".equals(tunnelType)) { %>
             <div id="profileField" class="rowItem">
                 <label for="profile" accesskey="f">
                     Pro<span class="accessKey">f</span>ile:
@@ -134,6 +141,7 @@
                     <option <%=(interactiveProfile == false ? "selected=\"selected\" " : "")%>value="bulk">bulk connection (downloads/websites/BT) </option>
                 </select>                
             </div> 
+         <% } // !streamrserver %>
             <div id="destinationField" class="rowItem">
                 <label for="localDestination" accesskey="L">
                     <span class="accessKey">L</span>ocal destination:
diff --git a/apps/i2ptunnel/jsp/index.jsp b/apps/i2ptunnel/jsp/index.jsp
index 7787eb1f52..4d9f6c57d9 100644
--- a/apps/i2ptunnel/jsp/index.jsp
+++ b/apps/i2ptunnel/jsp/index.jsp
@@ -150,6 +150,7 @@
                         <option value="ircclient">IRC</option>
                         <option value="sockstunnel">SOCKS 4/4a/5</option>
                         <option value="connectclient">CONNECT</option>
+                        <option value="streamrclient">Streamr</option>
                     </select>
                     <input class="control" type="submit" value="Create" />
                 </div>
@@ -261,6 +262,7 @@
                         <option value="server">Standard</option>
                         <option value="httpserver">HTTP</option>
                         <option value="ircserver">IRC</option>
+                        <option value="streamrserver">Streamr</option>
                     </select>
                     <input class="control" type="submit" value="Create" />
                 </div>
-- 
GitLab