NetDB: Add aggressive LS expire

This commit is contained in:
zzz
2024-12-18 17:57:53 +00:00
parent 6ffdb70e4c
commit 492056e99e
4 changed files with 101 additions and 28 deletions

View File

@@ -185,13 +185,13 @@ public class LeaseSet extends DatabaseEntry {
} }
/** /**
* Also sets receivedAsReply to true * As of 0.9.65, no longer sets receivedAsReply to true
* @param localClient may be null * @param localClient may be null
* @since 0.9.47 * @since 0.9.47
*/ */
public void setReceivedBy(Hash localClient) { public void setReceivedBy(Hash localClient) {
super.setReceivedBy(localClient); super.setReceivedBy(localClient);
super.setReceivedAsReply(); //setReceivedAsReply();
} }
/** /**

View File

@@ -35,11 +35,13 @@ import net.i2p.router.ClientMessage;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.LeaseSetKeys; import net.i2p.router.LeaseSetKeys;
import net.i2p.router.MessageSelector; import net.i2p.router.MessageSelector;
import net.i2p.router.NetworkDatabaseFacade;
import net.i2p.router.ReplyJob; import net.i2p.router.ReplyJob;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo; import net.i2p.router.TunnelInfo;
import net.i2p.router.crypto.ratchet.ReplyCallback; import net.i2p.router.crypto.ratchet.ReplyCallback;
import net.i2p.router.networkdb.kademlia.KademliaNetworkDatabaseFacade;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@@ -295,9 +297,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
} }
SendJob success = new SendJob(getContext()); SendJob success = new SendJob(getContext());
KademliaNetworkDatabaseFacade kndf = (KademliaNetworkDatabaseFacade) getContext().clientNetDb(_from.calculateHash());
// set in constructor // set in constructor
if (_leaseSet != null) { if (_leaseSet != null) {
if (!_leaseSet.getReceivedAsReply()) { if (!kndf.isClientDb() && !_leaseSet.getReceivedAsReply()) {
boolean shouldFetch = true; boolean shouldFetch = true;
if (_leaseSet.getType() != DatabaseEntry.KEY_TYPE_LEASESET) { if (_leaseSet.getType() != DatabaseEntry.KEY_TYPE_LEASESET) {
LeaseSet2 ls2 = (LeaseSet2) _leaseSet; LeaseSet2 ls2 = (LeaseSet2) _leaseSet;
@@ -307,7 +310,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
if (_log.shouldInfo()) if (_log.shouldInfo())
_log.info(getJobId() + ": RAP LS, firing search: " + _leaseSet.getHash().toBase32()); _log.info(getJobId() + ": RAP LS, firing search: " + _leaseSet.getHash().toBase32());
LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext()); LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext());
getContext().clientNetDb(_from.calculateHash()).lookupLeaseSetRemotely(_leaseSet.getHash(), success, failed, kndf.lookupLeaseSetRemotely(_leaseSet.getHash(), success, failed,
LS_LOOKUP_TIMEOUT, _from.calculateHash()); LS_LOOKUP_TIMEOUT, _from.calculateHash());
} else { } else {
dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET); dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET);
@@ -330,7 +333,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
long exp = now - _leaseSet.getLatestLeaseDate(); long exp = now - _leaseSet.getLatestLeaseDate();
_log.info(getJobId() + ": leaseSet expired " + DataHelper.formatDuration(exp) + " ago, firing search: " + _leaseSet.getHash().toBase32()); _log.info(getJobId() + ": leaseSet expired " + DataHelper.formatDuration(exp) + " ago, firing search: " + _leaseSet.getHash().toBase32());
} }
getContext().clientNetDb(_from.calculateHash()).lookupLeaseSetRemotely(_leaseSet.getHash(), _from.calculateHash()); kndf.lookupLeaseSetRemotely(_leaseSet.getHash(), _from.calculateHash());
} }
} }
success.runJob(); success.runJob();
@@ -340,7 +343,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
_log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job for " + _toString + " from client " + _from.calculateHash().toBase32()); _log.debug(getJobId() + ": Send outbound client message - sending off leaseSet lookup job for " + _toString + " from client " + _from.calculateHash().toBase32());
LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext()); LookupLeaseSetFailedJob failed = new LookupLeaseSetFailedJob(getContext());
Hash key = _to.calculateHash(); Hash key = _to.calculateHash();
getContext().clientNetDb(_from.calculateHash()).lookupLeaseSet(key, success, failed, LS_LOOKUP_TIMEOUT, _from.calculateHash()); kndf.lookupLeaseSet(key, success, failed, LS_LOOKUP_TIMEOUT, _from.calculateHash());
} }
} }
@@ -422,14 +425,18 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
*/ */
private int getNextLease() { private int getNextLease() {
// set in runJob if found locally // set in runJob if found locally
if (_leaseSet == null || !_leaseSet.getReceivedAsReply()) { KademliaNetworkDatabaseFacade kndf = (KademliaNetworkDatabaseFacade) getContext().clientNetDb(_from.calculateHash());
_leaseSet = getContext().clientNetDb(_from.calculateHash()).lookupLeaseSetLocally(_to.calculateHash()); if (_leaseSet == null || (!kndf.isClientDb() && _leaseSet.getReceivedAsPublished())) {
if (_leaseSet == null) { if (_leaseSet == null) {
// shouldn't happen // shouldn't happen
_leaseSet = kndf.lookupLeaseSetLocally(_to.calculateHash());
if (_leaseSet == null) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Lookup locally didn't find the leaseSet for " + _toString); _log.warn(getJobId() + ": Lookup locally didn't find the leaseSet for " + _toString);
return MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET; return MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET;
} else if (_leaseSet.getReceivedAsPublished()) { }
}
if (!kndf.isClientDb() && _leaseSet.getReceivedAsPublished()) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Only have RAP LS for " + _toString); _log.warn(getJobId() + ": Only have RAP LS for " + _toString);
return MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET; return MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET;

View File

@@ -9,16 +9,21 @@ package net.i2p.router.networkdb.kademlia;
*/ */
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import net.i2p.data.DatabaseEntry; import net.i2p.data.DatabaseEntry;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.data.LeaseSet; import net.i2p.data.LeaseSet;
import net.i2p.data.router.RouterKeyGenerator;
import net.i2p.router.JobImpl; import net.i2p.router.JobImpl;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SystemVersion;
/** /**
* Periodically search through all leases to find expired ones, failing those * Periodically search through all leases to find expired ones, failing those
@@ -31,6 +36,8 @@ class ExpireLeasesJob extends JobImpl {
private final KademliaNetworkDatabaseFacade _facade; private final KademliaNetworkDatabaseFacade _facade;
private final static long RERUN_DELAY_MS = 1*60*1000; private final static long RERUN_DELAY_MS = 1*60*1000;
private static final int LIMIT_LEASES_FF = 1250;
private static final int LIMIT_LEASES_CLIENT = SystemVersion.isSlow() ? 750 : 300;
public ExpireLeasesJob(RouterContext ctx, KademliaNetworkDatabaseFacade facade) { public ExpireLeasesJob(RouterContext ctx, KademliaNetworkDatabaseFacade facade) {
super(ctx); super(ctx);
@@ -42,16 +49,13 @@ class ExpireLeasesJob extends JobImpl {
public void runJob() { public void runJob() {
List<Hash> toExpire = selectKeysToExpire(); List<Hash> toExpire = selectKeysToExpire();
if (!toExpire.isEmpty()) {
for (Hash key : toExpire) { for (Hash key : toExpire) {
_facade.fail(key); _facade.fail(key);
//_log.info("Lease " + key + " is expiring, so lets look for it again", new Exception("Expire and search"));
//_facade.lookupLeaseSet(key, null, null, RERUN_DELAY_MS);
} }
if (_log.shouldInfo()) if (_log.shouldInfo())
_log.info("(dbid: " + _facade _log.info(_facade + " Leases expired: " + toExpire.size());
+ "; db size: " + _facade.getKnownLeaseSets() }
+ ") Leases expired: " + toExpire);
//_facade.queueForExploration(toExpire); // don't do explicit searches, just explore passively
requeue(RERUN_DELAY_MS); requeue(RERUN_DELAY_MS);
} }
@@ -62,19 +66,80 @@ class ExpireLeasesJob extends JobImpl {
*/ */
private List<Hash> selectKeysToExpire() { private List<Hash> selectKeysToExpire() {
RouterContext ctx = getContext(); RouterContext ctx = getContext();
List<Hash> toExpire = new ArrayList<Hash>(128); boolean isClient = _facade.isClientDb();
for (Map.Entry<Hash, DatabaseEntry> entry : _facade.getDataStore().getMapEntries()) { boolean isFFDB = _facade.floodfillEnabled() && !isClient;
Set<Map.Entry<Hash, DatabaseEntry>> entries = _facade.getDataStore().getMapEntries();
// clientdb only has leasesets
List<LeaseSet> current = new ArrayList<LeaseSet>(isFFDB ? 512 : (isClient ? entries.size() : 128));
List<Hash> toExpire = new ArrayList<Hash>(Math.min(entries.size(), 128));
int sz = 0;
for (Map.Entry<Hash, DatabaseEntry> entry : entries) {
DatabaseEntry obj = entry.getValue(); DatabaseEntry obj = entry.getValue();
if (obj.isLeaseSet()) { if (obj.isLeaseSet()) {
LeaseSet ls = (LeaseSet)obj; LeaseSet ls = (LeaseSet)obj;
if (!ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) {
Hash h = entry.getKey(); Hash h = entry.getKey();
boolean isLocal = ctx.clientManager().isLocal(h);
if (!ls.isCurrent(Router.CLOCK_FUDGE_FACTOR)) {
toExpire.add(h); toExpire.add(h);
if (ctx.clientManager().isLocal(h)) if (isLocal)
_log.logAlways(Log.WARN, "Expired local leaseset " + h.toBase32()); _log.logAlways(Log.WARN, "Expired local leaseset " + h.toBase32());
} else if (!isLocal) {
// do not aggressive expire RAR LS but still count them
sz++;
if (!ls.getReceivedAsReply())
current.add(ls);
} }
} }
} }
int origsz = sz;
int limit = isFFDB ? LIMIT_LEASES_FF : LIMIT_LEASES_CLIENT;
if (sz > limit) {
// aggressive drop strategy
if (isFFDB) {
RouterKeyGenerator gen = ctx.routerKeyGenerator();
byte[] ourRKey = ctx.routerHash().getData();
for (LeaseSet ls : current) {
Hash h = ls.getHash();
// don't drop very close to us
byte[] rkey = gen.getRoutingKey(h).getData();
int distance = (((rkey[0] ^ ourRKey[0]) & 0xff) << 8) |
((rkey[1] ^ ourRKey[1]) & 0xff);
// they have to be within 1/256 of the keyspace
if (distance >= 256) {
toExpire.add(h);
if (--sz <= limit)
break;
}
}
} else {
Collections.sort(current, new LeaseSetComparator());
for (LeaseSet ls : current) {
toExpire.add(ls.getHash());
//if (_log.shouldInfo())
// _log.info("Aggressive LS expire for " + _facade + '\n' + ls);
if (--sz <= limit)
break;
}
}
int exp = origsz - sz;
if (exp > 0 && _log.shouldWarn())
_log.warn("Aggressive LS expire for " + _facade + " removed " + exp +
" leasesets, limit " + limit + ", size now " + sz);
}
return toExpire; return toExpire;
} }
/**
* Oldest first
* @since 0.9.65
*/
private static class LeaseSetComparator implements Comparator<LeaseSet> {
public int compare(LeaseSet l, LeaseSet r) {
long dl = l.getLatestLeaseDate();
long dr = r.getLatestLeaseDate();
if (dl < dr) return -1;
if (dl > dr) return 1;
return 0;
}
}
} }

View File

@@ -122,9 +122,10 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
// for it. This flag must NOT get set on entries that we // for it. This flag must NOT get set on entries that we
// receive in response to our own lookups. // receive in response to our own lookups.
// See ../HDLMJ for more info // See ../HDLMJ for more info
if (!_facade.isClientDb()) {
if (!ls.getReceivedAsReply()) if (!ls.getReceivedAsReply())
ls.setReceivedAsPublished(); ls.setReceivedAsPublished();
if (_facade.isClientDb()) { } else {
// This is where we deal with what happens if a client subDB tries to store // This is where we deal with what happens if a client subDB tries to store
// a leaseSet which it is the owner/publisher of. // a leaseSet which it is the owner/publisher of.
// Look up a ls hash in the netDbSegmentor, and compare it to the _facade that we have. // Look up a ls hash in the netDbSegmentor, and compare it to the _facade that we have.