From 8371b8f806fac331ea8015ae7ef0fe9f1b1623da Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Thu, 15 May 2014 20:11:21 +0000 Subject: [PATCH] * I2CP: Client-side prep for asynch status for sent messages (ticket #788) - Clean up and reuse MessageState for asynch notification - New I2PSession sendMessage() method and listener - Move VerifyUsage from SimpleScheduler to SimpleTimer2 for efficiency - Fix up javadocs --- .../net/i2p/client/I2CPMessageProducer.java | 6 +- core/java/src/net/i2p/client/I2PSession.java | 72 ++++ .../src/net/i2p/client/I2PSessionImpl.java | 20 +- .../src/net/i2p/client/I2PSessionImpl2.java | 287 ++++++--------- .../net/i2p/client/I2PSessionMuxedImpl.java | 69 ++-- .../java/src/net/i2p/client/MessageState.java | 342 +++++++----------- .../i2p/client/SendMessageStatusListener.java | 25 ++ 7 files changed, 398 insertions(+), 423 deletions(-) create mode 100644 core/java/src/net/i2p/client/SendMessageStatusListener.java diff --git a/core/java/src/net/i2p/client/I2CPMessageProducer.java b/core/java/src/net/i2p/client/I2CPMessageProducer.java index 47c8bd53fa..c625e78419 100644 --- a/core/java/src/net/i2p/client/I2CPMessageProducer.java +++ b/core/java/src/net/i2p/client/I2CPMessageProducer.java @@ -301,7 +301,7 @@ class I2CPMessageProducer { * @param key unused - no end-to-end crypto * @param newKey unused - no end-to-end crypto */ - private Payload createPayload(Destination dest, byte[] payload, SessionTag tag, SessionKey key, Set tags, + private Payload createPayload(Destination dest, byte[] payload, SessionTag tag, SessionKey key, Set<SessionTag> tags, SessionKey newKey) throws I2PSessionException { if (dest == null) throw new I2PSessionException("No destination specified"); if (payload == null) throw new I2PSessionException("No payload specified"); @@ -346,8 +346,8 @@ class I2CPMessageProducer { * to the router * */ - public void createLeaseSet(I2PSessionImpl session, LeaseSet leaseSet, SigningPrivateKey signingPriv, PrivateKey priv) - throws I2PSessionException { + public void createLeaseSet(I2PSessionImpl session, LeaseSet leaseSet, SigningPrivateKey signingPriv, + PrivateKey priv) throws I2PSessionException { CreateLeaseSetMessage msg = new CreateLeaseSetMessage(); msg.setLeaseSet(leaseSet); msg.setPrivateKey(priv); diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index d70e4041f2..2d535c25af 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -47,10 +47,21 @@ public interface I2PSession { */ public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException; + /** Send a new message to the given destination, containing the specified + * payload, returning true if the router feels confident that the message + * was delivered. + * + * WARNING: It is recommended that you use a method that specifies the protocol and ports. + * + * @param dest location to send the message + * @param payload body of the message to be sent (unencrypted) + * @return success + */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException; /** * See I2PSessionMuxedImpl for proto/port details. + * @return success * @since 0.7.1 */ public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException; @@ -83,6 +94,7 @@ public interface I2PSession { * @param tagsSent UNUSED, IGNORED. Set of tags delivered to the peer and associated with the keyUsed. This is also an output parameter - * the contents of the set is ignored during the call, but afterwards it contains a set of SessionTag * objects that were sent along side the given keyUsed. + * @return success */ public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException; @@ -90,6 +102,7 @@ public interface I2PSession { * End-to-End Crypto is disabled, tags and keys are ignored. * @param keyUsed UNUSED, IGNORED. * @param tagsSent UNUSED, IGNORED. + * @return success */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException; @@ -98,6 +111,7 @@ public interface I2PSession { * @param keyUsed UNUSED, IGNORED. * @param tagsSent UNUSED, IGNORED. * @param expire absolute expiration timestamp, NOT interval from now + * @return success * @since 0.7.1 */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException; @@ -107,6 +121,14 @@ public interface I2PSession { * End-to-End Crypto is disabled, tags and keys are ignored. * @param keyUsed UNUSED, IGNORED. * @param tagsSent UNUSED, IGNORED. + * @param proto 1-254 or 0 for unset; recommended: + * I2PSession.PROTO_UNSPECIFIED + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param fromPort 1-65535 or 0 for unset + * @param toPort 1-65535 or 0 for unset + * @return success * @since 0.7.1 */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, @@ -118,6 +140,14 @@ public interface I2PSession { * @param keyUsed UNUSED, IGNORED. * @param tagsSent UNUSED, IGNORED. * @param expire absolute expiration timestamp, NOT interval from now + * @param proto 1-254 or 0 for unset; recommended: + * I2PSession.PROTO_UNSPECIFIED + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param fromPort 1-65535 or 0 for unset + * @param toPort 1-65535 or 0 for unset + * @return success * @since 0.7.1 */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, @@ -129,6 +159,14 @@ public interface I2PSession { * @param keyUsed UNUSED, IGNORED. * @param tagsSent UNUSED, IGNORED. * @param expire absolute expiration timestamp, NOT interval from now + * @param proto 1-254 or 0 for unset; recommended: + * I2PSession.PROTO_UNSPECIFIED + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param fromPort 1-65535 or 0 for unset + * @param toPort 1-65535 or 0 for unset + * @return success * @since 0.8.4 */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, @@ -137,11 +175,45 @@ public interface I2PSession { /** * See I2PSessionMuxedImpl for proto/port details. * See SendMessageOptions for option details. + * + * @param proto 1-254 or 0 for unset; recommended: + * I2PSession.PROTO_UNSPECIFIED + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param fromPort 1-65535 or 0 for unset + * @param toPort 1-65535 or 0 for unset + * @param options to be passed to the router + * @return success * @since 0.9.2 */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException; + /** + * Send a message and request an asynchronous notification of delivery status. + * Notifications will be delivered at least up to the expiration specified in the options, + * or 60 seconds if not specified. + * + * See I2PSessionMuxedImpl for proto/port details. + * See SendMessageOptions for option details. + * + * @param proto 1-254 or 0 for unset; recommended: + * I2PSession.PROTO_UNSPECIFIED + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param fromPort 1-65535 or 0 for unset + * @param toPort 1-65535 or 0 for unset + * @param options to be passed to the router + * @return the message ID to be used for later notification to the listener + * @throws I2PSessionException on all errors + * @since 0.9.14 + */ + public long sendMessage(Destination dest, byte[] payload, int offset, int size, + int proto, int fromport, int toport, + SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException; + /** Receive a message that the router has notified the client about, returning * the payload. * This may only be called once for a given msgId (until the counter wraps) diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 460ee13f34..20aa43550b 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -51,7 +51,7 @@ import net.i2p.util.I2PSSLSocketFactory; import net.i2p.util.LHMCache; import net.i2p.util.Log; import net.i2p.util.OrderedProperties; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; import net.i2p.util.VersionComparator; /** @@ -618,20 +618,24 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } /** - * Fire up a periodic task to check for unclamed messages + * Fire up a periodic task to check for unclaimed messages * @since 0.9.1 */ - private void startVerifyUsage() { - _context.simpleScheduler().addEvent(new VerifyUsage(), VERIFY_USAGE_TIME); + protected void startVerifyUsage() { + new VerifyUsage(); } /** * Check for unclaimed messages, without wastefully setting a timer for each - * message. Just copy all unclaimed ones and check 30 seconds later. + * message. Just copy all unclaimed ones and check some time later. */ - private class VerifyUsage implements SimpleTimer.TimedEvent { + private class VerifyUsage extends SimpleTimer2.TimedEvent { private final List<Long> toCheck = new ArrayList<Long>(); + public VerifyUsage() { + super(_context.simpleTimer2(), VERIFY_USAGE_TIME); + } + public void timeReached() { if (isClosed()) return; @@ -641,12 +645,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa for (Long msgId : toCheck) { MessagePayloadMessage removed = _availableMessages.remove(msgId); if (removed != null) - _log.error("Message NOT removed! id=" + msgId + ": " + removed); + _log.error(getPrefix() + " Client not responding? Message not processed! id=" + msgId + ": " + removed); } toCheck.clear(); } toCheck.addAll(_availableMessages.keySet()); - _context.simpleScheduler().addEvent(this, VERIFY_USAGE_TIME); + schedule(VERIFY_USAGE_TIME); } } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 27e787e435..9f8a5465d3 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -11,11 +11,13 @@ package net.i2p.client; import java.io.IOException; import java.io.InputStream; -import java.util.HashSet; import java.util.Iterator; import java.util.Locale; +import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; @@ -24,6 +26,7 @@ import net.i2p.data.SessionKey; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessageStatusMessage; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer2; /** * Thread safe implementation of an I2P session running over TCP. @@ -35,7 +38,8 @@ import net.i2p.util.Log; class I2PSessionImpl2 extends I2PSessionImpl { /** set of MessageState objects, representing all of the messages in the process of being sent */ - private /* FIXME final FIXME */ Set<MessageState> _sendingStates; + protected final Map<Long, MessageState> _sendingStates; + protected final AtomicLong _sendMessageNonce; /** max # seconds to wait for confirmation of the message send */ private final static long SEND_TIMEOUT = 60 * 1000; // 60 seconds to send /** should we gzip each payload prior to sending it? */ @@ -44,12 +48,16 @@ class I2PSessionImpl2 extends I2PSessionImpl { /** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */ protected boolean _noEffort; + private static final long REMOVE_EXPIRED_TIME = 63*1000; + /** * for extension by SimpleSession (no dest) */ protected I2PSessionImpl2(I2PAppContext context, Properties options, I2PClientMessageHandlerMap handlerMap) { super(context, options, handlerMap); + _sendingStates = null; + _sendMessageNonce = null; } /** @@ -63,11 +71,12 @@ class I2PSessionImpl2 extends I2PSessionImpl { */ public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { super(ctx, destKeyStream, options); - _sendingStates = new HashSet<MessageState>(32); + _sendingStates = new ConcurrentHashMap<Long, MessageState>(32); + _sendMessageNonce = new AtomicLong(); // default is BestEffort _noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); - ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } ); + //ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage1", "second part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage2", "third part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); @@ -80,11 +89,48 @@ class I2PSessionImpl2 extends I2PSessionImpl { //_context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 }); _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 }); - _context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 }); + //_context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 }); _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 }); _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 }); } + /** + * Fire up a periodic task to check for unclaimed messages + * @since 0.9.14 + */ + @Override + protected void startVerifyUsage() { + super.startVerifyUsage(); + new RemoveExpired(); + } + + /** + * Check for expired message states, without wastefully setting a timer for each + * message. + * @since 0.9.14 + */ + private class RemoveExpired extends SimpleTimer2.TimedEvent { + + public RemoveExpired() { + super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME); + } + + public void timeReached() { + if (isClosed()) + return; + if (!_sendingStates.isEmpty()) { + long now = _context.clock().now(); + for (Iterator<MessageState> iter = _sendingStates.values().iterator(); iter.hasNext(); ) { + MessageState state = iter.next(); + if (state.getExpires() < now) + iter.remove(); + } + } + schedule(REMOVE_EXPIRED_TIME); + } + } + + protected long getTimeout() { return SEND_TIMEOUT; } @@ -109,6 +155,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { * Todo: don't compress if destination is local? */ private static final int DONT_COMPRESS_SIZE = 66; + protected boolean shouldCompress(int size) { if (size <= DONT_COMPRESS_SIZE) return false; @@ -118,33 +165,47 @@ class I2PSessionImpl2 extends I2PSessionImpl { return SHOULD_COMPRESS; } + /** @throws UnsupportedOperationException always, use MuxedImpl */ public void addSessionListener(I2PSessionListener lsnr, int proto, int port) { - throw new IllegalArgumentException("Use MuxedImpl"); + throw new UnsupportedOperationException("Use MuxedImpl"); } + /** @throws UnsupportedOperationException always, use MuxedImpl */ public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) { - throw new IllegalArgumentException("Use MuxedImpl"); + throw new UnsupportedOperationException("Use MuxedImpl"); } + /** @throws UnsupportedOperationException always, use MuxedImpl */ public void removeListener(int proto, int port) { - throw new IllegalArgumentException("Use MuxedImpl"); + throw new UnsupportedOperationException("Use MuxedImpl"); } + /** @throws UnsupportedOperationException always, use MuxedImpl */ public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException { - throw new IllegalArgumentException("Use MuxedImpl"); + throw new UnsupportedOperationException("Use MuxedImpl"); } + /** @throws UnsupportedOperationException always, use MuxedImpl */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, int proto, int fromport, int toport) throws I2PSessionException { - throw new IllegalArgumentException("Use MuxedImpl"); + throw new UnsupportedOperationException("Use MuxedImpl"); } + /** @throws UnsupportedOperationException always, use MuxedImpl */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, int proto, int fromport, int toport) throws I2PSessionException { - throw new IllegalArgumentException("Use MuxedImpl"); + throw new UnsupportedOperationException("Use MuxedImpl"); } + /** @throws UnsupportedOperationException always, use MuxedImpl */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, int proto, int fromport, int toport, int flags) throws I2PSessionException { - throw new IllegalArgumentException("Use MuxedImpl"); + throw new UnsupportedOperationException("Use MuxedImpl"); } + /** @throws UnsupportedOperationException always, use MuxedImpl */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException { - throw new IllegalArgumentException("Use MuxedImpl"); + throw new UnsupportedOperationException("Use MuxedImpl"); + } + /** @throws UnsupportedOperationException always, use MuxedImpl */ + public long sendMessage(Destination dest, byte[] payload, int offset, int size, + int proto, int fromport, int toport, + SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException { + throw new UnsupportedOperationException("Use MuxedImpl"); } /** unused, see MuxedImpl override */ @@ -210,8 +271,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { String d = dest.calculateHash().toBase64().substring(0,4); _log.info("sending message to: " + d + " compress? " + sc + " sizeIn=" + size + " sizeOut=" + compressed); } - _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0); - _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); + _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed); + _context.statManager().addRateData("i2cp.tx.msgExpanded", size); if (_noEffort) return sendNoEffort(dest, payload, expires, 0); else @@ -257,142 +318,29 @@ class I2PSessionImpl2 extends I2PSessionImpl { */ protected boolean sendBestEffort(Destination dest, byte payload[], long expires, int flags) throws I2PSessionException { - //SessionKey key = null; - //SessionKey newKey = null; - //SessionTag tag = null; - //Set sentTags = null; - //int oldTags = 0; - long begin = _context.clock().now(); - /*********** - if (I2CPMessageProducer.END_TO_END_CRYPTO) { - if (_log.shouldLog(Log.DEBUG)) _log.debug("begin sendBestEffort"); - key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey()); - if (_log.shouldLog(Log.DEBUG)) _log.debug("key fetched"); - if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey()); - tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key); - if (_log.shouldLog(Log.DEBUG)) _log.debug("tag consumed"); - sentTags = null; - oldTags = _context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key); - long availTimeLeft = _context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key); - - if ( (tagsSent == null) || (tagsSent.isEmpty()) ) { - if (oldTags < NUM_TAGS) { - sentTags = createNewTags(NUM_TAGS); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding " + NUM_TAGS + ": " + sentTags); - } else if (availTimeLeft < 2 * 60 * 1000) { - // if we have > 50 tags, but they expire in under 2 minutes, we want more - sentTags = createNewTags(NUM_TAGS); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding " + NUM_TAGS + " new ones: " + sentTags); - //_log.error("** sendBestEffort available time left " + availTimeLeft); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft); - } - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("sendBestEffort is sending " + tagsSent.size() + " with " + availTimeLeft - + "ms left, " + oldTags + " tags known and " - + (tag == null ? "no tag" : " a valid tag")); - } - - if (false) // rekey - newKey = _context.keyGenerator().generateSessionKey(); - if ( (tagsSent != null) && (!tagsSent.isEmpty()) ) { - if (sentTags == null) - sentTags = new HashSet(); - sentTags.addAll(tagsSent); - } - } else { - // not using end to end crypto, so don't ever bundle any tags - } - **********/ - - //if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce"); - - long nonce = _context.random().nextInt(Integer.MAX_VALUE - 1) + 1; - //if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state"); + long nonce = _sendMessageNonce.incrementAndGet(); MessageState state = new MessageState(_context, nonce, getPrefix()); - //state.setKey(key); - //state.setTags(sentTags); - //state.setNewKey(newKey); - state.setTo(dest); - //if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Setting key = " + key); - - //if (keyUsed != null) { - //if (I2CPMessageProducer.END_TO_END_CRYPTO) { - // if (newKey != null) - // keyUsed.setData(newKey.getData()); - // else - // keyUsed.setData(key.getData()); - //} else { - // keyUsed.setData(SessionKey.INVALID_KEY.getData()); - //} - //} - //if (tagsSent != null) { - // if (sentTags != null) { - // tagsSent.addAll(sentTags); - // } - //} - //if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state"); - long beforeSendingSync = _context.clock().now(); - long inSendingSync = 0; - synchronized (_sendingStates) { - inSendingSync = _context.clock().now(); - _sendingStates.add(state); - } - long afterSendingSync = _context.clock().now(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " - + state.getNonce() + " for best effort " - + " sync took " + (inSendingSync-beforeSendingSync) - + " add took " + (afterSendingSync-inSendingSync)); - //_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires); - _producer.sendMessage(this, dest, nonce, payload, expires, flags); - // since this is 'best effort', all we're waiting for is a status update // saying that the router received it - in theory, that should come back // immediately, but in practice can take up to a second (though usually // much quicker). setting this to false will short-circuit that delay boolean actuallyWait = false; // true; - - long beforeWaitFor = _context.clock().now(); if (actuallyWait) - state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, - _context.clock().now() + getTimeout()); - //long afterWaitFor = _context.clock().now(); - //long inRemovingSync = 0; - synchronized (_sendingStates) { - //inRemovingSync = _context.clock().now(); - _sendingStates.remove(state); - } - long afterRemovingSync = _context.clock().now(); - boolean found = !actuallyWait || state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId() - + " / " + state.getNonce() + " found = " + found); - - long timeToSend = afterRemovingSync - beforeSendingSync; - if ( (timeToSend > 10*1000) && (_log.shouldLog(Log.WARN)) ) { - _log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz")); - } + _sendingStates.put(Long.valueOf(nonce), state); + _producer.sendMessage(this, dest, nonce, payload, expires, flags); - if ( (afterRemovingSync - begin > 500) && (_log.shouldLog(Log.WARN) ) ) { - _log.warn("Took " + (afterRemovingSync-begin) + "ms to sendBestEffort, " - + (afterSendingSync-begin) + "ms to prepare, " - + (beforeWaitFor-afterSendingSync) + "ms to send, " - + (afterRemovingSync-beforeWaitFor) + "ms waiting for reply"); + if (actuallyWait) { + try { + state.waitForAccept(_context.clock().now() + getTimeout()); + } catch (InterruptedException ie) { + throw new I2PSessionException("interrupted"); + } finally { + _sendingStates.remove(Long.valueOf(nonce)); + } } - - _context.statManager().addRateData("i2cp.sendBestEffortTotalTime", afterRemovingSync - begin, 0); - //_context.statManager().addRateData("i2cp.sendBestEffortStage0", beforeSendingSync- begin, 0); - //_context.statManager().addRateData("i2cp.sendBestEffortStage1", afterSendingSync- beforeSendingSync, 0); - //_context.statManager().addRateData("i2cp.sendBestEffortStage2", beforeWaitFor- afterSendingSync, 0); - //_context.statManager().addRateData("i2cp.sendBestEffortStage3", afterWaitFor- beforeWaitFor, 0); - //_context.statManager().addRateData("i2cp.sendBestEffortStage4", afterRemovingSync- afterWaitFor, 0); + boolean found = !actuallyWait || state.wasAccepted(); if (found) { if (_log.shouldLog(Log.INFO)) @@ -402,9 +350,9 @@ class I2PSessionImpl2 extends I2PSessionImpl { if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with " + payload.length + " bytes"); - if (_log.shouldLog(Log.ERROR)) - _log.error(getPrefix() + "Never received *accepted* from the router! dropping and reconnecting"); - disconnect(); + //if (_log.shouldLog(Log.ERROR)) + // _log.error(getPrefix() + "Never received *accepted* from the router! dropping and reconnecting"); + //disconnect(); return false; } return found; @@ -432,8 +380,6 @@ class I2PSessionImpl2 extends I2PSessionImpl { * Even when using sendBestEffort(), this is a waste, because the * MessageState is removed from _sendingStates immediately and * so the lookup here fails. - * And iterating through the HashSet instead of having a map - * is bad too. * * This is now pretty much avoided since streaming now sets * i2cp.messageReliability = none, which forces sendNoEffort() instead of sendBestEffort(), @@ -443,32 +389,24 @@ class I2PSessionImpl2 extends I2PSessionImpl { */ @Override public void receiveStatus(int msgId, long nonce, int status) { - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce); + MessageState state = null; - long beforeSync = _context.clock().now(); - long inSync = 0; - synchronized (_sendingStates) { - inSync = _context.clock().now(); - for (Iterator<MessageState> iter = _sendingStates.iterator(); iter.hasNext();) { - state = iter.next(); - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State " + state.getMessageId() + " / " + state.getNonce()); - if (state.getNonce() == nonce) { - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state"); - break; - } else if ((state.getMessageId() != null) && (state.getMessageId().getMessageId() == msgId)) { + if ((state = _sendingStates.get(Long.valueOf(nonce))) != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Found a matching state"); + } else if (!_sendingStates.isEmpty()) { + // O(n**2) + // shouldn't happen, router sends good nonce for all statuses as of 0.9.14 + for (MessageState s : _sendingStates.values()) { + if (s.getMessageId() != null && s.getMessageId().getMessageId() == msgId) { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state by msgId"); + state = s; break; - } else { - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State does not match"); - state = null; } } } - long afterSync = _context.clock().now(); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("receiveStatus(" + msgId + ", " + nonce + ", " + status+ "): sync: " - + (inSync-beforeSync) + "ms, check: " + (afterSync-inSync)); if (state != null) { if (state.getMessageId() == null) { @@ -477,11 +415,13 @@ class I2PSessionImpl2 extends I2PSessionImpl { state.setMessageId(id); } state.receive(status); + if (state.wasSuccessful()) + _sendingStates.remove(Long.valueOf(nonce)); long lifetime = state.getElapsed(); switch (status) { case 1: - _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime, 0); + _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime); break; // best effort codes unused //case 2: @@ -491,10 +431,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { // _context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0); // break; case 4: - _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime, 0); + _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime); break; case 5: - _context.statManager().addRateData("i2cp.receiveStatusTime.5", lifetime, 0); + _context.statManager().addRateData("i2cp.receiveStatusTime.5", lifetime); break; } @@ -503,7 +443,6 @@ class I2PSessionImpl2 extends I2PSessionImpl { _log.info(getPrefix() + "No matching state for messageId " + msgId + " / " + nonce + " w/ status = " + status); } - _context.statManager().addRateData("i2cp.receiveStatusTime", _context.clock().now() - beforeSync, 0); } /** @@ -522,11 +461,11 @@ class I2PSessionImpl2 extends I2PSessionImpl { private void clearStates() { if (_sendingStates == null) // only null if overridden by I2PSimpleSession return; - synchronized (_sendingStates) { - for (MessageState state : _sendingStates) - state.cancel(); - if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Disconnecting " + _sendingStates.size() + " states"); - _sendingStates.clear(); + for (MessageState state : _sendingStates.values()) { + state.cancel(); } + if (_log.shouldLog(Log.INFO)) + _log.info(getPrefix() + "Disconnecting " + _sendingStates.size() + " states"); + _sendingStates.clear(); } } diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index 89a9161b0f..48ca6016e8 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -193,21 +193,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { SessionKey keyUsed, Set tagsSent, long expires, int proto, int fromPort, int toPort, int flags) throws I2PSessionException { - if (isClosed()) throw new I2PSessionException("Already closed"); - updateActivity(); - - boolean sc = shouldCompress(size); - if (sc) - payload = DataHelper.compress(payload, offset, size); - else - payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION); - - setProto(payload, proto); - setFromPort(payload, fromPort); - setToPort(payload, toPort); - - _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0); - _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); + payload = prepPayload(payload, offset, size, proto, fromPort, toPort); if (_noEffort) return sendNoEffort(dest, payload, expires, flags); else @@ -232,11 +218,48 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { @Override public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, int proto, int fromPort, int toPort, SendMessageOptions options) throws I2PSessionException { + payload = prepPayload(payload, offset, size, proto, fromPort, toPort); + //if (_noEffort) { + sendNoEffort(dest, payload, options); + return true; + //} else { + // unimplemented + //return sendBestEffort(dest, payload, options); + //} + } + + /** + * Send a message and request an asynchronous notification of delivery status. + * + * See I2PSessionMuxedImpl for proto/port details. + * See SendMessageOptions for option details. + * + * @return the message ID to be used for later notification to the listener + * @throws I2PSessionException on all errors + * @since 0.9.14 + */ + @Override + public long sendMessage(Destination dest, byte[] payload, int offset, int size, + int proto, int fromPort, int toPort, + SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException { + payload = prepPayload(payload, offset, size, proto, fromPort, toPort); + long nonce = _sendMessageNonce.incrementAndGet(); + long expires = Math.max(_context.clock().now() + 60*1000L, options.getTime()); + MessageState state = new MessageState(_context, nonce, this, expires, listener); + _sendingStates.put(Long.valueOf(nonce), state); + _producer.sendMessage(this, dest, nonce, payload, options); + return nonce; + } + + /** + * @return gzip compressed payload, ready to send + * @since 0.9.14 + */ + private byte[] prepPayload(byte[] payload, int offset, int size, int proto, int fromPort, int toPort) throws I2PSessionException { if (isClosed()) throw new I2PSessionException("Already closed"); updateActivity(); - boolean sc = shouldCompress(size); - if (sc) + if (shouldCompress(size)) payload = DataHelper.compress(payload, offset, size); else payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION); @@ -245,15 +268,9 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { setFromPort(payload, fromPort); setToPort(payload, toPort); - _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0); - _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); - //if (_noEffort) { - sendNoEffort(dest, payload, options); - return true; - //} else { - // unimplemented - //return sendBestEffort(dest, payload, options); - //} + _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length); + _context.statManager().addRateData("i2cp.tx.msgExpanded", size); + return payload; } /** diff --git a/core/java/src/net/i2p/client/MessageState.java b/core/java/src/net/i2p/client/MessageState.java index 4d56f64859..ea1b4284dc 100644 --- a/core/java/src/net/i2p/client/MessageState.java +++ b/core/java/src/net/i2p/client/MessageState.java @@ -1,12 +1,8 @@ package net.i2p.client; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; -import net.i2p.data.Destination; -import net.i2p.data.SessionKey; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessageStatusMessage; import net.i2p.util.Log; @@ -14,8 +10,10 @@ import net.i2p.util.Log; /** * Contains the state of a payload message being sent to a peer. * - * This is mostly unused. See sendNoEffort vs. sendBestEffort in I2PSessionImpl2. - * TODO delete altogether? This is really bad. + * Originally was a general-purpose waiter. + * Then we got rid of guaranteed delivery. + * Then we stopped waiting for accept in best-effort delivery. + * Brought back to life for asynchronous status delivery to the client. */ class MessageState { private final I2PAppContext _context; @@ -23,32 +21,59 @@ class MessageState { private final long _nonce; private final String _prefix; private MessageId _id; - private final Set<Integer> _receivedStatus; - private SessionKey _key; - private SessionKey _newKey; - private Set _tags; - private Destination _to; - private boolean _cancelled; private final long _created; + private final long _expires; + private final SendMessageStatusListener _listener; + private final I2PSession _session; - private static final AtomicLong __stateId = new AtomicLong(); - private final long _stateId; - + private enum State { INIT, ACCEPTED, PROBABLE_FAIL, FAIL, SUCCESS }; + private State _state = State.INIT; + + /** + * For synchronous waiting for accept with waitForAccept(). + * UNUSED. + */ public MessageState(I2PAppContext ctx, long nonce, String prefix) { - _stateId = __stateId.incrementAndGet(); _context = ctx; _log = ctx.logManager().getLog(MessageState.class); _nonce = nonce; - _prefix = prefix + "[" + _stateId + "]: "; - _receivedStatus = new HashSet<Integer>(); + _prefix = prefix + '[' + _nonce + "]: "; + _created = ctx.clock().now(); + _expires = _created + 60*1000L; + _listener = null; + _session = null; + } + + /** + * For asynchronous notification + * @param expires absolute time (not interval) + * @since 0.9.14 + */ + public MessageState(I2PAppContext ctx, long nonce, I2PSession session, + long expires, SendMessageStatusListener listener) { + _context = ctx; + _log = ctx.logManager().getLog(MessageState.class); + _nonce = nonce; + _prefix = session.toString() + " [" + _nonce + "]: "; _created = ctx.clock().now(); - //ctx.statManager().createRateStat("i2cp.checkStatusTime", "how long it takes to go through the states", "i2cp", new long[] { 60*1000 }); + _expires = expires; + _listener = listener; + _session = session; } public void receive(int status) { - synchronized (_receivedStatus) { - _receivedStatus.add(Integer.valueOf(status)); - _receivedStatus.notifyAll(); + State oldState; + State newState; + synchronized (this) { + oldState = _state; + locked_update(status); + newState = _state; + this.notifyAll(); + } + if (_listener != null) { + // only notify on changing state, and only if we haven't expired + if (oldState != newState && _expires > _context.clock().now()) + _listener.messageStatus(_session, _nonce, status); } } @@ -60,221 +85,114 @@ class MessageState { return _id; } - public long getNonce() { - return _nonce; - } - - /** @deprecated unused */ - public void setKey(SessionKey key) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Setting key [" + _key + "] to [" + key + "]"); - _key = key; - } - - /** @deprecated unused */ - public SessionKey getKey() { - return _key; - } - - /** @deprecated unused */ - public void setNewKey(SessionKey key) { - _newKey = key; - } - - /** @deprecated unused */ - public SessionKey getNewKey() { - return _newKey; - } - - /** @deprecated unused */ - public void setTags(Set tags) { - _tags = tags; - } - - /** @deprecated unused */ - public Set getTags() { - return _tags; - } - - public void setTo(Destination dest) { - _to = dest; - } - - /** @deprecated unused */ - public Destination getTo() { - return _to; - } - public long getElapsed() { return _context.clock().now() - _created; } - public void waitFor(int status, long expiration) { - //long checkTime = -1; - boolean found = false; - while (!found) { - if (_cancelled) return; + /** + * @since 0.9.14 + */ + public long getExpires() { + return _expires; + } + + /** + * For guaranteed/best effort only. Not really used. + */ + public void waitForAccept(long expiration) throws InterruptedException { + while (true) { long timeToWait = expiration - _context.clock().now(); if (timeToWait <= 0) { if (_log.shouldLog(Log.WARN)) - _log.warn(_prefix + "Expired waiting for the status [" + status + "]"); + _log.warn(_prefix + "Expired waiting for the status"); return; } - found = false; - synchronized (_receivedStatus) { - //long beforeCheck = _context.clock().now(); - if (locked_isSuccess(status) || locked_isFailure(status)) { + synchronized (this) { + if (_state != State.INIT) { if (_log.shouldLog(Log.DEBUG)) _log.debug(_prefix + "Received a confirm (one way or the other)"); - found = true; - } - //checkTime = _context.clock().now() - beforeCheck; - if (!found) { - if (timeToWait > 5000) { - timeToWait = 5000; - } - try { - _receivedStatus.wait(timeToWait); - } catch (InterruptedException ie) { // nop - } + return; } + if (timeToWait > 5000) + timeToWait = 5000; + this.wait(timeToWait); } - //if (found) - // _context.statManager().addRateData("i2cp.checkStatusTime", checkTime, 0); } } - private boolean locked_isSuccess(int wantedStatus) { - boolean rv = false; - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "isSuccess(" + wantedStatus + "): " + _receivedStatus); - for (Integer val : _receivedStatus) { - int recv = val.intValue(); - switch (recv) { - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: - if (_log.shouldLog(Log.WARN)) - _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from " - + toString()); - rv = false; - break; - case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: - if (_log.shouldLog(Log.WARN)) - _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from " - + toString()); - rv = false; - break; - case MessageStatusMessage.STATUS_SEND_ACCEPTED: - if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) { - return true; // if we're only looking for accepted, take it directly (don't let any GUARANTEED_* override it) - } - // ignore accepted, as we want something better - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Got accepted, but we're waiting for more from " + toString()); - continue; - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received best effort success after " + getElapsed() - + " from " + toString()); - if (wantedStatus == recv) { - rv = true; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Not guaranteed success, but best effort after " - + getElapsed() + " will do... from " + toString()); - rv = true; - } - break; - case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from " - + toString()); - // even if we're waiting for best effort success, guaranteed is good enough - rv = true; - break; - case -1: - continue; - default: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received something else [" + recv + "]..."); - } + /** + * Update our flags + * @since 0.9.14 + */ + private void locked_update(int status) { + switch (status) { + case MessageStatusMessage.STATUS_SEND_ACCEPTED: + // only trumps init + if (_state == State.INIT) + _state = State.ACCEPTED; + break; + + case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: + case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: + // does not trump failure or success + if (_state != State.FAIL && _state != State.SUCCESS) + _state = State.PROBABLE_FAIL; + break; + + case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL: + case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER: + case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK: + case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION: + case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE: + case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS: + case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW: + case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED: + case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET: + case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS: + case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION: + case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION: + case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET: + case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET: + case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET: + case SendMessageStatusListener.STATUS_CANCELLED: + // does not trump success + if (_state != State.SUCCESS) + _state = State.FAIL; + break; + + case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: + case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: + case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL: + // trumps all + _state = State.SUCCESS; + + default: + break; } - return rv; } - private boolean locked_isFailure(int wantedStatus) { - boolean rv = false; - - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "isFailure(" + wantedStatus + "): " + _receivedStatus); - - for (Integer val : _receivedStatus) { - int recv = val.intValue(); - switch (recv) { - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: - if (_log.shouldLog(Log.DEBUG)) - _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from " - + toString()); - rv = true; - break; - case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: - if (_log.shouldLog(Log.DEBUG)) - _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from " - + toString()); - rv = true; - break; - case MessageStatusMessage.STATUS_SEND_ACCEPTED: - if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) { - rv = false; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Got accepted, but we're waiting for more from " - + toString()); - continue; - // ignore accepted, as we want something better - } - break; - case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received best effort success after " + getElapsed() - + " from " + toString()); - if (wantedStatus == recv) { - rv = false; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Not guaranteed success, but best effort after " - + getElapsed() + " will do... from " + toString()); - rv = false; - } - break; - case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from " - + toString()); - // even if we're waiting for best effort success, guaranteed is good enough - rv = false; - break; - case -1: - continue; - default: - if (_log.shouldLog(Log.DEBUG)) - _log.debug(_prefix + "Received something else [" + recv + "]..."); - } + /** + * @return true if accepted (fixme and not failed) + * @since 0.9.14 + */ + public boolean wasAccepted() { + synchronized (this) { + return _state != State.INIT && _state != State.FAIL; } - return rv; } - /** #return true if the given status (or an equivalent) was received */ - public boolean received(int status) { - synchronized (_receivedStatus) { - return locked_isSuccess(status); + /** + * @return true if successful + * @since 0.9.14 + */ + public boolean wasSuccessful() { + synchronized (this) { + return _state == State.SUCCESS; } } public void cancel() { - _cancelled = true; - synchronized (_receivedStatus) { - _receivedStatus.notifyAll(); - } + // Inject a fake status + receive(SendMessageStatusListener.STATUS_CANCELLED); } } diff --git a/core/java/src/net/i2p/client/SendMessageStatusListener.java b/core/java/src/net/i2p/client/SendMessageStatusListener.java new file mode 100644 index 0000000000..ed4121371f --- /dev/null +++ b/core/java/src/net/i2p/client/SendMessageStatusListener.java @@ -0,0 +1,25 @@ +package net.i2p.client; + +/** + * Asynchronously notify the client of the status + * of a sent message. + * + * @since 0.9.14 + */ +public interface SendMessageStatusListener { + + /** I2CP status codes are 0 - 255. Start our fake ones at 256. */ + public static final int STATUS_CANCELLED = 256; + + /** + * Tell the client of an update in the send status for a message + * previously sent with I2PSession.sendMessage(). + * Multiple calls for a single message ID are possible. + * + * @param session session notifying + * @param msgId message number returned from a previous sendMessage() call + * @param status of the message, as defined in MessageStatusMessage and this class. + */ + void messageStatus(I2PSession session, long msgId, int status); + +} -- GitLab