diff --git a/core/java/src/net/i2p/client/ClientWriterRunner.java b/core/java/src/net/i2p/client/ClientWriterRunner.java index 931af519cc8a540d1717ebfb81f51ffd44ca4287..cae086710a0035ab6d5e2c8fcc9b20cd6519ebca 100644 --- a/core/java/src/net/i2p/client/ClientWriterRunner.java +++ b/core/java/src/net/i2p/client/ClientWriterRunner.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageException; @@ -22,24 +23,31 @@ class ClientWriterRunner implements Runnable { private I2PSessionImpl _session; private BlockingQueue<I2CPMessage> _messagesToWrite; private static volatile long __Id = 0; + + private static final int MAX_QUEUE_SIZE = 32; + private static final long MAX_SEND_WAIT = 10*1000; /** starts the thread too */ public ClientWriterRunner(OutputStream out, I2PSessionImpl session) { _out = out; _session = session; - _messagesToWrite = new LinkedBlockingQueue(); + _messagesToWrite = new LinkedBlockingQueue(MAX_QUEUE_SIZE); Thread t = new I2PAppThread(this, "I2CP Client Writer " + (++__Id), true); t.start(); } /** - * Add this message to the writer's queue - * + * Add this message to the writer's queue. + * Blocking if queue is full. + * @throws I2PSessionException if we wait too long or are interrupted */ - public void addMessage(I2CPMessage msg) { + public void addMessage(I2CPMessage msg) throws I2PSessionException { try { - _messagesToWrite.put(msg); - } catch (InterruptedException ie) {} + if (!_messagesToWrite.offer(msg, MAX_SEND_WAIT, TimeUnit.MILLISECONDS)) + throw new I2PSessionException("Timed out waiting while write queue was full"); + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted while write queue was full", ie); + } } /** diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index eb1c6845c2fa1123aad53f902bbac83193d9b2c5..c3b993e4af08cf94e65414c1224043d46aee22aa 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -147,6 +147,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa private static final long VERIFY_USAGE_TIME = 60*1000; + private static final long MAX_SEND_WAIT = 10*1000; + void dateUpdated() { _dateReceived = true; synchronized (_dateReceivedLock) { @@ -643,18 +645,26 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** * Deliver an I2CP message to the router + * As of 0.9.3, may block for several seconds if the write queue to the router is full * * @throws I2PSessionException if the message is malformed or there is an error writing it out */ void sendMessage(I2CPMessage message) throws I2PSessionException { - if (isClosed()) + if (isClosed()) { throw new I2PSessionException("Already closed"); - else if (_queue != null) - _queue.offer(message); // internal - else if (_writer == null) + } else if (_queue != null) { + // internal + try { + if (!_queue.offer(message, MAX_SEND_WAIT)) + throw new I2PSessionException("Timed out waiting while write queue was full"); + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted while write queue was full", ie); + } + } else if (_writer == null) { throw new I2PSessionException("Already closed"); - else + } else { _writer.addMessage(message); + } } /** diff --git a/core/java/src/net/i2p/internal/I2CPMessageQueue.java b/core/java/src/net/i2p/internal/I2CPMessageQueue.java index 93bea3a3f29b55ece440d048f2fdd31e21975de8..eda2d8f9572975f13e702199597478707212f636 100644 --- a/core/java/src/net/i2p/internal/I2CPMessageQueue.java +++ b/core/java/src/net/i2p/internal/I2CPMessageQueue.java @@ -23,6 +23,14 @@ public abstract class I2CPMessageQueue { */ public abstract boolean offer(I2CPMessage msg); + /** + * Send a message, blocking. + * @param timeout how long to wait for space (ms) + * @return success (false if no space available or if timed out) + * @since 0.9.3 + */ + public abstract boolean offer(I2CPMessage msg, long timeout) throws InterruptedException; + /** * Receive a message, nonblocking. * Unused for now. diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index da55abe2713dfa79137b70b797a461754f16be41..e4a5bc64dbd3797a0f83a76273c7dd3087f714b9 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -52,6 +52,8 @@ class ClientManager { /** SSL interface (only) @since 0.8.3 */ private static final String PROP_ENABLE_SSL = "i2cp.SSL"; + private static final int INTERNAL_QUEUE_SIZE = 256; + public ClientManager(RouterContext context, int port) { _ctx = context; _log = context.logManager().getLog(ClientManager.class); @@ -125,9 +127,8 @@ class ClientManager { public I2CPMessageQueue internalConnect() throws I2PSessionException { if (!_isStarted) throw new I2PSessionException("Router client manager is shut down"); - // for now we make these unlimited size - LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue(); - LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue(); + LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE); + LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE); I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out); I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in); ClientConnectionRunner runner = new QueuedClientConnectionRunner(_ctx, this, myQueue); diff --git a/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java b/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java index f65b061766a8bf9982e09601ebf775de0b03cd7e..a783b3b9977019b0aa0926935eb74a3cec52e774 100644 --- a/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java +++ b/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java @@ -1,6 +1,7 @@ package net.i2p.router.client; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.internal.I2CPMessageQueue; @@ -32,6 +33,16 @@ class I2CPMessageQueueImpl extends I2CPMessageQueue { return _out.offer(msg); } + /** + * Send a message, blocking. + * @param timeout how long to wait for space (ms) + * @return success (false if no space available or if timed out) + * @since 0.9.3 + */ + public boolean offer(I2CPMessage msg, long timeout) throws InterruptedException { + return _out.offer(msg, timeout, TimeUnit.MILLISECONDS); + } + /** * Receive a message, nonblocking * @return message or null if none available