NetDB: Query connected peers for their RI directly

Negative cache router hash when expiring RI
Log tweaks
This commit is contained in:
zzz
2022-09-03 11:59:39 -04:00
parent 00774590b0
commit 49299f3f28
8 changed files with 188 additions and 18 deletions

View File

@@ -34,6 +34,14 @@ public class ObjectCounter<K> implements Serializable {
return 1;
}
/**
* Set a high value
* @since 0.9.56
*/
public void max(K h) {
map.put(h, new AtomicInteger(Integer.MAX_VALUE / 2));
}
/**
* @return current count
*/

View File

@@ -0,0 +1,79 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.Hash;
import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.data.router.RouterInfo;
import net.i2p.router.Job;
import net.i2p.router.RouterContext;
import net.i2p.router.OutNetMessage;
import net.i2p.util.Log;
/**
* Ask a connected peer for his RI.
* Modified from SingleSearchJob.
*
* Mainly for older routers. As of 0.9.55, transports will
* periodically send their RI.
* Some old routers may not respond or may send DSRM,
* e.g. if hidden (and i2pd?)
*
* @since 0.9.56
*/
class DirectLookupJob extends FloodOnlySearchJob {
private OutNetMessage _onm;
private final RouterInfo _oldRI;
private static final int TIMEOUT = 8*1000;
/**
* @param peer for Router Info only
*/
public DirectLookupJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash peer, RouterInfo oldRI, Job onFind, Job onFail) {
super(ctx, facade, peer, onFind, onFail, TIMEOUT);
_oldRI = oldRI;
}
@Override
public String getName() { return "NetDb direct RI request"; }
@Override
public boolean shouldProcessDSRM() { return false; } // don't loop
@Override
public void runJob() {
RouterContext ctx = getContext();
_onm = ctx.messageRegistry().registerPending(_replySelector, _onReply, _onTimeout);
DatabaseLookupMessage dlm = new DatabaseLookupMessage(ctx, true);
dlm.setFrom(ctx.routerHash());
long exp = ctx.clock().now() + 5*1000;
dlm.setMessageExpiration(exp);
dlm.setSearchKey(_key);
dlm.setSearchType(DatabaseLookupMessage.Type.RI);
OutNetMessage m = new OutNetMessage(ctx, dlm, exp,
OutNetMessage.PRIORITY_MY_NETDB_LOOKUP, _oldRI);
ctx.commSystem().processMessage(m);
_lookupsRemaining.set(1);
}
@Override
void failed() {
RouterContext ctx = getContext();
ctx.messageRegistry().unregisterPending(_onm);
ctx.profileManager().dbLookupFailed(_key);
_facade.complete(_key);
for (Job j : _onFailed) {
ctx.jobQueue().addJob(j);
}
}
@Override
void success() {
// don't give him any credit
//getContext().profileManager().dbLookupSuccessful(_to, System.currentTimeMillis()-_created);
_facade.complete(_key);
RouterContext ctx = getContext();
for (Job j : _onFind) {
ctx.jobQueue().addJob(j);
}
}
}

View File

@@ -0,0 +1,34 @@
package net.i2p.router.networkdb.kademlia;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext;
/**
* Override to not call failed() in setMessage(),
* as it will be called from runJob()
*
* @since 0.9.56
*/
class DirectLookupMatchJob extends FloodOnlyLookupMatchJob {
public DirectLookupMatchJob(RouterContext ctx, FloodSearchJob job) {
super(ctx, job);
}
@Override
public String getName() { return "Direct lookup match"; }
/**
* Override to not call failed() in setMessage(),
* as it will be called from runJob()
*/
@Override
public void setMessage(I2NPMessage message) {
if (message.getType() != DatabaseStoreMessage.MESSAGE_TYPE)
return;
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
if (dsm.getKey().equals(_search.getKey()))
_success = true;
}
}

View File

@@ -11,9 +11,9 @@ import net.i2p.router.RouterContext;
import net.i2p.util.Log;
class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
private final Log _log;
private final FloodSearchJob _search;
private volatile boolean _success;
protected final Log _log;
protected final FloodSearchJob _search;
protected volatile boolean _success;
public FloodOnlyLookupMatchJob(RouterContext ctx, FloodSearchJob job) {
super(ctx);
@@ -50,8 +50,8 @@ class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
return;
DatabaseStoreMessage dsm = (DatabaseStoreMessage)message;
if (_log.shouldLog(Log.INFO))
_log.info(_search.getJobId() + ": got a DSM for "
if (_log.shouldDebug())
_log.debug(_search.getJobId() + ": got a DSM for "
+ dsm.getKey().toBase64());
// This store will handled by HFDSMJ.
// Just note success here.

View File

@@ -56,9 +56,24 @@ abstract class FloodOnlySearchJob extends FloodSearchJob {
_timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT);
_expiration = _timeoutMs + ctx.clock().now();
_unheardFrom = new HashSet<Hash>(CONCURRENT_SEARCHES);
_replySelector = new FloodOnlyLookupSelector(getContext(), this);
_onReply = new FloodOnlyLookupMatchJob(getContext(), this);
_onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
_replySelector = new FloodOnlyLookupSelector(ctx, this);
_onReply = new FloodOnlyLookupMatchJob(ctx, this);
_onTimeout = new FloodOnlyLookupTimeoutJob(ctx, this);
}
/**
* For DirectLookupJob extension, RI only, different match job
*
* @since 0.9.56
*/
protected FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs) {
super(ctx, facade, key, onFind, onFailed, timeoutMs, false);
_timeoutMs = timeoutMs;
_expiration = _timeoutMs + ctx.clock().now();
_unheardFrom = new HashSet<Hash>(1);
_replySelector = new FloodOnlyLookupSelector(ctx, this);
_onReply = new DirectLookupMatchJob(ctx, this);
_onTimeout = new FloodOnlyLookupTimeoutJob(ctx, this);
}
public boolean shouldProcessDSRM() { return _shouldProcessDSRM; }

View File

@@ -464,12 +464,12 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
}
if (isNew) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("this is the first search for that key, fire off the FloodSearchJob");
if (_log.shouldDebug())
_log.debug("New ISJ for " + key.toBase64());
_context.jobQueue().addJob(searchJob);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Deferring flood search for " + key.toBase64() + " with " + _activeFloodQueries.size() + " in progress");
if (_log.shouldDebug())
_log.debug("Wait for pending ISJ for " + key.toBase64());
searchJob.addDeferred(onFindJob, onFailedLookupJob, timeoutMs, isLease);
// not necessarily LS
_context.statManager().addRateData("netDb.lookupDeferred", 1, searchJob.getExpiration()-_context.clock().now());
@@ -579,6 +579,32 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
*/
@Override
protected void lookupBeforeDropping(Hash peer, RouterInfo info) {
if (_context.commSystem().isEstablished(peer)) {
// see DirectLookupJob
boolean isNew = false;
FloodSearchJob searchJob;
Job onFindJob = new DropLookupFoundJob(_context, peer, info);
Job onFailedLookupJob = new DropLookupFailedJob(_context, peer, info);
synchronized (_activeFloodQueries) {
searchJob = _activeFloodQueries.get(peer);
if (searchJob == null) {
searchJob = new DirectLookupJob(_context, this, peer, info, onFindJob, onFailedLookupJob);
_activeFloodQueries.put(peer, searchJob);
isNew = true;
}
}
if (isNew) {
if (_log.shouldDebug())
_log.debug("Direct RI lookup for " + peer.toBase64());
_context.jobQueue().addJob(searchJob);
} else {
if (_log.shouldDebug())
_log.debug("Pending Direct RI lookup for " + peer.toBase64());
searchJob.addDeferred(onFindJob, onFailedLookupJob, 10*1000, false);
}
return;
}
// following are some special situations, we don't want to
// drop the peer in these cases
// yikes don't do this - stack overflow // getFloodfillPeers().size() == 0 ||
@@ -609,17 +635,17 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
// entry locally, firing no job if it gets a reply with an updated value (meaning
// we shouldn't drop them but instead use the new data), or if they all time out,
// firing the dropLookupFailedJob, which actually removes out local reference
if (_log.shouldDebug())
_log.debug("ISJ lookup before dropping for " + peer.toBase64() + ' ' + info.getPublished());
search(peer, new DropLookupFoundJob(_context, peer, info), new DropLookupFailedJob(_context, peer, info), 10*1000, false);
}
private class DropLookupFailedJob extends JobImpl {
private final Hash _peer;
private final RouterInfo _info;
public DropLookupFailedJob(RouterContext ctx, Hash peer, RouterInfo info) {
super(ctx);
_peer = peer;
_info = info;
}
public String getName() { return "Lookup on failure of netDb peer timed out"; }
public void runJob() {
@@ -639,10 +665,8 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
public String getName() { return "Lookup on failure of netDb peer matched"; }
public void runJob() {
RouterInfo updated = lookupRouterInfoLocally(_peer);
if ( (updated != null) && (updated.getPublished() > _info.getPublished()) ) {
// great, a legitimate update
} else {
// they just sent us what we already had. kill 'em both
if (updated == null || updated.getPublished() <= _info.getPublished()) {
// they just sent us what we already had
dropAfterLookupFailed(_peer);
}
}

View File

@@ -1379,6 +1379,7 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
*/
void dropAfterLookupFailed(Hash peer) {
_context.peerManager().removeCapabilities(peer);
_negativeCache.cache(peer);
_kb.remove(peer);
//if (removed) {
// if (_log.shouldLog(Log.INFO))

View File

@@ -36,6 +36,15 @@ class NegativeLookupCache {
this.counter.increment(h);
}
/**
* Negative cache the hash until the next clean time.
*
* @since 0.9.56
*/
public void cache(Hash h) {
this.counter.max(h);
}
public boolean isCached(Hash h) {
if (counter.count(h) >= _maxFails)
return true;