diff --git a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java index f93bba3d574ed6a16a333eefa0e90342e547a174..e7054ccd99bdd3b7c8fdf930ff18bb40b4098f73 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java @@ -16,6 +16,7 @@ import net.i2p.data.LeaseSet; import net.i2p.data.RouterAddress; import net.i2p.router.CommSystemFacade; import net.i2p.router.Router; +import net.i2p.router.RouterClock; import net.i2p.router.RouterVersion; import net.i2p.router.TunnelPoolSettings; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; @@ -88,6 +89,10 @@ public class SummaryHelper extends HelperBase { public String getReachability() { return reachability(); // + timeSkew(); + // testing + //return reachability() + + // " Offset: " + DataHelper.formatDuration(_context.clock().getOffset()) + + // " Slew: " + DataHelper.formatDuration(((RouterClock)_context.clock()).getDeltaOffset()); } private String reachability() { @@ -97,10 +102,10 @@ public class SummaryHelper extends HelperBase { // Warn based on actual skew from peers, not update status, so if we successfully offset // the clock, we don't complain. //if (!_context.clock().getUpdatedSuccessfully()) - Long skew = _context.commSystem().getFramedAveragePeerClockSkew(33); + long skew = _context.commSystem().getFramedAveragePeerClockSkew(33); // Display the actual skew, not the offset - if (skew != null && Math.abs(skew.longValue()) > 30) - return _("ERR-Clock Skew of {0}", DataHelper.formatDuration(Math.abs(skew.longValue()) * 1000)); + if (Math.abs(skew) > 30*1000) + return _("ERR-Clock Skew of {0}", DataHelper.formatDuration(Math.abs(skew))); if (_context.router().isHidden()) return _("Hidden"); diff --git a/core/java/src/net/i2p/time/NtpClient.java b/core/java/src/net/i2p/time/NtpClient.java index 5ab2e601f9f08d3afb9594e16d75371dbab7bf29..dd873850afd28539bd95392210de374beba50da5 100644 --- a/core/java/src/net/i2p/time/NtpClient.java +++ b/core/java/src/net/i2p/time/NtpClient.java @@ -76,12 +76,46 @@ public class NtpClient { throw new IllegalArgumentException("No reachable NTP servers specified"); } + /** + * Query the ntp servers, returning the current time from first one we find + * Hack to return time and stratum + * @return time in rv[0] and stratum in rv[1] + * @throws IllegalArgumentException if none of the servers are reachable + * @since 0.7.12 + */ + public static long[] currentTimeAndStratum(String serverNames[]) { + if (serverNames == null) + throw new IllegalArgumentException("No NTP servers specified"); + ArrayList names = new ArrayList(serverNames.length); + for (int i = 0; i < serverNames.length; i++) + names.add(serverNames[i]); + Collections.shuffle(names); + for (int i = 0; i < names.size(); i++) { + long[] rv = currentTimeAndStratum((String)names.get(i)); + if (rv != null && rv[0] > 0) + return rv; + } + throw new IllegalArgumentException("No reachable NTP servers specified"); + } + /** * Query the given NTP server, returning the current internet time * * @return milliseconds since january 1, 1970 (UTC), or -1 on error */ public static long currentTime(String serverName) { + long[] la = currentTimeAndStratum(serverName); + if (la != null) + return la[0]; + return -1; + } + + /** + * Hack to return time and stratum + * @return time in rv[0] and stratum in rv[1], or null for error + * @since 0.7.12 + */ + private static long[] currentTimeAndStratum(String serverName) { try { // Send request DatagramSocket socket = new DatagramSocket(); @@ -104,7 +138,7 @@ public class NtpClient { socket.receive(packet); } catch (InterruptedIOException iie) { socket.close(); - return -1; + return null; } // Immediately record the incoming timestamp @@ -123,15 +157,17 @@ public class NtpClient { // Anything else is right out, treat such responses like errors if ((msg.stratum < 1) || (msg.stratum > 15)) { //System.out.println("Response from NTP server of unacceptable stratum " + msg.stratum + ", failing."); - return(-1); + return null; } - long rv = (long)(System.currentTimeMillis() + localClockOffset*1000); + long[] rv = new long[2]; + rv[0] = (long)(System.currentTimeMillis() + localClockOffset*1000); + rv[1] = msg.stratum; //System.out.println("host: " + address.getHostAddress() + " rtt: " + roundTripDelay + " offset: " + localClockOffset + " seconds"); return rv; } catch (IOException ioe) { //ioe.printStackTrace(); - return -1; + return null; } } diff --git a/core/java/src/net/i2p/time/Timestamper.java b/core/java/src/net/i2p/time/Timestamper.java index 660e3c7ac81f8a959bee60a47787bf3a2c58add2..57b1dc28610510549c6759e0a17dfce801e29f58 100644 --- a/core/java/src/net/i2p/time/Timestamper.java +++ b/core/java/src/net/i2p/time/Timestamper.java @@ -23,6 +23,7 @@ public class Timestamper implements Runnable { private final List<UpdateListener> _listeners; private int _queryFrequency; private int _concurringServers; + private int _consecutiveFails; private volatile boolean _disabled; private boolean _daemon; private boolean _initialized; @@ -34,6 +35,7 @@ public class Timestamper implements Runnable { private static final String DEFAULT_DISABLED = "true"; /** how many times do we have to query if we are changing the clock? */ private static final int DEFAULT_CONCURRING_SERVERS = 3; + private static final int MAX_CONSECUTIVE_FAILS = 10; public static final String PROP_QUERY_FREQUENCY = "time.queryFrequencyMs"; public static final String PROP_SERVER_LIST = "time.sntpServerList"; @@ -106,7 +108,7 @@ public class Timestamper implements Runnable { } public UpdateListener getListener(int index) { synchronized (_listeners) { - return (UpdateListener)_listeners.get(index); + return _listeners.get(index); } } @@ -171,8 +173,12 @@ public class Timestamper implements Runnable { synchronized (this) { notifyAll(); } long sleepTime; if (lastFailed) { - sleepTime = 30*1000; + if (++_consecutiveFails >= MAX_CONSECUTIVE_FAILS) + sleepTime = 30*60*1000; + else + sleepTime = 30*1000; } else { + _consecutiveFails = 0; sleepTime = _context.random().nextInt(_queryFrequency) + _queryFrequency; if (_wellSynced) sleepTime *= 3; @@ -191,6 +197,7 @@ public class Timestamper implements Runnable { private boolean queryTime(String serverList[]) throws IllegalArgumentException { long found[] = new long[_concurringServers]; long now = -1; + int stratum = -1; long expectedDelta = 0; _wellSynced = false; for (int i = 0; i < _concurringServers; i++) { @@ -198,7 +205,9 @@ public class Timestamper implements Runnable { // this delays startup when net is disconnected or the timeserver list is bad, don't make it too long try { Thread.sleep(2*1000); } catch (InterruptedException ie) {} } - now = NtpClient.currentTime(serverList); + long[] timeAndStratum = NtpClient.currentTimeAndStratum(serverList); + now = timeAndStratum[0]; + stratum = (int) timeAndStratum[1]; long delta = now - _context.clock().now(); found[i] = delta; if (i == 0) { @@ -230,7 +239,7 @@ public class Timestamper implements Runnable { } } } - stampTime(now); + stampTime(now, stratum); if (_log.shouldLog(Log.DEBUG)) { StringBuilder buf = new StringBuilder(64); buf.append("Deltas: "); @@ -242,14 +251,14 @@ public class Timestamper implements Runnable { } /** - * Send an HTTP request to a given URL specifying the current time + * Notify the listeners */ - private void stampTime(long now) { + private void stampTime(long now, int stratum) { long before = _context.clock().now(); synchronized (_listeners) { for (int i = 0; i < _listeners.size(); i++) { - UpdateListener lsnr = (UpdateListener)_listeners.get(i); - lsnr.setNow(now); + UpdateListener lsnr = _listeners.get(i); + lsnr.setNow(now, stratum); } } if (_log.shouldLog(Log.DEBUG)) @@ -311,13 +320,16 @@ public class Timestamper implements Runnable { /** * Interface to receive update notifications for when we query the time - * + * Only used by Clock. + * stratum parameter added in 0.7.12. + * If there were any users outside of the tree, this broke compatibility, sorry. */ public interface UpdateListener { /** * The time has been queried and we have a current value for 'now' * */ - public void setNow(long now); + /** @param stratum 1-15, 1 being the best (added in 0.7.12) */ + public void setNow(long now, int stratum); } } diff --git a/core/java/src/net/i2p/util/Clock.java b/core/java/src/net/i2p/util/Clock.java index 669c9d92af539f971949f3dda4bff0736126405c..4fc5eaff075ba2e3d475fb9583f64117425d5ad2 100644 --- a/core/java/src/net/i2p/util/Clock.java +++ b/core/java/src/net/i2p/util/Clock.java @@ -117,6 +117,15 @@ public class Clock implements Timestamper.UpdateListener { setOffset(diff); } + /** + * @param stratum ignored + * @since 0.7.12 + */ + public void setNow(long realTime, int stratum) { + long diff = realTime - System.currentTimeMillis(); + setOffset(diff); + } + /** * Retrieve the current time synchronized with whatever reference clock is in * use. diff --git a/router/java/src/net/i2p/router/CommSystemFacade.java b/router/java/src/net/i2p/router/CommSystemFacade.java index 407cb28b21347cfb2c6f04499c8fbfa5f300f655..36cbefbf12127b687b392100a987677cb13bf6d8 100644 --- a/router/java/src/net/i2p/router/CommSystemFacade.java +++ b/router/java/src/net/i2p/router/CommSystemFacade.java @@ -49,7 +49,7 @@ public abstract class CommSystemFacade implements Service { * Return framed average clock skew of connected peers in seconds, or null if we cannot answer. * CommSystemFacadeImpl overrides this. */ - public Long getFramedAveragePeerClockSkew(int percentToInclude) { return null; } + public long getFramedAveragePeerClockSkew(int percentToInclude) { return 0; } /** * Determine under what conditions we are remotely reachable. diff --git a/router/java/src/net/i2p/router/RouterClock.java b/router/java/src/net/i2p/router/RouterClock.java index bb22dfbcb8ea2711663638623dfc3a1a69162587..e356964ee1450fcbf3db5e83386e7b6972957ff3 100644 --- a/router/java/src/net/i2p/router/RouterClock.java +++ b/router/java/src/net/i2p/router/RouterClock.java @@ -15,11 +15,31 @@ import net.i2p.util.Log; */ public class RouterClock extends Clock { + /** + * How often we will slew the clock + * i.e. ppm = 1000000/MAX_SLEW + * We should be able to slew really fast, + * this is probably a lot faster than what NTP does + * 1/50 is 12s in a 10m tunnel lifetime, that should be fine. + * All of this is @since 0.7.12 + */ + private static final long MAX_SLEW = 50; + private static final int DEFAULT_STRATUM = 8; + private static final int WORST_STRATUM = 16; + /** the max NTP Timestamper delay is 30m right now, make this longer than that */ + private static final long MIN_DELAY_FOR_WORSE_STRATUM = 45*60*1000; + private volatile long _desiredOffset; + private volatile long _lastSlewed; + /** use system time for this */ + private long _lastChanged; + private int _lastStratum; + RouterContext _contextRC; // LINT field hides another field public RouterClock(RouterContext context) { super(context); _contextRC = context; + _lastStratum = WORST_STRATUM; } /** @@ -29,6 +49,16 @@ public class RouterClock extends Clock { */ @Override public void setOffset(long offsetMs, boolean force) { + setOffset(offsetMs, force, DEFAULT_STRATUM); + } + + /** @since 0.7.12 */ + private void setOffset(long offsetMs, int stratum) { + setOffset(offsetMs, false, stratum); + } + + /** @since 0.7.12 */ + private void setOffset(long offsetMs, boolean force, int stratum) { long delta = offsetMs - _offset; if (!force) { if ((offsetMs > MAX_OFFSET) || (offsetMs < 0 - MAX_OFFSET)) { @@ -45,59 +75,127 @@ public class RouterClock extends Clock { } } - if ((delta < MIN_OFFSET_CHANGE) && (delta > 0 - MIN_OFFSET_CHANGE)) { - getLog().debug("Not changing offset since it is only " + delta + "ms"); + // let's be perfect + if (delta == 0) { + getLog().debug("Not changing offset, delta=0"); _alreadyChanged = true; return; } + // only listen to a worse stratum if it's been a while + if (_alreadyChanged && stratum > _lastStratum && + System.currentTimeMillis() - _lastChanged < MIN_DELAY_FOR_WORSE_STRATUM) { + getLog().warn("Ignoring update from a stratum " + stratum + + " clock, we recently had an update from a stratum " + _lastStratum + " clock"); + return; + } + // If so configured, check sanity of proposed clock offset if (Boolean.valueOf(_contextRC.getProperty("router.clockOffsetSanityCheck","true")).booleanValue() && _alreadyChanged) { // Try calculating peer clock skew - Long peerClockSkew = _contextRC.commSystem().getFramedAveragePeerClockSkew(50); - - if (peerClockSkew != null) { + long currentPeerClockSkew = _contextRC.commSystem().getFramedAveragePeerClockSkew(50); // Predict the effect of applying the proposed clock offset - long currentPeerClockSkew = peerClockSkew.longValue(); - long predictedPeerClockSkew = currentPeerClockSkew + (delta / 1000l); + long predictedPeerClockSkew = currentPeerClockSkew + delta; // Fail sanity check if applying the offset would increase peer clock skew - if ((Math.abs(predictedPeerClockSkew) > (Math.abs(currentPeerClockSkew) + 5)) || - (Math.abs(predictedPeerClockSkew) > 20)) { + if ((Math.abs(predictedPeerClockSkew) > (Math.abs(currentPeerClockSkew) + 5*1000)) || + (Math.abs(predictedPeerClockSkew) > 20*1000)) { getLog().error("Ignoring clock offset " + offsetMs + "ms (current " + _offset + "ms) since it would increase peer clock skew from " + currentPeerClockSkew + - "s to " + predictedPeerClockSkew + "s. Broken server in pool.ntp.org?"); + "ms to " + predictedPeerClockSkew + "ms. Bad time server?"); return; } else { getLog().debug("Approving clock offset " + offsetMs + "ms (current " + _offset + "ms) since it would decrease peer clock skew from " + currentPeerClockSkew + - "s to " + predictedPeerClockSkew + "s."); + "ms to " + predictedPeerClockSkew + "ms."); } - } } // check sanity } if (_alreadyChanged) { + // Update the target offset, slewing will take care of the rest if (delta > 15*1000) - getLog().error("Warning - Updating clock offset to " + offsetMs + "ms from " + _offset + "ms"); + getLog().error("Warning - Updating target clock offset to " + offsetMs + "ms from " + _offset + "ms, Stratum " + stratum); else if (getLog().shouldLog(Log.INFO)) - getLog().info("Updating clock offset to " + offsetMs + "ms from " + _offset + "ms"); + getLog().info("Updating target clock offset to " + offsetMs + "ms from " + _offset + "ms, Stratum " + stratum); if (!_statCreated) { _contextRC.statManager().createRateStat("clock.skew", "How far is the already adjusted clock being skewed?", "Clock", new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*60 }); _statCreated = true; } _contextRC.statManager().addRateData("clock.skew", delta, 0); + _desiredOffset = offsetMs; } else { - getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms from " + _offset + "ms"); + getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms, Stratum " + stratum); + _alreadyChanged = true; + _offset = offsetMs; + _desiredOffset = offsetMs; + // this is used by the JobQueue + fireOffsetChanged(delta); + } + _lastChanged = System.currentTimeMillis(); + _lastStratum = stratum; + + } + + /** + * @param stratum used to determine whether we should ignore + * @since 0.7.12 + */ + @Override + public void setNow(long realTime, int stratum) { + long diff = realTime - System.currentTimeMillis(); + setOffset(diff, stratum); + } + + /** + * Retrieve the current time synchronized with whatever reference clock is in use. + * Do really simple clock slewing, like NTP but without jitter prevention. + * Slew the clock toward the desired offset, but only up to a maximum slew rate, + * and never let the clock go backwards because of slewing. + * + * Take care to only access the volatile variables once for speed and to + * avoid having another thread change them + * + * This is called about a zillion times a second, so we can do the slewing right + * here rather than in some separate thread to keep it simple. + * Avoiding backwards clocks when updating in a thread would be hard too. + */ + @Override + public long now() { + long systemNow = System.currentTimeMillis(); + // copy the global, so two threads don't both increment or decrement _offset + long offset = _offset; + if (systemNow >= _lastSlewed + MAX_SLEW) { + // copy the global + long desiredOffset = _desiredOffset; + if (desiredOffset > offset) { + // slew forward + _offset = ++offset; + _lastSlewed = systemNow; + } else if (desiredOffset < offset) { + // slew backward, but don't let the clock go backward + // this should be the first call since systemNow + // was greater than lastSled + MAX_SLEW, i.e. different + // from the last systemNow, thus we won't let the clock go backward, + // no need to track when we were last called. + _offset = --offset; + _lastSlewed = systemNow; + } } - _alreadyChanged = true; - _offset = offsetMs; - fireOffsetChanged(delta); + return offset + systemNow; } + /* + * How far we still have to slew, for diagnostics + * @since 0.7.12 + */ + public long getDeltaOffset() { + return _desiredOffset - _offset; + } + } diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java index a9fbd37f51398167bdc40ec5ef34cd82e56c97f3..2ab19ea2758c9ca4168cc93201ed71d6df3126bb 100644 --- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java +++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java @@ -51,6 +51,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade { _log.info("Starting up the comm system"); _manager = new TransportManager(_context); _manager.startListening(); + startTimestamper(); } public void shutdown() { @@ -78,20 +79,20 @@ public class CommSystemFacadeImpl extends CommSystemFacade { /** * @param percentToInclude 1-100 - * @return Framed average clock skew of connected peers in seconds, or the clock offset if we cannot answer. + * @return Framed average clock skew of connected peers in milliseconds, or the clock offset if we cannot answer. * Average is calculated over the middle "percentToInclude" peers. + * Todo: change Vectors to milliseconds */ @Override - public Long getFramedAveragePeerClockSkew(int percentToInclude) { + public long getFramedAveragePeerClockSkew(int percentToInclude) { if (_manager == null) { - // round toward zero - return Long.valueOf(_context.clock().getOffset() / 1000); + return _context.clock().getOffset(); } Vector skews = _manager.getClockSkews(); if (skews == null || skews.size() <= 0 || (skews.size() < 5 && _context.clock().getUpdatedSuccessfully())) { - return Long.valueOf(_context.clock().getOffset() / 1000); + return _context.clock().getOffset(); } // Going to calculate, sort them @@ -106,12 +107,12 @@ public class CommSystemFacadeImpl extends CommSystemFacade { long sum = 0; for (int i = first; i <= last; i++) { long value = ((Long) (skews.get(i))).longValue(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Adding clock skew " + i + " valued " + value + " s."); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Adding clock skew " + i + " valued " + value + " s."); sum = sum + value; } - // Calculate average (round toward zero) - return Long.valueOf(sum / frameSize); + // Calculate average + return sum * 1000 / frameSize; } public List getBids(OutNetMessage msg) { @@ -481,4 +482,38 @@ public class CommSystemFacadeImpl extends CommSystemFacade { buf.append("</tt>"); return buf.toString(); } + + /* + * Timestamper stuff + * + * This is used as a backup to NTP over UDP. + * @since 0.7.12 + */ + + private static final int TIME_START_DELAY = 5*60*1000; + private static final int TIME_REPEAT_DELAY = 10*60*1000; + /** @since 0.7.12 */ + private void startTimestamper() { + SimpleScheduler.getInstance().addPeriodicEvent(new Timestamper(), TIME_START_DELAY, TIME_REPEAT_DELAY); + } + + /** + * Update the clock offset based on the average of the peers. + * This uses the default stratum which is lower than any reasonable + * NTP source, so it will be ignored unless NTP is broken. + * @since 0.7.12 + */ + private class Timestamper implements SimpleTimer.TimedEvent { + public void timeReached() { + // use the same % as in RouterClock so that check will never fail + // This is their our offset w.r.t. them... + long peerOffset = getFramedAveragePeerClockSkew(50); + if (peerOffset == 0) + return; + long currentOffset = _context.clock().getOffset(); + // ... so we subtract it to get in sync with them + long newOffset = currentOffset - peerOffset; + _context.clock().setOffset(newOffset); + } + } }