From 3347788712389c7b87c7cdb03a67466664b18e29 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Thu, 6 Sep 2012 19:53:01 +0000
Subject: [PATCH] add isBacklogged()

---
 .../net/i2p/router/util/CoDelBlockingQueue.java   | 15 +++++++++++++++
 .../router/util/CoDelPriorityBlockingQueue.java   | 14 ++++++++++++++
 .../src/net/i2p/router/util/PriBlockingQueue.java |  9 +++++++++
 3 files changed, 38 insertions(+)

diff --git a/router/java/src/net/i2p/router/util/CoDelBlockingQueue.java b/router/java/src/net/i2p/router/util/CoDelBlockingQueue.java
index 33a6fc42d5..5f89784a85 100644
--- a/router/java/src/net/i2p/router/util/CoDelBlockingQueue.java
+++ b/router/java/src/net/i2p/router/util/CoDelBlockingQueue.java
@@ -29,6 +29,7 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
     private final I2PAppContext _context;
     private final Log _log;
     private final String _name;
+    private final int _capacity;
 
     // following 4 are state variables defined by sample code, locked by this
     /** Time when we'll declare we're above target (0 if below) */
@@ -68,6 +69,7 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
     private final String STAT_DROP;
     private final String STAT_DELAY;
     private static final long[] RATES = {5*60*1000, 60*60*1000};
+    private static final long BACKLOG_TIME = 2*1000;
 
     /**
      *  @param name for stats
@@ -77,6 +79,7 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
         _context = ctx;
         _log = ctx.logManager().getLog(CoDelBlockingQueue.class);
         _name = name;
+        _capacity = capacity;
         STAT_DROP = "codel." + name + ".drop";
         STAT_DELAY = "codel." + name + ".delay";
         ctx.statManager().createRequiredRateStat(STAT_DROP, "queue delay of dropped items", "Router", RATES);
@@ -168,6 +171,18 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
         return super.drainTo(c);
     }
 
+    /**
+     *  Has the head of the queue been waiting too long,
+     *  or is the queue almost full?
+     */
+    public boolean isBacklogged() {
+        E e = peek();
+        if (e == null)
+            return false;
+        return _context.clock().now() - e.getEnqueueTime() >= BACKLOG_TIME ||
+               remainingCapacity() < _capacity / 4;
+    }
+
     /////// private below here
 
     /**
diff --git a/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java b/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java
index 45ea0a9dc2..ae4130edfc 100644
--- a/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java
+++ b/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java
@@ -76,6 +76,7 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking
     private static final int[] PRIORITIES = {MIN_PRIORITY, 200, 300, 400, 500};
     /** if priority is >= this, never drop */
     public static final int DONT_DROP_PRIORITY = 1000;
+    private static final long BACKLOG_TIME = 2*1000;
 
     /**
      *  @param name for stats
@@ -154,6 +155,19 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking
         return super.drainTo(c);
     }
 
+    /**
+     *  Has the head of the queue been waiting too long,
+     *  or is the queue too big?
+     */
+    @Override
+    public boolean isBacklogged() {
+        E e = peek();
+        if (e == null)
+            return false;
+        return _context.clock().now() - e.getEnqueueTime() >= BACKLOG_TIME ||
+               size() >= BACKLOG_SIZE;
+    }
+
     /////// private below here
 
     @Override
diff --git a/router/java/src/net/i2p/router/util/PriBlockingQueue.java b/router/java/src/net/i2p/router/util/PriBlockingQueue.java
index f3c1e5215c..486728bc48 100644
--- a/router/java/src/net/i2p/router/util/PriBlockingQueue.java
+++ b/router/java/src/net/i2p/router/util/PriBlockingQueue.java
@@ -18,6 +18,8 @@ public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E
 
     private final AtomicLong _seqNum = new AtomicLong();
 
+    protected static final int BACKLOG_SIZE = 256;
+
     public PriBlockingQueue(int initialCapacity) {
         super(initialCapacity, new PriorityComparator());
     }
@@ -46,6 +48,13 @@ public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E
         super.put(o);
     }
 
+    /**
+     *  Is the queue too big?
+     */
+    public boolean isBacklogged() {
+        return size() >= BACKLOG_SIZE;
+    }
+
     /////// private below here
 
     protected void timestamp(E o) {
-- 
GitLab