NetDB: Store handler updates

Drop some unsolicited RI stores when over thresholds
Don't update profile for banned/blocked RI
Don't flood RIs with blocked IP
Log tweaks
This commit is contained in:
zzz
2023-02-12 07:58:24 -05:00
parent 78ee005870
commit 7a75ea4bef
2 changed files with 136 additions and 13 deletions

View File

@@ -204,6 +204,8 @@ public class InNetMessagePool implements Service {
Job dsmjob = dsmbuilder.createJob(messageBody, fromRouter, fromRouterHash);
int sz = origMessages.size();
if (sz > 0) {
DatabaseStoreMessage dbsm = (DatabaseStoreMessage) messageBody;
dbsm.setReceivedAsReply();
// DSM inline, reply jobs on queue
if (dsmjob != null)
dsmjob.runJob();

View File

@@ -20,6 +20,7 @@ import net.i2p.data.TunnelId;
import net.i2p.data.router.RouterAddress;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.router.RouterInfo;
import net.i2p.data.router.RouterKeyGenerator;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.data.i2np.DeliveryStatusMessage;
import net.i2p.data.i2np.I2NPMessage;
@@ -27,10 +28,12 @@ import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.router.message.SendMessageDirectJob;
import net.i2p.util.Log;
import net.i2p.util.SystemVersion;
/**
* Receive DatabaseStoreMessage data and store it in the local net db
@@ -44,6 +47,9 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
private final FloodfillNetworkDatabaseFacade _facade;
private final static int REPLY_TIMEOUT = 60*1000;
private final static int MESSAGE_PRIORITY = OutNetMessage.PRIORITY_NETDB_REPLY;
// must be lower than LIMIT_ROUTERS in StartExplorersJob
// because exploration does not register a reply job
private static final int LIMIT_ROUTERS = SystemVersion.isSlow() ? 1000 : 4000;
/**
* @param receivedMessage must never have reply token set if it came down a tunnel
@@ -87,7 +93,7 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
// throw rather than return, so that we send the ack below (prevent easy attack)
dontBlamePeer = true;
throw new IllegalArgumentException("Peer attempted to store local leaseSet: " +
key.toBase64().substring(0, 4));
key.toBase32());
}
LeaseSet ls = (LeaseSet) entry;
//boolean oldrar = ls.getReceivedAsReply();
@@ -146,44 +152,159 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl {
} else if (type == DatabaseEntry.KEY_TYPE_ROUTERINFO) {
RouterInfo ri = (RouterInfo) entry;
getContext().statManager().addRateData("netDb.storeRouterInfoHandled", 1);
if (_log.shouldDebug())
_log.debug("Handling dbStore of router " + key + " with publishDate of "
+ DataHelper.formatTime(ri.getPublished()));
if (_fromHash == null && _from != null)
_fromHash = _from.getHash();
boolean isUs = getContext().routerHash().equals(key);
if (!key.equals(_fromHash) && !isUs) {
if (_message.getReceivedAsReply()) {
ri.setReceivedAsReply();
if (_message.getReplyToken() > 0)
ri.setReceivedAsPublished(true);
} else {
ri.setReceivedAsPublished(true);
}
}
if (_log.shouldInfo()) {
String req = ((_message.getReplyToken() > 0) ? " reply req." : "") +
((_fromHash == null && ri.getReceivedAsPublished()) ? " unsolicited" : "");
if (_fromHash == null)
_log.info("Handling dbStore of router " + key.toBase64() + " published " +
DataHelper.formatTime(ri.getPublished()) + req);
else if (_fromHash.equals(key))
_log.info("Handling dbStore of router " + key.toBase64() + " published " +
DataHelper.formatTime(ri.getPublished()) + " from that router" + req);
else
_log.info("Handling dbStore of router " + key.toBase64() + " published " +
DataHelper.formatTime(ri.getPublished()) + " from: " + _fromHash.toBase64() + req);
}
try {
// Never store our RouterInfo received from somebody else.
// This generally happens from a FloodfillVerifyStoreJob.
// If it is valid, it shouldn't be newer than what we have - unless
// somebody has our keys...
if (getContext().routerHash().equals(key)) {
if (isUs) {
//getContext().statManager().addRateData("netDb.storeLocalRouterInfoAttempt", 1, 0);
// This is initiated by PeerTestJob from another peer
// throw rather than return, so that we send the ack below (prevent easy attack)
dontBlamePeer = true;
throw new IllegalArgumentException("Peer attempted to store our RouterInfo");
}
getContext().profileManager().heardAbout(key);
prevNetDb = getContext().netDb().store(key, ri);
wasNew = ((null == prevNetDb) || (prevNetDb.getPublished() < ri.getPublished()));
boolean shouldStore = true;
if (ri.getReceivedAsPublished()) {
// these are often just dup stores from concurrent lookups
prevNetDb = (RouterInfo) _facade.lookupLocallyWithoutValidation(key);
if (prevNetDb == null) {
// actually new
int count = _facade.getDataStore().size();
if (count > LIMIT_ROUTERS) {
if (_facade.floodfillEnabled()) {
// determine if they're "close enough"
// we will still ack and flood by setting wasNew = true even if we don't store locally
// so even just-reseeded new routers will get stored to the right place
RouterKeyGenerator gen = getContext().routerKeyGenerator();
byte[] rkey = gen.getRoutingKey(key).getData();
byte[] ourRKey = getContext().routerHash().getData();
int distance = (((rkey[0] ^ ourRKey[0]) & 0xff) << 8) |
((rkey[1] ^ ourRKey[1]) & 0xff);
// they have to be within 1/256 of the keyspace
if (distance >= 256) {
long until = gen.getTimeTillMidnight();
if (until > FloodfillNetworkDatabaseFacade.NEXT_RKEY_RI_ADVANCE_TIME) {
// appx. 90% max drop rate so even just-reseeded new routers will make it eventually
int pdrop = Math.min(110, (128 * count / LIMIT_ROUTERS) - 128);
if (ri.getCapabilities().indexOf(Router.CAPABILITY_UNREACHABLE) >= 0)
pdrop *= 3;
if (pdrop > 0 && (pdrop >= 128 || getContext().random().nextInt(128) < pdrop)) {
if (_log.shouldWarn())
_log.warn("Dropping new unsolicited dbStore of " + ri.getCapabilities() +
" router " + key.toBase64() + " with distance " + distance +
" drop probability " + (pdrop * 100 / 128));
shouldStore = false;
// still flood if requested
if (_message.getReplyToken() > 0)
wasNew = true;
}
} else {
// almost midnight, recheck with tomorrow's keys
rkey = gen.getNextRoutingKey(key).getData();
ourRKey = gen.getNextRoutingKey(getContext().routerHash()).getData();
distance = (((rkey[0] ^ ourRKey[0]) & 0xff) << 8) |
((rkey[1] ^ ourRKey[1]) & 0xff);
if (distance >= 256) {
int pdrop = Math.min(110, (128 * count / LIMIT_ROUTERS) - 128);
if (ri.getCapabilities().indexOf(Router.CAPABILITY_UNREACHABLE) >= 0)
pdrop *= 3;
if (pdrop > 0 && (pdrop >= 128 || getContext().random().nextInt(128) < pdrop)) {
if (_log.shouldWarn())
_log.warn("Dropping new unsolicited dbStore of router " + key.toBase64() +
" with distance " + distance);
shouldStore = false;
// still flood if requested
if (_message.getReplyToken() > 0)
wasNew = true;
}
}
}
}
if (shouldStore && _log.shouldDebug())
_log.debug("Allowing new unsolicited dbStore of router " + key.toBase64() + " with distance " + distance);
} else {
// non-ff
// up to 100% drop rate
int pdrop = (128 * count / LIMIT_ROUTERS) - 128;
if (ri.getCapabilities().indexOf(Router.CAPABILITY_UNREACHABLE) >= 0)
pdrop *= 3;
if (pdrop > 0 && (pdrop >= 128 || getContext().random().nextInt(128) < pdrop)) {
if (_log.shouldWarn())
_log.warn("Dropping new unsolicited dbStore of router " + key.toBase64() +
" drop probability " + (pdrop * 100 / 128));
shouldStore = false;
// don't bother checking ban/blocklists.
//wasNew = true;
}
}
}
if (shouldStore && _log.shouldWarn())
_log.warn("Handling new unsolicited dbStore of router " + key.toBase64());
} else if (prevNetDb.getPublished() >= ri.getPublished()) {
shouldStore = false;
}
}
if (shouldStore) {
prevNetDb = _facade.store(key, ri);
wasNew = ((null == prevNetDb) || (prevNetDb.getPublished() < ri.getPublished()));
}
// Check new routerinfo address against blocklist
if (wasNew) {
// TODO should we not flood temporarily banned routers either?
boolean forever = getContext().banlist().isBanlistedForever(key);
if (forever)
if (forever) {
wasNew = false; // don't flood
shouldStore = false; // don't call heardAbout()
}
if (prevNetDb == null) {
if (!forever &&
getContext().blocklist().isBlocklisted(ri) &&
_log.shouldLog(Log.WARN))
getContext().blocklist().isBlocklisted(ri)) {
if (_log.shouldWarn())
_log.warn("Blocklisting new peer " + key + ' ' + ri);
wasNew = false; // don't flood
shouldStore = false; // don't call heardAbout()
}
} else if (!forever) {
Collection<RouterAddress> oldAddr = prevNetDb.getAddresses();
Collection<RouterAddress> newAddr = ri.getAddresses();
if ((!newAddr.equals(oldAddr)) &&
getContext().blocklist().isBlocklisted(ri) &&
_log.shouldLog(Log.WARN))
getContext().blocklist().isBlocklisted(ri)) {
if (_log.shouldWarn())
_log.warn("New address received, Blocklisting old peer " + key + ' ' + ri);
wasNew = false; // don't flood
shouldStore = false; // don't call heardAbout()
}
}
}
if (shouldStore)
getContext().profileManager().heardAbout(key);
} catch (UnsupportedCryptoException uce) {
invalidMessage = uce.getMessage();
dontBlamePeer = true;