From b8949eafe22f7d2ec9f2c00ce0e2039a40e469a3 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 5 Sep 2012 15:50:11 +0000 Subject: [PATCH] Non-codel version of priority blocking queue, so we can implement priority queues without necessarily committing to codel. --- .../src/net/i2p/router/util/CDPQEntry.java | 16 +---- .../util/CoDelPriorityBlockingQueue.java | 44 ++---------- .../java/src/net/i2p/router/util/PQEntry.java | 23 +++++++ .../net/i2p/router/util/PriBlockingQueue.java | 67 +++++++++++++++++++ 4 files changed, 95 insertions(+), 55 deletions(-) create mode 100644 router/java/src/net/i2p/router/util/PQEntry.java create mode 100644 router/java/src/net/i2p/router/util/PriBlockingQueue.java diff --git a/router/java/src/net/i2p/router/util/CDPQEntry.java b/router/java/src/net/i2p/router/util/CDPQEntry.java index 87106e340d..a04e7f522b 100644 --- a/router/java/src/net/i2p/router/util/CDPQEntry.java +++ b/router/java/src/net/i2p/router/util/CDPQEntry.java @@ -4,20 +4,6 @@ package net.i2p.router.util; * For CoDelPriorityQueue * @since 0.9.3 */ -public interface CDPQEntry extends CDQEntry { +public interface CDPQEntry extends CDQEntry, PQEntry { - /** - * Higher is higher priority - */ - public int getPriority(); - - /** - * To be set by the queue - */ - public void setSeqNum(long num); - - /** - * Needed to ensure FIFO ordering within a single priority - */ - public long getSeqNum(); } diff --git a/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java b/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java index 483967891c..45ea0a9dc2 100644 --- a/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java +++ b/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java @@ -25,7 +25,7 @@ import net.i2p.util.Log; * * @since 0.9.3 */ -public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlockingQueue<E> { +public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlockingQueue<E> { private final I2PAppContext _context; private final Log _log; @@ -81,7 +81,7 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo * @param name for stats */ public CoDelPriorityBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) { - super(initialCapacity, new PriorityComparator()); + super(initialCapacity); _context = ctx; _log = ctx.logManager().getLog(CoDelPriorityBlockingQueue.class); _name = name; @@ -94,30 +94,6 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo _id = __id.incrementAndGet(); } - @Override - public boolean add(E o) { - timestamp(o); - return super.add(o); - } - - @Override - public boolean offer(E o) { - timestamp(o); - return super.offer(o); - } - - @Override - public boolean offer(E o, long timeout, TimeUnit unit) { - timestamp(o); - return super.offer(o, timeout, unit); - } - - @Override - public void put(E o) { - timestamp(o); - super.put(o); - } - @Override public void clear() { super.clear(); @@ -180,7 +156,8 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo /////// private below here - private void timestamp(E o) { + @Override + protected void timestamp(E o) { o.setSeqNum(_seqNum.incrementAndGet()); o.setEnqueueTime(_context.clock().now()); if (o.getPriority() < MIN_PRIORITY && _log.shouldLog(Log.WARN)) @@ -317,17 +294,4 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriorityBlo private void control_law(long t) { _drop_next = t + (long) (INTERVAL / Math.sqrt(_count)); } - - /** - * highest priority first, then lowest sequence number first - */ - private static class PriorityComparator<E extends CDPQEntry> implements Comparator<E> { - public int compare(E l, E r) { - int d = r.getPriority() - l.getPriority(); - if (d != 0) - return d; - long ld = l.getSeqNum() - r.getSeqNum(); - return ld > 0 ? 1 : -1; - } - } } diff --git a/router/java/src/net/i2p/router/util/PQEntry.java b/router/java/src/net/i2p/router/util/PQEntry.java new file mode 100644 index 0000000000..e92d552dfa --- /dev/null +++ b/router/java/src/net/i2p/router/util/PQEntry.java @@ -0,0 +1,23 @@ +package net.i2p.router.util; + +/** + * For PriBlockingQueue + * @since 0.9.3 + */ +public interface PQEntry { + + /** + * Higher is higher priority + */ + public int getPriority(); + + /** + * To be set by the queue + */ + public void setSeqNum(long num); + + /** + * Needed to ensure FIFO ordering within a single priority + */ + public long getSeqNum(); +} diff --git a/router/java/src/net/i2p/router/util/PriBlockingQueue.java b/router/java/src/net/i2p/router/util/PriBlockingQueue.java new file mode 100644 index 0000000000..f3c1e5215c --- /dev/null +++ b/router/java/src/net/i2p/router/util/PriBlockingQueue.java @@ -0,0 +1,67 @@ +package net.i2p.router.util; + +import java.util.Comparator; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Priority Blocking Queue using methods in the entries, + * as definied in PQEntry, to store priority and sequence number, + * ensuring FIFO order within a priority. + * + * Input: add(), offer(), and put() are overridden to add a sequence number. + * + * @since 0.9.3 + */ +public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E> { + + private final AtomicLong _seqNum = new AtomicLong(); + + public PriBlockingQueue(int initialCapacity) { + super(initialCapacity, new PriorityComparator()); + } + + @Override + public boolean add(E o) { + timestamp(o); + return super.add(o); + } + + @Override + public boolean offer(E o) { + timestamp(o); + return super.offer(o); + } + + @Override + public boolean offer(E o, long timeout, TimeUnit unit) { + timestamp(o); + return super.offer(o, timeout, unit); + } + + @Override + public void put(E o) { + timestamp(o); + super.put(o); + } + + /////// private below here + + protected void timestamp(E o) { + o.setSeqNum(_seqNum.incrementAndGet()); + } + + /** + * highest priority first, then lowest sequence number first + */ + private static class PriorityComparator<E extends PQEntry> implements Comparator<E> { + public int compare(E l, E r) { + int d = r.getPriority() - l.getPriority(); + if (d != 0) + return d; + long ld = l.getSeqNum() - r.getSeqNum(); + return ld > 0 ? 1 : -1; + } + } +} -- GitLab