I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit ca91ad31 authored by zzz's avatar zzz
Browse files

* SSU: Move MessageReceiver queue to CoDel

parent 33de6bea
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,7 @@ package net.i2p.router.transport.udp; ...@@ -3,6 +3,7 @@ package net.i2p.router.transport.udp;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.util.CDQEntry;
import net.i2p.util.ByteCache; import net.i2p.util.ByteCache;
import net.i2p.util.Log; import net.i2p.util.Log;
...@@ -12,7 +13,7 @@ import net.i2p.util.Log; ...@@ -12,7 +13,7 @@ import net.i2p.util.Log;
* Warning - there is no synchronization in this class, take care in * Warning - there is no synchronization in this class, take care in
* InboundMessageFragments to avoid use-after-release, etc. * InboundMessageFragments to avoid use-after-release, etc.
*/ */
class InboundMessageState { class InboundMessageState implements CDQEntry {
private final RouterContext _context; private final RouterContext _context;
private final Log _log; private final Log _log;
private final long _messageId; private final long _messageId;
...@@ -29,6 +30,7 @@ class InboundMessageState { ...@@ -29,6 +30,7 @@ class InboundMessageState {
*/ */
private int _lastFragment; private int _lastFragment;
private final long _receiveBegin; private final long _receiveBegin;
private long _enqueueTime;
private int _completeSize; private int _completeSize;
private boolean _released; private boolean _released;
...@@ -138,6 +140,30 @@ class InboundMessageState { ...@@ -138,6 +140,30 @@ class InboundMessageState {
return _context.clock().now() - _receiveBegin; return _context.clock().now() - _receiveBegin;
} }
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) {
_enqueueTime = now;
}
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() {
return _enqueueTime;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
releaseResources();
}
public Hash getFrom() { return _from; } public Hash getFrom() { return _from; }
public long getMessageId() { return _messageId; } public long getMessageId() { return _messageId; }
......
package net.i2p.router.transport.udp; package net.i2p.router.transport.udp;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64; import net.i2p.data.Base64;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
...@@ -10,6 +9,7 @@ import net.i2p.data.i2np.I2NPMessageException; ...@@ -10,6 +9,7 @@ import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageHandler; import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.data.i2np.I2NPMessageImpl; import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.util.CoDelBlockingQueue;
//import net.i2p.util.ByteCache; //import net.i2p.util.ByteCache;
import net.i2p.util.HexDump; import net.i2p.util.HexDump;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
...@@ -55,7 +55,7 @@ class MessageReceiver { ...@@ -55,7 +55,7 @@ class MessageReceiver {
_threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20)); _threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024))); qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
} }
_completeMessages = new LinkedBlockingQueue(qsize); _completeMessages = new CoDelBlockingQueue(ctx, "UDP-MessageReceiver", qsize);
// the runners run forever, no need to have a cache // the runners run forever, no need to have a cache
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE); //_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment