diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java deleted file mode 100644 index 307ae5f7954017884f5cc120d295a208284a66a3..0000000000000000000000000000000000000000 --- a/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java +++ /dev/null @@ -1,101 +0,0 @@ -package net.i2p.router.networkdb.kademlia; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import net.i2p.data.DataStructure; -import net.i2p.data.Hash; -import net.i2p.data.LeaseSet; -import net.i2p.router.JobImpl; -import net.i2p.router.Router; -import net.i2p.router.RouterContext; -import net.i2p.util.Log; - -class DataPublisherJob extends JobImpl { - private Log _log; - private KademliaNetworkDatabaseFacade _facade; - private final static long RERUN_DELAY_MS = 120*1000; - private final static int MAX_SEND_PER_RUN = 1; // publish no more than 2 at a time - private final static long STORE_TIMEOUT = 60*1000; // give 'er a minute to send the data - - public DataPublisherJob(RouterContext ctx, KademliaNetworkDatabaseFacade facade) { - super(ctx); - _log = ctx.logManager().getLog(DataPublisherJob.class); - _facade = facade; - getTiming().setStartAfter(ctx.clock().now()+RERUN_DELAY_MS); // not immediate... - } - - public String getName() { return "Data Publisher Job"; } - public void runJob() { - Set toSend = selectKeysToSend(); - if (_log.shouldLog(Log.INFO)) - _log.info("Keys being published in this timeslice: " + toSend); - for (Iterator iter = toSend.iterator(); iter.hasNext(); ) { - Hash key = (Hash)iter.next(); - DataStructure data = _facade.getDataStore().get(key); - if (data == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Trying to send a key we dont have? " + key); - continue; - } - if (data instanceof LeaseSet) { - LeaseSet ls = (LeaseSet)data; - if (!ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Not publishing a lease that isn't current - " + key, - new Exception("Publish expired lease?")); - } - if (!getContext().clientManager().shouldPublishLeaseSet(key)) - continue; - } - _facade.sendStore(key, data, null, null, STORE_TIMEOUT, null); - //StoreJob store = new StoreJob(getContext(), _facade, key, data, null, null, STORE_TIMEOUT); - //getContext().jobQueue().addJob(store); - } - requeue(RERUN_DELAY_MS); - } - - private Set selectKeysToSend() { - Set explicit = _facade.getExplicitSendKeys(); - Set toSend = new HashSet(MAX_SEND_PER_RUN); - - // if there's nothing we *need* to send, only send 10% of the time - if (explicit.size() <= 0) { - if (getContext().random().nextInt(10) > 0) - return toSend; - } - - if (explicit.size() < MAX_SEND_PER_RUN) { - toSend.addAll(explicit); - _facade.removeFromExplicitSend(explicit); - - Set passive = _facade.getPassivelySendKeys(); - Set psend = new HashSet(passive.size()); - for (Iterator iter = passive.iterator(); iter.hasNext(); ) { - if (toSend.size() >= MAX_SEND_PER_RUN) break; - Hash key = (Hash)iter.next(); - toSend.add(key); - psend.add(key); - } - _facade.removeFromPassiveSend(psend); - } else { - for (Iterator iter = explicit.iterator(); iter.hasNext(); ) { - if (toSend.size() >= MAX_SEND_PER_RUN) break; - Hash key = (Hash)iter.next(); - toSend.add(key); - } - _facade.removeFromExplicitSend(toSend); - } - - return toSend; - } -} diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/DataRepublishingSelectorJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/DataRepublishingSelectorJob.java deleted file mode 100644 index 173522ab072c1b5b093fa10c3b89642e5f7804d8..0000000000000000000000000000000000000000 --- a/router/java/src/net/i2p/router/networkdb/kademlia/DataRepublishingSelectorJob.java +++ /dev/null @@ -1,175 +0,0 @@ -package net.i2p.router.networkdb.kademlia; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import java.util.TreeMap; - -import net.i2p.data.Hash; -import net.i2p.data.LeaseSet; -import net.i2p.data.RouterInfo; -import net.i2p.router.JobImpl; -import net.i2p.router.Router; -import net.i2p.router.RouterContext; -import net.i2p.util.Log; - -class DataRepublishingSelectorJob extends JobImpl { - private Log _log; - private KademliaNetworkDatabaseFacade _facade; - - private final static long RERUN_DELAY_MS = 1*60*1000; - public final static int MAX_PASSIVE_POOL_SIZE = 10; // no need to have the pool be too big - - /** - * For every bucket away from us, resend period increases by 5 minutes - so we resend - * our own key every 5 minutes, and keys very far from us every 2.5 hours, increasing - * linearly - */ - public final static long RESEND_BUCKET_FACTOR = 5*60*1000; - - /** - * % chance any peer not specializing in the lease's key will broadcast it on each pass - * of this job /after/ waiting 5 minutes (one RESENT_BUCKET_FACTOR). In other words, - * .5% of routers will broadcast a particular unexpired lease to (say) 5 peers every - * minute. - * - */ - private final static int LEASE_REBROADCAST_PROBABILITY = 5; - /** - * LEASE_REBROADCAST_PROBABILITY out of LEASE_REBROADCAST_PROBABILITY_SCALE chance. - */ - private final static int LEASE_REBROADCAST_PROBABILITY_SCALE = 1000; - - public DataRepublishingSelectorJob(RouterContext ctx, KademliaNetworkDatabaseFacade facade) { - super(ctx); - _log = ctx.logManager().getLog(DataRepublishingSelectorJob.class); - _facade = facade; - getTiming().setStartAfter(ctx.clock().now()+RERUN_DELAY_MS); // not immediate... - } - - public String getName() { return "Data Publisher Job"; } - public void runJob() { - Set toSend = selectKeysToSend(); - if (_log.shouldLog(Log.INFO)) - _log.info("Keys being queued up for publishing: " + toSend); - _facade.queueForPublishing(toSend); - requeue(RERUN_DELAY_MS); - } - - /** - * Run through the entire data store, ranking how much we want to send each - * data point, and returning the ones we most want to send so that they can - * be placed in the passive send pool (without making the passive pool greater - * than the limit) - * - */ - private Set selectKeysToSend() { - Set alreadyQueued = new HashSet(128); - alreadyQueued.addAll(_facade.getPassivelySendKeys()); - - int toAdd = MAX_PASSIVE_POOL_SIZE - alreadyQueued.size(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Keys we need to queue up to fill the passive send pool: " + toAdd); - if (toAdd <= 0) return new HashSet(); - - alreadyQueued.addAll(_facade.getExplicitSendKeys()); - - Set keys = _facade.getDataStore().getKeys(); - keys.removeAll(alreadyQueued); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Total number of keys in the datastore: " + keys.size()); - - TreeMap toSend = new TreeMap(); - for (Iterator iter = keys.iterator(); iter.hasNext(); ) { - Hash key = (Hash)iter.next(); - Long lastPublished = _facade.getLastSent(key); - long publishRank = rankPublishNeed(key, lastPublished); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Publish rank for " + key + ": " + publishRank); - if (publishRank > 0) { - while (toSend.containsKey(new Long(publishRank))) - publishRank++; - toSend.put(new Long(publishRank), key); - } - } - Set rv = new HashSet(toAdd); - for (Iterator iter = toSend.values().iterator(); iter.hasNext(); ) { - if (rv.size() > toAdd) break; - Hash key = (Hash)iter.next(); - rv.add(key); - } - return rv; - } - - /** - * Higher values mean we want to publish it more, and values less than or equal to zero - * means we don't want to publish it - * - */ - private long rankPublishNeed(Hash key, Long lastPublished) { - int bucket = _facade.getKBuckets().pickBucket(key); - long sendPeriod = (bucket+1) * RESEND_BUCKET_FACTOR; - long now = getContext().clock().now(); - if (lastPublished.longValue() < now-sendPeriod) { - RouterInfo ri = _facade.lookupRouterInfoLocally(key); - if (ri != null) { - if (ri.isCurrent(2 * ExpireRoutersJob.EXPIRE_DELAY)) { - // last time it was sent was before the last send period - return KBucketSet.NUM_BUCKETS - bucket; - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Not republishing router " + key - + " since it is really old [" - + (now-ri.getPublished()) + "ms]"); - return -2; - } - } else { - LeaseSet ls = _facade.lookupLeaseSetLocally(key); - if (ls != null) { - if (!getContext().clientManager().shouldPublishLeaseSet(ls.getDestination().calculateHash())) - return -3; - if (ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) { - // last time it was sent was before the last send period - return KBucketSet.NUM_BUCKETS - bucket; - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Not republishing leaseSet " + key - + " since it is really old [" - + (now-ls.getEarliestLeaseDate()) + "ms]"); - return -3; - } - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Key " + key + " is not a leaseSet or routerInfo, definitely not publishing it"); - return -5; - } - } - } else { - // its been published since the last period we want to publish it - - if (now - RESEND_BUCKET_FACTOR > lastPublished.longValue()) { - if (_facade.lookupRouterInfoLocally(key) != null) { - // randomize the chance of rebroadcast for leases if we haven't - // sent it within 5 minutes - int val = getContext().random().nextInt(LEASE_REBROADCAST_PROBABILITY_SCALE); - if (val <= LEASE_REBROADCAST_PROBABILITY) { - if (_log.shouldLog(Log.INFO)) - _log.info("Randomized rebroadcast of leases tells us to send " - + key + ": " + val); - return 1; - } - } - } - return -1; - } - } -} diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index 23ef78d86956961261ff24e9013978ee01a43bf4..82e3a7f705a6b078e8ccb9e232b3332d53ab1267 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -53,10 +53,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { private DataStore _ds; // hash to DataStructure mapping, persisted when necessary /** where the data store is pushing the data */ private String _dbDir; - private Set _explicitSendKeys; // set of Hash objects that should be published ASAP - private Set _passiveSendKeys; // set of Hash objects that should be published when there's time private Set _exploreKeys; // set of Hash objects that we should search on (to fill up a bucket, not to get data) - private Map _lastSent; // Hash to Long (date last sent, or <= 0 for never) private boolean _initialized; /** Clock independent time of when we started up */ private long _started; @@ -153,53 +150,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _exploreJob.updateExploreSchedule(); } - public Set getExplicitSendKeys() { - if (!_initialized) return null; - synchronized (_explicitSendKeys) { - return new HashSet(_explicitSendKeys); - } - } - public Set getPassivelySendKeys() { - if (!_initialized) return null; - synchronized (_passiveSendKeys) { - return new HashSet(_passiveSendKeys); - } - } - public void removeFromExplicitSend(Set toRemove) { - if (!_initialized) return; - synchronized (_explicitSendKeys) { - _explicitSendKeys.removeAll(toRemove); - } - } - public void removeFromPassiveSend(Set toRemove) { - if (!_initialized) return; - synchronized (_passiveSendKeys) { - _passiveSendKeys.removeAll(toRemove); - } - } - public void queueForPublishing(Set toSend) { - if (!_initialized) return; - synchronized (_passiveSendKeys) { - _passiveSendKeys.addAll(toSend); - } - } - - public Long getLastSent(Hash key) { - if (!_initialized) return null; - synchronized (_lastSent) { - if (!_lastSent.containsKey(key)) - _lastSent.put(key, new Long(0)); - return (Long)_lastSent.get(key); - } - } - - public void noteKeySent(Hash key) { - if (!_initialized) return; - synchronized (_lastSent) { - _lastSent.put(key, new Long(_context.clock().now())); - } - } - public Set getExploreKeys() { if (!_initialized) return null; synchronized (_exploreKeys) { @@ -226,10 +176,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _initialized = false; _kb = null; _ds = null; - _explicitSendKeys = null; - _passiveSendKeys = null; _exploreKeys = null; - _lastSent = null; } public void restart() { @@ -244,9 +191,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { else _enforceNetId = DEFAULT_ENFORCE_NETID; _ds.restart(); - synchronized (_explicitSendKeys) { _explicitSendKeys.clear(); } synchronized (_exploreKeys) { _exploreKeys.clear(); } - synchronized (_passiveSendKeys) { _passiveSendKeys.clear(); } _initialized = true; @@ -273,10 +218,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _kb = new KBucketSet(_context, ri.getIdentity().getHash()); _ds = new PersistentDataStore(_context, dbDir, this); //_ds = new TransientDataStore(); - _explicitSendKeys = new HashSet(64); - _passiveSendKeys = new HashSet(64); _exploreKeys = new HashSet(64); - _lastSent = new HashMap(1024); _dbDir = dbDir; createHandlers(); @@ -284,9 +226,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _initialized = true; _started = System.currentTimeMillis(); - // read the queues and publish appropriately - if (false) - _context.jobQueue().addJob(new DataPublisherJob(_context, this)); // expire old leases _context.jobQueue().addJob(new ExpireLeasesJob(_context, this)); @@ -298,9 +237,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { ////_context.jobQueue().addJob(new ExpireRoutersJob(_context, this)); if (!_quiet) { - // fill the passive queue periodically - // Is this pointless too??? - _context.jobQueue().addJob(new DataRepublishingSelectorJob(_context, this)); // fill the search queue with random keys in buckets that are too small // Disabled since KBucketImpl.generateRandomKey() is b0rked, // and anyway, we want to search for a completely random key, @@ -532,9 +468,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { if (!_context.clientManager().shouldPublishLeaseSet(h)) return; - synchronized (_explicitSendKeys) { - _explicitSendKeys.add(h); - } RepublishLeaseSetJob j = null; synchronized (_publishingLeaseSets) { j = (RepublishLeaseSetJob)_publishingLeaseSets.get(h); @@ -563,9 +496,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { if (_context.router().isHidden()) return; // DE-nied! Hash h = localRouterInfo.getIdentity().getHash(); store(h, localRouterInfo); - synchronized (_explicitSendKeys) { - _explicitSendKeys.add(h); - } } /** @@ -658,10 +588,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { throw new IllegalArgumentException("Invalid store attempt - " + err); _ds.put(key, leaseSet); - synchronized (_lastSent) { - if (!_lastSent.containsKey(key)) - _lastSent.put(key, new Long(0)); - } // Iterate through the old failure / success count, copying over the old // values (if any tunnels overlap between leaseSets). no need to be @@ -770,10 +696,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _context.peerManager().setCapabilities(key, routerInfo.getCapabilities()); _ds.put(key, routerInfo); - synchronized (_lastSent) { - if (!_lastSent.containsKey(key)) - _lastSent.put(key, new Long(0)); - } if (rv == null) _kb.add(key); return rv; @@ -808,15 +730,6 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _ds.remove(dbEntry); else _ds.removeLease(dbEntry); - synchronized (_lastSent) { - _lastSent.remove(dbEntry); - } - synchronized (_explicitSendKeys) { - _explicitSendKeys.remove(dbEntry); - } - synchronized (_passiveSendKeys) { - _passiveSendKeys.remove(dbEntry); - } } /** don't use directly - see F.N.D.F. override */ @@ -833,30 +746,12 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } _ds.remove(peer); - synchronized (_lastSent) { - _lastSent.remove(peer); - } - synchronized (_explicitSendKeys) { - _explicitSendKeys.remove(peer); - } - synchronized (_passiveSendKeys) { - _passiveSendKeys.remove(peer); - } } public void unpublish(LeaseSet localLeaseSet) { if (!_initialized) return; Hash h = localLeaseSet.getDestination().calculateHash(); DataStructure data = _ds.remove(h); - synchronized (_lastSent) { - _lastSent.remove(h); - } - synchronized (_explicitSendKeys) { - _explicitSendKeys.remove(h); - } - synchronized (_passiveSendKeys) { - _passiveSendKeys.remove(h); - } if (data == null) { if (_log.shouldLog(Log.WARN)) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index 18d18f13e2fdd12b39e3bce2257560315f5d41f9..cc4c51329c550e8d4f2be2c98777c91e5206e652 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -437,7 +437,6 @@ class StoreJob extends JobImpl { _log.debug(getJobId() + ": State of successful send: " + _state); if (_onSuccess != null) getContext().jobQueue().addJob(_onSuccess); - _facade.noteKeySent(_state.getTarget()); _state.complete(true); getContext().statManager().addRateData("netDb.storePeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted()); }