From aef021dcd1daeee5ef9f930a93551494267b6c05 Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 7 Sep 2012 22:49:24 +0000 Subject: [PATCH] * I2CP: Limit router/client queue sizes and queue wait times --- .../net/i2p/client/ClientWriterRunner.java | 20 +++++++++++++------ .../src/net/i2p/client/I2PSessionImpl.java | 20 ++++++++++++++----- .../net/i2p/internal/I2CPMessageQueue.java | 8 ++++++++ .../net/i2p/router/client/ClientManager.java | 7 ++++--- .../router/client/I2CPMessageQueueImpl.java | 11 ++++++++++ 5 files changed, 52 insertions(+), 14 deletions(-) diff --git a/core/java/src/net/i2p/client/ClientWriterRunner.java b/core/java/src/net/i2p/client/ClientWriterRunner.java index 931af519c..cae086710 100644 --- a/core/java/src/net/i2p/client/ClientWriterRunner.java +++ b/core/java/src/net/i2p/client/ClientWriterRunner.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageException; @@ -22,24 +23,31 @@ class ClientWriterRunner implements Runnable { private I2PSessionImpl _session; private BlockingQueue _messagesToWrite; private static volatile long __Id = 0; + + private static final int MAX_QUEUE_SIZE = 32; + private static final long MAX_SEND_WAIT = 10*1000; /** starts the thread too */ public ClientWriterRunner(OutputStream out, I2PSessionImpl session) { _out = out; _session = session; - _messagesToWrite = new LinkedBlockingQueue(); + _messagesToWrite = new LinkedBlockingQueue(MAX_QUEUE_SIZE); Thread t = new I2PAppThread(this, "I2CP Client Writer " + (++__Id), true); t.start(); } /** - * Add this message to the writer's queue - * + * Add this message to the writer's queue. + * Blocking if queue is full. + * @throws I2PSessionException if we wait too long or are interrupted */ - public void addMessage(I2CPMessage msg) { + public void addMessage(I2CPMessage msg) throws I2PSessionException { try { - _messagesToWrite.put(msg); - } catch (InterruptedException ie) {} + if (!_messagesToWrite.offer(msg, MAX_SEND_WAIT, TimeUnit.MILLISECONDS)) + throw new I2PSessionException("Timed out waiting while write queue was full"); + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted while write queue was full", ie); + } } /** diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index eb1c6845c..c3b993e4a 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -147,6 +147,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa private static final long VERIFY_USAGE_TIME = 60*1000; + private static final long MAX_SEND_WAIT = 10*1000; + void dateUpdated() { _dateReceived = true; synchronized (_dateReceivedLock) { @@ -643,18 +645,26 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** * Deliver an I2CP message to the router + * As of 0.9.3, may block for several seconds if the write queue to the router is full * * @throws I2PSessionException if the message is malformed or there is an error writing it out */ void sendMessage(I2CPMessage message) throws I2PSessionException { - if (isClosed()) + if (isClosed()) { throw new I2PSessionException("Already closed"); - else if (_queue != null) - _queue.offer(message); // internal - else if (_writer == null) + } else if (_queue != null) { + // internal + try { + if (!_queue.offer(message, MAX_SEND_WAIT)) + throw new I2PSessionException("Timed out waiting while write queue was full"); + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted while write queue was full", ie); + } + } else if (_writer == null) { throw new I2PSessionException("Already closed"); - else + } else { _writer.addMessage(message); + } } /** diff --git a/core/java/src/net/i2p/internal/I2CPMessageQueue.java b/core/java/src/net/i2p/internal/I2CPMessageQueue.java index 93bea3a3f..eda2d8f95 100644 --- a/core/java/src/net/i2p/internal/I2CPMessageQueue.java +++ b/core/java/src/net/i2p/internal/I2CPMessageQueue.java @@ -23,6 +23,14 @@ public abstract class I2CPMessageQueue { */ public abstract boolean offer(I2CPMessage msg); + /** + * Send a message, blocking. + * @param timeout how long to wait for space (ms) + * @return success (false if no space available or if timed out) + * @since 0.9.3 + */ + public abstract boolean offer(I2CPMessage msg, long timeout) throws InterruptedException; + /** * Receive a message, nonblocking. * Unused for now. diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index da55abe27..e4a5bc64d 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -52,6 +52,8 @@ class ClientManager { /** SSL interface (only) @since 0.8.3 */ private static final String PROP_ENABLE_SSL = "i2cp.SSL"; + private static final int INTERNAL_QUEUE_SIZE = 256; + public ClientManager(RouterContext context, int port) { _ctx = context; _log = context.logManager().getLog(ClientManager.class); @@ -125,9 +127,8 @@ class ClientManager { public I2CPMessageQueue internalConnect() throws I2PSessionException { if (!_isStarted) throw new I2PSessionException("Router client manager is shut down"); - // for now we make these unlimited size - LinkedBlockingQueue in = new LinkedBlockingQueue(); - LinkedBlockingQueue out = new LinkedBlockingQueue(); + LinkedBlockingQueue in = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE); + LinkedBlockingQueue out = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE); I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out); I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in); ClientConnectionRunner runner = new QueuedClientConnectionRunner(_ctx, this, myQueue); diff --git a/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java b/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java index f65b06176..a783b3b99 100644 --- a/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java +++ b/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java @@ -1,6 +1,7 @@ package net.i2p.router.client; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.internal.I2CPMessageQueue; @@ -32,6 +33,16 @@ class I2CPMessageQueueImpl extends I2CPMessageQueue { return _out.offer(msg); } + /** + * Send a message, blocking. + * @param timeout how long to wait for space (ms) + * @return success (false if no space available or if timed out) + * @since 0.9.3 + */ + public boolean offer(I2CPMessage msg, long timeout) throws InterruptedException { + return _out.offer(msg, timeout, TimeUnit.MILLISECONDS); + } + /** * Receive a message, nonblocking * @return message or null if none available