From a11bd7cbe7f3760efd85820dcb07a52e3ba4cc64 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 8 Feb 2017 15:22:41 +0000 Subject: [PATCH] I2CP: Return local delivery failure on queue overflow (ticket #1939) --- .../client/streaming/impl/PacketQueue.java | 3 +- .../router/client/ClientConnectionRunner.java | 28 +++++++++++-------- .../net/i2p/router/client/ClientManager.java | 7 ++--- .../i2p/router/client/MessageReceivedJob.java | 14 +++++++++- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java index 985065110e..aaa91e685a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java @@ -250,6 +250,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable { case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS: // probably took a long time to open the tunnel, allow retx case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED: + // overflow in router-side I2CP queue, sent as of 0.9.29, will be retried + case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL: if (_log.shouldLog(Log.WARN)) _log.warn("Rcvd soft failure status " + status + " for msg " + msgId + " on " + con); _messageStatusMap.remove(id); @@ -269,7 +271,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable { break; - case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL: case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER: case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK: case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION: diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 0e00b6af78..bd09aa2494 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -720,40 +720,44 @@ class ClientConnectionRunner { } /** - * Asynchronously deliver the message to the current runner + * Synchronously deliver the message to the current runner * - * Note that no failure indication is available. - * Fails silently on e.g. queue overflow to client, client dead, etc. + * Failure indication is available as of 0.9.29. + * Fails on e.g. queue overflow to client, client dead, etc. * * @param toDest non-null * @param fromDest generally null when from remote, non-null if from local + * @return success */ - void receiveMessage(Destination toDest, Destination fromDest, Payload payload) { - if (_dead) return; + boolean receiveMessage(Destination toDest, Destination fromDest, Payload payload) { + if (_dead) + return false; MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload, _dontSendMSMOnReceive); // This is fast and non-blocking, run in-line //_context.jobQueue().addJob(j); - j.runJob(); + //j.runJob(); + return j.receiveMessage(); } /** - * Asynchronously deliver the message to the current runner + * Synchronously deliver the message to the current runner * - * Note that no failure indication is available. - * Fails silently on e.g. queue overflow to client, client dead, etc. + * Failure indication is available as of 0.9.29. + * Fails on e.g. queue overflow to client, client dead, etc. * * @param toHash non-null * @param fromDest generally null when from remote, non-null if from local + * @return success * @since 0.9.21 */ - void receiveMessage(Hash toHash, Destination fromDest, Payload payload) { + boolean receiveMessage(Hash toHash, Destination fromDest, Payload payload) { SessionParams sp = _sessions.get(toHash); if (sp == null) { if (_log.shouldLog(Log.WARN)) _log.warn("No session found for receiveMessage()"); - return; + return false; } - receiveMessage(sp.dest, fromDest, payload); + return receiveMessage(sp.dest, fromDest, payload); } /** diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index fbea7a7f21..521b15ccf3 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -417,11 +417,10 @@ class ClientManager { public String getName() { return "Distribute local message"; } public void runJob() { - _to.receiveMessage(_toDest, _fromDest, _payload); - // note that receiveMessage() does not indicate a failure, - // so a queue overflow is not recognized. we always return success. + boolean ok = _to.receiveMessage(_toDest, _fromDest, _payload); if (_from != null) { - _from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); + int rc = ok ? MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL : MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL; + _from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, rc); } } } diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 02ecc8dd62..1fd9651b0f 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -48,7 +48,17 @@ class MessageReceivedJob extends JobImpl { public String getName() { return "Deliver New Message"; } public void runJob() { - if (_runner.isDead()) return; + receiveMessage(); + } + + /** + * Same as runJob() but with a return value + * @return success + * @since 0.9.29 + */ + public boolean receiveMessage() { + if (_runner.isDead()) + return false; MessageId id = null; try { long nextID = _runner.getNextMessageId(); @@ -59,6 +69,7 @@ class MessageReceivedJob extends JobImpl { _runner.setPayload(id, _payload); messageAvailable(id, _payload.getSize()); } + return true; } catch (I2CPMessageException ime) { String msg = "Error sending data to client " + _runner.getDestHash(); if (_log.shouldWarn()) @@ -67,6 +78,7 @@ class MessageReceivedJob extends JobImpl { _log.logAlways(Log.WARN, msg); if (id != null && !_sendDirect) _runner.removePayload(id); + return false; } } -- GitLab