diff --git a/router/java/src/net/i2p/router/InNetMessagePool.java b/router/java/src/net/i2p/router/InNetMessagePool.java index 03fd24285..9eccfea5d 100644 --- a/router/java/src/net/i2p/router/InNetMessagePool.java +++ b/router/java/src/net/i2p/router/InNetMessagePool.java @@ -204,6 +204,8 @@ public class InNetMessagePool implements Service { Job dsmjob = dsmbuilder.createJob(messageBody, fromRouter, fromRouterHash); int sz = origMessages.size(); if (sz > 0) { + DatabaseStoreMessage dbsm = (DatabaseStoreMessage) messageBody; + dbsm.setReceivedAsReply(); // DSM inline, reply jobs on queue if (dsmjob != null) dsmjob.runJob(); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java index 04f89eb54..16901c711 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java @@ -20,6 +20,7 @@ import net.i2p.data.TunnelId; import net.i2p.data.router.RouterAddress; import net.i2p.data.router.RouterIdentity; import net.i2p.data.router.RouterInfo; +import net.i2p.data.router.RouterKeyGenerator; import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.data.i2np.I2NPMessage; @@ -27,10 +28,12 @@ import net.i2p.data.i2np.TunnelGatewayMessage; import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.OutNetMessage; +import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; import net.i2p.router.message.SendMessageDirectJob; import net.i2p.util.Log; +import net.i2p.util.SystemVersion; /** * Receive DatabaseStoreMessage data and store it in the local net db @@ -44,6 +47,9 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { private final FloodfillNetworkDatabaseFacade _facade; private final static int REPLY_TIMEOUT = 60*1000; private final static int MESSAGE_PRIORITY = OutNetMessage.PRIORITY_NETDB_REPLY; + // must be lower than LIMIT_ROUTERS in StartExplorersJob + // because exploration does not register a reply job + private static final int LIMIT_ROUTERS = SystemVersion.isSlow() ? 1000 : 4000; /** * @param receivedMessage must never have reply token set if it came down a tunnel @@ -87,7 +93,7 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { // throw rather than return, so that we send the ack below (prevent easy attack) dontBlamePeer = true; throw new IllegalArgumentException("Peer attempted to store local leaseSet: " + - key.toBase64().substring(0, 4)); + key.toBase32()); } LeaseSet ls = (LeaseSet) entry; //boolean oldrar = ls.getReceivedAsReply(); @@ -146,44 +152,159 @@ class HandleFloodfillDatabaseStoreMessageJob extends JobImpl { } else if (type == DatabaseEntry.KEY_TYPE_ROUTERINFO) { RouterInfo ri = (RouterInfo) entry; getContext().statManager().addRateData("netDb.storeRouterInfoHandled", 1); - if (_log.shouldDebug()) - _log.debug("Handling dbStore of router " + key + " with publishDate of " - + DataHelper.formatTime(ri.getPublished())); + if (_fromHash == null && _from != null) + _fromHash = _from.getHash(); + + boolean isUs = getContext().routerHash().equals(key); + if (!key.equals(_fromHash) && !isUs) { + if (_message.getReceivedAsReply()) { + ri.setReceivedAsReply(); + if (_message.getReplyToken() > 0) + ri.setReceivedAsPublished(true); + } else { + ri.setReceivedAsPublished(true); + } + } + if (_log.shouldInfo()) { + String req = ((_message.getReplyToken() > 0) ? " reply req." : "") + + ((_fromHash == null && ri.getReceivedAsPublished()) ? " unsolicited" : ""); + if (_fromHash == null) + _log.info("Handling dbStore of router " + key.toBase64() + " published " + + DataHelper.formatTime(ri.getPublished()) + req); + else if (_fromHash.equals(key)) + _log.info("Handling dbStore of router " + key.toBase64() + " published " + + DataHelper.formatTime(ri.getPublished()) + " from that router" + req); + else + _log.info("Handling dbStore of router " + key.toBase64() + " published " + + DataHelper.formatTime(ri.getPublished()) + " from: " + _fromHash.toBase64() + req); + } try { // Never store our RouterInfo received from somebody else. // This generally happens from a FloodfillVerifyStoreJob. // If it is valid, it shouldn't be newer than what we have - unless // somebody has our keys... - if (getContext().routerHash().equals(key)) { + if (isUs) { //getContext().statManager().addRateData("netDb.storeLocalRouterInfoAttempt", 1, 0); // This is initiated by PeerTestJob from another peer // throw rather than return, so that we send the ack below (prevent easy attack) dontBlamePeer = true; throw new IllegalArgumentException("Peer attempted to store our RouterInfo"); } - getContext().profileManager().heardAbout(key); - prevNetDb = getContext().netDb().store(key, ri); - wasNew = ((null == prevNetDb) || (prevNetDb.getPublished() < ri.getPublished())); + boolean shouldStore = true; + if (ri.getReceivedAsPublished()) { + // these are often just dup stores from concurrent lookups + prevNetDb = (RouterInfo) _facade.lookupLocallyWithoutValidation(key); + if (prevNetDb == null) { + // actually new + int count = _facade.getDataStore().size(); + if (count > LIMIT_ROUTERS) { + if (_facade.floodfillEnabled()) { + // determine if they're "close enough" + // we will still ack and flood by setting wasNew = true even if we don't store locally + // so even just-reseeded new routers will get stored to the right place + RouterKeyGenerator gen = getContext().routerKeyGenerator(); + byte[] rkey = gen.getRoutingKey(key).getData(); + byte[] ourRKey = getContext().routerHash().getData(); + int distance = (((rkey[0] ^ ourRKey[0]) & 0xff) << 8) | + ((rkey[1] ^ ourRKey[1]) & 0xff); + // they have to be within 1/256 of the keyspace + if (distance >= 256) { + long until = gen.getTimeTillMidnight(); + if (until > FloodfillNetworkDatabaseFacade.NEXT_RKEY_RI_ADVANCE_TIME) { + // appx. 90% max drop rate so even just-reseeded new routers will make it eventually + int pdrop = Math.min(110, (128 * count / LIMIT_ROUTERS) - 128); + if (ri.getCapabilities().indexOf(Router.CAPABILITY_UNREACHABLE) >= 0) + pdrop *= 3; + if (pdrop > 0 && (pdrop >= 128 || getContext().random().nextInt(128) < pdrop)) { + if (_log.shouldWarn()) + _log.warn("Dropping new unsolicited dbStore of " + ri.getCapabilities() + + " router " + key.toBase64() + " with distance " + distance + + " drop probability " + (pdrop * 100 / 128)); + shouldStore = false; + // still flood if requested + if (_message.getReplyToken() > 0) + wasNew = true; + } + } else { + // almost midnight, recheck with tomorrow's keys + rkey = gen.getNextRoutingKey(key).getData(); + ourRKey = gen.getNextRoutingKey(getContext().routerHash()).getData(); + distance = (((rkey[0] ^ ourRKey[0]) & 0xff) << 8) | + ((rkey[1] ^ ourRKey[1]) & 0xff); + if (distance >= 256) { + int pdrop = Math.min(110, (128 * count / LIMIT_ROUTERS) - 128); + if (ri.getCapabilities().indexOf(Router.CAPABILITY_UNREACHABLE) >= 0) + pdrop *= 3; + if (pdrop > 0 && (pdrop >= 128 || getContext().random().nextInt(128) < pdrop)) { + if (_log.shouldWarn()) + _log.warn("Dropping new unsolicited dbStore of router " + key.toBase64() + + " with distance " + distance); + shouldStore = false; + // still flood if requested + if (_message.getReplyToken() > 0) + wasNew = true; + } + } + } + } + if (shouldStore && _log.shouldDebug()) + _log.debug("Allowing new unsolicited dbStore of router " + key.toBase64() + " with distance " + distance); + } else { + // non-ff + // up to 100% drop rate + int pdrop = (128 * count / LIMIT_ROUTERS) - 128; + if (ri.getCapabilities().indexOf(Router.CAPABILITY_UNREACHABLE) >= 0) + pdrop *= 3; + if (pdrop > 0 && (pdrop >= 128 || getContext().random().nextInt(128) < pdrop)) { + if (_log.shouldWarn()) + _log.warn("Dropping new unsolicited dbStore of router " + key.toBase64() + + " drop probability " + (pdrop * 100 / 128)); + shouldStore = false; + // don't bother checking ban/blocklists. + //wasNew = true; + } + } + } + if (shouldStore && _log.shouldWarn()) + _log.warn("Handling new unsolicited dbStore of router " + key.toBase64()); + } else if (prevNetDb.getPublished() >= ri.getPublished()) { + shouldStore = false; + } + } + if (shouldStore) { + prevNetDb = _facade.store(key, ri); + wasNew = ((null == prevNetDb) || (prevNetDb.getPublished() < ri.getPublished())); + } // Check new routerinfo address against blocklist if (wasNew) { // TODO should we not flood temporarily banned routers either? boolean forever = getContext().banlist().isBanlistedForever(key); - if (forever) + if (forever) { wasNew = false; // don't flood + shouldStore = false; // don't call heardAbout() + } if (prevNetDb == null) { if (!forever && - getContext().blocklist().isBlocklisted(ri) && - _log.shouldLog(Log.WARN)) + getContext().blocklist().isBlocklisted(ri)) { + if (_log.shouldWarn()) _log.warn("Blocklisting new peer " + key + ' ' + ri); + wasNew = false; // don't flood + shouldStore = false; // don't call heardAbout() + } } else if (!forever) { Collection oldAddr = prevNetDb.getAddresses(); Collection newAddr = ri.getAddresses(); if ((!newAddr.equals(oldAddr)) && - getContext().blocklist().isBlocklisted(ri) && - _log.shouldLog(Log.WARN)) + getContext().blocklist().isBlocklisted(ri)) { + if (_log.shouldWarn()) _log.warn("New address received, Blocklisting old peer " + key + ' ' + ri); + wasNew = false; // don't flood + shouldStore = false; // don't call heardAbout() + } } } + if (shouldStore) + getContext().profileManager().heardAbout(key); } catch (UnsupportedCryptoException uce) { invalidMessage = uce.getMessage(); dontBlamePeer = true;