diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 41efbcbc4..a81e35261 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -89,7 +89,7 @@ public class ClientConnectionRunner { _dead = false; } - private static int __id = 0; + private static volatile int __id = 0; /** * Actually run the connection - listen for I2CP messages and respond. This * is the main driver for this class, though it gets all its meat from the @@ -99,7 +99,7 @@ public class ClientConnectionRunner { public void startRunning() { try { _reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this)); - _writer = new ClientWriterRunner(); + _writer = new ClientWriterRunner(_context, this); I2PThread t = new I2PThread(_writer); t.setName("Writer " + ++__id); t.setDaemon(true); @@ -352,103 +352,34 @@ public class ClientConnectionRunner { //// //// - - /** - * Async writer class so that if a client app hangs, they wont take down the - * whole router with them (since otherwise the JobQueue would block until - * the client reads from their i2cp socket, causing all sorts of bad shit to - * happen) - * - */ - private class ClientWriterRunner implements Runnable { - private List _messagesToWrite; - private long _lastAdded; - public ClientWriterRunner() { - _messagesToWrite = new ArrayList(4); - } - - /** - * Add this message to the writer's queue - * - */ - public void addMessage(I2CPMessage msg) { - synchronized (_messagesToWrite) { - _messagesToWrite.add(msg); - _lastAdded = _context.clock().now(); - _messagesToWrite.notifyAll(); + boolean getIsDead() { return _dead; } + + void writeMessage(I2CPMessage msg) { + long before = _context.clock().now(); + try { + synchronized (_out) { + msg.writeMessage(_out); + _out.flush(); } - } - - /** - * No more messages - dont even try to send what we have - * - */ - public void stopWriting() { - synchronized (_messagesToWrite) { - _messagesToWrite.notifyAll(); - } - } - public void run() { - while (!_dead) { - I2CPMessage msg = null; - long beforeCheckSync = _context.clock().now(); - long inCheckSync = 0; - synchronized (_messagesToWrite) { - inCheckSync = _context.clock().now(); - if (_messagesToWrite.size() > 0) { - msg = (I2CPMessage)_messagesToWrite.remove(0); - } else { - try { - _messagesToWrite.wait(); - } catch (InterruptedException ie) { - if (_messagesToWrite.size() > 0) - msg = (I2CPMessage)_messagesToWrite.remove(0); - } - } - } - - long afterCheckSync = _context.clock().now(); - - if (msg != null) { - writeMessage(msg); - long afterWriteMessage = _context.clock().now(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("writeMessage: check sync took " - + (inCheckSync-beforeCheckSync) + "ms, writemessage took " - + (afterWriteMessage-afterCheckSync) - + "ms, time since addMessage(): " + - + (afterCheckSync-_lastAdded)); - } - } - } - - private void writeMessage(I2CPMessage msg) { - long before = _context.clock().now(); - try { - synchronized (_out) { - msg.writeMessage(_out); - _out.flush(); - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("after writeMessage("+ msg.getClass().getName() + "): " - + (_context.clock().now()-before) + "ms");; - } catch (I2CPMessageException ime) { - _log.error("Message exception sending I2CP message", ime); - stopRunning(); - } catch (IOException ioe) { - _log.error("IO exception sending I2CP message", ioe); - stopRunning(); - } catch (Throwable t) { - _log.log(Log.CRIT, "Unhandled exception sending I2CP message", t); - stopRunning(); - } finally { - long after = _context.clock().now(); - long lag = after - before; - if (lag > 300) { - if (_log.shouldLog(Log.WARN)) - _log.warn("synchronization on the i2cp message send took too long (" + lag - + "ms): " + msg); - } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("after writeMessage("+ msg.getClass().getName() + "): " + + (_context.clock().now()-before) + "ms");; + } catch (I2CPMessageException ime) { + _log.error("Message exception sending I2CP message", ime); + stopRunning(); + } catch (IOException ioe) { + _log.error("IO exception sending I2CP message", ioe); + stopRunning(); + } catch (Throwable t) { + _log.log(Log.CRIT, "Unhandled exception sending I2CP message", t); + stopRunning(); + } finally { + long after = _context.clock().now(); + long lag = after - before; + if (lag > 300) { + if (_log.shouldLog(Log.WARN)) + _log.warn("synchronization on the i2cp message send took too long (" + lag + + "ms): " + msg); } } } diff --git a/router/java/src/net/i2p/router/client/ClientWriterRunner.java b/router/java/src/net/i2p/router/client/ClientWriterRunner.java new file mode 100644 index 000000000..db4b77988 --- /dev/null +++ b/router/java/src/net/i2p/router/client/ClientWriterRunner.java @@ -0,0 +1,99 @@ +package net.i2p.router.client; + +import java.util.List; +import java.util.ArrayList; +import net.i2p.router.RouterContext; +import net.i2p.util.Log; +import net.i2p.data.i2cp.I2CPMessage; + +/** + * Async writer class so that if a client app hangs, they wont take down the + * whole router with them (since otherwise the JobQueue would block until + * the client reads from their i2cp socket, causing all sorts of bad shit to + * happen) + * + */ +class ClientWriterRunner implements Runnable { + private List _messagesToWrite; + private volatile long _lastAdded; + private ClientConnectionRunner _runner; + private RouterContext _context; + private Log _log; + public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) { + _context = context; + _log = context.logManager().getLog(ClientWriterRunner.class); + _messagesToWrite = new ArrayList(4); + _runner = runner; + } + + /** + * Add this message to the writer's queue + * + */ + public void addMessage(I2CPMessage msg) { + synchronized (_messagesToWrite) { + _messagesToWrite.add(msg); + _lastAdded = _context.clock().now(); + _messagesToWrite.notifyAll(); + } + } + + /** + * No more messages - dont even try to send what we have + * + */ + public void stopWriting() { + synchronized (_messagesToWrite) { + _messagesToWrite.notifyAll(); + } + } + public void run() { + while (!_runner.getIsDead()) { + List messages = null; + long beforeCheckSync = _context.clock().now(); + long inCheckSync = 0; + int remaining = 0; + synchronized (_messagesToWrite) { + inCheckSync = _context.clock().now(); + if (_messagesToWrite.size() > 0) { + messages = new ArrayList(_messagesToWrite.size()); + messages.addAll(_messagesToWrite); + _messagesToWrite.clear(); + } else { + try { + _messagesToWrite.wait(); + } catch (InterruptedException ie) {} + if (_messagesToWrite.size() > 0) { + messages = new ArrayList(_messagesToWrite.size()); + messages.addAll(_messagesToWrite); + _messagesToWrite.clear(); + } + } + remaining = _messagesToWrite.size(); + } + + long afterCheckSync = _context.clock().now(); + + if (messages != null) { + for (int i = 0; i < messages.size(); i++) { + I2CPMessage msg = (I2CPMessage)messages.get(i); + _runner.writeMessage(msg); + long afterWriteMessage = _context.clock().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("writeMessage: check sync took " + + (inCheckSync-beforeCheckSync) + "ms, writemessage took " + + (afterWriteMessage-afterCheckSync) + + "ms, time since addMessage(): " + + (afterCheckSync-_lastAdded) + " for " + + msg.getClass().getName() + " remaining - " + remaining); + } + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("dont writeMessage: check sync took " + + (inCheckSync-beforeCheckSync) + "ms, " + + "time since addMessage(): " + + (afterCheckSync-_lastAdded) + " remaining - " + remaining); + } + } + } +}