diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java index 65542028243e3d4418284a1f1c8ec4c8ffa8cc67..37da75d7e08eb7d207d3e47b882f1d66de3c214a 100644 --- a/core/java/src/net/i2p/data/DataHelper.java +++ b/core/java/src/net/i2p/data/DataHelper.java @@ -722,6 +722,7 @@ public class DataHelper { return true; } + /** treat bytes as unsigned */ public final static int compareTo(byte lhs[], byte rhs[]) { if ((rhs == null) && (lhs == null)) return 0; if (lhs == null) return -1; @@ -729,9 +730,9 @@ public class DataHelper { if (rhs.length < lhs.length) return 1; if (rhs.length > lhs.length) return -1; for (int i = 0; i < rhs.length; i++) { - if (rhs[i] > lhs[i]) + if ((rhs[i] & 0xff) > (lhs[i] & 0xff)) return -1; - else if (rhs[i] < lhs[i]) return 1; + else if ((rhs[i] & 0xff) < (lhs[i] & 0xff)) return 1; } return 0; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java index d9d2eaa144e65b1c69fff6c896369b3093aa67a6..1664094c29bf01bcb509471c6584935e8a57b52f 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java @@ -67,7 +67,11 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad _context.inNetMessagePool().registerHandlerJobBuilder(DatabaseStoreMessage.MESSAGE_TYPE, new FloodfillDatabaseStoreMessageHandler(_context, this)); } - private static final long PUBLISH_TIMEOUT = 30*1000; + /** + * This maybe could be shorter than RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT, + * because we are sending direct, but unresponsive floodfills may take a while due to timeouts. + */ + static final long PUBLISH_TIMEOUT = 90*1000; /** * @throws IllegalArgumentException if the local router info is invalid diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java index eec1908a8f042eed8dfccfdac9ff203faec98989..e1d2fe4527fe8697ae99f024ee550d9f846a8fb9 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillPeerSelector.java @@ -8,13 +8,12 @@ package net.i2p.router.networkdb.kademlia; * */ -import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.TreeMap; +import java.util.TreeSet; import net.i2p.data.Hash; import net.i2p.data.RouterInfo; @@ -75,8 +74,12 @@ class FloodfillPeerSelector extends PeerSelector { * List is not sorted and not shuffled. */ public List<Hash> selectFloodfillParticipants(KBucketSet kbuckets) { + return selectFloodfillParticipants(null, kbuckets); + } + + public List<Hash> selectFloodfillParticipants(Set<Hash> toIgnore, KBucketSet kbuckets) { if (kbuckets == null) return new ArrayList(); - FloodfillSelectionCollector matches = new FloodfillSelectionCollector(null, null, 0); + FloodfillSelectionCollector matches = new FloodfillSelectionCollector(null, toIgnore, 0); kbuckets.getAll(matches); return matches.getFloodfillParticipants(); } @@ -86,26 +89,77 @@ class FloodfillPeerSelector extends PeerSelector { * @param key the routing key * @param maxNumRouters max to return * Sorted by closest to the key if > maxNumRouters, otherwise not + * The list is in 3 groups - sorted by routing key within each group. + * Group 1: No store or lookup failure in last 15 minutes + * Group 2: No store or lookup failure in last 3 minutes + * Group 3: All others */ public List<Hash> selectFloodfillParticipants(Hash key, int maxNumRouters, KBucketSet kbuckets) { - List<Hash> ffs = selectFloodfillParticipants(kbuckets); - if (ffs.size() <= maxNumRouters) - return ffs; // unsorted - TreeMap<BigInteger, Hash> sorted = new TreeMap(); - for (int i = 0; i < ffs.size(); i++) { - Hash h = ffs.get(i); - BigInteger diff = getDistance(key, h); - sorted.put(diff, h); + return selectFloodfillParticipants(key, maxNumRouters, null, kbuckets); + } + + /** 1.5 * PublishLocalRouterInfoJob.PUBLISH_DELAY */ + private static final int NO_FAIL_STORE_OK = 30*60*1000; + private static final int NO_FAIL_STORE_GOOD = NO_FAIL_STORE_OK * 2; + private static final int NO_FAIL_LOOKUP_OK = 5*60*1000; + private static final int NO_FAIL_LOOKUP_GOOD = NO_FAIL_LOOKUP_OK * 3; + + public List<Hash> selectFloodfillParticipants(Hash key, int howMany, Set<Hash> toIgnore, KBucketSet kbuckets) { + List<Hash> ffs = selectFloodfillParticipants(toIgnore, kbuckets); + TreeSet<Hash> sorted = new TreeSet(new XORComparator(key)); + sorted.addAll(ffs); + + List<Hash> rv = new ArrayList(howMany); + List<Hash> okff = new ArrayList(howMany); + List<Hash> badff = new ArrayList(howMany); + int found = 0; + long now = _context.clock().now(); + + // split sorted list into 3 sorted lists + for (int i = 0; found < howMany && i < ffs.size(); i++) { + Hash entry = sorted.first(); + sorted.remove(entry); + if (entry == null) + break; // shouldn't happen + RouterInfo info = _context.netDb().lookupRouterInfoLocally(entry); + if (info != null && now - info.getPublished() > 3*60*60*1000) { + badff.add(entry); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Skipping, published a while ago: " + entry); + } else { + PeerProfile prof = _context.profileOrganizer().getProfile(entry); + if (prof != null && prof.getDBHistory() != null + && now - prof.getDBHistory().getLastStoreFailed() > NO_FAIL_STORE_GOOD + && now - prof.getDBHistory().getLastLookupFailed() > NO_FAIL_LOOKUP_GOOD) { + // good + rv.add(entry); + found++; + } else if (prof != null && prof.getDBHistory() != null + && now - prof.getDBHistory().getLastStoreFailed() > NO_FAIL_STORE_OK + && now - prof.getDBHistory().getLastLookupFailed() > NO_FAIL_LOOKUP_OK) { + okff.add(entry); + } else { + badff.add(entry); + } + } + } + + // Put the ok floodfills after the good floodfills + for (int i = 0; found < howMany && i < okff.size(); i++) { + rv.add(okff.get(i)); + found++; } - List<Hash> rv = new ArrayList(maxNumRouters); - for (int i = 0; i < maxNumRouters; i++) { - rv.add(sorted.remove(sorted.firstKey())); + // Put the "bad" floodfills after the ok floodfills + for (int i = 0; found < howMany && i < badff.size(); i++) { + rv.add(badff.get(i)); + found++; } + return rv; } private class FloodfillSelectionCollector implements SelectionCollector { - private TreeMap<BigInteger, Hash> _sorted; + private TreeSet<Hash> _sorted; private List<Hash> _floodfillMatches; private Hash _key; private Set<Hash> _toIgnore; @@ -113,7 +167,7 @@ class FloodfillPeerSelector extends PeerSelector { private int _wanted; public FloodfillSelectionCollector(Hash key, Set<Hash> toIgnore, int wanted) { _key = key; - _sorted = new TreeMap(); + _sorted = new TreeSet(new XORComparator(key)); _floodfillMatches = new ArrayList(8); _toIgnore = toIgnore; _matches = 0; @@ -152,8 +206,7 @@ class FloodfillPeerSelector extends PeerSelector { // So we keep going for a while. This, together with periodically shuffling the // KBucket (see KBucketImpl.add()) makes exploration work well. if ( (!SearchJob.onlyQueryFloodfillPeers(_context)) && (_wanted + EXTRA_MATCHES > _matches) && (_key != null) ) { - BigInteger diff = getDistance(_key, entry); - _sorted.put(diff, entry); + _sorted.add(entry); } else { return; } @@ -216,10 +269,13 @@ class FloodfillPeerSelector extends PeerSelector { rv.add(badff.get(i)); found++; } + // are we corrupting _sorted here? for (int i = rv.size(); i < howMany; i++) { if (_sorted.size() <= 0) break; - rv.add(_sorted.remove(_sorted.firstKey())); + Hash entry = _sorted.first(); + rv.add(entry); + _sorted.remove(entry); } return rv; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java index bfc26d902b504926a608d88f8b615dd62cd04e1c..3706606e408c55c61beb0c88373790b2f552de79 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java @@ -176,14 +176,19 @@ public class FloodfillVerifyStoreJob extends JobImpl { public void setMessage(I2NPMessage message) { _message = message; } } - /** the netDb store failed to verify, so resend it to a random floodfill peer */ + /** + * the netDb store failed to verify, so resend it to a random floodfill peer + * Fixme - this can loop for a long time - do we need a token or counter + * so we don't have multiple verify jobs? + */ private void resend() { - DataStructure ds = null; - ds = _facade.lookupLeaseSetLocally(_key); - if (ds == null) + DataStructure ds; + if (_isRouterInfo) ds = _facade.lookupRouterInfoLocally(_key); + else + ds = _facade.lookupLeaseSetLocally(_key); if (ds != null) - _facade.sendStore(_key, ds, null, null, VERIFY_TIMEOUT, null); + _facade.sendStore(_key, ds, null, null, FloodfillNetworkDatabaseFacade.PUBLISH_TIMEOUT, null); } private class VerifyTimeoutJob extends JobImpl { @@ -197,6 +202,8 @@ public class FloodfillVerifyStoreJob extends JobImpl { if (_sentTo != null) getContext().profileManager().dbStoreFailed(_sentTo); getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime, 0); + if (_log.shouldLog(Log.WARN)) + _log.warn("Verify timed out"); resend(); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index f7ecf82929e99d234d56b3e1548f4d8ebd243be0..85cb962859f24250f862d65b3bbda06f2c92ae86 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -513,7 +513,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } // Don't spam the floodfills. In addition, always delay a few seconds since there may // be another leaseset change coming along momentarily. - long nextTime = Math.max(j.lastPublished() + j.REPUBLISH_LEASESET_TIMEOUT, _context.clock().now() + PUBLISH_DELAY); + long nextTime = Math.max(j.lastPublished() + RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT, _context.clock().now() + PUBLISH_DELAY); j.getTiming().setStartAfter(nextTime); _context.jobQueue().addJob(j); } @@ -885,20 +885,28 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } /** smallest allowed period */ - private static final int MIN_PER_PEER_TIMEOUT = 5*1000; - private static final int MAX_PER_PEER_TIMEOUT = 10*1000; - + private static final int MIN_PER_PEER_TIMEOUT = 2*1000; + /** + * We want FNDF.PUBLISH_TIMEOUT and RepublishLeaseSetJob.REPUBLISH_LEASESET_TIMEOUT + * to be greater than MAX_PER_PEER_TIMEOUT * TIMEOUT_MULTIPLIER by a factor of at least + * 3 or 4, to allow at least that many peers to be attempted for a store. + */ + private static final int MAX_PER_PEER_TIMEOUT = 7*1000; + private static final int TIMEOUT_MULTIPLIER = 3; + + /** todo: does this need more tuning? */ public int getPeerTimeout(Hash peer) { PeerProfile prof = _context.profileOrganizer().getProfile(peer); double responseTime = MAX_PER_PEER_TIMEOUT; if (prof != null && prof.getIsExpandedDB()) { - responseTime = prof.getDbResponseTime().getLifetimeAverageValue(); - if (responseTime < MIN_PER_PEER_TIMEOUT) - responseTime = MIN_PER_PEER_TIMEOUT; - else if (responseTime > MAX_PER_PEER_TIMEOUT) + responseTime = prof.getDbResponseTime().getRate(24*60*60*1000l).getAverageValue(); + // if 0 then there is no data, set to max. + if (responseTime <= 0 || responseTime > MAX_PER_PEER_TIMEOUT) responseTime = MAX_PER_PEER_TIMEOUT; + else if (responseTime < MIN_PER_PEER_TIMEOUT) + responseTime = MIN_PER_PEER_TIMEOUT; } - return 4 * (int)responseTime; // give it up to 4x the average response time + return TIMEOUT_MULTIPLIER * (int)responseTime; // give it up to 3x the average response time } public void sendStore(Hash key, DataStructure ds, Job onSuccess, Job onFailure, long sendTimeout, Set toIgnore) { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java index 95af13a4322fa493abc30a702f592a76e946bfd8..9f4dc9a37dfa3341d21782da0522e58c9ac8e4a7 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java @@ -23,7 +23,7 @@ import net.i2p.util.Log; public class RepublishLeaseSetJob extends JobImpl { private Log _log; private final static long REPUBLISH_LEASESET_DELAY = 5*60*1000; - public final /* static */ long REPUBLISH_LEASESET_TIMEOUT = 60*1000; + public final static long REPUBLISH_LEASESET_TIMEOUT = 90*1000; private Hash _dest; private KademliaNetworkDatabaseFacade _facade; /** this is actually last attempted publish */ diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index d189068b5d4ee21b69f4c7f97733aaac04f52756..e0ec4dde8ad6fa88b3557d611e4a93b68d6ebf58 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -109,9 +109,13 @@ class StoreJob extends JobImpl { } if (isExpired()) { _state.complete(true); + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Expired: " + _timeoutMs); fail(); } else if (_state.getAttempted().size() > MAX_PEERS_SENT) { _state.complete(true); + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Max sent"); fail(); } else { //if (_log.shouldLog(Log.INFO)) @@ -147,10 +151,11 @@ class StoreJob extends JobImpl { // the network to scale. // Perhaps the ultimate solution is to send RouterInfos through a lease also. List<Hash> closestHashes; - if (_state.getData() instanceof RouterInfo) - closestHashes = getMostReliableRouters(_state.getTarget(), toCheck, _state.getAttempted()); - else - closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); + //if (_state.getData() instanceof RouterInfo) + // closestHashes = getMostReliableRouters(_state.getTarget(), toCheck, _state.getAttempted()); + //else + // closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); + closestHashes = getClosestFloodfillRouters(_state.getTarget(), toCheck, _state.getAttempted()); if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { if (_state.getPending().size() <= 0) { if (_log.shouldLog(Log.INFO)) @@ -217,6 +222,7 @@ class StoreJob extends JobImpl { * * @return ordered list of Hash objects */ +/***** private List<Hash> getClosestRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) { Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key); //if (_log.shouldLog(Log.DEBUG)) @@ -226,13 +232,27 @@ class StoreJob extends JobImpl { if (ks == null) return new ArrayList(); return _peerSelector.selectNearestExplicit(rkey, numClosest, alreadyChecked, ks); } +*****/ + /** used for routerinfo stores, prefers those already connected */ +/***** private List<Hash> getMostReliableRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) { Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key); KBucketSet ks = _facade.getKBuckets(); if (ks == null) return new ArrayList(); return _peerSelector.selectMostReliablePeers(rkey, numClosest, alreadyChecked, ks); } +*****/ + + private List<Hash> getClosestFloodfillRouters(Hash key, int numClosest, Set<Hash> alreadyChecked) { + Hash rkey = getContext().routingKeyGenerator().getRoutingKey(key); + KBucketSet ks = _facade.getKBuckets(); + if (ks == null) return new ArrayList(); + return ((FloodfillPeerSelector)_peerSelector).selectFloodfillParticipants(rkey, numClosest, alreadyChecked, ks); + } + + /** limit expiration for direct sends */ + private static final int MAX_DIRECT_EXPIRATION = 15*1000; /** * Send a store to the given peer through a garlic route, including a reply @@ -242,9 +262,11 @@ class StoreJob extends JobImpl { private void sendStore(RouterInfo router, int responseTime) { DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext()); msg.setKey(_state.getTarget()); - if (_state.getData() instanceof RouterInfo) + if (_state.getData() instanceof RouterInfo) { msg.setRouterInfo((RouterInfo)_state.getData()); - else if (_state.getData() instanceof LeaseSet) + if (responseTime > MAX_DIRECT_EXPIRATION) + responseTime = MAX_DIRECT_EXPIRATION; + } else if (_state.getData() instanceof LeaseSet) msg.setLeaseSet((LeaseSet)_state.getData()); else throw new IllegalArgumentException("Storing an unknown data type! " + _state.getData()); @@ -255,11 +277,11 @@ class StoreJob extends JobImpl { if (_log.shouldLog(Log.ERROR)) _log.error(getJobId() + ": Dont send store to ourselves - why did we try?"); return; - } else { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(getJobId() + ": Send store to " + router.getIdentity().getHash().toBase64()); } + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Send store timeout is " + responseTime); + sendStore(msg, router, getContext().clock().now() + responseTime); } @@ -269,7 +291,6 @@ class StoreJob extends JobImpl { sendStoreThroughGarlic(msg, peer, expiration); } else { getContext().statManager().addRateData("netDb.storeRouterInfoSent", 1, 0); - // todo - shorter expiration for direct? sendDirect(msg, peer, expiration); } }