From cc3165bf729e0fe7954fe70a7fb1f8170e195308 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sun, 15 Feb 2009 05:11:35 +0000 Subject: [PATCH] * Streaming lib: - Move ConEvent from SimpleTimer to SimpleScheduler - Move RetransmissionTimer (ResendPacketEvent) from SimpleTimer to new SimpleTimer2 - Move ActivityTimer and Flusher from SimpleTimer to RetransmissionTimer - SimpleTimer2 allows specifying "fuzz" to reduce timer queue churn further --- apps/BOB/src/net/i2p/BOB/BOB.java | 3 +- apps/BOB/src/net/i2p/BOB/Main.java | 5 +- .../net/i2p/client/streaming/Connection.java | 38 ++- .../client/streaming/MessageOutputStream.java | 13 +- .../net/i2p/client/streaming/PacketLocal.java | 16 +- .../client/streaming/RetransmissionTimer.java | 6 +- .../i2p/client/streaming/SchedulerImpl.java | 4 +- .../net/i2p/client/streaming/TCBShare.java | 14 +- .../src/net/i2p/util/SimpleScheduler.java | 4 +- core/java/src/net/i2p/util/SimpleTimer2.java | 252 ++++++++++++++++++ .../src/net/i2p/router/tunnel/FlushTimer.java | 11 +- 11 files changed, 322 insertions(+), 44 deletions(-) create mode 100644 core/java/src/net/i2p/util/SimpleTimer2.java diff --git a/apps/BOB/src/net/i2p/BOB/BOB.java b/apps/BOB/src/net/i2p/BOB/BOB.java index cd4ccfdcb..cc1428f8c 100644 --- a/apps/BOB/src/net/i2p/BOB/BOB.java +++ b/apps/BOB/src/net/i2p/BOB/BOB.java @@ -34,7 +34,6 @@ import java.util.Properties; import net.i2p.client.I2PClient; import net.i2p.client.streaming.RetransmissionTimer; import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; /** * <span style="font-size:8px;font-family:courier;color:#EEEEEE;background-color:#000000"> * ################################################################################<br> @@ -151,7 +150,7 @@ public class BOB { String configLocation = System.getProperty(PROP_CONFIG_LOCATION, "bob.config"); // This is here just to ensure there is no interference with our threadgroups. - SimpleTimer Y = RetransmissionTimer.getInstance(); + RetransmissionTimer Y = RetransmissionTimer.getInstance(); i = Y.hashCode(); { try { diff --git a/apps/BOB/src/net/i2p/BOB/Main.java b/apps/BOB/src/net/i2p/BOB/Main.java index 26823ff39..2d81fb30e 100644 --- a/apps/BOB/src/net/i2p/BOB/Main.java +++ b/apps/BOB/src/net/i2p/BOB/Main.java @@ -24,7 +24,6 @@ package net.i2p.BOB; import net.i2p.client.streaming.RetransmissionTimer; -import net.i2p.util.SimpleTimer; /** * Start from command line @@ -39,8 +38,8 @@ public class Main { */ public static void main(String[] args) { // THINK THINK THINK THINK THINK THINK - SimpleTimer Y = RetransmissionTimer.getInstance(); + RetransmissionTimer Y = RetransmissionTimer.getInstance(); BOB.main(args); - Y.removeSimpleTimer(); + Y.stop(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index a59e12610..503760ed1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -14,6 +14,7 @@ import net.i2p.data.Destination; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * Maintain the state controlling a streaming connection between two @@ -69,6 +70,7 @@ public class Connection { /** how many messages have been resent and not yet ACKed? */ private int _activeResends; private ConEvent _connectionEvent; + private int _randomWait; private long _lifetimeBytesSent; private long _lifetimeBytesReceived; @@ -124,6 +126,7 @@ public class Connection { _isInbound = false; _updatedShareOpts = false; _connectionEvent = new ConEvent(); + _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -325,7 +328,8 @@ public class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by")); - RetransmissionTimer.getInstance().addEvent(new ResendPacketEvent(packet, timeout + _context.clock().now()), timeout); + // schedules itself + ResendPacketEvent rpe = new ResendPacketEvent(packet, timeout); } _context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize()); @@ -526,7 +530,7 @@ public class Connection { if (_receiver != null) _receiver.destroy(); if (_activityTimer != null) - SimpleTimer.getInstance().removeEvent(_activityTimer); + _activityTimer.cancel(); //_activityTimer = null; if (_inputStream != null) _inputStream.streamErrorOccurred(new IOException("disconnected!")); @@ -822,15 +826,18 @@ public class Connection { return; } long howLong = _options.getInactivityTimeout(); - howLong += _context.random().nextInt(30*1000); // randomize it a bit, so both sides don't do it at once + howLong += _randomWait; // randomize it a bit, so both sides don't do it at once if (_log.shouldLog(Log.DEBUG)) _log.debug("Resetting the inactivity timer to " + howLong, new Exception(toString())); // this will get rescheduled, and rescheduled, and rescheduled... - RetransmissionTimer.getInstance().removeEvent(_activityTimer); - RetransmissionTimer.getInstance().addEvent(_activityTimer, howLong); + _activityTimer.reschedule(howLong, false); // use the later of current and previous timeout } - private class ActivityTimer implements SimpleTimer.TimedEvent { + private class ActivityTimer extends SimpleTimer2.TimedEvent { + public ActivityTimer() { + super(RetransmissionTimer.getInstance()); + setFuzz(5*1000); // sloppy timer, don't reschedule unless at least 5s later + } public void timeReached() { // uh, nothing more to do... if (!_connected) { @@ -841,7 +848,7 @@ public class Connection { long left = getTimeLeft(); if (left > 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Inactivity timeout reached, but there is time left (" + left + ")"); - RetransmissionTimer.getInstance().addEvent(ActivityTimer.this, left); + schedule(left); return; } // these are either going to time out or cause further rescheduling @@ -1010,14 +1017,17 @@ public class Connection { /** * Coordinate the resends of a given packet + * */ - public class ResendPacketEvent implements SimpleTimer.TimedEvent { + public class ResendPacketEvent extends SimpleTimer2.TimedEvent { private PacketLocal _packet; private long _nextSendTime; - public ResendPacketEvent(PacketLocal packet, long sendTime) { + public ResendPacketEvent(PacketLocal packet, long delay) { + super(RetransmissionTimer.getInstance()); _packet = packet; - _nextSendTime = sendTime; + _nextSendTime = delay + _context.clock().now(); packet.setResendPacketEvent(ResendPacketEvent.this); + schedule(delay); } public long getNextSendTime() { return _nextSendTime; } @@ -1025,6 +1035,10 @@ public class Connection { /** * Retransmit the packet if we need to. * + * ackImmediately() above calls directly in here, so + * we have to use forceReschedule() instead of schedule() below, + * to prevent duplicates in the timer queue. + * * @param penalize true if this retransmission is caused by a timeout, false if we * are just sending this packet instead of an ACK * @return true if the packet was sent, false if it was not @@ -1057,7 +1071,7 @@ public class Connection { if (_log.shouldLog(Log.INFO)) _log.info("Delaying resend of " + _packet + " as there are " + _activeResends + " active resends already in play"); - RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, 1000); + forceReschedule(1000); _nextSendTime = 1000 + _context.clock().now(); return false; } @@ -1144,7 +1158,7 @@ public class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Scheduling resend in " + timeout + "ms for " + _packet); - RetransmissionTimer.getInstance().addEvent(ResendPacketEvent.this, timeout); + forceReschedule(timeout); } // acked during resending (... or somethin') diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 4a19d5e09..63d1caee7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -8,7 +8,7 @@ import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; import net.i2p.util.ByteCache; import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * A stream that we can shove data into that fires off those bytes @@ -200,13 +200,20 @@ public class MessageOutputStream extends OutputStream { * Flush data that has been enqued but not flushed after a certain * period of inactivity */ - private class Flusher implements SimpleTimer.TimedEvent { + private class Flusher extends SimpleTimer2.TimedEvent { private boolean _enqueued; + public Flusher() { + super(RetransmissionTimer.getInstance()); + } public void enqueue() { // no need to be overly worried about duplicates - it would just // push it further out if (!_enqueued) { - RetransmissionTimer.getInstance().addEvent(_flusher, _passiveFlushDelay); + // Maybe we could just use schedule() here - or even SimpleScheduler - not sure... + // To be safe, use forceReschedule() so we don't get lots of duplicates + // We've seen the queue blow up before, maybe it was this before the rewrite... + // So perhaps it IS wise to be "overly worried" ... + forceReschedule(_passiveFlushDelay); if (_log.shouldLog(Log.DEBUG)) _log.debug("Enqueueing the flusher for " + _passiveFlushDelay + "ms out"); } else { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index f0288df5f..cbb89e79e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -6,7 +6,7 @@ import net.i2p.I2PAppContext; import net.i2p.data.Destination; import net.i2p.data.SessionKey; import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * coordinate local attributes about a packet - send time, ack time, number of @@ -27,7 +27,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat private long _cancelledOn; private volatile int _nackCount; private volatile boolean _retransmitted; - private SimpleTimer.TimedEvent _resendEvent; + private SimpleTimer2.TimedEvent _resendEvent; public PacketLocal(I2PAppContext ctx, Destination to) { this(ctx, to, null); @@ -93,7 +93,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat releasePayload(); notifyAll(); } - SimpleTimer.getInstance().removeEvent(_resendEvent); + _resendEvent.cancel(); } public void cancelled() { synchronized (this) { @@ -101,11 +101,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat releasePayload(); notifyAll(); } - SimpleTimer.getInstance().removeEvent(_resendEvent); + _resendEvent.cancel(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Cancelled! " + toString(), new Exception("cancelled")); } - public SimpleTimer.TimedEvent getResendEvent() { return _resendEvent; } + public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; } /** how long after packet creation was it acked? * @return how long after packet creation the packet was ACKed in ms @@ -122,15 +122,15 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public void incrementNACKs() { int cnt = ++_nackCount; - SimpleTimer.TimedEvent evt = _resendEvent; + SimpleTimer2.TimedEvent evt = _resendEvent; if ( (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD) && (evt != null) && (!_retransmitted)) { _retransmitted = true; - RetransmissionTimer.getInstance().addEvent(evt, 0); + evt.reschedule(0); } } public int getNACKs() { return _nackCount; } - public void setResendPacketEvent(SimpleTimer.TimedEvent evt) { _resendEvent = evt; } + public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; } @Override public StringBuffer formatAsString() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java index c52c373b1..92c4cf1c2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java @@ -1,12 +1,12 @@ package net.i2p.client.streaming; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * */ -public class RetransmissionTimer extends SimpleTimer { +public class RetransmissionTimer extends SimpleTimer2 { private static final RetransmissionTimer _instance = new RetransmissionTimer(); - public static final SimpleTimer getInstance() { return _instance; } + public static final RetransmissionTimer getInstance() { return _instance; } protected RetransmissionTimer() { super("StreamingTimer"); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java index 3d29880f0..e02a1b413 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java @@ -2,7 +2,7 @@ package net.i2p.client.streaming; import net.i2p.I2PAppContext; import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleScheduler; /** * Base scheduler @@ -17,6 +17,6 @@ abstract class SchedulerImpl implements TaskScheduler { } protected void reschedule(long msToWait, Connection con) { - SimpleTimer.getInstance().addEvent(con.getConnectionEvent(), msToWait); + SimpleScheduler.getInstance().addEvent(con.getConnectionEvent(), msToWait); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index 1562f948e..7c8df3e3e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -7,7 +7,7 @@ import java.util.concurrent.ConcurrentHashMap; import net.i2p.I2PAppContext; import net.i2p.data.Destination; import net.i2p.util.Log; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * Share important TCP Control Block parameters across Connections @@ -38,11 +38,11 @@ public class TCBShare { _log = ctx.logManager().getLog(TCBShare.class); _cache = new ConcurrentHashMap(4); _cleaner = new CleanEvent(); - SimpleTimer.getInstance().addEvent(_cleaner, CLEAN_TIME); + _cleaner.schedule(CLEAN_TIME); } public void stop() { - SimpleTimer.getInstance().removeEvent(_cleaner); + _cleaner.cancel(); } public void updateOptsFromShare(Connection con) { @@ -124,14 +124,16 @@ public class TCBShare { } } - private class CleanEvent implements SimpleTimer.TimedEvent { - public CleanEvent() {} + private class CleanEvent extends SimpleTimer2.TimedEvent { + public CleanEvent() { + super(RetransmissionTimer.getInstance()); + } public void timeReached() { for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) { if (_cache.get(iter.next()).isExpired()) iter.remove(); } - SimpleTimer.getInstance().addEvent(CleanEvent.this, CLEAN_TIME); + schedule(CLEAN_TIME); } } } diff --git a/core/java/src/net/i2p/util/SimpleScheduler.java b/core/java/src/net/i2p/util/SimpleScheduler.java index 91415102c..becf10099 100644 --- a/core/java/src/net/i2p/util/SimpleScheduler.java +++ b/core/java/src/net/i2p/util/SimpleScheduler.java @@ -13,8 +13,8 @@ import net.i2p.I2PAppContext; * appropriate time. The method that is fired however should NOT block (otherwise * they b0rk the timer). * - * This is like SimpleScheduler but addEvent() for an existing event adds a second - * job. Events cannot be cancelled or rescheduled. + * This is like SimpleTimer but addEvent() for an existing event adds a second + * job. Unlike SimpleTimer, events cannot be cancelled or rescheduled. * * For events that cannot or will not be cancelled or rescheduled - * for example, a call such as: diff --git a/core/java/src/net/i2p/util/SimpleTimer2.java b/core/java/src/net/i2p/util/SimpleTimer2.java new file mode 100644 index 000000000..6239ed42f --- /dev/null +++ b/core/java/src/net/i2p/util/SimpleTimer2.java @@ -0,0 +1,252 @@ +package net.i2p.util; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; +import java.util.Map; + +import net.i2p.I2PAppContext; + +/** + * Simple event scheduler - toss an event on the queue and it gets fired at the + * appropriate time. The method that is fired however should NOT block (otherwise + * they b0rk the timer). + * + * This rewrites the old SimpleTimer to use the java.util.concurrent.ScheduledThreadPoolExecutor. + * SimpleTimer has problems with lock contention; + * this should work a lot better. + * + * This supports cancelling and arbitrary rescheduling. + * If you don't need that, use SimpleScheduler instead. + * + * SimpleTimer is deprecated, use this or SimpleScheduler. + * + * @author zzz + */ +public class SimpleTimer2 { + private static final SimpleTimer2 _instance = new SimpleTimer2(); + public static SimpleTimer2 getInstance() { return _instance; } + private static final int THREADS = 4; + private I2PAppContext _context; + private static Log _log; // static so TimedEvent can use it + private ScheduledThreadPoolExecutor _executor; + private String _name; + private int _count; + + protected SimpleTimer2() { this("SimpleTimer2"); } + protected SimpleTimer2(String name) { + _context = I2PAppContext.getGlobalContext(); + _log = _context.logManager().getLog(SimpleTimer2.class); + _name = name; + _count = 0; + _executor = new CustomScheduledThreadPoolExecutor(THREADS, new CustomThreadFactory()); + } + + /** + * Removes the SimpleTimer. + */ + public void stop() { + _executor.shutdownNow(); + } + + private class CustomScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + public CustomScheduledThreadPoolExecutor(int threads, ThreadFactory factory) { + super(threads, factory); + } + + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + if (t != null) // shoudn't happen, caught in RunnableEvent.run() + _log.log(Log.CRIT, "wtf, event borked: " + r, t); + } + } + + private class CustomThreadFactory implements ThreadFactory { + public Thread newThread(Runnable r) { + Thread rv = Executors.defaultThreadFactory().newThread(r); + rv.setName(_name + ' ' + (++_count) + '/' + THREADS); + rv.setDaemon(true); + return rv; + } + } + + private ScheduledFuture schedule(TimedEvent t, long timeoutMs) { + return _executor.schedule(t, timeoutMs, TimeUnit.MILLISECONDS); + } + + /** + * Similar to SimpleTimer.TimedEvent but users must extend instead of implement, + * and all schedule and cancel methods are through this class rather than SimpleTimer2. + * + * To convert over, change implements SimpleTimer.TimedEvent to extends SimpleTimer2.TimedEvent, + * and be sure to call super(SimpleTimer2.getInstance(), timeoutMs) in the constructor + * (or super(SimpleTimer2.getInstance()); .... schedule(timeoutMs); if there is other stuff + * in your constructor) + * + * Other porting: + * SimpleTimer.getInstance().addEvent(new foo(), timeout) => new foo(SimpleTimer2.getInstance(), timeout) + * SimpleTimer.getInstance().addEvent(this, timeout) => schedule(timeout) + * SimpleTimer.getInstance().addEvent(foo, timeout) => foo.reschedule(timeout) + * SimpleTimer.getInstance().removeEvent(foo) => foo.cancel() + * + * There's no global locking, but for scheduling, we synchronize on this + * to reduce the chance of duplicates on the queue. + * + * schedule(ms) can get create duplicates + * reschedule(ms) and reschedule(ms, true) can lose the timer + * reschedule(ms, false) and forceReschedule(ms) are relatively safe from either + * + */ + public static abstract class TimedEvent implements Runnable { + private SimpleTimer2 _pool; + private int _fuzz; + protected static final int DEFAULT_FUZZ = 3; + private ScheduledFuture _future; // _executor.remove() doesn't work so we have to use this + // ... and I expect cancelling this way is more efficient + + /** must call schedule() later */ + public TimedEvent(SimpleTimer2 pool) { + _pool = pool; + _fuzz = DEFAULT_FUZZ; + } + /** automatically schedules, don't use this one if you have other things to do first */ + public TimedEvent(SimpleTimer2 pool, long timeoutMs) { + this(pool); + schedule(timeoutMs); + } + + /** + * Don't bother rescheduling if +/- this many ms or less. + * Use this to reduce timer queue and object churn for a sloppy timer like + * an inactivity timer. + * Default 3 ms. + */ + public void setFuzz(int fuzz) { + _fuzz = fuzz; + } + + /** + * More efficient than reschedule(). + * Only call this after calling the non-scheduling constructor, + * or from within timeReached(), or you will get duplicates on the queue. + * Otherwise use reschedule(). + */ + public synchronized void schedule(long timeoutMs) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Scheduling: " + this + " timeout = " + timeoutMs); + if (timeoutMs <= 0 && _log.shouldLog(Log.WARN)) + timeoutMs = 1; // otherwise we may execute before _future is updated, which is fine + // except it triggers 'early execution' warning logging + _future = _pool.schedule(this, timeoutMs); + } + + /** + * Use the earliest of the new time and the old time + * Do not call from within timeReached() + * + * @param timeoutMs + */ + public void reschedule(long timeoutMs) { + reschedule(timeoutMs, true); + } + + /** + * useEarliestTime must be false if called from within timeReached(), as + * it won't be rescheduled, in favor of the currently running task + * + * @param timeoutMs + * @param useEarliestTime if its already scheduled, use the earlier of the + * two timeouts, else use the later + */ + public synchronized void reschedule(long timeoutMs, boolean useEarliestTime) { + long oldTimeout; + boolean scheduled = _future != null && !_future.isDone(); + if (scheduled) + oldTimeout = _future.getDelay(TimeUnit.MILLISECONDS); + else + oldTimeout = timeoutMs; + // don't bother rescheduling if within _fuzz ms + if ((oldTimeout - _fuzz > timeoutMs && useEarliestTime) || + (oldTimeout + _fuzz < timeoutMs && !useEarliestTime)|| + (!scheduled)) { + if (scheduled) { + if (_log.shouldLog(Log.INFO)) + _log.info("Re-scheduling: " + this + " timeout = " + timeoutMs + " old timeout was " + oldTimeout); + cancel(); + } + schedule(timeoutMs); + } + } + + /** + * Always use the new time - ignores fuzz + * @param timeoutMs + */ + public synchronized void forceReschedule(long timeoutMs) { + cancel(); + schedule(timeoutMs); + } + + /** returns true if cancelled */ + public synchronized boolean cancel() { + if (_future == null) + return false; + return _future.cancel(false); + } + + public void run() { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Running: " + this); + long before = System.currentTimeMillis(); + long delay = 0; + if (_future != null) + delay = _future.getDelay(TimeUnit.MILLISECONDS); + else if (_log.shouldLog(Log.WARN)) + _log.warn(_pool + " wtf, no _future " + this); + // This can be an incorrect warning especially after a schedule(0) + if (_log.shouldLog(Log.WARN) && delay > 100) + _log.warn(_pool + " wtf, early execution " + delay + ": " + this); + else if (_log.shouldLog(Log.WARN) && delay < -1000) + _log.warn(" wtf, late execution " + delay + ": " + this + _pool.debug()); + try { + timeReached(); + } catch (Throwable t) { + _log.log(Log.CRIT, _pool + " wtf, event borked: " + this, t); + } + long time = System.currentTimeMillis() - before; + if (time > 500 && _log.shouldLog(Log.WARN)) + _log.warn(_pool + " wtf, event execution took " + time + ": " + this); + long completed = _pool.getCompletedTaskCount(); + if (_log.shouldLog(Log.INFO) && completed % 250 == 0) + _log.info(_pool.debug()); + } + + /** + * Simple interface for events to be queued up and notified on expiration + * the time requested has been reached (this call should NOT block, + * otherwise the whole SimpleTimer gets backed up) + * + */ + public abstract void timeReached(); + } + + public String toString() { + return _name; + } + + private long getCompletedTaskCount() { + return _executor.getCompletedTaskCount(); + } + + private String debug() { + _executor.purge(); // Remove cancelled tasks from the queue so we get a good queue size stat + return + " Pool: " + _name + + " Active: " + _executor.getActiveCount() + '/' + _executor.getPoolSize() + + " Completed: " + _executor.getCompletedTaskCount() + + " Queued: " + _executor.getQueue().size(); + } +} + diff --git a/router/java/src/net/i2p/router/tunnel/FlushTimer.java b/router/java/src/net/i2p/router/tunnel/FlushTimer.java index b18799ac6..b55384b80 100644 --- a/router/java/src/net/i2p/router/tunnel/FlushTimer.java +++ b/router/java/src/net/i2p/router/tunnel/FlushTimer.java @@ -6,7 +6,12 @@ import net.i2p.util.SimpleTimer; * */ class FlushTimer extends SimpleTimer { - private static final FlushTimer _instance = new FlushTimer(); - public static final SimpleTimer getInstance() { return _instance; } - protected FlushTimer() { super("TunnelFlushTimer"); } + /* + Streaming lib has been moved from SimpleTimer to SimpleTimer2, eliminating the congestion. + So there's not much left using SimpleTimer, and FlushTimer doesn't need its own 4 threads any more + (if it ever did?...) + */ + //private static final FlushTimer _instance = new FlushTimer(); + //public static final SimpleTimer getInstance() { return _instance; } + //protected FlushTimer() { super("TunnelFlushTimer"); } } -- GitLab