diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java index 985065110e4b65782e1c2184442d16145b01a0c5..aaa91e685ac58f46291fbf0f9795310dba15de99 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java @@ -250,6 +250,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable { case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS: // probably took a long time to open the tunnel, allow retx case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED: + // overflow in router-side I2CP queue, sent as of 0.9.29, will be retried + case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL: if (_log.shouldLog(Log.WARN)) _log.warn("Rcvd soft failure status " + status + " for msg " + msgId + " on " + con); _messageStatusMap.remove(id); @@ -269,7 +271,6 @@ class PacketQueue implements SendMessageStatusListener, Closeable { break; - case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL: case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER: case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK: case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION: diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 0e00b6af7843d6d423b848980f2fae12ba7a4ece..bd09aa2494ad51de7ce54a1c56e8eb15131c2b2c 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -720,40 +720,44 @@ class ClientConnectionRunner { } /** - * Asynchronously deliver the message to the current runner + * Synchronously deliver the message to the current runner * - * Note that no failure indication is available. - * Fails silently on e.g. queue overflow to client, client dead, etc. + * Failure indication is available as of 0.9.29. + * Fails on e.g. queue overflow to client, client dead, etc. * * @param toDest non-null * @param fromDest generally null when from remote, non-null if from local + * @return success */ - void receiveMessage(Destination toDest, Destination fromDest, Payload payload) { - if (_dead) return; + boolean receiveMessage(Destination toDest, Destination fromDest, Payload payload) { + if (_dead) + return false; MessageReceivedJob j = new MessageReceivedJob(_context, this, toDest, fromDest, payload, _dontSendMSMOnReceive); // This is fast and non-blocking, run in-line //_context.jobQueue().addJob(j); - j.runJob(); + //j.runJob(); + return j.receiveMessage(); } /** - * Asynchronously deliver the message to the current runner + * Synchronously deliver the message to the current runner * - * Note that no failure indication is available. - * Fails silently on e.g. queue overflow to client, client dead, etc. + * Failure indication is available as of 0.9.29. + * Fails on e.g. queue overflow to client, client dead, etc. * * @param toHash non-null * @param fromDest generally null when from remote, non-null if from local + * @return success * @since 0.9.21 */ - void receiveMessage(Hash toHash, Destination fromDest, Payload payload) { + boolean receiveMessage(Hash toHash, Destination fromDest, Payload payload) { SessionParams sp = _sessions.get(toHash); if (sp == null) { if (_log.shouldLog(Log.WARN)) _log.warn("No session found for receiveMessage()"); - return; + return false; } - receiveMessage(sp.dest, fromDest, payload); + return receiveMessage(sp.dest, fromDest, payload); } /** diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index fbea7a7f21abf08132599e57f8883491e8915a0b..521b15ccf3f8cc2bb8d8d0b37a66dc890d4d87c3 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -417,11 +417,10 @@ class ClientManager { public String getName() { return "Distribute local message"; } public void runJob() { - _to.receiveMessage(_toDest, _fromDest, _payload); - // note that receiveMessage() does not indicate a failure, - // so a queue overflow is not recognized. we always return success. + boolean ok = _to.receiveMessage(_toDest, _fromDest, _payload); if (_from != null) { - _from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL); + int rc = ok ? MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL : MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL; + _from.updateMessageDeliveryStatus(_fromDest, _msgId, _messageNonce, rc); } } } diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 02ecc8dd625f5d123f01380981e0520c0e40af0b..1fd9651b0fc4a8c333f776d79132ef3b7125b6b8 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -48,7 +48,17 @@ class MessageReceivedJob extends JobImpl { public String getName() { return "Deliver New Message"; } public void runJob() { - if (_runner.isDead()) return; + receiveMessage(); + } + + /** + * Same as runJob() but with a return value + * @return success + * @since 0.9.29 + */ + public boolean receiveMessage() { + if (_runner.isDead()) + return false; MessageId id = null; try { long nextID = _runner.getNextMessageId(); @@ -59,6 +69,7 @@ class MessageReceivedJob extends JobImpl { _runner.setPayload(id, _payload); messageAvailable(id, _payload.getSize()); } + return true; } catch (I2CPMessageException ime) { String msg = "Error sending data to client " + _runner.getDestHash(); if (_log.shouldWarn()) @@ -67,6 +78,7 @@ class MessageReceivedJob extends JobImpl { _log.logAlways(Log.WARN, msg); if (id != null && !_sendDirect) _runner.removePayload(id); + return false; } }