From 4b989eb0928f4e0d6e8248b6300e1421064454ce Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 7 Sep 2011 01:38:30 +0000 Subject: [PATCH] * JobQueue: Change queue from a HashSet to a TreeSet for more efficiency --- history.txt | 7 +++ router/java/src/net/i2p/router/JobImpl.java | 7 ++- router/java/src/net/i2p/router/JobQueue.java | 49 ++++++++++++++++--- .../src/net/i2p/router/RouterVersion.java | 2 +- 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/history.txt b/history.txt index 479ff3eb0..63faabcd8 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,10 @@ +2011-09-07 zzz + * Console: Limit max displayed participating tunnels + * JobQueue: Change queue from a Set to a TreeSet for more efficiency + * TunnelDispatcher: Change participant expire List to a Queue for + efficiency and to remove global lock. Also remove separate + time List for space savings. + 2011-09-06 zzz * Console: Move configservice.jsp rendering code from the router to the console diff --git a/router/java/src/net/i2p/router/JobImpl.java b/router/java/src/net/i2p/router/JobImpl.java index 1692f2056..b3a63773c 100644 --- a/router/java/src/net/i2p/router/JobImpl.java +++ b/router/java/src/net/i2p/router/JobImpl.java @@ -8,14 +8,17 @@ package net.i2p.router; * */ +import java.util.concurrent.atomic.AtomicLong; + import net.i2p.util.Log; + /** * Base implementation of a Job */ public abstract class JobImpl implements Job { private final RouterContext _context; private final JobTiming _timing; - private static long _idSrc = 0; + private static AtomicLong _idSrc = new AtomicLong(); private final long _id; //private Exception _addedBy; private long _madeReadyOn; @@ -23,7 +26,7 @@ public abstract class JobImpl implements Job { public JobImpl(RouterContext context) { _context = context; _timing = new JobTiming(context); - _id = ++_idSrc; + _id = _idSrc.incrementAndGet(); } public long getJobId() { return _id; } diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index b78f1a53f..622c2350d 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -13,12 +13,13 @@ import java.io.Writer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.TreeSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.BlockingQueue; @@ -45,7 +46,7 @@ public class JobQueue { private volatile static int _runnerId = 0; /** list of jobs that are ready to run ASAP */ private final BlockingQueue _readyJobs; - /** list of jobs that are scheduled for running in the future */ + /** SortedSet of jobs that are scheduled for running in the future, earliest first */ private final Set _timedJobs; /** job name to JobStat for that job */ private final Map _jobStats; @@ -137,11 +138,10 @@ public class JobQueue { _alive = true; _readyJobs = new LinkedBlockingQueue(); - _timedJobs = new HashSet(64); + _timedJobs = new TreeSet(new JobComparator()); _jobLock = new Object(); _queueRunners = new ConcurrentHashMap(RUNNERS); _jobStats = new ConcurrentHashMap(); - _allowParallelOperation = false; _pumper = new QueuePumper(); I2PThread pumperThread = new I2PThread(_pumper, "Job Queue Pumper", true); //pumperThread.setPriority(I2PThread.NORM_PRIORITY+1); @@ -168,8 +168,11 @@ public class JobQueue { alreadyExists = true; numReady = _readyJobs.size(); if (!alreadyExists) { - if (_timedJobs.contains(job)) - alreadyExists = true; + //if (_timedJobs.contains(job)) + // alreadyExists = true; + // Always remove and re-add, since it needs to be + // re-sorted in the TreeSet. + _timedJobs.remove(job); } if (shouldDrop(job, numReady)) { @@ -185,10 +188,11 @@ public class JobQueue { _readyJobs.offer(job); } else { _timedJobs.add(job); + // only notify for _timedJobs, as _readyJobs does not use that lock + _jobLock.notifyAll(); } } } - _jobLock.notifyAll(); } _context.statManager().addRateData("jobQueue.readyJobs", numReady, 0); @@ -468,8 +472,11 @@ public class JobQueue { toAdd.add(j); iter.remove(); } else { - if ( (timeToWait <= 0) || (timeLeft < timeToWait) ) + //if ( (timeToWait <= 0) || (timeLeft < timeToWait) ) + // _timedJobs is now a TreeSet, so once we hit one that is + // not ready yet, we can break timeToWait = timeLeft; + break; } } @@ -609,6 +616,32 @@ public class JobQueue { public void dropped() {} } + /** + * Comparator for the _timedJobs TreeSet. + * Ensure different jobs with the same timing are different so they aren't removed. + * @since 0.8.9 + */ + private static class JobComparator implements Comparator { + public int compare(Job l, Job r) { + // equals first, Jobs generally don't override so this should be fast + if (l.equals(r)) + return 0; + // This is for _timedJobs, which always have a JobTiming. + // PoisonJob only goes in _readyJobs. + long ld = l.getTiming().getStartAfter() - r.getTiming().getStartAfter(); + if (ld < 0) + return -1; + if (ld > 0) + return 1; + ld = l.getJobId() - r.getJobId(); + if (ld < 0) + return -1; + if (ld > 0) + return 1; + return l.hashCode() - r.hashCode(); + } + } + /** * Dump the current state. * For the router console jobs status page. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index e36cd3b14..7f4edb975 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 12; + public final static long BUILD = 13; /** for example "-test" */ public final static String EXTRA = "";