From 4f70a7d0fe04f21a5d50eee95de7ea20bb1ea4f3 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Sat, 13 Feb 2010 01:20:23 +0000
Subject: [PATCH]     * Clock:       - getFramedAveragePeerClockSkew() now
 returns a long (ms);         was a Long (s)       - Implement NTP-style clock
 slewing so the clock is adjusted         gradually       - Implement clock
 strata so we prefer better clocks       - Implement a timestamper in the
 transport so we will periodically         update the clock even if NTP is not
 working         This allows the router to converge the clock instead of
 simply         hoping the first connected peer is correct.       - Slow down
 NTP attempts after several consecutive failures

---
 .../src/net/i2p/router/web/SummaryHelper.java |  11 +-
 core/java/src/net/i2p/time/NtpClient.java     |  44 +++++-
 core/java/src/net/i2p/time/Timestamper.java   |  32 +++--
 core/java/src/net/i2p/util/Clock.java         |   9 ++
 .../src/net/i2p/router/CommSystemFacade.java  |   2 +-
 .../java/src/net/i2p/router/RouterClock.java  | 134 +++++++++++++++---
 .../transport/CommSystemFacadeImpl.java       |  53 +++++--
 7 files changed, 240 insertions(+), 45 deletions(-)

diff --git a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java
index f93bba3d57..e7054ccd99 100644
--- a/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java
+++ b/apps/routerconsole/java/src/net/i2p/router/web/SummaryHelper.java
@@ -16,6 +16,7 @@ import net.i2p.data.LeaseSet;
 import net.i2p.data.RouterAddress;
 import net.i2p.router.CommSystemFacade;
 import net.i2p.router.Router;
+import net.i2p.router.RouterClock;
 import net.i2p.router.RouterVersion;
 import net.i2p.router.TunnelPoolSettings;
 import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
@@ -88,6 +89,10 @@ public class SummaryHelper extends HelperBase {
     
     public String getReachability() {
         return reachability(); // + timeSkew();
+        // testing
+        //return reachability() +
+        //       " Offset: " + DataHelper.formatDuration(_context.clock().getOffset()) +
+        //       " Slew: " + DataHelper.formatDuration(((RouterClock)_context.clock()).getDeltaOffset());
     }
 
     private String reachability() {
@@ -97,10 +102,10 @@ public class SummaryHelper extends HelperBase {
         // Warn based on actual skew from peers, not update status, so if we successfully offset
         // the clock, we don't complain.
         //if (!_context.clock().getUpdatedSuccessfully())
-        Long skew = _context.commSystem().getFramedAveragePeerClockSkew(33);
+        long skew = _context.commSystem().getFramedAveragePeerClockSkew(33);
         // Display the actual skew, not the offset
-        if (skew != null && Math.abs(skew.longValue()) > 30)
-            return _("ERR-Clock Skew of {0}", DataHelper.formatDuration(Math.abs(skew.longValue()) * 1000));
+        if (Math.abs(skew) > 30*1000)
+            return _("ERR-Clock Skew of {0}", DataHelper.formatDuration(Math.abs(skew)));
         if (_context.router().isHidden())
             return _("Hidden");
 
diff --git a/core/java/src/net/i2p/time/NtpClient.java b/core/java/src/net/i2p/time/NtpClient.java
index 5ab2e601f9..dd873850af 100644
--- a/core/java/src/net/i2p/time/NtpClient.java
+++ b/core/java/src/net/i2p/time/NtpClient.java
@@ -76,12 +76,46 @@ public class NtpClient {
         throw new IllegalArgumentException("No reachable NTP servers specified");
     }
     
+    /**
+     * Query the ntp servers, returning the current time from first one we find
+     * Hack to return time and stratum
+     * @return time in rv[0] and stratum in rv[1]
+     * @throws IllegalArgumentException if none of the servers are reachable
+     * @since 0.7.12
+     */
+    public static long[] currentTimeAndStratum(String serverNames[]) {
+        if (serverNames == null) 
+            throw new IllegalArgumentException("No NTP servers specified");
+        ArrayList names = new ArrayList(serverNames.length);
+        for (int i = 0; i < serverNames.length; i++)
+            names.add(serverNames[i]);
+        Collections.shuffle(names);
+        for (int i = 0; i < names.size(); i++) {
+            long[] rv = currentTimeAndStratum((String)names.get(i));
+            if (rv != null && rv[0] > 0)
+                return rv;
+        }
+        throw new IllegalArgumentException("No reachable NTP servers specified");
+    }
+    
     /**
      * Query the given NTP server, returning the current internet time
      *
      * @return milliseconds since january 1, 1970 (UTC), or -1 on error
      */
     public static long currentTime(String serverName) {
+         long[] la = currentTimeAndStratum(serverName);
+         if (la != null)
+             return la[0];
+         return -1;
+    }
+
+    /**
+     * Hack to return time and stratum
+     * @return time in rv[0] and stratum in rv[1], or null for error
+     * @since 0.7.12
+     */
+    private static long[] currentTimeAndStratum(String serverName) {
         try {
             // Send request
             DatagramSocket socket = new DatagramSocket();
@@ -104,7 +138,7 @@ public class NtpClient {
                 socket.receive(packet);
             } catch (InterruptedIOException iie) {
                 socket.close();
-                return -1;
+                return null;
             }
 
             // Immediately record the incoming timestamp
@@ -123,15 +157,17 @@ public class NtpClient {
             // Anything else is right out, treat such responses like errors
             if ((msg.stratum < 1) || (msg.stratum > 15)) {
                 //System.out.println("Response from NTP server of unacceptable stratum " + msg.stratum + ", failing.");
-                return(-1);
+                return null;
             }
             
-            long rv = (long)(System.currentTimeMillis() + localClockOffset*1000);
+            long[] rv = new long[2];
+            rv[0] = (long)(System.currentTimeMillis() + localClockOffset*1000);
+            rv[1] = msg.stratum;
             //System.out.println("host: " + address.getHostAddress() + " rtt: " + roundTripDelay + " offset: " + localClockOffset + " seconds");
             return rv;
         } catch (IOException ioe) {
             //ioe.printStackTrace();
-            return -1;
+            return null;
         }
     }
     
diff --git a/core/java/src/net/i2p/time/Timestamper.java b/core/java/src/net/i2p/time/Timestamper.java
index 660e3c7ac8..57b1dc2861 100644
--- a/core/java/src/net/i2p/time/Timestamper.java
+++ b/core/java/src/net/i2p/time/Timestamper.java
@@ -23,6 +23,7 @@ public class Timestamper implements Runnable {
     private final List<UpdateListener> _listeners;
     private int _queryFrequency;
     private int _concurringServers;
+    private int _consecutiveFails;
     private volatile boolean _disabled;
     private boolean _daemon;
     private boolean _initialized;
@@ -34,6 +35,7 @@ public class Timestamper implements Runnable {
     private static final String DEFAULT_DISABLED = "true";
     /** how many times do we have to query if we are changing the clock? */
     private static final int DEFAULT_CONCURRING_SERVERS = 3;
+    private static final int MAX_CONSECUTIVE_FAILS = 10;
     
     public static final String PROP_QUERY_FREQUENCY = "time.queryFrequencyMs";
     public static final String PROP_SERVER_LIST = "time.sntpServerList";
@@ -106,7 +108,7 @@ public class Timestamper implements Runnable {
     }
     public UpdateListener getListener(int index) {
         synchronized (_listeners) {
-            return (UpdateListener)_listeners.get(index);
+            return _listeners.get(index);
         }
     }
     
@@ -171,8 +173,12 @@ public class Timestamper implements Runnable {
                 synchronized (this) { notifyAll(); }
                 long sleepTime;
                 if (lastFailed) {
-                    sleepTime = 30*1000;
+                    if (++_consecutiveFails >= MAX_CONSECUTIVE_FAILS)
+                        sleepTime = 30*60*1000;
+                    else
+                        sleepTime = 30*1000;
                 } else {
+                    _consecutiveFails = 0;
                     sleepTime = _context.random().nextInt(_queryFrequency) + _queryFrequency;
                     if (_wellSynced)
                         sleepTime *= 3;
@@ -191,6 +197,7 @@ public class Timestamper implements Runnable {
     private boolean queryTime(String serverList[]) throws IllegalArgumentException {
         long found[] = new long[_concurringServers];
         long now = -1;
+        int stratum = -1;
         long expectedDelta = 0;
         _wellSynced = false;
         for (int i = 0; i < _concurringServers; i++) {
@@ -198,7 +205,9 @@ public class Timestamper implements Runnable {
                 // this delays startup when net is disconnected or the timeserver list is bad, don't make it too long
                 try { Thread.sleep(2*1000); } catch (InterruptedException ie) {}
             }
-            now = NtpClient.currentTime(serverList);
+            long[] timeAndStratum = NtpClient.currentTimeAndStratum(serverList);
+            now = timeAndStratum[0];
+            stratum = (int) timeAndStratum[1];
             long delta = now - _context.clock().now();
             found[i] = delta;
             if (i == 0) {
@@ -230,7 +239,7 @@ public class Timestamper implements Runnable {
                 }
             }
         }
-        stampTime(now);
+        stampTime(now, stratum);
         if (_log.shouldLog(Log.DEBUG)) {
             StringBuilder buf = new StringBuilder(64);
             buf.append("Deltas: ");
@@ -242,14 +251,14 @@ public class Timestamper implements Runnable {
     }
     
     /**
-     * Send an HTTP request to a given URL specifying the current time 
+     * Notify the listeners
      */
-    private void stampTime(long now) {
+    private void stampTime(long now, int stratum) {
         long before = _context.clock().now();
         synchronized (_listeners) {
             for (int i = 0; i < _listeners.size(); i++) {
-                UpdateListener lsnr = (UpdateListener)_listeners.get(i);
-                lsnr.setNow(now);
+                UpdateListener lsnr = _listeners.get(i);
+                lsnr.setNow(now, stratum);
             }
         }
         if (_log.shouldLog(Log.DEBUG))
@@ -311,13 +320,16 @@ public class Timestamper implements Runnable {
     
     /**
      * Interface to receive update notifications for when we query the time
-     *
+     * Only used by Clock.
+     * stratum parameter added in 0.7.12.
+     * If there were any users outside of the tree, this broke compatibility, sorry.
      */
     public interface UpdateListener {
         /**
          * The time has been queried and we have a current value for 'now'
          *
          */
-        public void setNow(long now);
+        /** @param stratum 1-15, 1 being the best (added in 0.7.12) */
+        public void setNow(long now, int stratum);
     }
 }
diff --git a/core/java/src/net/i2p/util/Clock.java b/core/java/src/net/i2p/util/Clock.java
index 669c9d92af..4fc5eaff07 100644
--- a/core/java/src/net/i2p/util/Clock.java
+++ b/core/java/src/net/i2p/util/Clock.java
@@ -117,6 +117,15 @@ public class Clock implements Timestamper.UpdateListener {
         setOffset(diff);
     }
 
+    /**
+     *  @param stratum ignored
+     *  @since 0.7.12
+     */
+    public void setNow(long realTime, int stratum) {
+        long diff = realTime - System.currentTimeMillis();
+        setOffset(diff);
+    }
+
     /**
      * Retrieve the current time synchronized with whatever reference clock is in
      * use.
diff --git a/router/java/src/net/i2p/router/CommSystemFacade.java b/router/java/src/net/i2p/router/CommSystemFacade.java
index 407cb28b21..36cbefbf12 100644
--- a/router/java/src/net/i2p/router/CommSystemFacade.java
+++ b/router/java/src/net/i2p/router/CommSystemFacade.java
@@ -49,7 +49,7 @@ public abstract class CommSystemFacade implements Service {
      * Return framed average clock skew of connected peers in seconds, or null if we cannot answer.
      * CommSystemFacadeImpl overrides this.
      */
-    public Long getFramedAveragePeerClockSkew(int percentToInclude) { return null; }
+    public long getFramedAveragePeerClockSkew(int percentToInclude) { return 0; }
     
     /**
      * Determine under what conditions we are remotely reachable.
diff --git a/router/java/src/net/i2p/router/RouterClock.java b/router/java/src/net/i2p/router/RouterClock.java
index bb22dfbcb8..e356964ee1 100644
--- a/router/java/src/net/i2p/router/RouterClock.java
+++ b/router/java/src/net/i2p/router/RouterClock.java
@@ -15,11 +15,31 @@ import net.i2p.util.Log;
  */
 public class RouterClock extends Clock {
 
+    /**
+     *  How often we will slew the clock
+     *  i.e. ppm = 1000000/MAX_SLEW
+     *  We should be able to slew really fast,
+     *  this is probably a lot faster than what NTP does
+     *  1/50 is 12s in a 10m tunnel lifetime, that should be fine.
+     *  All of this is @since 0.7.12
+     */
+    private static final long MAX_SLEW = 50;
+    private static final int DEFAULT_STRATUM = 8;
+    private static final int WORST_STRATUM = 16;
+    /** the max NTP Timestamper delay is 30m right now, make this longer than that */
+    private static final long MIN_DELAY_FOR_WORSE_STRATUM = 45*60*1000;
+    private volatile long _desiredOffset;
+    private volatile long _lastSlewed;
+    /** use system time for this */
+    private long _lastChanged;
+    private int _lastStratum;
+
     RouterContext _contextRC; // LINT field hides another field
 
     public RouterClock(RouterContext context) {
         super(context);
         _contextRC = context;
+        _lastStratum = WORST_STRATUM;
     }
 
     /**
@@ -29,6 +49,16 @@ public class RouterClock extends Clock {
      */
     @Override
     public void setOffset(long offsetMs, boolean force) {
+         setOffset(offsetMs, force, DEFAULT_STRATUM);
+    }
+
+    /** @since 0.7.12 */
+    private void setOffset(long offsetMs, int stratum) {
+         setOffset(offsetMs, false, stratum);
+    }
+
+    /** @since 0.7.12 */
+    private void setOffset(long offsetMs, boolean force, int stratum) {
         long delta = offsetMs - _offset;
         if (!force) {
             if ((offsetMs > MAX_OFFSET) || (offsetMs < 0 - MAX_OFFSET)) {
@@ -45,59 +75,127 @@ public class RouterClock extends Clock {
                 }
             }
             
-            if ((delta < MIN_OFFSET_CHANGE) && (delta > 0 - MIN_OFFSET_CHANGE)) {
-                getLog().debug("Not changing offset since it is only " + delta + "ms");
+            // let's be perfect
+            if (delta == 0) {
+                getLog().debug("Not changing offset, delta=0");
                 _alreadyChanged = true;
                 return;
             }
 
+            // only listen to a worse stratum if it's been a while
+            if (_alreadyChanged && stratum > _lastStratum &&
+                System.currentTimeMillis() - _lastChanged < MIN_DELAY_FOR_WORSE_STRATUM) {
+                getLog().warn("Ignoring update from a stratum " + stratum +
+                              " clock, we recently had an update from a stratum " + _lastStratum + " clock");
+                return;
+            }
+            
             // If so configured, check sanity of proposed clock offset
             if (Boolean.valueOf(_contextRC.getProperty("router.clockOffsetSanityCheck","true")).booleanValue() &&
                 _alreadyChanged) {
 
                 // Try calculating peer clock skew
-                Long peerClockSkew = _contextRC.commSystem().getFramedAveragePeerClockSkew(50);
-
-                if (peerClockSkew != null) {
+                long currentPeerClockSkew = _contextRC.commSystem().getFramedAveragePeerClockSkew(50);
 
                     // Predict the effect of applying the proposed clock offset
-                    long currentPeerClockSkew = peerClockSkew.longValue();
-                    long predictedPeerClockSkew = currentPeerClockSkew + (delta / 1000l);
+                    long predictedPeerClockSkew = currentPeerClockSkew + delta;
 
                     // Fail sanity check if applying the offset would increase peer clock skew
-                    if ((Math.abs(predictedPeerClockSkew) > (Math.abs(currentPeerClockSkew) + 5)) ||
-                        (Math.abs(predictedPeerClockSkew) > 20)) {
+                    if ((Math.abs(predictedPeerClockSkew) > (Math.abs(currentPeerClockSkew) + 5*1000)) ||
+                        (Math.abs(predictedPeerClockSkew) > 20*1000)) {
 
                         getLog().error("Ignoring clock offset " + offsetMs + "ms (current " + _offset +
                                        "ms) since it would increase peer clock skew from " + currentPeerClockSkew +
-                                       "s to " + predictedPeerClockSkew + "s. Broken server in pool.ntp.org?");
+                                       "ms to " + predictedPeerClockSkew + "ms. Bad time server?");
                         return;
                     } else {
                         getLog().debug("Approving clock offset " + offsetMs + "ms (current " + _offset +
                                        "ms) since it would decrease peer clock skew from " + currentPeerClockSkew +
-                                       "s to " + predictedPeerClockSkew + "s.");
+                                       "ms to " + predictedPeerClockSkew + "ms.");
                     }
-                }
             } // check sanity
         }
 
         if (_alreadyChanged) {
+            // Update the target offset, slewing will take care of the rest
             if (delta > 15*1000)
-                getLog().error("Warning - Updating clock offset to " + offsetMs + "ms from " + _offset + "ms");
+                getLog().error("Warning - Updating target clock offset to " + offsetMs + "ms from " + _offset + "ms, Stratum " + stratum);
             else if (getLog().shouldLog(Log.INFO))
-                getLog().info("Updating clock offset to " + offsetMs + "ms from " + _offset + "ms");
+                getLog().info("Updating target clock offset to " + offsetMs + "ms from " + _offset + "ms, Stratum " + stratum);
             
             if (!_statCreated) {
                 _contextRC.statManager().createRateStat("clock.skew", "How far is the already adjusted clock being skewed?", "Clock", new long[] { 10*60*1000, 3*60*60*1000, 24*60*60*60 });
                 _statCreated = true;
             }
             _contextRC.statManager().addRateData("clock.skew", delta, 0);
+            _desiredOffset = offsetMs;
         } else {
-            getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms from " + _offset + "ms");
+            getLog().log(Log.INFO, "Initializing clock offset to " + offsetMs + "ms, Stratum " + stratum);
+            _alreadyChanged = true;
+            _offset = offsetMs;
+            _desiredOffset = offsetMs;
+            // this is used by the JobQueue
+            fireOffsetChanged(delta);
+        }
+        _lastChanged = System.currentTimeMillis();
+        _lastStratum = stratum;
+
+    }
+
+    /**
+     *  @param stratum used to determine whether we should ignore
+     *  @since 0.7.12
+     */
+    @Override
+    public void setNow(long realTime, int stratum) {
+        long diff = realTime - System.currentTimeMillis();
+        setOffset(diff, stratum);
+    }
+
+    /**
+     * Retrieve the current time synchronized with whatever reference clock is in use.
+     * Do really simple clock slewing, like NTP but without jitter prevention.
+     * Slew the clock toward the desired offset, but only up to a maximum slew rate,
+     * and never let the clock go backwards because of slewing.
+     * 
+     * Take care to only access the volatile variables once for speed and to
+     * avoid having another thread change them
+     *
+     * This is called about a zillion times a second, so we can do the slewing right
+     * here rather than in some separate thread to keep it simple.
+     * Avoiding backwards clocks when updating in a thread would be hard too.
+     */
+    @Override
+    public long now() {
+        long systemNow = System.currentTimeMillis();
+        // copy the global, so two threads don't both increment or decrement _offset
+        long offset = _offset;
+        if (systemNow >= _lastSlewed + MAX_SLEW) {
+            // copy the global
+            long desiredOffset = _desiredOffset;
+            if (desiredOffset > offset) {
+                // slew forward
+                _offset = ++offset;
+                _lastSlewed = systemNow;
+            } else if (desiredOffset < offset) {
+                // slew backward, but don't let the clock go backward
+                // this should be the first call since systemNow
+                // was greater than lastSled + MAX_SLEW, i.e. different
+                // from the last systemNow, thus we won't let the clock go backward,
+                // no need to track when we were last called.
+                _offset = --offset;
+                _lastSlewed = systemNow;
+            }
         }
-        _alreadyChanged = true;
-        _offset = offsetMs;
-        fireOffsetChanged(delta);
+        return offset + systemNow;
     }
 
+    /*
+     *  How far we still have to slew, for diagnostics
+     *  @since 0.7.12
+     */
+    public long getDeltaOffset() {
+        return _desiredOffset - _offset;
+    }
+    
 }
diff --git a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
index a9fbd37f51..2ab19ea275 100644
--- a/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
+++ b/router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java
@@ -51,6 +51,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
         _log.info("Starting up the comm system");
         _manager = new TransportManager(_context);
         _manager.startListening();
+        startTimestamper();
     }
     
     public void shutdown() {
@@ -78,20 +79,20 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
     
     /**
      * @param percentToInclude 1-100
-     * @return Framed average clock skew of connected peers in seconds, or the clock offset if we cannot answer.
+     * @return Framed average clock skew of connected peers in milliseconds, or the clock offset if we cannot answer.
      * Average is calculated over the middle "percentToInclude" peers.
+     * Todo: change Vectors to milliseconds
      */
     @Override
-    public Long getFramedAveragePeerClockSkew(int percentToInclude) {
+    public long getFramedAveragePeerClockSkew(int percentToInclude) {
         if (_manager == null) {
-            // round toward zero
-            return Long.valueOf(_context.clock().getOffset() / 1000);
+            return _context.clock().getOffset();
         }
         Vector skews = _manager.getClockSkews();
         if (skews == null ||
             skews.size() <= 0 ||
             (skews.size() < 5 && _context.clock().getUpdatedSuccessfully())) {
-            return Long.valueOf(_context.clock().getOffset() / 1000);
+            return _context.clock().getOffset();
         }
 
         // Going to calculate, sort them
@@ -106,12 +107,12 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
         long sum = 0;
         for (int i = first; i <= last; i++) {
             long value = ((Long) (skews.get(i))).longValue();
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Adding clock skew " + i + " valued " + value + " s.");
+            //if (_log.shouldLog(Log.DEBUG))
+            //    _log.debug("Adding clock skew " + i + " valued " + value + " s.");
             sum = sum + value;
         }
-        // Calculate average (round toward zero)
-        return Long.valueOf(sum / frameSize);
+        // Calculate average
+        return sum * 1000 / frameSize;
     }
     
     public List getBids(OutNetMessage msg) {
@@ -481,4 +482,38 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
         buf.append("</tt>");
         return buf.toString();
     }
+
+    /*
+     * Timestamper stuff
+     *
+     * This is used as a backup to NTP over UDP.
+     * @since 0.7.12
+     */
+
+    private static final int TIME_START_DELAY = 5*60*1000;
+    private static final int TIME_REPEAT_DELAY = 10*60*1000;
+    /** @since 0.7.12 */
+    private void startTimestamper() {
+        SimpleScheduler.getInstance().addPeriodicEvent(new Timestamper(), TIME_START_DELAY,  TIME_REPEAT_DELAY);
+    }
+
+    /**
+     * Update the clock offset based on the average of the peers.
+     * This uses the default stratum which is lower than any reasonable
+     * NTP source, so it will be ignored unless NTP is broken.
+     * @since 0.7.12
+     */
+    private class Timestamper implements SimpleTimer.TimedEvent {
+        public void timeReached() {
+             // use the same % as in RouterClock so that check will never fail
+             // This is their our offset w.r.t. them...
+             long peerOffset = getFramedAveragePeerClockSkew(50);
+             if (peerOffset == 0)
+                 return;
+             long currentOffset = _context.clock().getOffset();
+             // ... so we subtract it to get in sync with them
+             long newOffset = currentOffset - peerOffset;
+             _context.clock().setOffset(newOffset);
+        }
+    }
 }
-- 
GitLab