From f1170b948f6b4f8a07500245aa989d90be3d8d84 Mon Sep 17 00:00:00 2001 From: zzz <zzz@i2pmail.org> Date: Tue, 28 Dec 2021 09:57:42 -0500 Subject: [PATCH] NetDB: StoreJob reliability improvements - Always use a lease as the reply tunnel when publishing LS through a client tunnel This ensures we're not using about-to-expire tunnels for the reply, and ensures the ff is able pick an alternate - Don't count skipped peers as attempted in FloodfillVerify - Pass failed and skipped peers to FloodfillVerify job to be skipped there also - Pass failed and skipped peers from FloodfillVerify job to the next StoreJob on failure - Consolidate common reply token generation code in StoreJob - Ensure tunnel diversity in StoreJob retries by only using tunnels closest to the target for the first request. This increases reliability by not reusing the same tunnels for all retries. - Refactor StoreState to rework inefficient methods and unused data - Clean up commented-out code in StoreState - Log tweaks --- .../networkdb/kademlia/FloodfillStoreJob.java | 8 +- .../kademlia/FloodfillVerifyStoreJob.java | 23 +- .../router/networkdb/kademlia/StoreJob.java | 207 ++++++++++++------ .../router/networkdb/kademlia/StoreState.java | 148 ++++++------- 4 files changed, 231 insertions(+), 155 deletions(-) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java index ff731bb13a..2a23b34547 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillStoreJob.java @@ -8,7 +8,6 @@ package net.i2p.router.networkdb.kademlia; * */ -import java.util.NoSuchElementException; import java.util.Set; import net.i2p.data.DatabaseEntry; @@ -98,10 +97,7 @@ class FloodfillStoreJob extends StoreJob { published = data.getDate(); } // we should always have exactly one successful entry - Hash sentTo = null; - try { - sentTo = _state.getSuccessful().iterator().next(); - } catch (NoSuchElementException nsee) {} + Hash sentTo = _state.getSuccessful(); Hash client; if (type == DatabaseEntry.KEY_TYPE_ENCRYPTED_LS2) { // get the real client hash @@ -111,7 +107,7 @@ class FloodfillStoreJob extends StoreJob { } Job fvsj = new FloodfillVerifyStoreJob(ctx, key, client, published, type, - sentTo, _facade); + sentTo, _state.getAttempted(), _facade); if (shouldLog) _log.info(getJobId() + ": Succeeded sending key " + key + ", queueing verify job " + fvsj.getJobId()); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java index f9e024a396..239ccac352 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillVerifyStoreJob.java @@ -40,6 +40,7 @@ class FloodfillVerifyStoreJob extends JobImpl { private final FloodfillNetworkDatabaseFacade _facade; private long _expiration; private long _sendTime; + private int _attempted; private final long _published; private final int _type; private final boolean _isRouterInfo; @@ -59,9 +60,11 @@ class FloodfillVerifyStoreJob extends JobImpl { * @param client generally the same as key, unless encrypted LS2; non-null * @param published getDate() for RI or LS1, getPublished() for LS2 * @param sentTo who to give the credit or blame to, can be null + * @param toSkip don't query any of these peers, may be null + * @since 0.9.53 added toSkip param */ public FloodfillVerifyStoreJob(RouterContext ctx, Hash key, Hash client, long published, int type, - Hash sentTo, FloodfillNetworkDatabaseFacade facade) { + Hash sentTo, Set<Hash> toSkip, FloodfillNetworkDatabaseFacade facade) { super(ctx); facade.verifyStarted(key); _key = key; @@ -73,7 +76,12 @@ class FloodfillVerifyStoreJob extends JobImpl { _log = ctx.logManager().getLog(getClass()); _sentTo = sentTo; _facade = facade; - _ignore = new HashSet<Hash>(MAX_PEERS_TO_TRY); + _ignore = new HashSet<Hash>(8); + if (toSkip != null) { + synchronized(toSkip) { + _ignore.addAll(toSkip); + } + } if (sentTo != null) { _ipSet = new MaskedIPSet(ctx, sentTo, IP_CLOSE_BYTES); _ignore.add(_sentTo); @@ -192,7 +200,7 @@ class FloodfillVerifyStoreJob extends JobImpl { // We don't have a compatible way to get a reply, // skip it for now. if (_log.shouldWarn()) - _log.warn("Skipping store verify for ECIES client " + _client.toBase32()); + _log.warn("Skipping store verify to " + _target + " for ECIES- or ElG-only client " + _client.toBase32()); _facade.verifyFinished(_key); return; } @@ -242,6 +250,7 @@ class FloodfillVerifyStoreJob extends JobImpl { new VerifyReplyJob(getContext()), new VerifyTimeoutJob(getContext())); ctx.tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), _target); + _attempted++; } /** @@ -442,10 +451,14 @@ class FloodfillVerifyStoreJob extends JobImpl { _log.info(getJobId() + ": Verify failed, but new store already happened for: " + _key); return; } - Set<Hash> toSkip = new HashSet<Hash>(2); + Set<Hash> toSkip = new HashSet<Hash>(8); if (_sentTo != null) toSkip.add(_sentTo); toSkip.add(_target); + // pass over all the ignores for the next attempt + // unless we've had a crazy number of attempts, then start over + if (_ignore.size() < 20) + toSkip.addAll(_ignore); if (_log.shouldWarn()) _log.warn(getJobId() + ": Verify failed, starting new store for: " + _key); _facade.sendStore(_key, ds, null, null, FloodfillNetworkDatabaseFacade.PUBLISH_TIMEOUT, toSkip); @@ -467,7 +480,7 @@ class FloodfillVerifyStoreJob extends JobImpl { getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime); if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Verify timed out for: " + _key); - if (_ignore.size() < MAX_PEERS_TO_TRY) { + if (_attempted < MAX_PEERS_TO_TRY) { // Don't resend, simply rerun FVSJ.this inline and // chose somebody besides _target for verification _ignore.add(_target); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index addb6e6063..9c30922366 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -8,7 +8,10 @@ package net.i2p.router.networkdb.kademlia; * */ +import java.io.Serializable; +import java.util.Arrays; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -19,7 +22,9 @@ import net.i2p.data.Base64; import net.i2p.data.Certificate; import net.i2p.data.DatabaseEntry; import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; import net.i2p.data.Hash; +import net.i2p.data.Lease; import net.i2p.data.LeaseSet; import net.i2p.data.router.RouterIdentity; import net.i2p.data.router.RouterInfo; @@ -128,7 +133,7 @@ abstract class StoreJob extends JobImpl { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Expired: " + _timeoutMs); fail(); - } else if (_state.getAttempted().size() > MAX_PEERS_SENT) { + } else if (_state.getAttemptedCount() > MAX_PEERS_SENT) { _state.complete(true); if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Max sent"); @@ -155,7 +160,7 @@ abstract class StoreJob extends JobImpl { */ private synchronized void continueSending() { if (_state.completed()) return; - int toCheck = getParallelization() - _state.getPending().size(); + int toCheck = getParallelization() - _state.getPendingCount(); if (toCheck <= 0) { // too many already pending if (_log.shouldLog(Log.DEBUG)) @@ -177,7 +182,7 @@ abstract class StoreJob extends JobImpl { // closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); closestHashes = getClosestFloodfillRouters(_state.getTarget(), toCheck, _state.getAttempted()); if ( (closestHashes == null) || (closestHashes.isEmpty()) ) { - if (_state.getPending().isEmpty()) { + if (_state.getPendingCount() <= 0) { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": No more peers left and none pending"); fail(); @@ -187,7 +192,6 @@ abstract class StoreJob extends JobImpl { return; } } else { - //_state.addPending(closestHashes); int queued = 0; int skipped = 0; int type = _state.getData().getType(); @@ -255,14 +259,14 @@ abstract class StoreJob extends JobImpl { // if (!((RouterInfo)ds).isHidden()) { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": Continue sending key " + _state.getTarget() + - " after " + _state.getAttempted().size() + " tries to " + closestHashes); + " after " + _state.getAttemptedCount() + " tries to " + closestHashes); _state.addPending(peer); sendStore((RouterInfo)ds, peerTimeout); queued++; //} } } - if (queued == 0 && _state.getPending().isEmpty()) { + if (queued == 0 && _state.getPendingCount() <= 0) { if (_log.shouldLog(Log.INFO)) _log.info(getJobId() + ": No more peers left after skipping " + skipped + " and none pending"); // queue a job to go around again rather than recursing @@ -322,6 +326,12 @@ abstract class StoreJob extends JobImpl { _log.error("Hash mismatch StoreJob"); return; } + if (router.getIdentity().equals(getContext().router().getRouterInfo().getIdentity())) { + // don't send it to ourselves + _log.error(getJobId() + ": Dont send store to ourselves - why did we try?"); + return; + } + DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext()); int type = _state.getData().getType(); if (type == DatabaseEntry.KEY_TYPE_ROUTERINFO) { @@ -334,16 +344,11 @@ abstract class StoreJob extends JobImpl { long now = getContext().clock().now(); msg.setMessageExpiration(now + _timeoutMs); - if (router.getIdentity().equals(getContext().router().getRouterInfo().getIdentity())) { - // don't send it to ourselves - if (_log.shouldLog(Log.ERROR)) - _log.error(getJobId() + ": Dont send store to ourselves - why did we try?"); - return; - } - - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(getJobId() + ": Send store timeout is " + responseTime); + long token = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE); + msg.setReplyToken(token); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": send(dbStore) w/ token expected " + msg.getReplyToken() + " msg exp. " + _timeoutMs + " resp exp. " + responseTime); sendStore(msg, router, now + responseTime); } @@ -381,18 +386,17 @@ abstract class StoreJob extends JobImpl { * */ private void sendDirect(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { - long token = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE); - msg.setReplyToken(token); msg.setReplyGateway(getContext().routerHash()); - _state.addPending(peer.getIdentity().getHash()); + Hash to = peer.getIdentity().getHash(); + _state.addPending(to); SendSuccessJob onReply = new SendSuccessJob(getContext(), peer); FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now()); - StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration); + StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, msg.getReplyToken(), expiration); if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": sending store directly to " + peer.getIdentity().getHash()); + _log.debug(getJobId() + ": sending store directly to " + to); OutNetMessage m = new OutNetMessage(getContext(), msg, expiration, STORE_PRIORITY, peer); m.setOnFailedReplyJob(onFail); m.setOnFailedSendJob(onFail); @@ -410,38 +414,37 @@ abstract class StoreJob extends JobImpl { * @since 0.9.41 renamed from sendStoreThroughGarlic() */ private void sendStoreThroughExploratory(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { - long token = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE); - Hash to = peer.getIdentity().getHash(); - TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundExploratoryTunnel(to); + // For all tunnel selections, the first time we pick the tunnel with the far-end closest + // to the target. After that we pick a random tunnel, or else we'd pick the + // same tunnels every time. + TunnelInfo replyTunnel; + if (_state.getAttemptedCount() <= 1) + replyTunnel = getContext().tunnelManager().selectInboundExploratoryTunnel(to); + else + replyTunnel = getContext().tunnelManager().selectInboundTunnel(); if (replyTunnel == null) { _log.warn("No reply inbound tunnels available!"); return; } TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0); - msg.setReplyToken(token); msg.setReplyTunnel(replyTunnelId); msg.setReplyGateway(replyTunnel.getPeer(0)); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": send store thru expl. tunnel to " + peer.getIdentity().getHash() + " w/ token expected " + token); - _state.addPending(to); - TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundExploratoryTunnel(to); + TunnelInfo outTunnel; + if (_state.getAttemptedCount() <= 1) + outTunnel = getContext().tunnelManager().selectOutboundExploratoryTunnel(to); + else + outTunnel = getContext().tunnelManager().selectOutboundTunnel(); if (outTunnel != null) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug(getJobId() + ": Sending tunnel message out " + outTunnelId + " to " - // + peer.getIdentity().getHash().toBase64()); - //TunnelId targetTunnelId = null; // not needed - //Job onSend = null; // not wanted - SendSuccessJob onReply = new SendSuccessJob(getContext(), peer, outTunnel, msg.getMessageSize()); FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now()); - StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration); + StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, msg.getReplyToken(), expiration); if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg); + _log.debug(getJobId() + ": sending store to " + to + " through " + outTunnel + ": " + msg); getContext().messageRegistry().registerPending(selector, onReply, onFail); getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, to); } else { @@ -469,9 +472,9 @@ abstract class StoreJob extends JobImpl { */ private void sendStoreThroughClient(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { final RouterContext ctx = getContext(); - long token = 1 + ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE); Hash client; - if (msg.getEntry().getType() == DatabaseEntry.KEY_TYPE_ENCRYPTED_LS2) { + int dstype = msg.getEntry().getType(); + if (dstype == DatabaseEntry.KEY_TYPE_ENCRYPTED_LS2) { // get the real client hash client = ((LeaseSet)msg.getEntry()).getDestination().calculateHash(); } else { @@ -480,22 +483,40 @@ abstract class StoreJob extends JobImpl { RouterIdentity ident = peer.getIdentity(); Hash to = ident.getHash(); - TunnelInfo replyTunnel = ctx.tunnelManager().selectInboundTunnel(client, to); - if (replyTunnel == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("No reply inbound tunnels available!"); - fail(); - return; + // see comments in method above + Hash replyGW; + TunnelId replyTunnelId; + if (dstype == DatabaseEntry.KEY_TYPE_LS2 || dstype == DatabaseEntry.KEY_TYPE_LEASESET) { + // always pick the reply tunnel from the LS, they will be the newest, + // probably still connected, + // and it gives the ff flexibility to choose another one + LeaseSet ls = (LeaseSet) msg.getEntry(); + Lease lease = pickReplyTunnel(ls, _state.getAttemptedCount(), to); + replyGW = lease.getGateway(); + replyTunnelId = lease.getTunnelId(); + } else { + TunnelInfo replyTunnel; + if (_state.getAttemptedCount() <= 1) + replyTunnel = ctx.tunnelManager().selectInboundTunnel(client, to); + else + replyTunnel = ctx.tunnelManager().selectInboundTunnel(client); + if (replyTunnel == null) { + if (_log.shouldLog(Log.WARN)) + _log.warn("No reply inbound tunnels available!"); + fail(); + return; + } + replyTunnelId = replyTunnel.getReceiveTunnelId(0); + replyGW = replyTunnel.getPeer(0); } - TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0); - msg.setReplyToken(token); msg.setReplyTunnel(replyTunnelId); - msg.setReplyGateway(replyTunnel.getPeer(0)); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": send(dbStore) w/ token expected " + token); + msg.setReplyGateway(replyGW); - TunnelInfo outTunnel = ctx.tunnelManager().selectOutboundTunnel(client, to); + TunnelInfo outTunnel; + if (_state.getAttemptedCount() <= 1) + outTunnel = ctx.tunnelManager().selectOutboundTunnel(client, to); + else + outTunnel = ctx.tunnelManager().selectOutboundTunnel(client); if (outTunnel != null) { I2NPMessage sent; LeaseSetKeys lsk = ctx.keyManager().getKeys(client); @@ -531,10 +552,10 @@ abstract class StoreJob extends JobImpl { } SendSuccessJob onReply = new SendSuccessJob(ctx, peer, outTunnel, sent.getMessageSize()); FailedJob onFail = new FailedJob(ctx, peer, ctx.clock().now()); - StoreMessageSelector selector = new StoreMessageSelector(ctx, getJobId(), peer, token, expiration); + StoreMessageSelector selector = new StoreMessageSelector(ctx, getJobId(), peer, msg.getReplyToken(), expiration); if (_log.shouldLog(Log.DEBUG)) { - _log.debug(getJobId() + ": sending encrypted store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent); + _log.debug(getJobId() + ": sending encrypted store to " + to + " through " + outTunnel + ": " + sent + " with reply to " + replyGW + ' ' + replyTunnelId); } ctx.messageRegistry().registerPending(selector, onReply, onFail); ctx.tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to); @@ -552,6 +573,60 @@ abstract class StoreJob extends JobImpl { } } + /** + * Pick a reply tunnel out of a LeaseSet + * + * @param to pick closest if attempts == 1 + * @since 0.9.53 + */ + private Lease pickReplyTunnel(LeaseSet ls, int attempts, Hash to) { + int c = ls.getLeaseCount(); + if (c <= 0) + throw new IllegalStateException(); + if (c == 1) + return ls.getLease(0); + if (attempts > 1) + return ls.getLease(getContext().random().nextInt(c)); + // pick closest + Lease[] leases = new Lease[c]; + for (int i = 0; i < c; i++) { + leases[i] = ls.getLease(i); + } + Arrays.sort(leases, new LeaseComparator(to)); + return leases[0]; + } + + /** + * Find the lease that is XOR-closest to a given hash + * + * @since 0.9.53 adapted from TunnelPool.TunnelInfoComparator + */ + private static class LeaseComparator implements Comparator<Lease>, Serializable { + private final byte[] _base; + + /** + * @param target key to compare distances with + */ + public LeaseComparator(Hash target) { + _base = target.getData(); + } + + public int compare(Lease lhs, Lease rhs) { + byte lhsb[] = lhs.getGateway().getData(); + byte rhsb[] = rhs.getGateway().getData(); + for (int i = 0; i < _base.length; i++) { + int ld = (lhsb[i] ^ _base[i]) & 0xff; + int rd = (rhsb[i] ^ _base[i]) & 0xff; + if (ld < rd) + return -1; + if (ld > rd) + return 1; + } + // latest-expiring first as a tie-breaker + return (int) (rhs.getEndTime() - lhs.getEndTime()); + } + } + /** * Send a leaseset store message out an exploratory tunnel, * with the reply to come back through a exploratory tunnel. @@ -564,9 +639,13 @@ abstract class StoreJob extends JobImpl { */ private void sendWrappedStoreThroughExploratory(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { final RouterContext ctx = getContext(); - long token = 1 + ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE); Hash to = peer.getIdentity().getHash(); - TunnelInfo replyTunnel = ctx.tunnelManager().selectInboundExploratoryTunnel(to); + // see comments in method above + TunnelInfo replyTunnel; + if (_state.getAttemptedCount() <= 1) + replyTunnel = ctx.tunnelManager().selectInboundExploratoryTunnel(to); + else + replyTunnel = ctx.tunnelManager().selectInboundTunnel(); if (replyTunnel == null) { if (_log.shouldLog(Log.WARN)) _log.warn("No inbound expl. tunnels for reply - delaying..."); @@ -580,14 +659,14 @@ abstract class StoreJob extends JobImpl { return; } TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0); - msg.setReplyToken(token); msg.setReplyTunnel(replyTunnelId); msg.setReplyGateway(replyTunnel.getPeer(0)); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": send(dbStore) w/ token expected " + token); - - TunnelInfo outTunnel = ctx.tunnelManager().selectOutboundExploratoryTunnel(to); + TunnelInfo outTunnel; + if (_state.getAttemptedCount() <= 1) + outTunnel = ctx.tunnelManager().selectOutboundExploratoryTunnel(to); + else + outTunnel = ctx.tunnelManager().selectOutboundTunnel(); if (outTunnel != null) { I2NPMessage sent; // garlic encrypt using router SKM @@ -609,10 +688,10 @@ abstract class StoreJob extends JobImpl { } SendSuccessJob onReply = new SendSuccessJob(ctx, peer, outTunnel, sent.getMessageSize()); FailedJob onFail = new FailedJob(ctx, peer, ctx.clock().now()); - StoreMessageSelector selector = new StoreMessageSelector(ctx, getJobId(), peer, token, expiration); + StoreMessageSelector selector = new StoreMessageSelector(ctx, getJobId(), peer, msg.getReplyToken(), expiration); if (_log.shouldLog(Log.DEBUG)) { - _log.debug(getJobId() + ": sending encrypted store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent); + _log.debug(getJobId() + ": sending encrypted store to " + to + " through " + outTunnel + ": " + sent); } ctx.messageRegistry().registerPending(selector, onReply, onFail); ctx.tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to); @@ -804,7 +883,7 @@ abstract class StoreJob extends JobImpl { if (_onSuccess != null) getContext().jobQueue().addJob(_onSuccess); _state.complete(true); - getContext().statManager().addRateData("netDb.storePeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted()); + getContext().statManager().addRateData("netDb.storePeers", _state.getAttemptedCount()); } /** @@ -819,6 +898,6 @@ abstract class StoreJob extends JobImpl { if (_onFailure != null) getContext().jobQueue().addJob(_onFailure); _state.complete(true); - getContext().statManager().addRateData("netDb.storeFailedPeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted()); + getContext().statManager().addRateData("netDb.storeFailedPeers", _state.getAttemptedCount()); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java index 8404d996d1..14cde571d9 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreState.java @@ -5,6 +5,7 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -23,17 +24,21 @@ class StoreState { private final HashSet<Hash> _pendingPeers; private final Map<Hash, Long> _pendingPeerTimes; private final Map<Hash, MessageWrapper.WrappedMessage> _pendingMessages; - private final HashSet<Hash> _successfulPeers; - //private final HashSet<Hash> _successfulExploratoryPeers; - private final HashSet<Hash> _failedPeers; - private final HashSet<Hash> _attemptedPeers; + private final Set<Hash> _successfulPeers; + private final Set<Hash> _attemptedPeers; private int _completeCount; + private int _attempted; private volatile long _completed; private volatile long _started; public StoreState(RouterContext ctx, Hash key, DatabaseEntry data) { this(ctx, key, data, null); } + + /** + * @param key the DatabaseEntry hash + * @param toSkip may be null, if non-null, all attempted and skipped targets will be added as of 0.9.53 + */ public StoreState(RouterContext ctx, Hash key, DatabaseEntry data, Set<Hash> toSkip) { _context = ctx; _key = key; @@ -41,50 +46,72 @@ class StoreState { _pendingPeers = new HashSet<Hash>(4); _pendingPeerTimes = new HashMap<Hash, Long>(4); _pendingMessages = new ConcurrentHashMap<Hash, WrappedMessage>(4); - _attemptedPeers = new HashSet<Hash>(8); if (toSkip != null) { - _attemptedPeers.addAll(toSkip); - _completeCount = toSkip.size(); + _attemptedPeers = toSkip; + } else { + _attemptedPeers = new HashSet<Hash>(8); } - _failedPeers = new HashSet<Hash>(8); _successfulPeers = new HashSet<Hash>(4); - //_successfulExploratoryPeers = new HashSet(16); _completed = -1; _started = _context.clock().now(); } public Hash getTarget() { return _key; } public DatabaseEntry getData() { return _data; } - public Set<Hash> getPending() { + + /** + * The number of peers pending. + * + * @since 0.9.53 replaces getPending() + */ + public int getPendingCount() { synchronized (_pendingPeers) { - return new HashSet<Hash>(_pendingPeers); + return _pendingPeers.size(); } } + + /** + * The peers attempted OR skipped. + * DOES include skipped peers. + * Use getAttemptedCount for the number of attempts. + */ public Set<Hash> getAttempted() { synchronized (_attemptedPeers) { return new HashSet<Hash>(_attemptedPeers); } } - public Set<Hash> getSuccessful() { - synchronized (_successfulPeers) { - return new HashSet<Hash>(_successfulPeers); - } - } - /** unused */ -/**** - public Set<Hash> getSuccessfulExploratory() { - synchronized (_successfulExploratoryPeers) { - return (Set<Hash>)_successfulExploratoryPeers.clone(); + + /** + * The number of peers attempted. + * Does not include skipped peers. + * Do not use getAttempted().size() as that does include skipped peers. + * + * @since 0.9.53 + */ + public int getAttemptedCount() { + synchronized (_attemptedPeers) { + return _attempted; } } -****/ - /** getFailed */ - public Set<Hash> getFailed() { - synchronized (_failedPeers) { - return new HashSet<Hash>(_failedPeers); + /** + * Return a successful peer (a random one if more than one was successful) + * or null. + * + * @since 0.9.53 formerly returned a copy of the Set + */ + public Hash getSuccessful() { + synchronized (_successfulPeers) { + if (_successfulPeers.isEmpty()) + return null; + try { + return _successfulPeers.iterator().next(); + } catch (NoSuchElementException nsee) { + return null; + } } } + public boolean completed() { return _completed != -1; } public void complete(boolean completed) { if (completed && _completed <= 0) @@ -112,25 +139,21 @@ class StoreState { return _pendingMessages.remove(peer); } + /** + * Increments attempted count + */ public void addPending(Hash peer) { + Long now = Long.valueOf(_context.clock().now()); synchronized (_pendingPeers) { _pendingPeers.add(peer); - _pendingPeerTimes.put(peer, Long.valueOf(_context.clock().now())); - } - synchronized (_attemptedPeers) { - _attemptedPeers.add(peer); - } - } - public void addPending(Collection<Hash> pending) { - synchronized (_pendingPeers) { - _pendingPeers.addAll(pending); - for (Hash peer : pending) - _pendingPeerTimes.put(peer, Long.valueOf(_context.clock().now())); + _pendingPeerTimes.put(peer, now); } synchronized (_attemptedPeers) { - _attemptedPeers.addAll(pending); + if (_attemptedPeers.add(peer)) + _attempted++; } } + /** we aren't even going to try to contact this peer */ public void addSkipped(Hash peer) { synchronized (_attemptedPeers) { @@ -153,79 +176,44 @@ class StoreState { return rv; } - /** unused */ -/**** - public long confirmedExploratory(Hash peer) { - long rv = -1; - synchronized (_pendingPeers) { - _pendingPeers.remove(peer); - Long when = _pendingPeerTimes.remove(peer); - if (when != null) - rv = _context.clock().now() - when.longValue(); - } - synchronized (_successfulExploratoryPeers) { - _successfulExploratoryPeers.add(peer); - } - return rv; - } -****/ - public void replyTimeout(Hash peer) { synchronized (_pendingPeers) { _pendingPeers.remove(peer); } - synchronized (_failedPeers) { - _failedPeers.add(peer); - } } @Override public String toString() { StringBuilder buf = new StringBuilder(256); buf.append("Storing ").append(_key); - buf.append(" "); + buf.append(' '); if (_completed <= 0) buf.append(" completed? false "); else buf.append(" completed on ").append(new Date(_completed)); - buf.append(" Attempted: "); + buf.append(" Attempted: ") + .append(_attempted) + .append(" Attempted+Skipped: "); synchronized (_attemptedPeers) { buf.append(_attemptedPeers.size()).append(' '); for (Hash peer : _attemptedPeers) { - buf.append(peer.toBase64()).append(" "); + buf.append(peer.toBase64()).append(' '); } } buf.append(" Pending: "); synchronized (_pendingPeers) { buf.append(_pendingPeers.size()).append(' '); for (Hash peer : _pendingPeers) { - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Failed: "); - synchronized (_failedPeers) { - buf.append(_failedPeers.size()).append(' '); - for (Hash peer : _failedPeers) { - buf.append(peer.toBase64()).append(" "); + buf.append(peer.toBase64()).append(' '); } } buf.append(" Successful: "); synchronized (_successfulPeers) { buf.append(_successfulPeers.size()).append(' '); for (Hash peer : _successfulPeers) { - buf.append(peer.toBase64()).append(" "); - } - } -/**** - buf.append(" Successful Exploratory: "); - synchronized (_successfulExploratoryPeers) { - buf.append(_successfulExploratoryPeers.size()).append(' '); - for (Iterator<Hash> iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) { - Hash peer = iter.next(); - buf.append(peer.toBase64()).append(" "); + buf.append(peer.toBase64()).append(' '); } } -****/ return buf.toString(); } } -- GitLab