diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 85fd558b73515789053a16ae328a0a7cc07364bc..9663620086e0fb266ee8eb6027ed083b7f3c5fa5 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -67,10 +67,15 @@ class I2PSessionImpl2 extends I2PSessionImpl { } public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) - throws I2PSessionException { + throws I2PSessionException { if (isClosed()) throw new I2PSessionException("Already closed"); if (SHOULD_COMPRESS) payload = DataHelper.compress(payload); - if (isGuaranteed()) { + // we always send as guaranteed (so we get the session keys/tags acked), + // but only block until the appropriate event has been reached (guaranteed + // success or accepted). we may want to break this out into a seperate + // attribute, allowing both nonblocking sends and transparently managed keys, + // as well as the nonblocking sends with application managed keys. Later. + if (isGuaranteed() || true) { return sendGuaranteed(dest, payload, keyUsed, tagsSent); } else { return sendBestEffort(dest, payload, keyUsed, tagsSent); @@ -89,7 +94,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { } private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent) - throws I2PSessionException { + throws I2PSessionException { SessionKey key = SessionKeyManager.getInstance().getCurrentKey(dest.getPublicKey()); if (key == null) key = SessionKeyManager.getInstance().createSession(dest.getPublicKey()); SessionTag tag = SessionKeyManager.getInstance().consumeNextAvailableTag(dest.getPublicKey(), key); @@ -129,8 +134,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { _sendingStates.add(state); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Adding sending state " + state.getMessageId() + " / " - + state.getNonce()); + _log.debug("Adding sending state " + state.getMessageId() + " / " + + state.getNonce()); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, Clock.getInstance().now() + getTimeout()); synchronized (_sendingStates) { @@ -138,19 +143,18 @@ class I2PSessionImpl2 extends I2PSessionImpl { } boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); if (_log.shouldLog(Log.DEBUG)) - _log.debug("After waitFor sending state " + state.getMessageId().getMessageId() - + " / " + state.getNonce() + " found = " + found); + _log.debug("After waitFor sending state " + state.getMessageId().getMessageId() + + " / " + state.getNonce() + " found = " + found); if (found) { if (_log.shouldLog(Log.INFO)) - _log.info("Message sent after " + state.getElapsed() + "ms with " - + payload.length + " bytes"); + _log.info("Message sent after " + state.getElapsed() + "ms with " + + payload.length + " bytes"); } else { if (_log.shouldLog(Log.INFO)) - _log.info("Message send failed after " + state.getElapsed() + "ms with " - + payload.length + " bytes"); + _log.info("Message send failed after " + state.getElapsed() + "ms with " + + payload.length + " bytes"); if (_log.shouldLog(Log.ERROR)) - _log - .error("Never received *accepted* from the router! dropping and reconnecting"); + _log.error("Never received *accepted* from the router! dropping and reconnecting"); disconnect(); return false; } @@ -158,7 +162,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { } private boolean sendGuaranteed(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent) - throws I2PSessionException { + throws I2PSessionException { SessionKey key = SessionKeyManager.getInstance().getCurrentKey(dest.getPublicKey()); if (key == null) key = SessionKeyManager.getInstance().createSession(dest.getPublicKey()); SessionTag tag = SessionKeyManager.getInstance().consumeNextAvailableTag(dest.getPublicKey(), key); @@ -198,10 +202,13 @@ class I2PSessionImpl2 extends I2PSessionImpl { _sendingStates.add(state); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Adding sending state " + state.getMessageId() + " / " - + state.getNonce()); + _log.debug("Adding sending state " + state.getMessageId() + " / " + + state.getNonce()); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); - state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS, Clock.getInstance().now() + SEND_TIMEOUT); + if (isGuaranteed()) + state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS, Clock.getInstance().now() + SEND_TIMEOUT); + else + state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, Clock.getInstance().now() + SEND_TIMEOUT); synchronized (_sendingStates) { _sendingStates.remove(state); } @@ -210,28 +217,27 @@ class I2PSessionImpl2 extends I2PSessionImpl { if ((!accepted) || (state.getMessageId() == null)) { if (_log.shouldLog(Log.ERROR)) - _log.error("State with nonce " + state.getNonce() - + " was not accepted? (no messageId!!)"); + _log.error("State with nonce " + state.getNonce() + + " was not accepted? (no messageId!!)"); nackTags(state); if (_log.shouldLog(Log.CRIT)) - _log.log(Log.CRIT, - "Disconnecting/reconnecting because we never were accepted!"); + _log.log(Log.CRIT, "Disconnecting/reconnecting because we never were accepted!"); disconnect(); return false; } if (_log.shouldLog(Log.DEBUG)) - _log.debug("After waitFor sending state " + state.getMessageId().getMessageId() - + " / " + state.getNonce() + " found = " + found); + _log.debug("After waitFor sending state " + state.getMessageId().getMessageId() + + " / " + state.getNonce() + " found = " + found); if (found) { if (_log.shouldLog(Log.INFO)) - _log.info("Message sent after " + state.getElapsed() + "ms with " - + payload.length + " bytes"); + _log.info("Message sent after " + state.getElapsed() + "ms with " + + payload.length + " bytes"); ackTags(state); } else { if (_log.shouldLog(Log.INFO)) - _log.info("Message send failed after " + state.getElapsed() + "ms with " - + payload.length + " bytes"); + _log.info("Message send failed after " + state.getElapsed() + "ms with " + + payload.length + " bytes"); nackTags(state); } return found; @@ -239,23 +245,21 @@ class I2PSessionImpl2 extends I2PSessionImpl { private void ackTags(MessageState state) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("ack tags for msgId " + state.getMessageId() + " / " - + state.getNonce() + " key = " + state.getKey() + ", tags = " - + state.getTags()); + _log.debug("ack tags for msgId " + state.getMessageId() + " / " + + state.getNonce() + " key = " + state.getKey() + ", tags = " + + state.getTags()); if ((state.getTags() != null) && (state.getTags().size() > 0)) { if (state.getNewKey() == null) - SessionKeyManager.getInstance().tagsDelivered(state.getTo().getPublicKey(), state.getKey(), - state.getTags()); + SessionKeyManager.getInstance().tagsDelivered(state.getTo().getPublicKey(), state.getKey(), state.getTags()); else - SessionKeyManager.getInstance().tagsDelivered(state.getTo().getPublicKey(), state.getNewKey(), - state.getTags()); + SessionKeyManager.getInstance().tagsDelivered(state.getTo().getPublicKey(), state.getNewKey(), state.getTags()); } } private void nackTags(MessageState state) { if (_log.shouldLog(Log.INFO)) - _log.info("nack tags for msgId " + state.getMessageId() + " / " + state.getNonce() - + " key = " + state.getKey()); + _log.info("nack tags for msgId " + state.getMessageId() + " / " + state.getNonce() + + " key = " + state.getKey()); SessionKeyManager.getInstance().failTags(state.getTo().getPublicKey()); } @@ -288,8 +292,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { state.receive(status); } else { if (_log.shouldLog(Log.INFO)) - _log.info("No matching state for messageId " + msgId + " / " + nonce - + " w/ status = " + status); + _log.info("No matching state for messageId " + msgId + " / " + nonce + + " w/ status = " + status); } }