From aa7496226333536653291971da443e176cef58e4 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Tue, 10 Nov 2009 18:24:15 +0000
Subject: [PATCH]     * DataHelper: Fix broken byte[] compareTo() used by
 XORComparator,       was not doing unsigned comparisons!     * Floodfill
 rework part 2 of N:       Store closest to the key, subject to last failed   
    lookup and store stats.     * FloodfillPeerSelector: Use standard
 XORComparator       instead of messing with BigInteger     *
 FloodfillVerifyStoreJob: Set correct timeout for       requeued store job    
 * KNDF: Rework getPeerTimout() to use 1 day averages,       and lower the
 min, max, and multiplication factor.     * Publish jobs: Lengthen timeout to
 90s (was 30s for       routerinfos and 60s for leasesets)     * StoreJob:
 Limit max peer timeout to 15s for direct stores

---
 core/java/src/net/i2p/data/DataHelper.java    |  5 +-
 .../FloodfillNetworkDatabaseFacade.java       |  6 +-
 .../kademlia/FloodfillPeerSelector.java       | 94 +++++++++++++++----
 .../kademlia/FloodfillVerifyStoreJob.java     | 17 +++-
 .../KademliaNetworkDatabaseFacade.java        | 26 +++--
 .../kademlia/RepublishLeaseSetJob.java        |  2 +-
 .../router/networkdb/kademlia/StoreJob.java   | 41 ++++++--
 7 files changed, 144 insertions(+), 47 deletions(-)

diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java
index 6554202824..37da75d7e0 100644
--- a/core/java/src/net/i2p/data/DataHelper.java
+++ b/core/java/src/net/i2p/data/DataHelper.java
@@ -722,6 +722,7 @@ public class DataHelper {
         return true;
     }
     
+    /** treat bytes as unsigned */
     public final static int compareTo(byte lhs[], byte rhs[]) {
         if ((rhs == null) && (lhs == null)) return 0;
         if (lhs == null) return -1;
@@ -729,9 +730,9 @@ public class DataHelper {
         if (rhs.length < lhs.length) return 1;
         if (rhs.length > lhs.length) return -1;
         for (int i = 0; i < rhs.length; i++) {
-            if (rhs[i] > lhs[i])
+            if ((rhs[i] & 0xff) > (lhs[i] & 0xff))
                 return -1;
-            else if (rhs[i] < lhs[i]) return 1;
+            else if ((rhs[i] & 0xff) < (lhs[i] & 0xff)) return 1;
         }
         return 0;
     }
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java
index d9d2eaa144..1664094c29 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java
@@ -67,7 +67,11 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
         _context.inNetMessagePool().registerHandlerJobBuilder(DatabaseStoreMessage.MESSAGE_TYPE, new FloodfillDatabaseStoreMessageHandler(_context, this));
     }
     
-    private static final long PUBLISH_TIMEOUT = 30*1000;
+    /**
+     *  This maybe could be shorter than RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT,
+     *  because we are sending direct, but unresponsive floodfills may take a while due to timeouts.
+     */
+    static final long PUBLISH_TIMEOUT = 90*1000;
     
     /**
      * @throws IllegalArgumentException if the local router info is invalid
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java
index eec1908a8f..e1d2fe4527 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java
@@ -8,13 +8,12 @@ package net.i2p.router.networkdb.kademlia;
  *
  */
 
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.TreeMap;
+import java.util.TreeSet;
 
 import net.i2p.data.Hash;
 import net.i2p.data.RouterInfo;
@@ -75,8 +74,12 @@ class FloodfillPeerSelector extends PeerSelector {
      *  List is not sorted and not shuffled.
      */
     public List<Hash> selectFloodfillParticipants(KBucketSet kbuckets) {
+         return selectFloodfillParticipants(null, kbuckets);
+    }
+
+    public List<Hash> selectFloodfillParticipants(Set<Hash> toIgnore, KBucketSet kbuckets) {
         if (kbuckets == null) return new ArrayList();
-        FloodfillSelectionCollector matches = new FloodfillSelectionCollector(null, null, 0);
+        FloodfillSelectionCollector matches = new FloodfillSelectionCollector(null, toIgnore, 0);
         kbuckets.getAll(matches);
         return matches.getFloodfillParticipants();
     }
@@ -86,26 +89,77 @@ class FloodfillPeerSelector extends PeerSelector {
      *  @param key the routing key
      *  @param maxNumRouters max to return
      *  Sorted by closest to the key if > maxNumRouters, otherwise not
+     *  The list is in 3 groups - sorted by routing key within each group.
+     *  Group 1: No store or lookup failure in last 15 minutes
+     *  Group 2: No store or lookup failure in last 3 minutes
+     *  Group 3: All others
      */
     public List<Hash> selectFloodfillParticipants(Hash key, int maxNumRouters, KBucketSet kbuckets) {
-        List<Hash> ffs = selectFloodfillParticipants(kbuckets);
-        if (ffs.size() <= maxNumRouters)
-            return ffs; // unsorted
-        TreeMap<BigInteger, Hash> sorted = new TreeMap();
-        for (int i = 0; i < ffs.size(); i++) {
-            Hash h = ffs.get(i);
-            BigInteger diff = getDistance(key, h);
-            sorted.put(diff, h);
+         return selectFloodfillParticipants(key, maxNumRouters, null, kbuckets);
+    }
+
+    /** 1.5 * PublishLocalRouterInfoJob.PUBLISH_DELAY */
+    private static final int NO_FAIL_STORE_OK = 30*60*1000;
+    private static final int NO_FAIL_STORE_GOOD = NO_FAIL_STORE_OK * 2;
+    private static final int NO_FAIL_LOOKUP_OK = 5*60*1000;
+    private static final int NO_FAIL_LOOKUP_GOOD = NO_FAIL_LOOKUP_OK * 3;
+
+    public List<Hash> selectFloodfillParticipants(Hash key, int howMany, Set<Hash> toIgnore, KBucketSet kbuckets) {
+        List<Hash> ffs = selectFloodfillParticipants(toIgnore, kbuckets);
+        TreeSet<Hash> sorted = new TreeSet(new XORComparator(key));
+        sorted.addAll(ffs);
+
+        List<Hash> rv = new ArrayList(howMany);
+        List<Hash> okff = new ArrayList(howMany);
+        List<Hash> badff = new ArrayList(howMany);
+        int found = 0;
+        long now = _context.clock().now();
+
+        // split sorted list into 3 sorted lists
+        for (int i = 0; found < howMany && i < ffs.size(); i++) {
+            Hash entry = sorted.first();
+            sorted.remove(entry);
+            if (entry == null)
+                break;  // shouldn't happen
+            RouterInfo info = _context.netDb().lookupRouterInfoLocally(entry);
+            if (info != null && now - info.getPublished() > 3*60*60*1000) {
+                badff.add(entry);
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug("Skipping, published a while ago: " + entry);
+            } else {
+                PeerProfile prof = _context.profileOrganizer().getProfile(entry);
+                if (prof != null && prof.getDBHistory() != null
+                    && now - prof.getDBHistory().getLastStoreFailed() > NO_FAIL_STORE_GOOD
+                    && now - prof.getDBHistory().getLastLookupFailed() > NO_FAIL_LOOKUP_GOOD) {
+                    // good
+                    rv.add(entry);
+                    found++;
+                } else if (prof != null && prof.getDBHistory() != null
+                           && now - prof.getDBHistory().getLastStoreFailed() > NO_FAIL_STORE_OK
+                           && now - prof.getDBHistory().getLastLookupFailed() > NO_FAIL_LOOKUP_OK) {
+                    okff.add(entry);
+                } else {
+                    badff.add(entry);
+                }
+            }
+        }
+
+        // Put the ok floodfills after the good floodfills
+        for (int i = 0; found < howMany && i < okff.size(); i++) {
+            rv.add(okff.get(i));
+            found++;
         }
-        List<Hash> rv = new ArrayList(maxNumRouters);
-        for (int i = 0; i < maxNumRouters; i++) {
-            rv.add(sorted.remove(sorted.firstKey()));
+        // Put the "bad" floodfills after the ok floodfills
+        for (int i = 0; found < howMany && i < badff.size(); i++) {
+            rv.add(badff.get(i));
+            found++;
         }
+
         return rv;
     }
     
     private class FloodfillSelectionCollector implements SelectionCollector {
-        private TreeMap<BigInteger, Hash> _sorted;
+        private TreeSet<Hash> _sorted;
         private List<Hash>  _floodfillMatches;
         private Hash _key;
         private Set<Hash>  _toIgnore;
@@ -113,7 +167,7 @@ class FloodfillPeerSelector extends PeerSelector {
         private int _wanted;
         public FloodfillSelectionCollector(Hash key, Set<Hash> toIgnore, int wanted) {
             _key = key;
-            _sorted = new TreeMap();
+            _sorted = new TreeSet(new XORComparator(key));
             _floodfillMatches = new ArrayList(8);
             _toIgnore = toIgnore;
             _matches = 0;
@@ -152,8 +206,7 @@ class FloodfillPeerSelector extends PeerSelector {
                 // So we keep going for a while. This, together with periodically shuffling the
                 // KBucket (see KBucketImpl.add()) makes exploration work well.
                 if ( (!SearchJob.onlyQueryFloodfillPeers(_context)) && (_wanted + EXTRA_MATCHES > _matches) && (_key != null) ) {
-                    BigInteger diff = getDistance(_key, entry);
-                    _sorted.put(diff, entry);
+                    _sorted.add(entry);
                 } else {
                     return;
                 }
@@ -216,10 +269,13 @@ class FloodfillPeerSelector extends PeerSelector {
                 rv.add(badff.get(i));
                 found++;
             }
+            // are we corrupting _sorted here?
             for (int i = rv.size(); i < howMany; i++) {
                 if (_sorted.size() <= 0)
                     break;
-                rv.add(_sorted.remove(_sorted.firstKey()));
+                Hash entry = _sorted.first();
+                rv.add(entry);
+                _sorted.remove(entry);
             }
             return rv;
         }
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java
index bfc26d902b..3706606e40 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java
@@ -176,14 +176,19 @@ public class FloodfillVerifyStoreJob extends JobImpl {
         public void setMessage(I2NPMessage message) { _message = message; }
     }
     
-    /** the netDb store failed to verify, so resend it to a random floodfill peer */
+    /**
+     *  the netDb store failed to verify, so resend it to a random floodfill peer
+     *  Fixme - this can loop for a long time - do we need a token or counter
+     *  so we don't have multiple verify jobs?
+     */
     private void resend() {
-        DataStructure ds = null;
-        ds = _facade.lookupLeaseSetLocally(_key);
-        if (ds == null)
+        DataStructure ds;
+        if (_isRouterInfo)
             ds = _facade.lookupRouterInfoLocally(_key);
+        else
+            ds = _facade.lookupLeaseSetLocally(_key);
         if (ds != null)
-            _facade.sendStore(_key, ds, null, null, VERIFY_TIMEOUT, null);
+            _facade.sendStore(_key, ds, null, null, FloodfillNetworkDatabaseFacade.PUBLISH_TIMEOUT, null);
     }
     
     private class VerifyTimeoutJob extends JobImpl {
@@ -197,6 +202,8 @@ public class FloodfillVerifyStoreJob extends JobImpl {
             if (_sentTo != null)
                 getContext().profileManager().dbStoreFailed(_sentTo);
             getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime, 0);
+            if (_log.shouldLog(Log.WARN))
+                _log.warn("Verify timed out");
             resend(); 
         }
     }
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
index f7ecf82929..85cb962859 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java
@@ -513,7 +513,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
         }
         // Don't spam the floodfills. In addition, always delay a few seconds since there may
         // be another leaseset change coming along momentarily.
-        long nextTime = Math.max(j.lastPublished() + j.REPUBLISH_LEASESET_TIMEOUT, _context.clock().now() + PUBLISH_DELAY);
+        long nextTime = Math.max(j.lastPublished() + RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT, _context.clock().now() + PUBLISH_DELAY);
         j.getTiming().setStartAfter(nextTime);
         _context.jobQueue().addJob(j);
     }
@@ -885,20 +885,28 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade {
     }
 
     /** smallest allowed period */
-    private static final int MIN_PER_PEER_TIMEOUT = 5*1000;
-    private static final int MAX_PER_PEER_TIMEOUT = 10*1000;
-    
+    private static final int MIN_PER_PEER_TIMEOUT = 2*1000;
+    /**
+     *  We want FNDF.PUBLISH_TIMEOUT and RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT
+     *  to be greater than MAX_PER_PEER_TIMEOUT * TIMEOUT_MULTIPLIER by a factor of at least
+     *  3 or 4, to allow at least that many peers to be attempted for a store.
+     */
+    private static final int MAX_PER_PEER_TIMEOUT = 7*1000;
+    private static final int TIMEOUT_MULTIPLIER = 3;
+
+    /** todo: does this need more tuning? */
     public int getPeerTimeout(Hash peer) {
         PeerProfile prof = _context.profileOrganizer().getProfile(peer);
         double responseTime = MAX_PER_PEER_TIMEOUT;
         if (prof != null && prof.getIsExpandedDB()) {
-            responseTime = prof.getDbResponseTime().getLifetimeAverageValue();
-            if (responseTime < MIN_PER_PEER_TIMEOUT)
-                responseTime = MIN_PER_PEER_TIMEOUT;
-            else if (responseTime > MAX_PER_PEER_TIMEOUT)
+            responseTime = prof.getDbResponseTime().getRate(24*60*60*1000l).getAverageValue();
+            // if 0 then there is no data, set to max.
+            if (responseTime <= 0 || responseTime > MAX_PER_PEER_TIMEOUT)
                 responseTime = MAX_PER_PEER_TIMEOUT;
+            else if (responseTime < MIN_PER_PEER_TIMEOUT)
+                responseTime = MIN_PER_PEER_TIMEOUT;
         }
-        return 4 * (int)responseTime;  // give it up to 4x the average response time
+        return TIMEOUT_MULTIPLIER * (int)responseTime;  // give it up to 3x the average response time
     }
 
     public void sendStore(Hash key, DataStructure ds, Job onSuccess, Job onFailure, long sendTimeout, Set toIgnore) {
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java
index 95af13a432..9f4dc9a37d 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java
@@ -23,7 +23,7 @@ import net.i2p.util.Log;
 public class RepublishLeaseSetJob extends JobImpl {
     private Log _log;
     private final static long REPUBLISH_LEASESET_DELAY = 5*60*1000;
-    public final /* static */ long REPUBLISH_LEASESET_TIMEOUT = 60*1000;
+    public final static long REPUBLISH_LEASESET_TIMEOUT = 90*1000;
     private Hash _dest;
     private KademliaNetworkDatabaseFacade _facade;
     /** this is actually last attempted publish */
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java
index d189068b5d..e0ec4dde8a 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java
@@ -109,9 +109,13 @@ class StoreJob extends JobImpl {
         }
         if (isExpired()) {
             _state.complete(true);
+            if (_log.shouldLog(Log.INFO))
+                _log.info(getJobId() + ": Expired: " + _timeoutMs);
             fail();
         } else if (_state.getAttempted().size() > MAX_PEERS_SENT) {
             _state.complete(true);
+            if (_log.shouldLog(Log.INFO))
+                _log.info(getJobId() + ": Max sent");
             fail();
         } else {
             //if (_log.shouldLog(Log.INFO))
@@ -147,10 +151,11 @@ class StoreJob extends JobImpl {
         // the network to scale.
         // Perhaps the ultimate solution is to send RouterInfos through a lease also.
         List<Hash> closestHashes;
-        if (_state.getData() instanceof RouterInfo) 
-            closestHashes = getMostReliableRouters(_state.getTarget(), toCheck, _state.getAttempted());
-        else
-            closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted());
+        //if (_state.getData() instanceof RouterInfo) 
+        //    closestHashes = getMostReliableRouters(_state.getTarget(), toCheck, _state.getAttempted());
+        //else
+        //    closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted());
+        closestHashes = getClosestFloodfillRouters(_state.getTarget(), toCheck, _state.getAttempted());
         if ( (closestHashes == null) || (closestHashes.size() <= 0) ) {
             if (_state.getPending().size() <= 0) {
                 if (_log.shouldLog(Log.INFO))
@@ -217,6 +222,7 @@ class StoreJob extends JobImpl {
      *
      * @return ordered list of Hash objects
      */
+/*****
     private List<Hash> getClosestRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) {
         Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key);
         //if (_log.shouldLog(Log.DEBUG))
@@ -226,13 +232,27 @@ class StoreJob extends JobImpl {
         if (ks == null) return new ArrayList();
         return _peerSelector.selectNearestExplicit(rkey, numClosest, alreadyChecked, ks);
     }
+*****/
 
+    /** used for routerinfo stores, prefers those already connected */
+/*****
     private List<Hash> getMostReliableRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) {
         Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key);
         KBucketSet ks = _facade.getKBuckets();
         if (ks == null) return new ArrayList();
         return _peerSelector.selectMostReliablePeers(rkey, numClosest, alreadyChecked, ks);
     }
+*****/
+
+    private List<Hash> getClosestFloodfillRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) {
+        Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key);
+        KBucketSet ks = _facade.getKBuckets();
+        if (ks == null) return new ArrayList();
+        return ((FloodfillPeerSelector)_peerSelector).selectFloodfillParticipants(rkey, numClosest, alreadyChecked, ks);
+    }
+
+    /** limit expiration for direct sends */
+    private static final int MAX_DIRECT_EXPIRATION = 15*1000;
 
     /**
      * Send a store to the given peer through a garlic route, including a reply 
@@ -242,9 +262,11 @@ class StoreJob extends JobImpl {
     private void sendStore(RouterInfo router, int responseTime) {
         DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext());
         msg.setKey(_state.getTarget());
-        if (_state.getData() instanceof RouterInfo) 
+        if (_state.getData() instanceof RouterInfo) {
             msg.setRouterInfo((RouterInfo)_state.getData());
-        else if (_state.getData() instanceof LeaseSet) 
+            if (responseTime > MAX_DIRECT_EXPIRATION)
+                responseTime = MAX_DIRECT_EXPIRATION;
+        } else if (_state.getData() instanceof LeaseSet) 
             msg.setLeaseSet((LeaseSet)_state.getData());
         else
             throw new IllegalArgumentException("Storing an unknown data type! " + _state.getData());
@@ -255,11 +277,11 @@ class StoreJob extends JobImpl {
             if (_log.shouldLog(Log.ERROR))
                 _log.error(getJobId() + ": Dont send store to ourselves - why did we try?");
             return;
-        } else {
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug(getJobId() + ": Send store to " + router.getIdentity().getHash().toBase64());
         }
 
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug(getJobId() + ": Send store timeout is " + responseTime);
+
         sendStore(msg, router, getContext().clock().now() + responseTime);
     }
     
@@ -269,7 +291,6 @@ class StoreJob extends JobImpl {
             sendStoreThroughGarlic(msg, peer, expiration);
         } else {
             getContext().statManager().addRateData("netDb.storeRouterInfoSent", 1, 0);
-            // todo - shorter expiration for direct?
             sendDirect(msg, peer, expiration);
         }
     }
-- 
GitLab