From 3d75b3dc315c2fd89dbef224f881b27e4fa59c16 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sun, 27 Oct 2019 12:24:08 +0000 Subject: [PATCH] OCMOSJ: Keep bundling LS until acked log tweaks --- .../net/i2p/router/message/OutboundCache.java | 2 +- .../OutboundClientMessageOneShotJob.java | 44 ++++++++++++++----- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/router/java/src/net/i2p/router/message/OutboundCache.java b/router/java/src/net/i2p/router/message/OutboundCache.java index 6a4856b978..6763f8cdba 100644 --- a/router/java/src/net/i2p/router/message/OutboundCache.java +++ b/router/java/src/net/i2p/router/message/OutboundCache.java @@ -67,7 +67,7 @@ public class OutboundCache { * * Concurrent. */ - final Map<HashPair, LeaseSet> leaseSetCache = new ConcurrentHashMap<HashPair, LeaseSet>(64); + final ConcurrentHashMap<HashPair, LeaseSet> leaseSetCache = new ConcurrentHashMap<HashPair, LeaseSet>(64); /** * Use the same inbound tunnel (i.e. lease) as we did for the same destination previously, diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 046c0078d1..7fbafc4957 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -326,22 +326,26 @@ public class OutboundClientMessageOneShotJob extends JobImpl { return null; // punt // If the last leaseSet we sent him is still good, don't bother sending again - LeaseSet ls = _cache.leaseSetCache.put(_hashPair, newLS); + // As of 0.9.44, we do not put it in the cache here, we wait until it is acked + // and do it in SendSuccessJob. + if (!force) { + LeaseSet ls = _cache.leaseSetCache.get(_hashPair); if (ls != null) { - if (ls.equals(newLS)) { + if (ls.getDate() >= newLS.getDate()) { if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Found in cache - NOT including reply leaseset for " + _toString); + _log.info(getJobId() + ": LS already acked - NOT sending reply LS to " + _toString); return null; } else { if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Expired from cache - reply leaseset for " + _toString); + _log.info(getJobId() + ": Expired from cache - sending reply LS to " + _toString); } + } else { + if (_log.shouldInfo()) + _log.info(getJobId() + ": Not acked - sending reply LS to " + _toString); } } - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Added to cache - reply leaseset for " + _toString); return newLS; } @@ -367,9 +371,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { if (rc == 0) { send(); } else { - // shouldn't happen + // shouldn't happen unless unsupported encryption if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")"); + _log.warn("Got the lease but can't send to it, failure code " + rc + " (to=" + _toString + ")"); dieFatal(rc); } } @@ -478,7 +482,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // randomize the ordering (so leases with equal # of failures per next // sort are randomly ordered) - Collections.shuffle(leases, getContext().random()); + if (leases.size() > 1) + Collections.shuffle(leases, getContext().random()); /**** if (false) { @@ -680,7 +685,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { if (skm != null) tsh = skm.tagsDelivered(_encryptionKey, sessKey, tags); } - onReply = new SendSuccessJob(getContext(), sessKey, tsh); + onReply = new SendSuccessJob(getContext(), sessKey, tsh, replyLeaseSet); onFail = new SendTimeoutJob(getContext(), sessKey, tsh); long expiration = Math.max(_overallExpiration, _start + REPLY_TIMEOUT_MS_MIN); selector = new ReplySelector(token, expiration); @@ -996,6 +1001,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private class SendSuccessJob extends JobImpl implements ReplyJob { private final SessionKey _key; private final TagSetHandle _tags; + private final LeaseSet _deliveredLS; /** * Create a new success job that will be fired when the message encrypted with @@ -1004,11 +1010,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * * @param key may be null * @param tags may be null + * @param ls the delivered leaseset or null */ - public SendSuccessJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags) { + public SendSuccessJob(RouterContext enclosingContext, SessionKey key, TagSetHandle tags, LeaseSet ls) { super(enclosingContext); _key = key; _tags = tags; + _deliveredLS = ls; } public String getName() { return "Outbound client message send success"; } @@ -1017,6 +1025,20 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * May be run after SendTimeoutJob, will re-add the tags. */ public void runJob() { + if (_deliveredLS != null) { + // note that the delivered LS was acked + LeaseSet oldls = _cache.leaseSetCache.putIfAbsent(_hashPair, _deliveredLS); + if (oldls != null) { + if (_deliveredLS.getDate() > oldls.getDate()) { + _cache.leaseSetCache.put(_hashPair, _deliveredLS); + if (_log.shouldInfo()) + _log.info(getJobId() + ": added to cache - got reply LS from " + _toString); + } + } else { + if (_log.shouldInfo()) + _log.info(getJobId() + ": added to cache - got reply LS from " + _toString); + } + } // do we leak tags here? Result old; // never succeed twice but we can succeed after fail -- GitLab