From 7ec29b0c5a34513905b65d8b5c3b00f2a4394e86 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Mon, 2 Feb 2009 18:03:16 +0000
Subject: [PATCH] use concurrent

---
 .../router/client/ClientConnectionRunner.java | 55 +++--------
 .../i2p/router/client/ClientWriterRunner.java | 94 ++++++++-----------
 2 files changed, 50 insertions(+), 99 deletions(-)

diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
index 189568eadd..af30a5d3da 100644
--- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
+++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java
@@ -11,6 +11,7 @@ package net.i2p.router.client;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -59,7 +60,7 @@ public class ClientConnectionRunner {
     /** user's config */
     private SessionConfig _config;
     /** static mapping of MessageId to Payload, storing messages for retrieval */
-    private Map _messages; 
+    private Map<MessageId, Payload> _messages; 
     /** lease set request state, or null if there is no request pending on at the moment */
     private LeaseRequestState _leaseRequest;
     /** currently allocated leaseSet, or null if none is allocated */
@@ -88,7 +89,7 @@ public class ClientConnectionRunner {
         _manager = manager;
         _socket = socket;
         _config = null;
-        _messages = new HashMap();
+        _messages = new ConcurrentHashMap();
         _alreadyProcessed = new ArrayList();
         _acceptedPending = new HashSet();
         _dead = false;
@@ -106,7 +107,7 @@ public class ClientConnectionRunner {
             _reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this));
             _writer = new ClientWriterRunner(_context, this);
             I2PThread t = new I2PThread(_writer);
-            t.setName("Writer " + ++__id);
+            t.setName("I2CP Writer " + ++__id);
             t.setDaemon(true);
             t.setPriority(I2PThread.MAX_PRIORITY);
             t.start();
@@ -128,9 +129,7 @@ public class ClientConnectionRunner {
         if (_reader != null) _reader.stopReading();
         if (_writer != null) _writer.stopWriting();
         if (_socket != null) try { _socket.close(); } catch (IOException ioe) { }
-        synchronized (_messages) {
-            _messages.clear();
-        }
+        _messages.clear();
         if (_manager != null)
             _manager.unregisterConnection(this);
         if (_currentLeaseSet != null)
@@ -164,50 +163,18 @@ public class ClientConnectionRunner {
     }
     /** already closed? */
     boolean isDead() { return _dead; }
+
     /** message body */
     Payload getPayload(MessageId id) { 
-        Payload rv = null;
-        long beforeLock = _context.clock().now();
-        long inLock = 0;
-        synchronized (_messages) { 
-            inLock = _context.clock().now();
-            rv = (Payload)_messages.get(id); 
-        } 
-        long afterLock = _context.clock().now();
-        
-        if (afterLock - beforeLock > 50) {
-            _log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock)
-                      + " overall, synchronized took " + (inLock - beforeLock));
-        }
-        return rv;
+        return _messages.get(id); 
     }
+
     void setPayload(MessageId id, Payload payload) { 
-        long beforeLock = _context.clock().now();
-        long inLock = 0;
-        synchronized (_messages) { 
-            inLock = _context.clock().now();
-            _messages.put(id, payload); 
-        } 
-        long afterLock = _context.clock().now();
-        
-        if (afterLock - beforeLock > 50) {
-            _log.warn("setPayload.locking took too long: " + (afterLock-beforeLock)
-                      + " overall, synchronized took " + (inLock - beforeLock));
-        }
+        _messages.put(id, payload); 
     }
+
     void removePayload(MessageId id) { 
-        long beforeLock = _context.clock().now();
-        long inLock = 0;
-        synchronized (_messages) { 
-            inLock = _context.clock().now();
-            _messages.remove(id); 
-        } 
-        long afterLock = _context.clock().now();
-        
-        if (afterLock - beforeLock > 50) {
-            _log.warn("removePayload.locking took too long: " + (afterLock-beforeLock)
-                      + " overall, synchronized took " + (inLock - beforeLock));
-        }
+        _messages.remove(id); 
     }
     
     void sessionEstablished(SessionConfig config) {
diff --git a/router/java/src/net/i2p/router/client/ClientWriterRunner.java b/router/java/src/net/i2p/router/client/ClientWriterRunner.java
index bf13648772..49fcddcc20 100644
--- a/router/java/src/net/i2p/router/client/ClientWriterRunner.java
+++ b/router/java/src/net/i2p/router/client/ClientWriterRunner.java
@@ -1,9 +1,13 @@
 package net.i2p.router.client;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import net.i2p.data.i2cp.I2CPMessage;
+import net.i2p.data.i2cp.I2CPMessageImpl;
+import net.i2p.data.i2cp.I2CPMessageException;
 import net.i2p.router.RouterContext;
 import net.i2p.util.Log;
 
@@ -13,26 +17,18 @@ import net.i2p.util.Log;
  * the client reads from their i2cp socket, causing all sorts of bad shit to
  * happen)
  *
+ * @author zzz modded to use concurrent
  */
 class ClientWriterRunner implements Runnable {
-    private List _messagesToWrite;
-    private List _messagesToWriteTimes;
+    private BlockingQueue<I2CPMessage> _messagesToWrite;
     private ClientConnectionRunner _runner;
-    private RouterContext _context;
     private Log _log;
     private long _id;
     private static long __id = 0;
     
-    private static final long MAX_WAIT = 5*1000;
-    
-    /** lock on this when updating the class level data structs */
-    private Object _dataLock = new Object();
-    
     public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) {
-        _context = context;
         _log = context.logManager().getLog(ClientWriterRunner.class);
-        _messagesToWrite = new ArrayList(4);
-        _messagesToWriteTimes = new ArrayList(4);
+        _messagesToWrite = new LinkedBlockingQueue();
         _runner = runner;
         _id = ++__id;
     }
@@ -42,11 +38,9 @@ class ClientWriterRunner implements Runnable {
      *
      */
     public void addMessage(I2CPMessage msg) {
-        synchronized (_dataLock) {
-            _messagesToWrite.add(msg);
-            _messagesToWriteTimes.add(new Long(_context.clock().now()));
-            _dataLock.notifyAll();
-        }
+        try {
+            _messagesToWrite.put(msg);
+        } catch (InterruptedException ie) {}
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("["+_id+"] addMessage completed for " + msg.getClass().getName());
     }
@@ -56,47 +50,37 @@ class ClientWriterRunner implements Runnable {
      *
      */
     public void stopWriting() {
-        synchronized (_dataLock) {
-            _dataLock.notifyAll();
-        }
+        _messagesToWrite.clear();
+        try {
+            _messagesToWrite.put(new PoisonMessage());
+        } catch (InterruptedException ie) {}
     }
+
     public void run() {
-        List messages = new ArrayList(64); 
-        List messageTimes = new ArrayList(64);
-        List switchList = null;
-        
+        I2CPMessage msg;
         while (!_runner.getIsDead()) {
-            synchronized (_dataLock) {
-                if (_messagesToWrite.size() <= 0) 
-                    try { _dataLock.wait(); } catch (InterruptedException ie) {}
-                
-                if (_messagesToWrite.size() > 0) {
-                    switchList = _messagesToWrite;
-                    _messagesToWrite = messages;
-                    messages = switchList;
-                    
-                    switchList = _messagesToWriteTimes;
-                    _messagesToWriteTimes = messageTimes;
-                    messageTimes = switchList;
-                } 
-            }
-            
-            if (messages.size() > 0) {
-                for (int i = 0; i < messages.size(); i++) {
-                    I2CPMessage msg = (I2CPMessage)messages.get(i);
-                    Long when = (Long)messageTimes.get(i);
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("["+_id+"] writeMessage before writing " 
-                                   + msg.getClass().getName());
-                    _runner.writeMessage(msg);
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("["+_id+"] writeMessage time since addMessage(): " 
-                                   + (_context.clock().now()-when.longValue()) + " for " 
-                                   + msg.getClass().getName());
-                }
+            try {
+                msg = _messagesToWrite.take();
+            } catch (InterruptedException ie) {
+                continue;
             }
-            messages.clear();
-            messageTimes.clear();
+            if (msg.getType() == PoisonMessage.MESSAGE_TYPE)
+                break;
+            _runner.writeMessage(msg);
+        }
+    }
+
+    /**
+     * End-of-stream msg used to stop the concurrent queue
+     * See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
+     *
+     */
+    private static class PoisonMessage extends I2CPMessageImpl {
+        public static final int MESSAGE_TYPE = 999999;
+        public int getType() {
+            return MESSAGE_TYPE;
         }
+        public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
+        public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
     }
 }
-- 
GitLab