diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 3305050f2..af032c05e 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -89,7 +89,8 @@ public class I2PSocketManagerFactory { if (!opts.containsKey(name)) opts.setProperty(name, System.getProperty(name)); } - if (true) { + boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER)); + if (oldLib) { // for the old streaming lib opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); //opts.setProperty("tunnels.depthInbound", "0"); diff --git a/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java b/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java index 058c5b8b3..5d2829218 100644 --- a/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java +++ b/apps/streaming/java/test/net/i2p/client/streaming/StreamSinkTest.java @@ -1,28 +1,51 @@ package net.i2p.client.streaming; /** - * + * Usage: StreamSinkTest [(old|new) [#hops [#kb]]] */ public class StreamSinkTest { /* private static String HOST1 = "dev.i2p.net"; private static String HOST2 = "dev.i2p.net"; private static String PORT1 = "4101"; private static String PORT2 = "4501"; -*/ /* */ + /* private static String HOST1 = "localhost"; private static String HOST2 = "localhost"; private static String PORT1 = "7654"; private static String PORT2 = "7654"; - /* */ /* + */ private static String HOST1 = "localhost"; private static String HOST2 = "localhost"; private static String PORT1 = "10001"; private static String PORT2 = "11001"; - */ + /* */ public static void main(String args[]) { - System.setProperty(I2PSocketManagerFactory.PROP_MANAGER, I2PSocketManagerFull.class.getName()); - //System.setProperty("tunnels.depthInbound", "0"); + boolean old = false; + int hops = 0; + int kb = 32*1024; + if (args.length > 0) { + if ("old".equals(args[0])) + old = true; + } + if (args.length > 1) { + try { + hops = Integer.parseInt(args[1]); + } catch (NumberFormatException nfe) { + hops = 0; + } + } + if (args.length > 2) { + try { + kb = Integer.parseInt(args[2]); + } catch (NumberFormatException nfe) { + kb = 32*1024; + } + } + + if (!old) + System.setProperty(I2PSocketManagerFactory.PROP_MANAGER, I2PSocketManagerFull.class.getName()); + System.setProperty("tunnels.depthInbound", ""+hops); new Thread(new Runnable() { public void run() { @@ -32,10 +55,10 @@ public class StreamSinkTest { try { Thread.sleep(60*1000); } catch (Exception e) {} - //run(256, 10000); + //run(256, 1); //run(256, 1000); - //run(1024, 10); - run(32*1024, 1); + //run(4*1024, 10); + run(kb, 1); //run(1*1024, 1); //run("/home/jrandom/streamSinkTestDir/clientSink36766.dat", 1); //run(512*1024, 1); diff --git a/history.txt b/history.txt index 1cd564fe2..30302500c 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.60 2004/11/01 08:31:31 jrandom Exp $ +$Id: history.txt,v 1.61 2004/11/02 03:27:56 jrandom Exp $ + +2004-11-02 jrandom + * Fix for a long standing synchronization bug in the JobQueue (and added + some kooky flags to make sure it stays dead) + * Update the ministreaming lib to force mode=guaranteed if the default + lib is used, and mode=best_effort for all other libs. 2004-11-02 jrandom * Fixed up the configuration overrides for the streaming socket lib diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java b/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java index 602853b03..86150d527 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageReader.java @@ -153,6 +153,8 @@ public class I2NPMessageReader { return 0; } else { boolean shouldLag = _context.random().nextInt(1000) > size; + if (!shouldLag) return 0; + long readLag = getReadLag(); if (readLag > 0) { long lag = _context.random().nextLong(readLag); diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index 3c82164fa..3cd0acc8a 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -132,7 +132,6 @@ public class JobQueue { if (job instanceof JobImpl) ((JobImpl)job).addedToQueue(); - boolean isReady = false; long numReady = 0; boolean alreadyExists = false; synchronized (_readyJobs) { @@ -154,7 +153,9 @@ public class JobQueue { + numReady + ": job = " + job); job.dropped(); _context.statManager().addRateData("jobQueue.droppedJobs", 1, 1); - awaken(1); + synchronized (_readyJobs) { + _readyJobs.notifyAll(); + } return; } @@ -166,7 +167,7 @@ public class JobQueue { ((JobImpl)job).madeReady(); synchronized (_readyJobs) { _readyJobs.add(job); - isReady = true; + _readyJobs.notifyAll(); } } else { synchronized (_timedJobs) { @@ -179,11 +180,6 @@ public class JobQueue { _log.debug("Not adding already enqueued job " + job.getName()); } - if (isReady) { - // wake up at most one runner - awaken(1); - } - return; } @@ -246,11 +242,15 @@ public class JobQueue { } synchronized (_readyJobs) { _readyJobs.clear(); + _readyJobs.notifyAll(); } } void shutdown() { _alive = false; + synchronized (_readyJobs) { + _readyJobs.notifyAll(); + } if (_log.shouldLog(Log.WARN)) { StringBuffer buf = new StringBuffer(1024); buf.append("current jobs: \n"); @@ -339,31 +339,13 @@ public class JobQueue { */ Job getNext() { while (_alive) { - Job rv = null; - int ready = 0; synchronized (_readyJobs) { - ready = _readyJobs.size(); - if (ready > 0) - rv = (Job)_readyJobs.remove(0); - } - if (rv != null) { - // we found one, but there may be more, so wake up enough - // other runners - - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Waking up " + (ready-1) + " job runners (and running one)"); - //awaken(ready-1); - return rv; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("No jobs pending, waiting"); - } - - try { - synchronized (_runnerLock) { - _runnerLock.wait(); + if (_readyJobs.size() > 0) { + return (Job)_readyJobs.remove(0); + } else { + try { _readyJobs.wait(); } catch (InterruptedException ie) {} } - } catch (InterruptedException ie) {} + } } if (_log.shouldLog(Log.WARN)) _log.warn("No longer alive, returning null"); @@ -414,23 +396,6 @@ public class JobQueue { } void removeRunner(int id) { _queueRunners.remove(new Integer(id)); } - - /** - * Notify a sufficient number of waiting runners, and if necessary, increase - * the number of runners (up to maxRunners) - * - */ - private void awaken(int numMadeReady) { - // notify a sufficient number of waiting runners - //for (int i = 0; i < numMadeReady; i++) { - // synchronized (_runnerLock) { - // _runnerLock.notify(); - // } - //} - synchronized (_runnerLock) { - _runnerLock.notify(); - } - } /** * Responsible for moving jobs from the timed queue to the ready queue, @@ -480,9 +445,8 @@ public class JobQueue { // on some profiling data ;) for (int i = 0; i < toAdd.size(); i++) _readyJobs.add(toAdd.get(i)); + _readyJobs.notifyAll(); } - - awaken(toAdd.size()); } else { if (timeToWait < 100) timeToWait = 100; @@ -609,16 +573,15 @@ public class JobQueue { ArrayList justFinishedJobs = new ArrayList(4); out.write("\n"); out.flush(); - synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); } - out.write("\n"); - out.flush(); - synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); } - out.write("\n"); - out.flush(); + + int states[] = null; int numRunners = 0; synchronized (_queueRunners) { - for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext();) { + states = new int[_queueRunners.size()]; + int i = 0; + for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); i++) { JobQueueRunner runner = (JobQueueRunner)iter.next(); + states[i] = runner.getState(); Job job = runner.getCurrentJob(); if (job != null) { activeJobs.add(job); @@ -630,13 +593,28 @@ public class JobQueue { numRunners = _queueRunners.size(); } - out.write("\n"); + StringBuffer str = new StringBuffer(128); + str.append("\n"); + out.write(str.toString()); + out.flush(); + + synchronized (_readyJobs) { readyJobs = new ArrayList(_readyJobs); } + out.write("\n"); + out.flush(); + synchronized (_timedJobs) { timedJobs = new ArrayList(_timedJobs); } + out.write("\n"); out.flush(); StringBuffer buf = new StringBuffer(32*1024); buf.append("

JobQueue

"); - buf.append("# runners: ").append(numRunners); - buf.append("
\n"); + buf.append("# runners: ").append(numRunners).append(" [states="); + if (states != null) + for (int i = 0; i < states.length; i++) + buf.append(states[i]).append(" "); + buf.append("]
\n"); long now = _context.clock().now(); diff --git a/router/java/src/net/i2p/router/JobQueueRunner.java b/router/java/src/net/i2p/router/JobQueueRunner.java index 2c37821e7..95f0fba6c 100644 --- a/router/java/src/net/i2p/router/JobQueueRunner.java +++ b/router/java/src/net/i2p/router/JobQueueRunner.java @@ -13,6 +13,7 @@ class JobQueueRunner implements Runnable { private Job _lastJob; private long _lastBegin; private long _lastEnd; + private int _state; public JobQueueRunner(RouterContext context, int id) { _context = context; @@ -26,8 +27,11 @@ class JobQueueRunner implements Runnable { _context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sat on the job queue?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + _state = 1; } + final int getState() { return _state; } + public Job getCurrentJob() { return _currentJob; } public Job getLastJob() { return _lastJob; } public int getRunnerId() { return _id; } @@ -36,12 +40,16 @@ class JobQueueRunner implements Runnable { public long getLastBegin() { return _lastBegin; } public long getLastEnd() { return _lastEnd; } public void run() { + _state = 2; long lastActive = _context.clock().now(); long jobNum = 0; while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) { + _state = 3; try { Job job = _context.jobQueue().getNext(); + _state = 4; if (job == null) { + _state = 5; if (_context.router().isAlive()) if (_log.shouldLog(Log.ERROR)) _log.error("getNext returned null - dead?"); @@ -51,11 +59,14 @@ class JobQueueRunner implements Runnable { long enqueuedTime = 0; if (job instanceof JobImpl) { + _state = 6; long when = ((JobImpl)job).getMadeReadyOn(); if (when <= 0) { + _state = 7; _log.error("Job was not made ready?! " + job, new Exception("Not made ready?!")); } else { + _state = 8; enqueuedTime = now - when; } } @@ -63,16 +74,21 @@ class JobQueueRunner implements Runnable { long betweenJobs = now - lastActive; _currentJob = job; _lastJob = null; + _state = 9; if (_log.shouldLog(Log.DEBUG)) _log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName()); long origStartAfter = job.getTiming().getStartAfter(); long doStart = _context.clock().now(); + _state = 10; job.getTiming().start(); runCurrentJob(); job.getTiming().end(); + _state = 11; long duration = job.getTiming().getActualEnd() - job.getTiming().getActualStart(); long beforeUpdate = _context.clock().now(); + _state = 12; _context.jobQueue().updateStats(job, doStart, origStartAfter, duration); + _state = 13; long diff = _context.clock().now() - beforeUpdate; _context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs); @@ -80,6 +96,8 @@ class JobQueueRunner implements Runnable { _context.statManager().addRateData("jobQueue.jobLag", doStart - origStartAfter, 0); _context.statManager().addRateData("jobQueue.jobWait", enqueuedTime, enqueuedTime); + _state = 14; + if (diff > 100) { if (_log.shouldLog(Log.WARN)) _log.warn("Updating statistics for the job took too long [" + diff + "ms]"); @@ -92,6 +110,7 @@ class JobQueueRunner implements Runnable { _currentJob = null; _lastEnd = lastActive; jobNum++; + _state = 15; //if ( (jobNum % 10) == 0) // System.gc(); @@ -100,17 +119,22 @@ class JobQueueRunner implements Runnable { _log.log(Log.CRIT, "WTF, error running?", t); } } + _state = 16; if (_context.router().isAlive()) if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Queue runner " + _id + " exiting"); _context.jobQueue().removeRunner(_id); + _state = 17; } private void runCurrentJob() { try { + _state = 18; _lastBegin = _context.clock().now(); _currentJob.runJob(); + _state = 19; } catch (OutOfMemoryError oom) { + _state = 20; try { if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Router ran out of memory, shutting down", oom); @@ -122,6 +146,7 @@ class JobQueueRunner implements Runnable { try { Thread.sleep(1000); } catch (InterruptedException ie) {} System.exit(-1); } catch (Throwable t) { + _state = 21; if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Error processing job [" + _currentJob.getName() + "] on thread " + _id + ": " + t.getMessage(), t); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 2eb774c12..b819cb24b 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.66 $ $Date: 2004/11/01 08:31:30 $"; + public final static String ID = "$Revision: 1.67 $ $Date: 2004/11/02 03:27:56 $"; public final static String VERSION = "0.4.1.3"; - public final static long BUILD = 7; + public final static long BUILD = 8; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID);