* I2CP: Limit router/client queue sizes and queue wait times

This commit is contained in:
zzz
2012-09-07 22:49:24 +00:00
parent 489f43529c
commit aef021dcd1
5 changed files with 52 additions and 14 deletions

View File

@@ -4,6 +4,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageException;
@@ -22,24 +23,31 @@ class ClientWriterRunner implements Runnable {
private I2PSessionImpl _session;
private BlockingQueue<I2CPMessage> _messagesToWrite;
private static volatile long __Id = 0;
private static final int MAX_QUEUE_SIZE = 32;
private static final long MAX_SEND_WAIT = 10*1000;
/** starts the thread too */
public ClientWriterRunner(OutputStream out, I2PSessionImpl session) {
_out = out;
_session = session;
_messagesToWrite = new LinkedBlockingQueue();
_messagesToWrite = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
Thread t = new I2PAppThread(this, "I2CP Client Writer " + (++__Id), true);
t.start();
}
/**
* Add this message to the writer's queue
*
* Add this message to the writer's queue.
* Blocking if queue is full.
* @throws I2PSessionException if we wait too long or are interrupted
*/
public void addMessage(I2CPMessage msg) {
public void addMessage(I2CPMessage msg) throws I2PSessionException {
try {
_messagesToWrite.put(msg);
} catch (InterruptedException ie) {}
if (!_messagesToWrite.offer(msg, MAX_SEND_WAIT, TimeUnit.MILLISECONDS))
throw new I2PSessionException("Timed out waiting while write queue was full");
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted while write queue was full", ie);
}
}
/**

View File

@@ -147,6 +147,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
private static final long VERIFY_USAGE_TIME = 60*1000;
private static final long MAX_SEND_WAIT = 10*1000;
void dateUpdated() {
_dateReceived = true;
synchronized (_dateReceivedLock) {
@@ -643,18 +645,26 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/**
* Deliver an I2CP message to the router
* As of 0.9.3, may block for several seconds if the write queue to the router is full
*
* @throws I2PSessionException if the message is malformed or there is an error writing it out
*/
void sendMessage(I2CPMessage message) throws I2PSessionException {
if (isClosed())
if (isClosed()) {
throw new I2PSessionException("Already closed");
else if (_queue != null)
_queue.offer(message); // internal
else if (_writer == null)
} else if (_queue != null) {
// internal
try {
if (!_queue.offer(message, MAX_SEND_WAIT))
throw new I2PSessionException("Timed out waiting while write queue was full");
} catch (InterruptedException ie) {
throw new I2PSessionException("Interrupted while write queue was full", ie);
}
} else if (_writer == null) {
throw new I2PSessionException("Already closed");
else
} else {
_writer.addMessage(message);
}
}
/**

View File

@@ -23,6 +23,14 @@ public abstract class I2CPMessageQueue {
*/
public abstract boolean offer(I2CPMessage msg);
/**
* Send a message, blocking.
* @param timeout how long to wait for space (ms)
* @return success (false if no space available or if timed out)
* @since 0.9.3
*/
public abstract boolean offer(I2CPMessage msg, long timeout) throws InterruptedException;
/**
* Receive a message, nonblocking.
* Unused for now.

View File

@@ -52,6 +52,8 @@ class ClientManager {
/** SSL interface (only) @since 0.8.3 */
private static final String PROP_ENABLE_SSL = "i2cp.SSL";
private static final int INTERNAL_QUEUE_SIZE = 256;
public ClientManager(RouterContext context, int port) {
_ctx = context;
_log = context.logManager().getLog(ClientManager.class);
@@ -125,9 +127,8 @@ class ClientManager {
public I2CPMessageQueue internalConnect() throws I2PSessionException {
if (!_isStarted)
throw new I2PSessionException("Router client manager is shut down");
// for now we make these unlimited size
LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue();
LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue();
LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE);
LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE);
I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out);
I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in);
ClientConnectionRunner runner = new QueuedClientConnectionRunner(_ctx, this, myQueue);

View File

@@ -1,6 +1,7 @@
package net.i2p.router.client;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.internal.I2CPMessageQueue;
@@ -32,6 +33,16 @@ class I2CPMessageQueueImpl extends I2CPMessageQueue {
return _out.offer(msg);
}
/**
* Send a message, blocking.
* @param timeout how long to wait for space (ms)
* @return success (false if no space available or if timed out)
* @since 0.9.3
*/
public boolean offer(I2CPMessage msg, long timeout) throws InterruptedException {
return _out.offer(msg, timeout, TimeUnit.MILLISECONDS);
}
/**
* Receive a message, nonblocking
* @return message or null if none available