From e375ffe8f1fcec4518e35c5e1ed565b701b45def Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 2 Jan 2013 13:19:40 +0000 Subject: [PATCH] * I2CP: - Fix leak if nonce = 0 but reliability != none - More work on failure codes (ticket #788) - Fix race with _finished indication in OCMOSJ --- .../client/MessageStatusMessageHandler.java | 36 +++++++++---------- .../router/client/ClientConnectionRunner.java | 9 ++++- .../OutboundClientMessageOneShotJob.java | 31 ++++++++-------- 3 files changed, 41 insertions(+), 35 deletions(-) diff --git a/core/java/src/net/i2p/client/MessageStatusMessageHandler.java b/core/java/src/net/i2p/client/MessageStatusMessageHandler.java index cbd49b9b0e..f967ad9568 100644 --- a/core/java/src/net/i2p/client/MessageStatusMessageHandler.java +++ b/core/java/src/net/i2p/client/MessageStatusMessageHandler.java @@ -30,10 +30,12 @@ class MessageStatusMessageHandler extends HandlerImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Handle message " + message); MessageStatusMessage msg = (MessageStatusMessage) message; - switch (msg.getStatus()) { + int status = msg.getStatus(); + long id = msg.getMessageId(); + switch (status) { case MessageStatusMessage.STATUS_AVAILABLE: ReceiveMessageBeginMessage m = new ReceiveMessageBeginMessage(); - m.setMessageId(msg.getMessageId()); + m.setMessageId(id); m.setSessionId(msg.getSessionId()); try { session.sendMessage(m); @@ -41,27 +43,23 @@ class MessageStatusMessageHandler extends HandlerImpl { _log.error("Error asking for the message", ise); } return; + case MessageStatusMessage.STATUS_SEND_ACCEPTED: - session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus()); + session.receiveStatus((int)id, msg.getNonce(), status); // noop return; - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: - case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: - if (_log.shouldLog(Log.INFO)) - _log.info("Message delivery succeeded for message " + msg.getMessageId()); - //if (!skipStatus) - session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus()); - return; - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: - case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: - if (_log.shouldLog(Log.INFO)) - _log.info("Message delivery FAILED for message " + msg.getMessageId()); + + default: + if (msg.isSuccessful()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Message delivery succeeded for message " + id); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Message delivery FAILED (" + status + ") for message " + id); + } //if (!skipStatus) - session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus()); + session.receiveStatus((int)id, msg.getNonce(), status); return; - default: - if (_log.shouldLog(Log.ERROR)) - _log.error("Invalid message delivery status received: " + msg.getStatus()); } } -} \ No newline at end of file +} diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 10bd79d4b6..bce06ab811 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -270,6 +270,8 @@ class ClientConnectionRunner { * Note that this sends the Guaranteed status codes, even though we only support best effort. * Doesn't do anything if i2cp.messageReliability = "none" * + * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. + * * @param status see I2CP MessageStatusMessage for success/failure codes */ void updateMessageDeliveryStatus(MessageId id, int status) { @@ -357,7 +359,7 @@ class ClientConnectionRunner { expiration = msg.getExpirationTime(); flags = msg.getFlags(); } - if (!_dontSendMSM) + if (message.getNonce() != 0 && !_dontSendMSM) _acceptedPending.add(id); if (_log.shouldLog(Log.DEBUG)) @@ -382,6 +384,9 @@ class ClientConnectionRunner { * for delivery (but not necessarily delivered) * Doesn't do anything if i2cp.messageReliability = "none" * or if the nonce is 0. + * + * @param id OUR id for the message + * @param nonce HIS id for the message */ void ackSendMessage(MessageId id, long nonce) { if (_dontSendMSM || nonce == 0) @@ -630,6 +635,8 @@ class ClientConnectionRunner { private long _lastTried; /** + * Do not use for status = STATUS_SEND_ACCEPTED; use ackSendMessage() for that. + * * @param status see I2CP MessageStatusMessage for success/failure codes */ public MessageDeliveryStatusUpdate(MessageId id, int status) { diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 18b7daca06..e62dcf9d2e 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -6,6 +6,7 @@ import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import net.i2p.client.SendMessageOptions; import net.i2p.crypto.SessionKeyManager; @@ -59,7 +60,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { private PayloadGarlicConfig _clove; private long _cloveId; private final long _start; - private boolean _finished; + private final AtomicBoolean _finished = new AtomicBoolean(); private long _leaseSetLookupBegin; private TunnelInfo _outTunnel; private TunnelInfo _inTunnel; @@ -199,7 +200,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public void runJob() { long now = getContext().clock().now(); if (now >= _overallExpiration) { - dieFatal(); + dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED); return; } //if (_log.shouldLog(Log.DEBUG)) @@ -271,7 +272,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // shouldn't happen if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")"); - dieFatal(); + dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET); } } } @@ -403,12 +404,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl { getContext().statManager().addRateData("client.leaseSetFailedRemoteTime", lookupTime, lookupTime); } - if (!_finished) { + if (!_finished.get()) { if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send to " + _toString + " because we couldn't find their leaseSet"); } - dieFatal(); + dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET); } } @@ -421,10 +422,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { * */ private void send() { - if (_finished) return; + if (_finished.get()) return; long now = getContext().clock().now(); if (now >= _overallExpiration) { - dieFatal(); + dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED); return; } @@ -480,7 +481,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { boolean ok = buildClove(); if (!ok) { - dieFatal(); + dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION); return; } //if (_log.shouldLog(Log.DEBUG)) @@ -501,7 +502,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Unable to create the garlic message (no tunnels left or too lagged) to " + _toString); getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0); - dieFatal(); + dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS); return; } @@ -545,7 +546,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { if (_log.shouldLog(Log.WARN)) _log.warn(getJobId() + ": Could not find any outbound tunnels to send the payload through... this might take a while"); getContext().statManager().addRateData("client.dispatchNoTunnels", now - _start, 0); - dieFatal(); + dieFatal(MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS); } _clove = null; getContext().statManager().addRateData("client.dispatchPrepareTime", now - _start, 0); @@ -681,8 +682,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } private void dieFatal(int status) { - if (_finished) return; - _finished = true; + if (_finished.getAndSet(true)) + return; long sendTime = getContext().clock().now() - _start; if (_log.shouldLog(Log.WARN)) @@ -811,8 +812,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public void runJob() { // do we leak tags here? - if (_finished) return; - _finished = true; + if (_finished.getAndSet(true)) + return; long sendTime = getContext().clock().now() - _start; if (_log.shouldLog(Log.INFO)) _log.info(OutboundClientMessageOneShotJob.this.getJobId() @@ -883,7 +884,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { if (skm != null) skm.failTags(_leaseSet.getEncryptionKey(), _key, _tags); } - dieFatal(); + dieFatal(MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE); } } } -- GitLab