diff --git a/core/java/src/net/i2p/I2PAppContext.java b/core/java/src/net/i2p/I2PAppContext.java
index b497796408cb2b6051d9386c3ed7dd230bf82c31..458aee31e592eb017d7f9522fc84c2e1682526ac 100644
--- a/core/java/src/net/i2p/I2PAppContext.java
+++ b/core/java/src/net/i2p/I2PAppContext.java
@@ -22,6 +22,7 @@ import net.i2p.crypto.SHA256Generator;
 import net.i2p.crypto.SessionKeyManager;
 import net.i2p.crypto.TransientSessionKeyManager;
 import net.i2p.data.RoutingKeyGenerator;
+import net.i2p.internal.InternalClientManager;
 import net.i2p.stat.StatManager;
 import net.i2p.util.Clock;
 import net.i2p.util.ConcurrentHashSet;
@@ -843,4 +844,13 @@ public class I2PAppContext {
     public boolean isRouterContext() {
         return false;
     }
+
+    /**
+     *  Use this to connect to the router in the same JVM.
+     *  @return always null in I2PAppContext, the client manager if in RouterContext
+     *  @since 0.8.3
+     */
+    public InternalClientManager internalClientManager() {
+        return null;
+    }
 }
diff --git a/core/java/src/net/i2p/client/ClientWriterRunner.java b/core/java/src/net/i2p/client/ClientWriterRunner.java
index 056208fb5a021470a0766c3205eda71cf8bc8ad6..f69148da3e212210782d41a65e1ec3577b9774bd 100644
--- a/core/java/src/net/i2p/client/ClientWriterRunner.java
+++ b/core/java/src/net/i2p/client/ClientWriterRunner.java
@@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import net.i2p.data.i2cp.I2CPMessage;
 import net.i2p.data.i2cp.I2CPMessageImpl;
 import net.i2p.data.i2cp.I2CPMessageException;
+import net.i2p.internal.PoisonI2CPMessage;
 import net.i2p.util.I2PAppThread;
 
 /**
@@ -50,7 +51,7 @@ class ClientWriterRunner implements Runnable {
     public void stopWriting() {
         _messagesToWrite.clear();
         try {
-            _messagesToWrite.put(new PoisonMessage());
+            _messagesToWrite.put(new PoisonI2CPMessage());
         } catch (InterruptedException ie) {}
     }
 
@@ -62,7 +63,7 @@ class ClientWriterRunner implements Runnable {
             } catch (InterruptedException ie) {
                 continue;
             }
-            if (msg.getType() == PoisonMessage.MESSAGE_TYPE)
+            if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE)
                 break;
             // only thread, we don't need synchronized
             try {
@@ -80,18 +81,4 @@ class ClientWriterRunner implements Runnable {
         }
         _messagesToWrite.clear();
     }
-
-    /**
-     * End-of-stream msg used to stop the concurrent queue
-     * See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
-     *
-     */
-    private static class PoisonMessage extends I2CPMessageImpl {
-        public static final int MESSAGE_TYPE = 999999;
-        public int getType() {
-            return MESSAGE_TYPE;
-        }
-        public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
-        public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
-    }
 }
diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java
index 72cc8b406b3d13104809426bd6c5e33e3b4e86f9..639ac5e9c67a766f051d4ce6ded881d04b519d52 100644
--- a/core/java/src/net/i2p/client/I2PSessionImpl.java
+++ b/core/java/src/net/i2p/client/I2PSessionImpl.java
@@ -39,8 +39,10 @@ import net.i2p.data.i2cp.I2CPMessageException;
 import net.i2p.data.i2cp.I2CPMessageReader;
 import net.i2p.data.i2cp.MessagePayloadMessage;
 import net.i2p.data.i2cp.SessionId;
+import net.i2p.internal.I2CPMessageQueue;
+import net.i2p.internal.InternalClientManager;
+import net.i2p.internal.QueuedI2CPMessageReader;
 import net.i2p.util.I2PThread;
-import net.i2p.util.InternalSocket;
 import net.i2p.util.Log;
 import net.i2p.util.SimpleScheduler;
 import net.i2p.util.SimpleTimer;
@@ -79,6 +81,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
     /** where we pipe our messages */
     protected /* FIXME final FIXME */OutputStream _out;
 
+    /**
+     *  Used for internal connections to the router.
+     *  If this is set, _socket, _writer, and _out will be null.
+     *  @since 0.8.3
+     */
+    protected I2CPMessageQueue _queue;
+
     /** who we send events to */
     protected I2PSessionListener _sessionListener;
 
@@ -285,17 +294,27 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
             
         long startConnect = _context.clock().now();
         try {
-            // If we are in the router JVM, connect using the interal pseudo-socket
-            _socket = InternalSocket.getSocket(_hostname, _portNum);
-            // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
-            _out = _socket.getOutputStream();
-            synchronized (_out) {
-                _out.write(I2PClient.PROTOCOL_BYTE);
-                _out.flush();
+            // If we are in the router JVM, connect using the interal queue
+            if (_context.isRouterContext()) {
+                // _socket, _out, and _writer remain null
+                InternalClientManager mgr = _context.internalClientManager();
+                if (mgr == null)
+                    throw new I2PSessionException("Router is not ready for connections");
+                // the following may throw an I2PSessionException
+                _queue = mgr.connect();
+                _reader = new QueuedI2CPMessageReader(_queue, this);
+            } else {
+                _socket = new Socket(_hostname, _portNum);
+                // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
+                _out = _socket.getOutputStream();
+                synchronized (_out) {
+                    _out.write(I2PClient.PROTOCOL_BYTE);
+                    _out.flush();
+                }
+                _writer = new ClientWriterRunner(_out, this);
+                InputStream in = _socket.getInputStream();
+                _reader = new I2CPMessageReader(in, this);
             }
-            _writer = new ClientWriterRunner(_out, this);
-            InputStream in = _socket.getInputStream();
-            _reader = new I2CPMessageReader(in, this);
             if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "before startReading");
             _reader.startReading();
             if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate");
@@ -567,9 +586,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
      * @throws I2PSessionException if the message is malformed or there is an error writing it out
      */
     void sendMessage(I2CPMessage message) throws I2PSessionException {
-        if (isClosed() || _writer == null)
+        if (isClosed())
+            throw new I2PSessionException("Already closed");
+        else if (_queue != null)
+            _queue.offer(message);  // internal
+        else if (_writer == null)
             throw new I2PSessionException("Already closed");
-        _writer.addMessage(message);
+        else
+            _writer.addMessage(message);
     }
 
     /**
@@ -581,8 +605,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
         // Only log as WARN if the router went away
         int level;
         String msgpfx;
-        if ((error instanceof EOFException) ||
-            (error.getMessage() != null && error.getMessage().startsWith("Pipe closed"))) {
+        if (error instanceof EOFException) {
             level = Log.WARN;
             msgpfx = "Router closed connection: ";
         } else {
@@ -647,6 +670,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
             _reader.stopReading();
             _reader = null;
         }
+        if (_queue != null) {
+            // internal
+            _queue.close();
+        }
         if (_writer != null) {
             _writer.stopWriting();
             _writer = null;
diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java
index f4bc3e812ca6d42f8f4c58be9985ec049ffdf56b..c2da90217c65a73823c5c6dfc114855555f8e526 100644
--- a/core/java/src/net/i2p/client/I2PSimpleSession.java
+++ b/core/java/src/net/i2p/client/I2PSimpleSession.java
@@ -19,6 +19,9 @@ import net.i2p.data.i2cp.DestLookupMessage;
 import net.i2p.data.i2cp.DestReplyMessage;
 import net.i2p.data.i2cp.GetBandwidthLimitsMessage;
 import net.i2p.data.i2cp.I2CPMessageReader;
+import net.i2p.internal.I2CPMessageQueue;
+import net.i2p.internal.InternalClientManager;
+import net.i2p.internal.QueuedI2CPMessageReader;
 import net.i2p.util.I2PThread;
 import net.i2p.util.InternalSocket;
 
@@ -72,16 +75,26 @@ class I2PSimpleSession extends I2PSessionImpl2 {
         notifier.start();
         
         try {
-            // If we are in the router JVM, connect using the interal pseudo-socket
-            _socket = InternalSocket.getSocket(_hostname, _portNum);
-            _out = _socket.getOutputStream();
-            synchronized (_out) {
-                _out.write(I2PClient.PROTOCOL_BYTE);
-                _out.flush();
+            // If we are in the router JVM, connect using the interal queue
+            if (_context.isRouterContext()) {
+                // _socket, _out, and _writer remain null
+                InternalClientManager mgr = _context.internalClientManager();
+                if (mgr == null)
+                    throw new I2PSessionException("Router is not ready for connections");
+                // the following may throw an I2PSessionException
+                _queue = mgr.connect();
+                _reader = new QueuedI2CPMessageReader(_queue, this);
+            } else {
+                _socket = new Socket(_hostname, _portNum);
+                _out = _socket.getOutputStream();
+                synchronized (_out) {
+                    _out.write(I2PClient.PROTOCOL_BYTE);
+                    _out.flush();
+                }
+                _writer = new ClientWriterRunner(_out, this);
+                InputStream in = _socket.getInputStream();
+                _reader = new I2CPMessageReader(in, this);
             }
-            _writer = new ClientWriterRunner(_out, this);
-            InputStream in = _socket.getInputStream();
-            _reader = new I2CPMessageReader(in, this);
             _reader.startReading();
 
         } catch (UnknownHostException uhe) {
diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java
index 461c4b08a4f9dbf25de23be53c896e406e818540..4ac902a487cfc14f3df30ba6389cb56c5c627cf8 100644
--- a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java
+++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java
@@ -27,11 +27,11 @@ import net.i2p.util.Log;
 public class I2CPMessageReader {
     private final static Log _log = new Log(I2CPMessageReader.class);
     private InputStream _stream;
-    private I2CPMessageEventListener _listener;
-    private I2CPMessageReaderRunner _reader;
-    private Thread _readerThread;
+    protected I2CPMessageEventListener _listener;
+    protected I2CPMessageReaderRunner _reader;
+    protected Thread _readerThread;
     
-    private static volatile long __readerId = 0;
+    protected static volatile long __readerId = 0;
 
     public I2CPMessageReader(InputStream stream, I2CPMessageEventListener lsnr) {
         _stream = stream;
@@ -42,6 +42,14 @@ public class I2CPMessageReader {
         _readerThread.setName("I2CP Reader " + (++__readerId));
     }
 
+    /**
+     * For internal extension only. No stream.
+     * @since 0.8.3
+     */
+    protected I2CPMessageReader(I2CPMessageEventListener lsnr) {
+        setListener(lsnr);
+    }
+
     public void setListener(I2CPMessageEventListener lsnr) {
         _listener = lsnr;
     }
@@ -114,9 +122,9 @@ public class I2CPMessageReader {
         public void disconnected(I2CPMessageReader reader);
     }
 
-    private class I2CPMessageReaderRunner implements Runnable {
-        private volatile boolean _doRun;
-        private volatile boolean _stayAlive;
+    protected class I2CPMessageReaderRunner implements Runnable {
+        protected volatile boolean _doRun;
+        protected volatile boolean _stayAlive;
 
         public I2CPMessageReaderRunner() {
             _doRun = true;
diff --git a/core/java/src/net/i2p/internal/I2CPMessageQueue.java b/core/java/src/net/i2p/internal/I2CPMessageQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..93bea3a3f29b55ece440d048f2fdd31e21975de8
--- /dev/null
+++ b/core/java/src/net/i2p/internal/I2CPMessageQueue.java
@@ -0,0 +1,51 @@
+package net.i2p.internal;
+
+import net.i2p.data.i2cp.I2CPMessage;
+
+/**
+ * Contains the methods to talk to a router or client via I2CP,
+ * when both are in the same JVM.
+ * This interface contains methods to access two queues,
+ * one for transmission and one for receiving.
+ * The methods are identical to those in java.util.concurrent.BlockingQueue.
+ *
+ * Reading may be done in a thread using the QueuedI2CPMessageReader class.
+ * Non-blocking writing may be done directly with offer().
+ *
+ * @author zzz
+ * @since 0.8.3
+ */
+public abstract class I2CPMessageQueue {
+
+    /**
+     *  Send a message, nonblocking.
+     *  @return success (false if no space available)
+     */
+    public abstract boolean offer(I2CPMessage msg);
+
+    /**
+     *  Receive a message, nonblocking.
+     *  Unused for now.
+     *  @return message or null if none available
+     */
+    public abstract I2CPMessage poll();
+
+    /**
+     *  Send a message, blocking until space is available.
+     *  Unused for now.
+     */
+    public abstract void put(I2CPMessage msg) throws InterruptedException;
+
+    /**
+     *  Receive a message, blocking until one is available.
+     *  @return message
+     */
+    public abstract I2CPMessage take() throws InterruptedException;
+
+    /**
+     *  == offer(new PoisonI2CPMessage());
+     */
+    public void close() {
+        offer(new PoisonI2CPMessage());
+    }
+}
diff --git a/core/java/src/net/i2p/internal/InternalClientManager.java b/core/java/src/net/i2p/internal/InternalClientManager.java
new file mode 100644
index 0000000000000000000000000000000000000000..a923fb9f706ac0069356f2df2516d65ed11f69c0
--- /dev/null
+++ b/core/java/src/net/i2p/internal/InternalClientManager.java
@@ -0,0 +1,19 @@
+package net.i2p.internal;
+
+import net.i2p.client.I2PSessionException;
+import net.i2p.data.i2cp.I2CPMessage;
+
+/**
+ * A manager for the in-JVM I2CP message interface
+ *
+ * @author zzz
+ * @since 0.8.3
+ */
+public interface InternalClientManager {
+
+    /**
+     *  Connect to the router, receiving a message queue to talk to the router with.
+     *  @throws I2PSessionException if the router isn't ready
+     */
+    public I2CPMessageQueue connect() throws I2PSessionException;
+}
diff --git a/core/java/src/net/i2p/internal/PoisonI2CPMessage.java b/core/java/src/net/i2p/internal/PoisonI2CPMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..23b99db9ace32516e82f46cdca7e2138c718eeff
--- /dev/null
+++ b/core/java/src/net/i2p/internal/PoisonI2CPMessage.java
@@ -0,0 +1,58 @@
+package net.i2p.internal;
+
+import java.io.InputStream;
+
+import net.i2p.data.i2cp.I2CPMessageException;
+import net.i2p.data.i2cp.I2CPMessageImpl;
+
+/**
+ * For marking end-of-queues in a standard manner.
+ * Don't actually send it.
+ *
+ * @author zzz
+ * @since 0.8.3
+ */
+public class PoisonI2CPMessage extends I2CPMessageImpl {
+    public final static int MESSAGE_TYPE = 999999;
+
+    public PoisonI2CPMessage() {
+        super();
+    }
+
+    /**
+     *  @deprecated don't do this
+     *  @throws I2CPMessageException always
+     */
+    protected void doReadMessage(InputStream in, int size) throws I2CPMessageException {
+        throw new I2CPMessageException("Don't do this");
+    }
+
+    /**
+     *  @deprecated don't do this
+     *  @throws I2CPMessageException always
+     */
+    protected byte[] doWriteMessage() throws I2CPMessageException {
+        throw new I2CPMessageException("Don't do this");
+    }
+
+    public int getType() {
+        return MESSAGE_TYPE;
+    }
+
+    /* FIXME missing hashCode() method FIXME */
+    @Override
+    public boolean equals(Object object) {
+        if ((object != null) && (object instanceof PoisonI2CPMessage)) {
+            return true;
+        }
+        
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buf = new StringBuilder();
+        buf.append("[PoisonMessage]");
+        return buf.toString();
+    }
+}
diff --git a/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java b/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java
new file mode 100644
index 0000000000000000000000000000000000000000..da128ceaa22d08c76c42be6e67059fd38d30d2dc
--- /dev/null
+++ b/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java
@@ -0,0 +1,55 @@
+package net.i2p.internal;
+
+import net.i2p.data.i2cp.I2CPMessage;
+import net.i2p.data.i2cp.I2CPMessageReader;
+import net.i2p.util.I2PThread;
+
+/**
+ * Get messages off an In-JVM queue, zero-copy
+ *
+ * @author zzz
+ * @since 0.8.3
+ */
+public class QueuedI2CPMessageReader extends I2CPMessageReader {
+    private final I2CPMessageQueue in;
+
+    public QueuedI2CPMessageReader(I2CPMessageQueue in, I2CPMessageEventListener lsnr) {
+        super(lsnr);
+        this.in = in;
+        _reader = new QueuedI2CPMessageReaderRunner();
+        _readerThread = new I2PThread(_reader, "I2CP Internal Reader " + (++__readerId), true);
+    }
+
+    protected class QueuedI2CPMessageReaderRunner extends I2CPMessageReaderRunner implements Runnable {
+
+        public QueuedI2CPMessageReaderRunner() {
+            super();
+        }
+
+        @Override
+        public void run() {
+            while (_stayAlive) {
+                while (_doRun) {
+                    // do read
+                    I2CPMessage msg = null;
+                    try {
+                        msg = in.take();
+                        if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE)
+                            cancelRunner();
+                        else
+                            _listener.messageReceived(QueuedI2CPMessageReader.this, msg);
+                    } catch (InterruptedException ie) {}
+                }
+                if (!_doRun) {
+                    // pause .5 secs when we're paused
+                    try {
+                        Thread.sleep(500);
+                    } catch (InterruptedException ie) {
+                        _listener.disconnected(QueuedI2CPMessageReader.this);
+                        cancelRunner();
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/core/java/src/net/i2p/internal/package.html b/core/java/src/net/i2p/internal/package.html
new file mode 100644
index 0000000000000000000000000000000000000000..edac509f01e1887342b677e561225ba20a3de00f
--- /dev/null
+++ b/core/java/src/net/i2p/internal/package.html
@@ -0,0 +1,7 @@
+<html><body>
+<p>
+Interface and classes for a router and client
+within the same JVM to directly pass I2CP messages using Queues
+instead of serialized messages over socket streams.
+</p>
+</body></html>
diff --git a/router/java/src/net/i2p/router/RouterContext.java b/router/java/src/net/i2p/router/RouterContext.java
index 3d5ed609edb65cb900a9ebc68bad1cf9af8b93ce..cb3c6366251378c0cf50972d197c446bc6d960f3 100644
--- a/router/java/src/net/i2p/router/RouterContext.java
+++ b/router/java/src/net/i2p/router/RouterContext.java
@@ -6,6 +6,7 @@ import java.util.Properties;
 
 import net.i2p.I2PAppContext;
 import net.i2p.data.Hash;
+import net.i2p.internal.InternalClientManager;
 import net.i2p.router.client.ClientManagerFacadeImpl;
 import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
 import net.i2p.router.peermanager.Calculator;
@@ -34,7 +35,7 @@ import net.i2p.util.KeyRing;
  */
 public class RouterContext extends I2PAppContext {
     private Router _router;
-    private ClientManagerFacade _clientManagerFacade;
+    private ClientManagerFacadeImpl _clientManagerFacade;
     private ClientMessagePool _clientMessagePool;
     private JobQueue _jobQueue;
     private InNetMessagePool _inNetMessagePool;
@@ -106,10 +107,12 @@ public class RouterContext extends I2PAppContext {
     }
 
     public void initAll() {
-        if ("false".equals(getProperty("i2p.dummyClientFacade", "false")))
-            _clientManagerFacade = new ClientManagerFacadeImpl(this);
-        else
-            _clientManagerFacade = new DummyClientManagerFacade(this);
+        if (getBooleanProperty("i2p.dummyClientFacade"))
+            System.err.println("i2p.dummpClientFacade currently unsupported");
+        _clientManagerFacade = new ClientManagerFacadeImpl(this);
+        // removed since it doesn't implement InternalClientManager for now
+        //else
+        //    _clientManagerFacade = new DummyClientManagerFacade(this);
         _clientMessagePool = new ClientMessagePool(this);
         _jobQueue = new JobQueue(this);
         _inNetMessagePool = new InNetMessagePool(this);
@@ -395,4 +398,13 @@ public class RouterContext extends I2PAppContext {
     public boolean isRouterContext() {
         return true;
     }
+
+    /**
+     *  Use this to connect to the router in the same JVM.
+     *  @return the client manager
+     *  @since 0.8.3
+     */
+    public InternalClientManager internalClientManager() {
+        return _clientManagerFacade;
+    }
 }
diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
index b3468e4e0be51c575d0998569cdea4de1fa83165..5807aa098346b8ebd76144d79d1de595352a8529 100644
--- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
+++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
@@ -52,7 +52,7 @@ import net.i2p.util.SimpleTimer;
  */
 public class ClientConnectionRunner {
     private Log _log;
-    private RouterContext _context;
+    protected final RouterContext _context;
     private ClientManager _manager;
     /** socket for this particular peer connection */
     private Socket _socket;
@@ -71,7 +71,7 @@ public class ClientConnectionRunner {
     /** set of messageIds created but not yet ACCEPTED */
     private Set<MessageId> _acceptedPending;
     /** thingy that does stuff */
-    private I2CPMessageReader _reader;
+    protected I2CPMessageReader _reader;
     /** just for this destination */
     private SessionKeyManager _sessionKeyManager;
     /** 
@@ -469,18 +469,8 @@ public class ClientConnectionRunner {
                 _log.warn("Error sending I2CP message - client went away", eofe);
             stopRunning();
         } catch (IOException ioe) {
-            // only warn if client went away
-            int level;
-            String emsg;
-            if (ioe.getMessage() != null && ioe.getMessage().startsWith("Pipe closed")) {
-                level = Log.WARN;
-                emsg = "Error sending I2CP message - client went away";
-            } else {
-                level = Log.ERROR;
-                emsg = "IO Error sending I2CP message to client";
-            }
-            if (_log.shouldLog(level)) 
-                _log.log(level, emsg, ioe);
+            if (_log.shouldLog(Log.ERROR)) 
+                _log.error("IO Error sending I2CP message to client", ioe);
             stopRunning();
         } catch (Throwable t) {
             _log.log(Log.CRIT, "Unhandled exception sending I2CP message to client", t);
diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java
index 7d866ab0b650e961ecd763be04d41434721291b4..948f895bbe7e017984ed956cc7304136d1f46f65 100644
--- a/router/java/src/net/i2p/router/client/ClientManager.java
+++ b/router/java/src/net/i2p/router/client/ClientManager.java
@@ -15,7 +15,9 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import net.i2p.client.I2PSessionException;
 import net.i2p.crypto.SessionKeyManager;
 import net.i2p.data.DataHelper;
 import net.i2p.data.Destination;
@@ -23,8 +25,10 @@ import net.i2p.data.Hash;
 import net.i2p.data.LeaseSet;
 import net.i2p.data.Payload;
 import net.i2p.data.TunnelId;
+import net.i2p.data.i2cp.I2CPMessage;
 import net.i2p.data.i2cp.MessageId;
 import net.i2p.data.i2cp.SessionConfig;
+import net.i2p.internal.I2CPMessageQueue;
 import net.i2p.router.ClientManagerFacade;
 import net.i2p.router.ClientMessage;
 import net.i2p.router.Job;
@@ -42,7 +46,6 @@ import net.i2p.util.Log;
 public class ClientManager {
     private Log _log;
     private ClientListenerRunner _listener;
-    private ClientListenerRunner _internalListener;
     private final HashMap<Destination, ClientConnectionRunner>  _runners;        // Destination --> ClientConnectionRunner
     private final Set<ClientConnectionRunner> _pendingRunners; // ClientConnectionRunner for clients w/out a Dest yet
     private RouterContext _ctx;
@@ -69,11 +72,6 @@ public class ClientManager {
         t.setName("ClientListener:" + port);
         t.setDaemon(true);
         t.start();
-        _internalListener = new InternalClientListenerRunner(_ctx, this, port);
-        t = new I2PThread(_internalListener);
-        t.setName("ClientListener:" + port + "-i");
-        t.setDaemon(true);
-        t.start();
     }
     
     public void restart() {
@@ -97,7 +95,6 @@ public class ClientManager {
     public void shutdown() {
         _log.info("Shutting down the ClientManager");
         _listener.stopListening();
-        _internalListener.stopListening();
         Set<ClientConnectionRunner> runners = new HashSet();
         synchronized (_runners) {
             for (Iterator<ClientConnectionRunner> iter = _runners.values().iterator(); iter.hasNext();) {
@@ -117,6 +114,23 @@ public class ClientManager {
         }
     }
     
+    /**
+     *  The InternalClientManager interface.
+     *  Connects to the router, receiving a message queue to talk to the router with.
+     *  Might throw I2PSessionException if the router isn't ready, someday.
+     *  @since 0.8.3
+     */
+    public I2CPMessageQueue internalConnect() {
+        // for now we make these unlimited size
+        LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue();
+        LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue();
+        I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out);
+        I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in);
+        ClientConnectionRunner runner = new QueuedClientConnectionRunner(_ctx, this, myQueue);
+        registerConnection(runner);
+        return hisQueue;
+    }
+
     public boolean isAlive() { return _listener.isListening(); }
 
     public void registerConnection(ClientConnectionRunner runner) {
diff --git a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java
index 066d6cc354223c3ee1e53b56e31c78fc4f82e4ab..5fd0bbc28baf0c4a7326efb7539f3fa35448cb1d 100644
--- a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java
+++ b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java
@@ -14,6 +14,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
 
+import net.i2p.client.I2PSessionException;
 import net.i2p.crypto.SessionKeyManager;
 import net.i2p.data.DataHelper;
 import net.i2p.data.Destination;
@@ -21,6 +22,8 @@ import net.i2p.data.Hash;
 import net.i2p.data.LeaseSet;
 import net.i2p.data.i2cp.MessageId;
 import net.i2p.data.i2cp.SessionConfig;
+import net.i2p.internal.I2CPMessageQueue;
+import net.i2p.internal.InternalClientManager;
 import net.i2p.router.ClientManagerFacade;
 import net.i2p.router.ClientMessage;
 import net.i2p.router.Job;
@@ -32,7 +35,7 @@ import net.i2p.util.Log;
  *
  * @author jrandom
  */
-public class ClientManagerFacadeImpl extends ClientManagerFacade {
+public class ClientManagerFacadeImpl extends ClientManagerFacade implements InternalClientManager {
     private final static Log _log = new Log(ClientManagerFacadeImpl.class);
     private ClientManager _manager; 
     private RouterContext _context;
@@ -220,4 +223,16 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade {
         else
             return Collections.EMPTY_SET;
     }
+
+    /**
+     *  The InternalClientManager interface.
+     *  Connect to the router, receiving a message queue to talk to the router with.
+     *  @throws I2PSessionException if the router isn't ready
+     *  @since 0.8.3
+     */
+    public I2CPMessageQueue connect() throws I2PSessionException {
+        if (_manager != null)
+            return _manager.internalConnect();
+        throw new I2PSessionException("No manager yet");
+    }
 }
diff --git a/router/java/src/net/i2p/router/client/ClientWriterRunner.java b/router/java/src/net/i2p/router/client/ClientWriterRunner.java
index 49fcddcc208962b78097acc645c6b1215a5b7f77..b93a4e5f447e2102a66202280ea667b4baa9c135 100644
--- a/router/java/src/net/i2p/router/client/ClientWriterRunner.java
+++ b/router/java/src/net/i2p/router/client/ClientWriterRunner.java
@@ -8,6 +8,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import net.i2p.data.i2cp.I2CPMessage;
 import net.i2p.data.i2cp.I2CPMessageImpl;
 import net.i2p.data.i2cp.I2CPMessageException;
+import net.i2p.internal.PoisonI2CPMessage;
 import net.i2p.router.RouterContext;
 import net.i2p.util.Log;
 
@@ -52,7 +53,7 @@ class ClientWriterRunner implements Runnable {
     public void stopWriting() {
         _messagesToWrite.clear();
         try {
-            _messagesToWrite.put(new PoisonMessage());
+            _messagesToWrite.put(new PoisonI2CPMessage());
         } catch (InterruptedException ie) {}
     }
 
@@ -64,23 +65,9 @@ class ClientWriterRunner implements Runnable {
             } catch (InterruptedException ie) {
                 continue;
             }
-            if (msg.getType() == PoisonMessage.MESSAGE_TYPE)
+            if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE)
                 break;
             _runner.writeMessage(msg);
         }
     }
-
-    /**
-     * End-of-stream msg used to stop the concurrent queue
-     * See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
-     *
-     */
-    private static class PoisonMessage extends I2CPMessageImpl {
-        public static final int MESSAGE_TYPE = 999999;
-        public int getType() {
-            return MESSAGE_TYPE;
-        }
-        public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
-        public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
-    }
 }
diff --git a/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java b/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..f65b061766a8bf9982e09601ebf775de0b03cd7e
--- /dev/null
+++ b/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java
@@ -0,0 +1,57 @@
+package net.i2p.router.client;
+
+import java.util.concurrent.BlockingQueue;
+
+import net.i2p.data.i2cp.I2CPMessage;
+import net.i2p.internal.I2CPMessageQueue;
+
+/**
+ * Contains the methods to talk to a router or client via I2CP,
+ * when both are in the same JVM.
+ * This interface contains methods to access two queues,
+ * one for transmission and one for receiving.
+ * The methods are identical to those in java.util.concurrent.BlockingQueue
+ *
+ * @author zzz
+ * @since 0.8.3
+ */
+class I2CPMessageQueueImpl extends I2CPMessageQueue {
+    private final BlockingQueue<I2CPMessage> _in;
+    private final BlockingQueue<I2CPMessage> _out;
+
+    public I2CPMessageQueueImpl(BlockingQueue<I2CPMessage> in, BlockingQueue<I2CPMessage> out) {
+        _in = in;
+        _out = out;
+    }
+
+    /**
+     *  Send a message, nonblocking
+     *  @return success (false if no space available)
+     */
+    public boolean offer(I2CPMessage msg) {
+        return _out.offer(msg);
+    }
+
+    /**
+     *  Receive a message, nonblocking
+     *  @return message or null if none available
+     */
+    public I2CPMessage poll() {
+        return _in.poll();
+    }
+
+    /**
+     *  Send a message, blocking until space is available
+     */
+    public void put(I2CPMessage msg) throws InterruptedException {
+        _out.put(msg);
+    }
+
+    /**
+     *  Receive a message, blocking until one is available
+     *  @return message
+     */
+    public I2CPMessage take() throws InterruptedException {
+        return _in.take();
+    }
+}
diff --git a/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java b/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java
deleted file mode 100644
index 995c69400f5d17988f7ccd0f9d47008e6968956b..0000000000000000000000000000000000000000
--- a/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package net.i2p.router.client;
-/*
- * free (adj.): unencumbered; not under the control of others
- * Written by jrandom in 2003 and released into the public domain 
- * with no warranty of any kind, either expressed or implied.  
- * It probably won't make your computer catch on fire, or eat 
- * your children, but it might.  Use at your own risk.
- *
- */
-
-import java.io.IOException;
-import java.net.Socket;
-
-import net.i2p.router.RouterContext;
-import net.i2p.util.Log;
-import net.i2p.util.InternalServerSocket;
-
-/**
- * Listen for in-JVM connections on the internal "socket"
- *
- * @author zzz
- * @since 0.7.9
- */
-public class InternalClientListenerRunner extends ClientListenerRunner {
-
-    public InternalClientListenerRunner(RouterContext context, ClientManager manager, int port) {
-        super(context, manager, port);
-        _log = _context.logManager().getLog(InternalClientListenerRunner.class);
-    }
-    
-    /** 
-     * Start up the socket listener, listens for connections, and
-     * fires those connections off via {@link #runConnection runConnection}.  
-     * This only returns if the socket cannot be opened or there is a catastrophic
-     * failure.
-     *
-     */
-    public void runServer() {
-        try {
-            if (_log.shouldLog(Log.INFO))
-                _log.info("Listening on internal port " + _port);
-            _socket = new InternalServerSocket(_port);
-            
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("InternalServerSocket created, before accept: " + _socket);
-            
-            _listening = true;
-            _running = true;
-            while (_running) {
-                try {
-                    Socket socket = _socket.accept();
-                    if (validate(socket)) {
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug("Internal connection received");
-                        runConnection(socket);
-                    } else {
-                        if (_log.shouldLog(Log.WARN))
-                            _log.warn("Refused connection from " + socket.getInetAddress());
-                        try {
-                            socket.close();
-                        } catch (IOException ioe) {}
-                    }
-                } catch (IOException ioe) {
-                    if (_context.router().isAlive()) 
-                        _log.error("Server error accepting", ioe);
-                } catch (Throwable t) {
-                    if (_context.router().isAlive()) 
-                        _log.error("Fatal error running client listener - killing the thread!", t);
-                    _listening = false;
-                    return;
-                }
-            }
-        } catch (IOException ioe) {
-            if (_context.router().isAlive()) 
-                _log.error("Error listening on internal port " + _port, ioe);
-        }
-        
-        _listening = false;
-        if (_socket != null) {
-            try { _socket.close(); } catch (IOException ioe) {}
-            _socket = null; 
-        }
-        
-
-        if (_context.router().isAlive())
-            _log.error("CANCELING I2CP LISTEN", new Exception("I2CP Listen cancelled!!!"));
-        _running = false;
-    }
-}
diff --git a/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java b/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java
new file mode 100644
index 0000000000000000000000000000000000000000..ea2cc3d30baa0bf5a44e79be46d694ff4bcacdb4
--- /dev/null
+++ b/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java
@@ -0,0 +1,76 @@
+package net.i2p.router.client;
+
+import java.io.IOException;
+
+import net.i2p.data.i2cp.I2CPMessage;
+import net.i2p.data.i2cp.I2CPMessageException;
+import net.i2p.internal.I2CPMessageQueue;
+import net.i2p.internal.QueuedI2CPMessageReader;
+import net.i2p.router.RouterContext;
+import net.i2p.util.Log;
+
+/**
+ * Zero-copy in-JVM.
+ * While super() starts both a reader and a writer thread, we only need a reader thread here.
+ *
+ * @author zzz
+ * @since 0.8.3
+ */
+public class QueuedClientConnectionRunner extends ClientConnectionRunner {
+    private final I2CPMessageQueue queue;
+    
+    /**
+     * Create a new runner with the given queues
+     *
+     */
+    public QueuedClientConnectionRunner(RouterContext context, ClientManager manager, I2CPMessageQueue queue) {
+        super(context, manager, null);
+        this.queue = queue;
+    }
+    
+
+
+    /**
+     * Starts the reader thread. Does not call super().
+     */
+    @Override
+    public void startRunning() {
+        _reader = new QueuedI2CPMessageReader(this.queue, new ClientMessageEventListener(_context, this));
+        _reader.startReading();
+    }
+    
+    /**
+     * Calls super() to stop the reader, and sends a poison message to the client.
+     */
+    @Override
+    void stopRunning() {
+        super.stopRunning();
+        queue.close();
+    }
+    
+    /**
+     *  In super(), doSend queues it to the writer thread and
+     *  the writer thread calls writeMessage() to write to the output stream.
+     *  Since we have no writer thread this shouldn't happen.
+     */
+    @Override
+    void writeMessage(I2CPMessage msg) {
+        throw new RuntimeException("huh?");
+    }
+    
+    /**
+     * Actually send the I2CPMessage to the client.
+     * Nonblocking.
+     */
+    @Override
+    void doSend(I2CPMessage msg) throws I2CPMessageException {
+        // This will never fail, for now, as the router uses unbounded queues
+        // Perhaps in the future we may want to use bounded queues,
+        // with non-blocking writes for the router
+        // and blocking writes for the client?
+        boolean success = queue.offer(msg);
+        if (!success)
+            throw new I2CPMessageException("I2CP write to queue failed");
+    }
+    
+}