I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Unverified Commit f1170b94 authored by zzz's avatar zzz
Browse files

NetDB: StoreJob reliability improvements

- Always use a lease as the reply tunnel when publishing LS through a client tunnel
  This ensures we're not using about-to-expire tunnels for the reply,
  and ensures the ff is able pick an alternate
- Don't count skipped peers as attempted in FloodfillVerify
- Pass failed and skipped peers to FloodfillVerify job to be skipped there also
- Pass failed and skipped peers from FloodfillVerify job to the next StoreJob on failure
- Consolidate common reply token generation code in StoreJob
- Ensure tunnel diversity in StoreJob retries by only
  using tunnels closest to the target for the first request.
  This increases reliability by not reusing the same tunnels for all retries.
- Refactor StoreState to rework inefficient methods and unused data
- Clean up commented-out code in StoreState
- Log tweaks
parent 59ab4077
No related branches found
No related tags found
No related merge requests found
......@@ -8,7 +8,6 @@ package net.i2p.router.networkdb.kademlia;
*
*/
import java.util.NoSuchElementException;
import java.util.Set;
import net.i2p.data.DatabaseEntry;
......@@ -98,10 +97,7 @@ class FloodfillStoreJob extends StoreJob {
published = data.getDate();
}
// we should always have exactly one successful entry
Hash sentTo = null;
try {
sentTo = _state.getSuccessful().iterator().next();
} catch (NoSuchElementException nsee) {}
Hash sentTo = _state.getSuccessful();
Hash client;
if (type == DatabaseEntry.KEY_TYPE_ENCRYPTED_LS2) {
// get the real client hash
......@@ -111,7 +107,7 @@ class FloodfillStoreJob extends StoreJob {
}
Job fvsj = new FloodfillVerifyStoreJob(ctx, key, client,
published, type,
sentTo, _facade);
sentTo, _state.getAttempted(), _facade);
if (shouldLog)
_log.info(getJobId() + ": Succeeded sending key " + key +
", queueing verify job " + fvsj.getJobId());
......
......@@ -40,6 +40,7 @@ class FloodfillVerifyStoreJob extends JobImpl {
private final FloodfillNetworkDatabaseFacade _facade;
private long _expiration;
private long _sendTime;
private int _attempted;
private final long _published;
private final int _type;
private final boolean _isRouterInfo;
......@@ -59,9 +60,11 @@ class FloodfillVerifyStoreJob extends JobImpl {
* @param client generally the same as key, unless encrypted LS2; non-null
* @param published getDate() for RI or LS1, getPublished() for LS2
* @param sentTo who to give the credit or blame to, can be null
* @param toSkip don't query any of these peers, may be null
* @since 0.9.53 added toSkip param
*/
public FloodfillVerifyStoreJob(RouterContext ctx, Hash key, Hash client, long published, int type,
Hash sentTo, FloodfillNetworkDatabaseFacade facade) {
Hash sentTo, Set<Hash> toSkip, FloodfillNetworkDatabaseFacade facade) {
super(ctx);
facade.verifyStarted(key);
_key = key;
......@@ -73,7 +76,12 @@ class FloodfillVerifyStoreJob extends JobImpl {
_log = ctx.logManager().getLog(getClass());
_sentTo = sentTo;
_facade = facade;
_ignore = new HashSet<Hash>(MAX_PEERS_TO_TRY);
_ignore = new HashSet<Hash>(8);
if (toSkip != null) {
synchronized(toSkip) {
_ignore.addAll(toSkip);
}
}
if (sentTo != null) {
_ipSet = new MaskedIPSet(ctx, sentTo, IP_CLOSE_BYTES);
_ignore.add(_sentTo);
......@@ -192,7 +200,7 @@ class FloodfillVerifyStoreJob extends JobImpl {
// We don't have a compatible way to get a reply,
// skip it for now.
if (_log.shouldWarn())
_log.warn("Skipping store verify for ECIES client " + _client.toBase32());
_log.warn("Skipping store verify to " + _target + " for ECIES- or ElG-only client " + _client.toBase32());
_facade.verifyFinished(_key);
return;
}
......@@ -242,6 +250,7 @@ class FloodfillVerifyStoreJob extends JobImpl {
new VerifyReplyJob(getContext()),
new VerifyTimeoutJob(getContext()));
ctx.tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), _target);
_attempted++;
}
/**
......@@ -442,10 +451,14 @@ class FloodfillVerifyStoreJob extends JobImpl {
_log.info(getJobId() + ": Verify failed, but new store already happened for: " + _key);
return;
}
Set<Hash> toSkip = new HashSet<Hash>(2);
Set<Hash> toSkip = new HashSet<Hash>(8);
if (_sentTo != null)
toSkip.add(_sentTo);
toSkip.add(_target);
// pass over all the ignores for the next attempt
// unless we've had a crazy number of attempts, then start over
if (_ignore.size() < 20)
toSkip.addAll(_ignore);
if (_log.shouldWarn())
_log.warn(getJobId() + ": Verify failed, starting new store for: " + _key);
_facade.sendStore(_key, ds, null, null, FloodfillNetworkDatabaseFacade.PUBLISH_TIMEOUT, toSkip);
......@@ -467,7 +480,7 @@ class FloodfillVerifyStoreJob extends JobImpl {
getContext().statManager().addRateData("netDb.floodfillVerifyTimeout", getContext().clock().now() - _sendTime);
if (_log.shouldLog(Log.WARN))
_log.warn(getJobId() + ": Verify timed out for: " + _key);
if (_ignore.size() < MAX_PEERS_TO_TRY) {
if (_attempted < MAX_PEERS_TO_TRY) {
// Don't resend, simply rerun FVSJ.this inline and
// chose somebody besides _target for verification
_ignore.add(_target);
......
......@@ -8,7 +8,10 @@ package net.i2p.router.networkdb.kademlia;
*
*/
import java.io.Serializable;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -19,7 +22,9 @@ import net.i2p.data.Base64;
import net.i2p.data.Certificate;
import net.i2p.data.DatabaseEntry;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.Lease;
import net.i2p.data.LeaseSet;
import net.i2p.data.router.RouterIdentity;
import net.i2p.data.router.RouterInfo;
......@@ -128,7 +133,7 @@ abstract class StoreJob extends JobImpl {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Expired: " + _timeoutMs);
fail();
} else if (_state.getAttempted().size() > MAX_PEERS_SENT) {
} else if (_state.getAttemptedCount() > MAX_PEERS_SENT) {
_state.complete(true);
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Max sent");
......@@ -155,7 +160,7 @@ abstract class StoreJob extends JobImpl {
*/
private synchronized void continueSending() {
if (_state.completed()) return;
int toCheck = getParallelization() - _state.getPending().size();
int toCheck = getParallelization() - _state.getPendingCount();
if (toCheck <= 0) {
// too many already pending
if (_log.shouldLog(Log.DEBUG))
......@@ -177,7 +182,7 @@ abstract class StoreJob extends JobImpl {
// closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted());
closestHashes = getClosestFloodfillRouters(_state.getTarget(), toCheck, _state.getAttempted());
if ( (closestHashes == null) || (closestHashes.isEmpty()) ) {
if (_state.getPending().isEmpty()) {
if (_state.getPendingCount() <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": No more peers left and none pending");
fail();
......@@ -187,7 +192,6 @@ abstract class StoreJob extends JobImpl {
return;
}
} else {
//_state.addPending(closestHashes);
int queued = 0;
int skipped = 0;
int type = _state.getData().getType();
......@@ -255,14 +259,14 @@ abstract class StoreJob extends JobImpl {
// if (!((RouterInfo)ds).isHidden()) {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Continue sending key " + _state.getTarget() +
" after " + _state.getAttempted().size() + " tries to " + closestHashes);
" after " + _state.getAttemptedCount() + " tries to " + closestHashes);
_state.addPending(peer);
sendStore((RouterInfo)ds, peerTimeout);
queued++;
//}
}
}
if (queued == 0 && _state.getPending().isEmpty()) {
if (queued == 0 && _state.getPendingCount() <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": No more peers left after skipping " + skipped + " and none pending");
// queue a job to go around again rather than recursing
......@@ -322,6 +326,12 @@ abstract class StoreJob extends JobImpl {
_log.error("Hash mismatch StoreJob");
return;
}
if (router.getIdentity().equals(getContext().router().getRouterInfo().getIdentity())) {
// don't send it to ourselves
_log.error(getJobId() + ": Dont send store to ourselves - why did we try?");
return;
}
DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext());
int type = _state.getData().getType();
if (type == DatabaseEntry.KEY_TYPE_ROUTERINFO) {
......@@ -334,16 +344,11 @@ abstract class StoreJob extends JobImpl {
long now = getContext().clock().now();
msg.setMessageExpiration(now + _timeoutMs);
if (router.getIdentity().equals(getContext().router().getRouterInfo().getIdentity())) {
// don't send it to ourselves
if (_log.shouldLog(Log.ERROR))
_log.error(getJobId() + ": Dont send store to ourselves - why did we try?");
return;
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(getJobId() + ": Send store timeout is " + responseTime);
long token = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
msg.setReplyToken(token);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": send(dbStore) w/ token expected " + msg.getReplyToken() + " msg exp. " + _timeoutMs + " resp exp. " + responseTime);
sendStore(msg, router, now + responseTime);
}
......@@ -381,18 +386,17 @@ abstract class StoreJob extends JobImpl {
*
*/
private void sendDirect(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
long token = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
msg.setReplyToken(token);
msg.setReplyGateway(getContext().routerHash());
_state.addPending(peer.getIdentity().getHash());
Hash to = peer.getIdentity().getHash();
_state.addPending(to);
SendSuccessJob onReply = new SendSuccessJob(getContext(), peer);
FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now());
StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration);
StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, msg.getReplyToken(), expiration);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": sending store directly to " + peer.getIdentity().getHash());
_log.debug(getJobId() + ": sending store directly to " + to);
OutNetMessage m = new OutNetMessage(getContext(), msg, expiration, STORE_PRIORITY, peer);
m.setOnFailedReplyJob(onFail);
m.setOnFailedSendJob(onFail);
......@@ -410,38 +414,37 @@ abstract class StoreJob extends JobImpl {
* @since 0.9.41 renamed from sendStoreThroughGarlic()
*/
private void sendStoreThroughExploratory(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
long token = 1 + getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE);
Hash to = peer.getIdentity().getHash();
TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundExploratoryTunnel(to);
// For all tunnel selections, the first time we pick the tunnel with the far-end closest
// to the target. After that we pick a random tunnel, or else we'd pick the
// same tunnels every time.
TunnelInfo replyTunnel;
if (_state.getAttemptedCount() <= 1)
replyTunnel = getContext().tunnelManager().selectInboundExploratoryTunnel(to);
else
replyTunnel = getContext().tunnelManager().selectInboundTunnel();
if (replyTunnel == null) {
_log.warn("No reply inbound tunnels available!");
return;
}
TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0);
msg.setReplyToken(token);
msg.setReplyTunnel(replyTunnelId);
msg.setReplyGateway(replyTunnel.getPeer(0));
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": send store thru expl. tunnel to " + peer.getIdentity().getHash() + " w/ token expected " + token);
_state.addPending(to);
TunnelInfo outTunnel = getContext().tunnelManager().selectOutboundExploratoryTunnel(to);
TunnelInfo outTunnel;
if (_state.getAttemptedCount() <= 1)
outTunnel = getContext().tunnelManager().selectOutboundExploratoryTunnel(to);
else
outTunnel = getContext().tunnelManager().selectOutboundTunnel();
if (outTunnel != null) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(getJobId() + ": Sending tunnel message out " + outTunnelId + " to "
// + peer.getIdentity().getHash().toBase64());
//TunnelId targetTunnelId = null; // not needed
//Job onSend = null; // not wanted
SendSuccessJob onReply = new SendSuccessJob(getContext(), peer, outTunnel, msg.getMessageSize());
FailedJob onFail = new FailedJob(getContext(), peer, getContext().clock().now());
StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, token, expiration);
StoreMessageSelector selector = new StoreMessageSelector(getContext(), getJobId(), peer, msg.getReplyToken(), expiration);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg);
_log.debug(getJobId() + ": sending store to " + to + " through " + outTunnel + ": " + msg);
getContext().messageRegistry().registerPending(selector, onReply, onFail);
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, to);
} else {
......@@ -469,9 +472,9 @@ abstract class StoreJob extends JobImpl {
*/
private void sendStoreThroughClient(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
final RouterContext ctx = getContext();
long token = 1 + ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE);
Hash client;
if (msg.getEntry().getType() == DatabaseEntry.KEY_TYPE_ENCRYPTED_LS2) {
int dstype = msg.getEntry().getType();
if (dstype == DatabaseEntry.KEY_TYPE_ENCRYPTED_LS2) {
// get the real client hash
client = ((LeaseSet)msg.getEntry()).getDestination().calculateHash();
} else {
......@@ -480,22 +483,40 @@ abstract class StoreJob extends JobImpl {
RouterIdentity ident = peer.getIdentity();
Hash to = ident.getHash();
TunnelInfo replyTunnel = ctx.tunnelManager().selectInboundTunnel(client, to);
if (replyTunnel == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No reply inbound tunnels available!");
fail();
return;
// see comments in method above
Hash replyGW;
TunnelId replyTunnelId;
if (dstype == DatabaseEntry.KEY_TYPE_LS2 || dstype == DatabaseEntry.KEY_TYPE_LEASESET) {
// always pick the reply tunnel from the LS, they will be the newest,
// probably still connected,
// and it gives the ff flexibility to choose another one
LeaseSet ls = (LeaseSet) msg.getEntry();
Lease lease = pickReplyTunnel(ls, _state.getAttemptedCount(), to);
replyGW = lease.getGateway();
replyTunnelId = lease.getTunnelId();
} else {
TunnelInfo replyTunnel;
if (_state.getAttemptedCount() <= 1)
replyTunnel = ctx.tunnelManager().selectInboundTunnel(client, to);
else
replyTunnel = ctx.tunnelManager().selectInboundTunnel(client);
if (replyTunnel == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No reply inbound tunnels available!");
fail();
return;
}
replyTunnelId = replyTunnel.getReceiveTunnelId(0);
replyGW = replyTunnel.getPeer(0);
}
TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0);
msg.setReplyToken(token);
msg.setReplyTunnel(replyTunnelId);
msg.setReplyGateway(replyTunnel.getPeer(0));
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": send(dbStore) w/ token expected " + token);
msg.setReplyGateway(replyGW);
TunnelInfo outTunnel = ctx.tunnelManager().selectOutboundTunnel(client, to);
TunnelInfo outTunnel;
if (_state.getAttemptedCount() <= 1)
outTunnel = ctx.tunnelManager().selectOutboundTunnel(client, to);
else
outTunnel = ctx.tunnelManager().selectOutboundTunnel(client);
if (outTunnel != null) {
I2NPMessage sent;
LeaseSetKeys lsk = ctx.keyManager().getKeys(client);
......@@ -531,10 +552,10 @@ abstract class StoreJob extends JobImpl {
}
SendSuccessJob onReply = new SendSuccessJob(ctx, peer, outTunnel, sent.getMessageSize());
FailedJob onFail = new FailedJob(ctx, peer, ctx.clock().now());
StoreMessageSelector selector = new StoreMessageSelector(ctx, getJobId(), peer, token, expiration);
StoreMessageSelector selector = new StoreMessageSelector(ctx, getJobId(), peer, msg.getReplyToken(), expiration);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getJobId() + ": sending encrypted store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent);
_log.debug(getJobId() + ": sending encrypted store to " + to + " through " + outTunnel + ": " + sent + " with reply to " + replyGW + ' ' + replyTunnelId);
}
ctx.messageRegistry().registerPending(selector, onReply, onFail);
ctx.tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to);
......@@ -552,6 +573,60 @@ abstract class StoreJob extends JobImpl {
}
}
/**
* Pick a reply tunnel out of a LeaseSet
*
* @param to pick closest if attempts == 1
* @since 0.9.53
*/
private Lease pickReplyTunnel(LeaseSet ls, int attempts, Hash to) {
int c = ls.getLeaseCount();
if (c <= 0)
throw new IllegalStateException();
if (c == 1)
return ls.getLease(0);
if (attempts > 1)
return ls.getLease(getContext().random().nextInt(c));
// pick closest
Lease[] leases = new Lease[c];
for (int i = 0; i < c; i++) {
leases[i] = ls.getLease(i);
}
Arrays.sort(leases, new LeaseComparator(to));
return leases[0];
}
/**
* Find the lease that is XOR-closest to a given hash
*
* @since 0.9.53 adapted from TunnelPool.TunnelInfoComparator
*/
private static class LeaseComparator implements Comparator<Lease>, Serializable {
private final byte[] _base;
/**
* @param target key to compare distances with
*/
public LeaseComparator(Hash target) {
_base = target.getData();
}
public int compare(Lease lhs, Lease rhs) {
byte lhsb[] = lhs.getGateway().getData();
byte rhsb[] = rhs.getGateway().getData();
for (int i = 0; i < _base.length; i++) {
int ld = (lhsb[i] ^ _base[i]) & 0xff;
int rd = (rhsb[i] ^ _base[i]) & 0xff;
if (ld < rd)
return -1;
if (ld > rd)
return 1;
}
// latest-expiring first as a tie-breaker
return (int) (rhs.getEndTime() - lhs.getEndTime());
}
}
/**
* Send a leaseset store message out an exploratory tunnel,
* with the reply to come back through a exploratory tunnel.
......@@ -564,9 +639,13 @@ abstract class StoreJob extends JobImpl {
*/
private void sendWrappedStoreThroughExploratory(DatabaseStoreMessage msg, RouterInfo peer, long expiration) {
final RouterContext ctx = getContext();
long token = 1 + ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE);
Hash to = peer.getIdentity().getHash();
TunnelInfo replyTunnel = ctx.tunnelManager().selectInboundExploratoryTunnel(to);
// see comments in method above
TunnelInfo replyTunnel;
if (_state.getAttemptedCount() <= 1)
replyTunnel = ctx.tunnelManager().selectInboundExploratoryTunnel(to);
else
replyTunnel = ctx.tunnelManager().selectInboundTunnel();
if (replyTunnel == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("No inbound expl. tunnels for reply - delaying...");
......@@ -580,14 +659,14 @@ abstract class StoreJob extends JobImpl {
return;
}
TunnelId replyTunnelId = replyTunnel.getReceiveTunnelId(0);
msg.setReplyToken(token);
msg.setReplyTunnel(replyTunnelId);
msg.setReplyGateway(replyTunnel.getPeer(0));
if (_log.shouldLog(Log.DEBUG))
_log.debug(getJobId() + ": send(dbStore) w/ token expected " + token);
TunnelInfo outTunnel = ctx.tunnelManager().selectOutboundExploratoryTunnel(to);
TunnelInfo outTunnel;
if (_state.getAttemptedCount() <= 1)
outTunnel = ctx.tunnelManager().selectOutboundExploratoryTunnel(to);
else
outTunnel = ctx.tunnelManager().selectOutboundTunnel();
if (outTunnel != null) {
I2NPMessage sent;
// garlic encrypt using router SKM
......@@ -609,10 +688,10 @@ abstract class StoreJob extends JobImpl {
}
SendSuccessJob onReply = new SendSuccessJob(ctx, peer, outTunnel, sent.getMessageSize());
FailedJob onFail = new FailedJob(ctx, peer, ctx.clock().now());
StoreMessageSelector selector = new StoreMessageSelector(ctx, getJobId(), peer, token, expiration);
StoreMessageSelector selector = new StoreMessageSelector(ctx, getJobId(), peer, msg.getReplyToken(), expiration);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getJobId() + ": sending encrypted store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + sent);
_log.debug(getJobId() + ": sending encrypted store to " + to + " through " + outTunnel + ": " + sent);
}
ctx.messageRegistry().registerPending(selector, onReply, onFail);
ctx.tunnelDispatcher().dispatchOutbound(sent, outTunnel.getSendTunnelId(0), null, to);
......@@ -804,7 +883,7 @@ abstract class StoreJob extends JobImpl {
if (_onSuccess != null)
getContext().jobQueue().addJob(_onSuccess);
_state.complete(true);
getContext().statManager().addRateData("netDb.storePeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted());
getContext().statManager().addRateData("netDb.storePeers", _state.getAttemptedCount());
}
/**
......@@ -819,6 +898,6 @@ abstract class StoreJob extends JobImpl {
if (_onFailure != null)
getContext().jobQueue().addJob(_onFailure);
_state.complete(true);
getContext().statManager().addRateData("netDb.storeFailedPeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted());
getContext().statManager().addRateData("netDb.storeFailedPeers", _state.getAttemptedCount());
}
}
......@@ -5,6 +5,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
......@@ -23,17 +24,21 @@ class StoreState {
private final HashSet<Hash> _pendingPeers;
private final Map<Hash, Long> _pendingPeerTimes;
private final Map<Hash, MessageWrapper.WrappedMessage> _pendingMessages;
private final HashSet<Hash> _successfulPeers;
//private final HashSet<Hash> _successfulExploratoryPeers;
private final HashSet<Hash> _failedPeers;
private final HashSet<Hash> _attemptedPeers;
private final Set<Hash> _successfulPeers;
private final Set<Hash> _attemptedPeers;
private int _completeCount;
private int _attempted;
private volatile long _completed;
private volatile long _started;
public StoreState(RouterContext ctx, Hash key, DatabaseEntry data) {
this(ctx, key, data, null);
}
/**
* @param key the DatabaseEntry hash
* @param toSkip may be null, if non-null, all attempted and skipped targets will be added as of 0.9.53
*/
public StoreState(RouterContext ctx, Hash key, DatabaseEntry data, Set<Hash> toSkip) {
_context = ctx;
_key = key;
......@@ -41,50 +46,72 @@ class StoreState {
_pendingPeers = new HashSet<Hash>(4);
_pendingPeerTimes = new HashMap<Hash, Long>(4);
_pendingMessages = new ConcurrentHashMap<Hash, WrappedMessage>(4);
_attemptedPeers = new HashSet<Hash>(8);
if (toSkip != null) {
_attemptedPeers.addAll(toSkip);
_completeCount = toSkip.size();
_attemptedPeers = toSkip;
} else {
_attemptedPeers = new HashSet<Hash>(8);
}
_failedPeers = new HashSet<Hash>(8);
_successfulPeers = new HashSet<Hash>(4);
//_successfulExploratoryPeers = new HashSet(16);
_completed = -1;
_started = _context.clock().now();
}
public Hash getTarget() { return _key; }
public DatabaseEntry getData() { return _data; }
public Set<Hash> getPending() {
/**
* The number of peers pending.
*
* @since 0.9.53 replaces getPending()
*/
public int getPendingCount() {
synchronized (_pendingPeers) {
return new HashSet<Hash>(_pendingPeers);
return _pendingPeers.size();
}
}
/**
* The peers attempted OR skipped.
* DOES include skipped peers.
* Use getAttemptedCount for the number of attempts.
*/
public Set<Hash> getAttempted() {
synchronized (_attemptedPeers) {
return new HashSet<Hash>(_attemptedPeers);
}
}
public Set<Hash> getSuccessful() {
synchronized (_successfulPeers) {
return new HashSet<Hash>(_successfulPeers);
}
}
/** unused */
/****
public Set<Hash> getSuccessfulExploratory() {
synchronized (_successfulExploratoryPeers) {
return (Set<Hash>)_successfulExploratoryPeers.clone();
/**
* The number of peers attempted.
* Does not include skipped peers.
* Do not use getAttempted().size() as that does include skipped peers.
*
* @since 0.9.53
*/
public int getAttemptedCount() {
synchronized (_attemptedPeers) {
return _attempted;
}
}
****/
/** getFailed */
public Set<Hash> getFailed() {
synchronized (_failedPeers) {
return new HashSet<Hash>(_failedPeers);
/**
* Return a successful peer (a random one if more than one was successful)
* or null.
*
* @since 0.9.53 formerly returned a copy of the Set
*/
public Hash getSuccessful() {
synchronized (_successfulPeers) {
if (_successfulPeers.isEmpty())
return null;
try {
return _successfulPeers.iterator().next();
} catch (NoSuchElementException nsee) {
return null;
}
}
}
public boolean completed() { return _completed != -1; }
public void complete(boolean completed) {
if (completed && _completed <= 0)
......@@ -112,25 +139,21 @@ class StoreState {
return _pendingMessages.remove(peer);
}
/**
* Increments attempted count
*/
public void addPending(Hash peer) {
Long now = Long.valueOf(_context.clock().now());
synchronized (_pendingPeers) {
_pendingPeers.add(peer);
_pendingPeerTimes.put(peer, Long.valueOf(_context.clock().now()));
}
synchronized (_attemptedPeers) {
_attemptedPeers.add(peer);
}
}
public void addPending(Collection<Hash> pending) {
synchronized (_pendingPeers) {
_pendingPeers.addAll(pending);
for (Hash peer : pending)
_pendingPeerTimes.put(peer, Long.valueOf(_context.clock().now()));
_pendingPeerTimes.put(peer, now);
}
synchronized (_attemptedPeers) {
_attemptedPeers.addAll(pending);
if (_attemptedPeers.add(peer))
_attempted++;
}
}
/** we aren't even going to try to contact this peer */
public void addSkipped(Hash peer) {
synchronized (_attemptedPeers) {
......@@ -153,79 +176,44 @@ class StoreState {
return rv;
}
/** unused */
/****
public long confirmedExploratory(Hash peer) {
long rv = -1;
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
Long when = _pendingPeerTimes.remove(peer);
if (when != null)
rv = _context.clock().now() - when.longValue();
}
synchronized (_successfulExploratoryPeers) {
_successfulExploratoryPeers.add(peer);
}
return rv;
}
****/
public void replyTimeout(Hash peer) {
synchronized (_pendingPeers) {
_pendingPeers.remove(peer);
}
synchronized (_failedPeers) {
_failedPeers.add(peer);
}
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(256);
buf.append("Storing ").append(_key);
buf.append(" ");
buf.append(' ');
if (_completed <= 0)
buf.append(" completed? false ");
else
buf.append(" completed on ").append(new Date(_completed));
buf.append(" Attempted: ");
buf.append(" Attempted: ")
.append(_attempted)
.append(" Attempted+Skipped: ");
synchronized (_attemptedPeers) {
buf.append(_attemptedPeers.size()).append(' ');
for (Hash peer : _attemptedPeers) {
buf.append(peer.toBase64()).append(" ");
buf.append(peer.toBase64()).append(' ');
}
}
buf.append(" Pending: ");
synchronized (_pendingPeers) {
buf.append(_pendingPeers.size()).append(' ');
for (Hash peer : _pendingPeers) {
buf.append(peer.toBase64()).append(" ");
}
}
buf.append(" Failed: ");
synchronized (_failedPeers) {
buf.append(_failedPeers.size()).append(' ');
for (Hash peer : _failedPeers) {
buf.append(peer.toBase64()).append(" ");
buf.append(peer.toBase64()).append(' ');
}
}
buf.append(" Successful: ");
synchronized (_successfulPeers) {
buf.append(_successfulPeers.size()).append(' ');
for (Hash peer : _successfulPeers) {
buf.append(peer.toBase64()).append(" ");
}
}
/****
buf.append(" Successful Exploratory: ");
synchronized (_successfulExploratoryPeers) {
buf.append(_successfulExploratoryPeers.size()).append(' ');
for (Iterator<Hash> iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) {
Hash peer = iter.next();
buf.append(peer.toBase64()).append(" ");
buf.append(peer.toBase64()).append(' ');
}
}
****/
return buf.toString();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment