From d9e6c06b22ae630bbeeca58df207f5938208b862 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 8 Sep 2012 20:45:11 +0000 Subject: [PATCH] * I2CP: Buffer output streams * ClientConnectionRunner: More cleanups and edge cases --- .../net/i2p/client/ClientWriterRunner.java | 3 +- .../router/client/ClientConnectionRunner.java | 91 ++++++++++--------- .../net/i2p/router/client/ClientManager.java | 11 ++- .../i2p/router/client/ClientWriterRunner.java | 11 ++- 4 files changed, 65 insertions(+), 51 deletions(-) diff --git a/core/java/src/net/i2p/client/ClientWriterRunner.java b/core/java/src/net/i2p/client/ClientWriterRunner.java index cae086710a..a1d55ca5ce 100644 --- a/core/java/src/net/i2p/client/ClientWriterRunner.java +++ b/core/java/src/net/i2p/client/ClientWriterRunner.java @@ -1,5 +1,6 @@ package net.i2p.client; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.BlockingQueue; @@ -29,7 +30,7 @@ class ClientWriterRunner implements Runnable { /** starts the thread too */ public ClientWriterRunner(OutputStream out, I2PSessionImpl session) { - _out = out; + _out = new BufferedOutputStream(out); _session = session; _messagesToWrite = new LinkedBlockingQueue(MAX_QUEUE_SIZE); Thread t = new I2PAppThread(this, "I2CP Client Writer " + (++__Id), true); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 03958f3785..15e4dbd71e 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -9,6 +9,7 @@ package net.i2p.router.client; */ import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; @@ -118,7 +119,7 @@ class ClientConnectionRunner { _messageId = new AtomicInteger(_context.random().nextInt()); } - private static volatile int __id = 0; + private static final AtomicInteger __id = new AtomicInteger(); /** * Actually run the connection - listen for I2CP messages and respond. This @@ -126,25 +127,25 @@ class ClientConnectionRunner { * {@link net.i2p.data.i2cp.I2CPMessageReader I2CPMessageReader} * */ - public synchronized void startRunning() { - try { + public synchronized void startRunning() throws IOException { + if (_dead || _reader != null) + throw new IllegalStateException(); _reader = new I2CPMessageReader(new BufferedInputStream(_socket.getInputStream(), BUF_SIZE), new ClientMessageEventListener(_context, this, true)); _writer = new ClientWriterRunner(_context, this); I2PThread t = new I2PThread(_writer); - t.setName("I2CP Writer " + ++__id); + t.setName("I2CP Writer " + __id.incrementAndGet()); t.setDaemon(true); t.setPriority(I2PThread.MAX_PRIORITY); t.start(); - _out = _socket.getOutputStream(); // FIXME OWCH! needs a better way so it can be final. FIXME + _out = new BufferedOutputStream(_socket.getOutputStream()); _reader.startReading(); // TODO need a cleaner for unclaimed items in _messages, but we have no timestamps... - } catch (IOException ioe) { - _log.error("Error starting up the runner", ioe); - } } - /** die a horrible death */ + /** + * Die a horrible death. Cannot be restarted. + */ public synchronized void stopRunning() { if (_dead) return; if (_context.router().isAlive() && _log.shouldLog(Log.WARN)) @@ -156,6 +157,7 @@ class ClientConnectionRunner { if (_writer != null) _writer.stopWriting(); if (_socket != null) try { _socket.close(); } catch (IOException ioe) { } _messages.clear(); + _acceptedPending.clear(); if (_sessionKeyManager != null) _sessionKeyManager.shutdown(); _manager.unregisterConnection(this); @@ -499,17 +501,22 @@ class ClientConnectionRunner { //// boolean getIsDead() { return _dead; } + /** + * Not thread-safe. Blocking. Only used for external sockets. + * ClientWriterRunner thread is the only caller. + * Others must use doSend(). + */ void writeMessage(I2CPMessage msg) { - long before = _context.clock().now(); + //long before = _context.clock().now(); try { - // We don't still need synchronization here? isn't ClientWriterRunner the only writer? - synchronized (_out) { + // We don't need synchronization here, ClientWriterRunner is the only writer. + //synchronized (_out) { msg.writeMessage(_out); _out.flush(); - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("after writeMessage("+ msg.getClass().getName() + "): " - + (_context.clock().now()-before) + "ms"); + //} + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("after writeMessage("+ msg.getClass().getName() + "): " + // + (_context.clock().now()-before) + "ms"); } catch (I2CPMessageException ime) { _log.error("Error sending I2CP message to client", ime); stopRunning(); @@ -525,14 +532,14 @@ class ClientConnectionRunner { } catch (Throwable t) { _log.log(Log.CRIT, "Unhandled exception sending I2CP message to client", t); stopRunning(); - } finally { - long after = _context.clock().now(); - long lag = after - before; - if (lag > 300) { - if (_log.shouldLog(Log.WARN)) - _log.warn("synchronization on the i2cp message send took too long (" + lag - + "ms): " + msg); - } + //} finally { + // long after = _context.clock().now(); + // long lag = after - before; + // if (lag > 300) { + // if (_log.shouldLog(Log.WARN)) + // _log.warn("synchronization on the i2cp message send took too long (" + lag + // + "ms): " + msg); + // } } } @@ -543,25 +550,25 @@ class ClientConnectionRunner { void doSend(I2CPMessage msg) throws I2CPMessageException { if (_out == null) throw new I2CPMessageException("Output stream is not initialized"); if (msg == null) throw new I2CPMessageException("Null message?!"); - if (_log.shouldLog(Log.DEBUG)) { - if ( (_config == null) || (_config.getDestination() == null) ) - _log.debug("before doSend of a "+ msg.getClass().getName() - + " message on for establishing i2cp con"); - else - _log.debug("before doSend of a "+ msg.getClass().getName() - + " message on for " - + _config.getDestination().calculateHash().toBase64()); - } + //if (_log.shouldLog(Log.DEBUG)) { + // if ( (_config == null) || (_config.getDestination() == null) ) + // _log.debug("before doSend of a "+ msg.getClass().getName() + // + " message on for establishing i2cp con"); + // else + // _log.debug("before doSend of a "+ msg.getClass().getName() + // + " message on for " + // + _config.getDestination().calculateHash().toBase64()); + //} _writer.addMessage(msg); - if (_log.shouldLog(Log.DEBUG)) { - if ( (_config == null) || (_config.getDestination() == null) ) - _log.debug("after doSend of a "+ msg.getClass().getName() - + " message on for establishing i2cp con"); - else - _log.debug("after doSend of a "+ msg.getClass().getName() - + " message on for " - + _config.getDestination().calculateHash().toBase64()); - } + //if (_log.shouldLog(Log.DEBUG)) { + // if ( (_config == null) || (_config.getDestination() == null) ) + // _log.debug("after doSend of a "+ msg.getClass().getName() + // + " message on for establishing i2cp con"); + // else + // _log.debug("after doSend of a "+ msg.getClass().getName() + // + " message on for " + // + _config.getDestination().calculateHash().toBase64()); + //} } public int getNextMessageId() { diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index 0b868aea6d..3abd06c531 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -141,10 +141,15 @@ class ClientManager { } public void registerConnection(ClientConnectionRunner runner) { - synchronized (_pendingRunners) { - _pendingRunners.add(runner); + try { + runner.startRunning(); + synchronized (_pendingRunners) { + _pendingRunners.add(runner); + } + } catch (IOException ioe) { + _log.error("Error starting up the runner", ioe); + runner.stopRunning(); } - runner.startRunning(); } public void unregisterConnection(ClientConnectionRunner runner) { diff --git a/router/java/src/net/i2p/router/client/ClientWriterRunner.java b/router/java/src/net/i2p/router/client/ClientWriterRunner.java index eba3e96e03..8bdeeded45 100644 --- a/router/java/src/net/i2p/router/client/ClientWriterRunner.java +++ b/router/java/src/net/i2p/router/client/ClientWriterRunner.java @@ -15,21 +15,22 @@ import net.i2p.util.Log; * the client reads from their i2cp socket, causing all sorts of bad things to * happen) * + * For external I2CP connections only. */ class ClientWriterRunner implements Runnable { private final BlockingQueue<I2CPMessage> _messagesToWrite; private final ClientConnectionRunner _runner; - private final Log _log; - private final long _id; - private static long __id = 0; + //private final Log _log; + //private final long _id; + //private static long __id = 0; private static final int QUEUE_SIZE = 256; public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) { - _log = context.logManager().getLog(ClientWriterRunner.class); + //_log = context.logManager().getLog(ClientWriterRunner.class); _messagesToWrite = new LinkedBlockingQueue(QUEUE_SIZE); _runner = runner; - _id = ++__id; + //_id = ++__id; } /** -- GitLab