From ca91ad3188e970990fc71e6c7c8b86918611586a Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Sat, 8 Sep 2012 12:34:34 +0000
Subject: [PATCH]   * SSU: Move MessageReceiver queue to CoDel

---
 .../transport/udp/InboundMessageState.java    | 28 ++++++++++++++++++-
 .../router/transport/udp/MessageReceiver.java |  4 +--
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
index c2e6e4124d..e4f8435ae4 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
@@ -3,6 +3,7 @@ package net.i2p.router.transport.udp;
 import net.i2p.data.ByteArray;
 import net.i2p.data.Hash;
 import net.i2p.router.RouterContext;
+import net.i2p.router.util.CDQEntry;
 import net.i2p.util.ByteCache;
 import net.i2p.util.Log;
 
@@ -12,7 +13,7 @@ import net.i2p.util.Log;
  * Warning - there is no synchronization in this class, take care in
  * InboundMessageFragments to avoid use-after-release, etc.
  */
-class InboundMessageState {
+class InboundMessageState implements CDQEntry {
     private final RouterContext _context;
     private final Log _log;
     private final long _messageId;
@@ -29,6 +30,7 @@ class InboundMessageState {
      */
     private int _lastFragment;
     private final long _receiveBegin;
+    private long _enqueueTime;
     private int _completeSize;
     private boolean _released;
     
@@ -138,6 +140,30 @@ class InboundMessageState {
         return _context.clock().now() - _receiveBegin;
     }
 
+    /**
+     *  For CDQ
+     *  @since 0.9.3
+     */
+    public void setEnqueueTime(long now) {
+        _enqueueTime = now;
+    }
+
+    /**
+     *  For CDQ
+     *  @since 0.9.3
+     */
+    public long getEnqueueTime() {
+        return _enqueueTime;
+    }
+
+    /**
+     *  For CDQ
+     *  @since 0.9.3
+     */
+    public void drop() {
+        releaseResources();
+    }
+
     public Hash getFrom() { return _from; }
 
     public long getMessageId() { return _messageId; }
diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
index 112ea24980..61061e7cb6 100644
--- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
+++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
@@ -1,7 +1,6 @@
 package net.i2p.router.transport.udp;
 
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import net.i2p.data.Base64;
 import net.i2p.data.ByteArray;
@@ -10,6 +9,7 @@ import net.i2p.data.i2np.I2NPMessageException;
 import net.i2p.data.i2np.I2NPMessageHandler;
 import net.i2p.data.i2np.I2NPMessageImpl;
 import net.i2p.router.RouterContext;
+import net.i2p.router.util.CoDelBlockingQueue;
 //import net.i2p.util.ByteCache;
 import net.i2p.util.HexDump;
 import net.i2p.util.I2PThread;
@@ -55,7 +55,7 @@ class MessageReceiver {
             _threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
             qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
         }
-        _completeMessages = new LinkedBlockingQueue(qsize);
+        _completeMessages = new CoDelBlockingQueue(ctx, "UDP-MessageReceiver", qsize);
 
         // the runners run forever, no need to have a cache
         //_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
-- 
GitLab