From b53031685063a034d612a58d0affed481b438be9 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Thu, 17 Dec 2009 01:05:29 +0000
Subject: [PATCH]     * I2CP:       - Move client-side writes to their own
 thread       - Reenable InternalSockets

---
 .../net/i2p/client/ClientWriterRunner.java    | 97 +++++++++++++++++++
 .../src/net/i2p/client/I2PSessionImpl.java    | 42 +++-----
 .../src/net/i2p/client/I2PSimpleSession.java  |  1 +
 .../net/i2p/util/InternalServerSocket.java    |  5 +
 .../java/src/net/i2p/util/InternalSocket.java | 10 +-
 5 files changed, 123 insertions(+), 32 deletions(-)
 create mode 100644 core/java/src/net/i2p/client/ClientWriterRunner.java

diff --git a/core/java/src/net/i2p/client/ClientWriterRunner.java b/core/java/src/net/i2p/client/ClientWriterRunner.java
new file mode 100644
index 0000000000..056208fb5a
--- /dev/null
+++ b/core/java/src/net/i2p/client/ClientWriterRunner.java
@@ -0,0 +1,97 @@
+package net.i2p.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import net.i2p.data.i2cp.I2CPMessage;
+import net.i2p.data.i2cp.I2CPMessageImpl;
+import net.i2p.data.i2cp.I2CPMessageException;
+import net.i2p.util.I2PAppThread;
+
+/**
+ * Copied from net.i2p.router.client
+ * We need a single thread that writes so we don't have issues with
+ * the Piped Streams used in InternalSocket.
+ *
+ * @author zzz from net.i2p.router.client.ClientWriterRunner
+ */
+class ClientWriterRunner implements Runnable {
+    private OutputStream _out;
+    private I2PSessionImpl _session;
+    private BlockingQueue<I2CPMessage> _messagesToWrite;
+    private static volatile long __Id = 0;
+    
+    /** starts the thread too */
+    public ClientWriterRunner(OutputStream out, I2PSessionImpl session) {
+        _out = out;
+        _session = session;
+        _messagesToWrite = new LinkedBlockingQueue();
+        Thread t = new I2PAppThread(this, "I2CP Client Writer " + (++__Id), true);
+        t.start();
+    }
+
+    /**
+     * Add this message to the writer's queue
+     *
+     */
+    public void addMessage(I2CPMessage msg) {
+        try {
+            _messagesToWrite.put(msg);
+        } catch (InterruptedException ie) {}
+    }
+
+    /**
+     * No more messages - dont even try to send what we have
+     *
+     */
+    public void stopWriting() {
+        _messagesToWrite.clear();
+        try {
+            _messagesToWrite.put(new PoisonMessage());
+        } catch (InterruptedException ie) {}
+    }
+
+    public void run() {
+        I2CPMessage msg;
+        while (!_session.isClosed()) {
+            try {
+                msg = _messagesToWrite.take();
+            } catch (InterruptedException ie) {
+                continue;
+            }
+            if (msg.getType() == PoisonMessage.MESSAGE_TYPE)
+                break;
+            // only thread, we don't need synchronized
+            try {
+                msg.writeMessage(_out);
+                _out.flush();
+            } catch (I2CPMessageException ime) {
+                _session.propogateError("Error writing out the message", ime);
+                _session.disconnect();
+                break;
+            } catch (IOException ioe) {
+                _session.propogateError("Error writing out the message", ioe);
+                _session.disconnect();
+                break;
+            }
+        }
+        _messagesToWrite.clear();
+    }
+
+    /**
+     * End-of-stream msg used to stop the concurrent queue
+     * See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
+     *
+     */
+    private static class PoisonMessage extends I2CPMessageImpl {
+        public static final int MESSAGE_TYPE = 999999;
+        public int getType() {
+            return MESSAGE_TYPE;
+        }
+        public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
+        public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
+    }
+}
diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java
index c11c71549a..b1fc2b2266 100644
--- a/core/java/src/net/i2p/client/I2PSessionImpl.java
+++ b/core/java/src/net/i2p/client/I2PSessionImpl.java
@@ -73,6 +73,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
     protected Socket _socket;
     /** reader that always searches for messages */
     protected I2CPMessageReader _reader;
+    /** writer message queue */
+    protected ClientWriterRunner _writer;
     /** where we pipe our messages */
     protected /* FIXME final FIXME */OutputStream _out;
 
@@ -277,11 +279,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
                 _out.write(I2PClient.PROTOCOL_BYTE);
                 _out.flush();
             }
+            _writer = new ClientWriterRunner(_out, this);
             InputStream in = _socket.getInputStream();
             _reader = new I2CPMessageReader(in, this);
             if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "before startReading");
             _reader.startReading();
-
             if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate");
             sendMessage(new GetDateMessage());
             if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After getDate / begin waiting for a response");
@@ -543,34 +545,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
      * @throws I2PSessionException if the message is malformed or there is an error writing it out
      */
     void sendMessage(I2CPMessage message) throws I2PSessionException {
-        if (isClosed()) throw new I2PSessionException("Already closed");
-
-        long beforeSync = _context.clock().now();
-        long inSync = 0;
-        if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync to write");
-        try {
-            synchronized (_out) {
-                inSync = _context.clock().now();
-                if (_log.shouldLog(Log.DEBUG)) _log.debug("before writeMessage");
-                message.writeMessage(_out);
-                if (_log.shouldLog(Log.DEBUG)) _log.debug("after writeMessage");
-                _out.flush();
-                if (_log.shouldLog(Log.DEBUG)) _log.debug("after flush");
-            }
-        } catch (I2CPMessageException ime) {
-            throw new I2PSessionException(getPrefix() + "Error writing out the message", ime);
-        } catch (IOException ioe) {
-            throw new I2PSessionException(getPrefix() + "Error writing out the message", ioe);
-        }
-        long afterSync = _context.clock().now();
-        if (_log.shouldLog(Log.DEBUG)) 
-            _log.debug(getPrefix() + "Message written out and flushed w/ " 
-                       + (inSync-beforeSync) + "ms to sync and "
-                       + (afterSync-inSync) + "ms to send");
+        if (isClosed() || _writer == null)
+            throw new I2PSessionException("Already closed");
+        _writer.addMessage(message);
     }
 
     /**
      * Pass off the error to the listener
+     * Misspelled, oh well.
      */
     void propogateError(String msg, Throwable error) {
         if (_log.shouldLog(Log.ERROR)) 
@@ -629,8 +611,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
     private void closeSocket() {
         if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Closing the socket", new Exception("closeSocket"));
         _closed = true;
-        if (_reader != null) _reader.stopReading();
-        _reader = null;
+        if (_reader != null) {
+            _reader.stopReading();
+            _reader = null;
+        }
+        if (_writer != null) {
+            _writer.stopWriting();
+            _writer = null;
+        }
 
         if (_socket != null) {
             try {
diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java
index 933a9578c3..af5ab9a75b 100644
--- a/core/java/src/net/i2p/client/I2PSimpleSession.java
+++ b/core/java/src/net/i2p/client/I2PSimpleSession.java
@@ -79,6 +79,7 @@ class I2PSimpleSession extends I2PSessionImpl2 {
                 _out.write(I2PClient.PROTOCOL_BYTE);
                 _out.flush();
             }
+            _writer = new ClientWriterRunner(_out, this);
             InputStream in = _socket.getInputStream();
             _reader = new I2CPMessageReader(in, this);
             _reader.startReading();
diff --git a/core/java/src/net/i2p/util/InternalServerSocket.java b/core/java/src/net/i2p/util/InternalServerSocket.java
index 00d8352ee4..bc9a19ff7a 100644
--- a/core/java/src/net/i2p/util/InternalServerSocket.java
+++ b/core/java/src/net/i2p/util/InternalServerSocket.java
@@ -21,6 +21,11 @@ import net.i2p.I2PAppContext;
  *  A simple in-JVM ServerSocket using Piped Streams.
  *  We use port numbers just like regular sockets.
  *  Can only be connected by InternalSocket.
+ *
+ *  Warning - this uses Piped Streams, which don't like multiple writers from threads
+ *  that may vanish. If you do use multipe writers,
+ *  you may get intermittent 'write end dead' or 'pipe broken' IOExceptions on the reader side.
+ *  See http://techtavern.wordpress.com/2008/07/16/whats-this-ioexception-write-end-dead/
  */
 public class InternalServerSocket extends ServerSocket {
     private static final ConcurrentHashMap<Integer, InternalServerSocket> _sockets = new ConcurrentHashMap(4);
diff --git a/core/java/src/net/i2p/util/InternalSocket.java b/core/java/src/net/i2p/util/InternalSocket.java
index 52b028c117..26280f2545 100644
--- a/core/java/src/net/i2p/util/InternalSocket.java
+++ b/core/java/src/net/i2p/util/InternalSocket.java
@@ -34,12 +34,12 @@ public class InternalSocket extends Socket {
      *  Convenience method to return either a Socket or an InternalSocket
      */
     public static Socket getSocket(String host, int port) throws IOException {
-        //if (System.getProperty("router.version") != null &&
-        //    (host.equals("127.0.0.1") || host.equals("localhost"))) {
-        //    return new InternalSocket(port);
-        //} else {
+        if (System.getProperty("router.version") != null &&
+            (host.equals("127.0.0.1") || host.equals("localhost"))) {
+            return new InternalSocket(port);
+        } else {
             return new Socket(host, port);
-        //}
+        }
     }
 
     @Override
-- 
GitLab