forked from I2P_Developers/i2p.i2p
add isBacklogged()
This commit is contained in:
@@ -29,6 +29,7 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
|
||||
private final I2PAppContext _context;
|
||||
private final Log _log;
|
||||
private final String _name;
|
||||
private final int _capacity;
|
||||
|
||||
// following 4 are state variables defined by sample code, locked by this
|
||||
/** Time when we'll declare we're above target (0 if below) */
|
||||
@@ -68,6 +69,7 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
|
||||
private final String STAT_DROP;
|
||||
private final String STAT_DELAY;
|
||||
private static final long[] RATES = {5*60*1000, 60*60*1000};
|
||||
private static final long BACKLOG_TIME = 2*1000;
|
||||
|
||||
/**
|
||||
* @param name for stats
|
||||
@@ -77,6 +79,7 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(CoDelBlockingQueue.class);
|
||||
_name = name;
|
||||
_capacity = capacity;
|
||||
STAT_DROP = "codel." + name + ".drop";
|
||||
STAT_DELAY = "codel." + name + ".delay";
|
||||
ctx.statManager().createRequiredRateStat(STAT_DROP, "queue delay of dropped items", "Router", RATES);
|
||||
@@ -168,6 +171,18 @@ public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<
|
||||
return super.drainTo(c);
|
||||
}
|
||||
|
||||
/**
|
||||
* Has the head of the queue been waiting too long,
|
||||
* or is the queue almost full?
|
||||
*/
|
||||
public boolean isBacklogged() {
|
||||
E e = peek();
|
||||
if (e == null)
|
||||
return false;
|
||||
return _context.clock().now() - e.getEnqueueTime() >= BACKLOG_TIME ||
|
||||
remainingCapacity() < _capacity / 4;
|
||||
}
|
||||
|
||||
/////// private below here
|
||||
|
||||
/**
|
||||
|
||||
@@ -76,6 +76,7 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking
|
||||
private static final int[] PRIORITIES = {MIN_PRIORITY, 200, 300, 400, 500};
|
||||
/** if priority is >= this, never drop */
|
||||
public static final int DONT_DROP_PRIORITY = 1000;
|
||||
private static final long BACKLOG_TIME = 2*1000;
|
||||
|
||||
/**
|
||||
* @param name for stats
|
||||
@@ -154,6 +155,19 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking
|
||||
return super.drainTo(c);
|
||||
}
|
||||
|
||||
/**
|
||||
* Has the head of the queue been waiting too long,
|
||||
* or is the queue too big?
|
||||
*/
|
||||
@Override
|
||||
public boolean isBacklogged() {
|
||||
E e = peek();
|
||||
if (e == null)
|
||||
return false;
|
||||
return _context.clock().now() - e.getEnqueueTime() >= BACKLOG_TIME ||
|
||||
size() >= BACKLOG_SIZE;
|
||||
}
|
||||
|
||||
/////// private below here
|
||||
|
||||
@Override
|
||||
|
||||
@@ -18,6 +18,8 @@ public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E
|
||||
|
||||
private final AtomicLong _seqNum = new AtomicLong();
|
||||
|
||||
protected static final int BACKLOG_SIZE = 256;
|
||||
|
||||
public PriBlockingQueue(int initialCapacity) {
|
||||
super(initialCapacity, new PriorityComparator());
|
||||
}
|
||||
@@ -46,6 +48,13 @@ public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E
|
||||
super.put(o);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the queue too big?
|
||||
*/
|
||||
public boolean isBacklogged() {
|
||||
return size() >= BACKLOG_SIZE;
|
||||
}
|
||||
|
||||
/////// private below here
|
||||
|
||||
protected void timestamp(E o) {
|
||||
|
||||
Reference in New Issue
Block a user