diff --git a/core/java/src/net/i2p/client/I2CPMessageProducer.java b/core/java/src/net/i2p/client/I2CPMessageProducer.java
index 47c8bd53fa37498c7b9c39e4922a92718823aac2..c625e784196f29fb3e1847c5a1f22f4c07f16714 100644
--- a/core/java/src/net/i2p/client/I2CPMessageProducer.java
+++ b/core/java/src/net/i2p/client/I2CPMessageProducer.java
@@ -301,7 +301,7 @@ class I2CPMessageProducer {
      * @param key unused - no end-to-end crypto
      * @param newKey unused - no end-to-end crypto
      */
-    private Payload createPayload(Destination dest, byte[] payload, SessionTag tag, SessionKey key, Set tags,
+    private Payload createPayload(Destination dest, byte[] payload, SessionTag tag, SessionKey key, Set<SessionTag> tags,
                                   SessionKey newKey) throws I2PSessionException {
         if (dest == null) throw new I2PSessionException("No destination specified");
         if (payload == null) throw new I2PSessionException("No payload specified");
@@ -346,8 +346,8 @@ class I2CPMessageProducer {
      * to the router
      * 
      */
-    public void createLeaseSet(I2PSessionImpl session, LeaseSet leaseSet, SigningPrivateKey signingPriv, PrivateKey priv)
-                                                                                                                         throws I2PSessionException {
+    public void createLeaseSet(I2PSessionImpl session, LeaseSet leaseSet, SigningPrivateKey signingPriv,
+                               PrivateKey priv) throws I2PSessionException {
         CreateLeaseSetMessage msg = new CreateLeaseSetMessage();
         msg.setLeaseSet(leaseSet);
         msg.setPrivateKey(priv);
diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java
index d70e4041f2da03f6e672991894fabce51eaf3aae..2d535c25af71f15effb077bd251ec9390ad31eb5 100644
--- a/core/java/src/net/i2p/client/I2PSession.java
+++ b/core/java/src/net/i2p/client/I2PSession.java
@@ -47,10 +47,21 @@ public interface I2PSession {
      */
     public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException;
 
+    /** Send a new message to the given destination, containing the specified
+     * payload, returning true if the router feels confident that the message
+     * was delivered.
+     *
+     * WARNING: It is recommended that you use a method that specifies the protocol and ports.
+     *
+     * @param dest location to send the message
+     * @param payload body of the message to be sent (unencrypted)
+     * @return success
+     */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException;
 
     /**
      * See I2PSessionMuxedImpl for proto/port details.
+     * @return success
      * @since 0.7.1
      */
     public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException;
@@ -83,6 +94,7 @@ public interface I2PSession {
      * @param tagsSent UNUSED, IGNORED. Set of tags delivered to the peer and associated with the keyUsed.  This is also an output parameter -
      *                 the contents of the set is ignored during the call, but afterwards it contains a set of SessionTag 
      *                 objects that were sent along side the given keyUsed.
+     * @return success
      */
     public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
 
@@ -90,6 +102,7 @@ public interface I2PSession {
      * End-to-End Crypto is disabled, tags and keys are ignored.
      * @param keyUsed UNUSED, IGNORED.
      * @param tagsSent UNUSED, IGNORED.
+     * @return success
      */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
 
@@ -98,6 +111,7 @@ public interface I2PSession {
      * @param keyUsed UNUSED, IGNORED.
      * @param tagsSent UNUSED, IGNORED.
      * @param expire absolute expiration timestamp, NOT interval from now
+     * @return success
      * @since 0.7.1
      */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException;
@@ -107,6 +121,14 @@ public interface I2PSession {
      * End-to-End Crypto is disabled, tags and keys are ignored.
      * @param keyUsed UNUSED, IGNORED.
      * @param tagsSent UNUSED, IGNORED.
+     * @param proto 1-254 or 0 for unset; recommended:
+     *         I2PSession.PROTO_UNSPECIFIED
+     *         I2PSession.PROTO_STREAMING
+     *         I2PSession.PROTO_DATAGRAM
+     *         255 disallowed
+     * @param fromPort 1-65535 or 0 for unset
+     * @param toPort 1-65535 or 0 for unset
+     * @return success
      * @since 0.7.1
      */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
@@ -118,6 +140,14 @@ public interface I2PSession {
      * @param keyUsed UNUSED, IGNORED.
      * @param tagsSent UNUSED, IGNORED.
      * @param expire absolute expiration timestamp, NOT interval from now
+     * @param proto 1-254 or 0 for unset; recommended:
+     *         I2PSession.PROTO_UNSPECIFIED
+     *         I2PSession.PROTO_STREAMING
+     *         I2PSession.PROTO_DATAGRAM
+     *         255 disallowed
+     * @param fromPort 1-65535 or 0 for unset
+     * @param toPort 1-65535 or 0 for unset
+     * @return success
      * @since 0.7.1
      */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
@@ -129,6 +159,14 @@ public interface I2PSession {
      * @param keyUsed UNUSED, IGNORED.
      * @param tagsSent UNUSED, IGNORED.
      * @param expire absolute expiration timestamp, NOT interval from now
+     * @param proto 1-254 or 0 for unset; recommended:
+     *         I2PSession.PROTO_UNSPECIFIED
+     *         I2PSession.PROTO_STREAMING
+     *         I2PSession.PROTO_DATAGRAM
+     *         255 disallowed
+     * @param fromPort 1-65535 or 0 for unset
+     * @param toPort 1-65535 or 0 for unset
+     * @return success
      * @since 0.8.4
      */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
@@ -137,11 +175,45 @@ public interface I2PSession {
     /**
      * See I2PSessionMuxedImpl for proto/port details.
      * See SendMessageOptions for option details.
+     *
+     * @param proto 1-254 or 0 for unset; recommended:
+     *         I2PSession.PROTO_UNSPECIFIED
+     *         I2PSession.PROTO_STREAMING
+     *         I2PSession.PROTO_DATAGRAM
+     *         255 disallowed
+     * @param fromPort 1-65535 or 0 for unset
+     * @param toPort 1-65535 or 0 for unset
+     * @param options to be passed to the router
+     * @return success
      * @since 0.9.2
      */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
                                int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException;
 
+    /**
+     * Send a message and request an asynchronous notification of delivery status.
+     * Notifications will be delivered at least up to the expiration specified in the options,
+     * or 60 seconds if not specified.
+     *
+     * See I2PSessionMuxedImpl for proto/port details.
+     * See SendMessageOptions for option details.
+     *
+     * @param proto 1-254 or 0 for unset; recommended:
+     *         I2PSession.PROTO_UNSPECIFIED
+     *         I2PSession.PROTO_STREAMING
+     *         I2PSession.PROTO_DATAGRAM
+     *         255 disallowed
+     * @param fromPort 1-65535 or 0 for unset
+     * @param toPort 1-65535 or 0 for unset
+     * @param options to be passed to the router
+     * @return the message ID to be used for later notification to the listener
+     * @throws I2PSessionException on all errors
+     * @since 0.9.14
+     */
+    public long sendMessage(Destination dest, byte[] payload, int offset, int size,
+                               int proto, int fromport, int toport,
+                               SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException;
+
     /** Receive a message that the router has notified the client about, returning
      * the payload.
      * This may only be called once for a given msgId (until the counter wraps)
diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java
index 460ee13f34cb1528d91795f53c465459f023d0d7..20aa43550b4040bed6aaa8993c3b9e7d75678382 100644
--- a/core/java/src/net/i2p/client/I2PSessionImpl.java
+++ b/core/java/src/net/i2p/client/I2PSessionImpl.java
@@ -51,7 +51,7 @@ import net.i2p.util.I2PSSLSocketFactory;
 import net.i2p.util.LHMCache;
 import net.i2p.util.Log;
 import net.i2p.util.OrderedProperties;
-import net.i2p.util.SimpleTimer;
+import net.i2p.util.SimpleTimer2;
 import net.i2p.util.VersionComparator;
 
 /**
@@ -618,20 +618,24 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
     }
 
     /**
-     *  Fire up a periodic task to check for unclamed messages
+     *  Fire up a periodic task to check for unclaimed messages
      *  @since 0.9.1
      */
-    private void startVerifyUsage() {
-        _context.simpleScheduler().addEvent(new VerifyUsage(), VERIFY_USAGE_TIME);
+    protected void startVerifyUsage() {
+        new VerifyUsage();
     }
 
     /**
      *  Check for unclaimed messages, without wastefully setting a timer for each
-     *  message. Just copy all unclaimed ones and check 30 seconds later.
+     *  message. Just copy all unclaimed ones and check some time later.
      */
-    private class VerifyUsage implements SimpleTimer.TimedEvent {
+    private class VerifyUsage extends SimpleTimer2.TimedEvent {
         private final List<Long> toCheck = new ArrayList<Long>();
         
+        public VerifyUsage() {
+             super(_context.simpleTimer2(), VERIFY_USAGE_TIME);
+        }
+
         public void timeReached() {
             if (isClosed())
                 return;
@@ -641,12 +645,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
                 for (Long msgId : toCheck) {
                     MessagePayloadMessage removed = _availableMessages.remove(msgId);
                     if (removed != null)
-                        _log.error("Message NOT removed!  id=" + msgId + ": " + removed);
+                        _log.error(getPrefix() + " Client not responding? Message not processed! id=" + msgId + ": " + removed);
                 }
                 toCheck.clear();
             }
             toCheck.addAll(_availableMessages.keySet());
-            _context.simpleScheduler().addEvent(this, VERIFY_USAGE_TIME);
+            schedule(VERIFY_USAGE_TIME);
         }
     }
 
diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java
index 27e787e435f5a9b8b889458d55c8aaa7c0877914..9f8a5465d31184b14239f1dc49726688b9b2b39a 100644
--- a/core/java/src/net/i2p/client/I2PSessionImpl2.java
+++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java
@@ -11,11 +11,13 @@ package net.i2p.client;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import net.i2p.I2PAppContext;
 import net.i2p.data.DataHelper;
@@ -24,6 +26,7 @@ import net.i2p.data.SessionKey;
 import net.i2p.data.i2cp.MessageId;
 import net.i2p.data.i2cp.MessageStatusMessage;
 import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer2;
 
 /**
  * Thread safe implementation of an I2P session running over TCP.  
@@ -35,7 +38,8 @@ import net.i2p.util.Log;
 class I2PSessionImpl2 extends I2PSessionImpl {
 
     /** set of MessageState objects, representing all of the messages in the process of being sent */
-    private /* FIXME final FIXME */ Set<MessageState> _sendingStates;
+    protected final Map<Long, MessageState> _sendingStates;
+    protected final AtomicLong _sendMessageNonce;
     /** max # seconds to wait for confirmation of the message send */
     private final static long SEND_TIMEOUT = 60 * 1000; // 60 seconds to send 
     /** should we gzip each payload prior to sending it? */
@@ -44,12 +48,16 @@ class I2PSessionImpl2 extends I2PSessionImpl {
     /** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */
     protected boolean _noEffort;
 
+    private static final long REMOVE_EXPIRED_TIME = 63*1000;
+
      /**
       * for extension by SimpleSession (no dest)
       */
     protected I2PSessionImpl2(I2PAppContext context, Properties options,
                               I2PClientMessageHandlerMap handlerMap) {
         super(context, options, handlerMap);
+        _sendingStates = null;
+        _sendMessageNonce = null;
     }
 
     /**
@@ -63,11 +71,12 @@ class I2PSessionImpl2 extends I2PSessionImpl {
      */
     public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException {
         super(ctx, destKeyStream, options);
-        _sendingStates = new HashSet<MessageState>(32);
+        _sendingStates = new ConcurrentHashMap<Long, MessageState>(32);
+        _sendMessageNonce = new AtomicLong();
         // default is BestEffort
         _noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
 
-        ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } );
+        //ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } );
         //ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
         //ctx.statManager().createRateStat("i2cp.sendBestEffortStage1", "second part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
         //ctx.statManager().createRateStat("i2cp.sendBestEffortStage2", "third part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
@@ -80,11 +89,48 @@ class I2PSessionImpl2 extends I2PSessionImpl {
         //_context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
         _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 });
         _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 });
-        _context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 });
+        //_context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 });
         _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 });
         _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 });
     }
 
+    /**
+     *  Fire up a periodic task to check for unclaimed messages
+     *  @since 0.9.14
+     */
+    @Override
+    protected void startVerifyUsage() {
+        super.startVerifyUsage();
+        new RemoveExpired();
+    }
+
+    /**
+     *  Check for expired message states, without wastefully setting a timer for each
+     *  message.
+     *  @since 0.9.14
+     */
+    private class RemoveExpired extends SimpleTimer2.TimedEvent {
+        
+        public RemoveExpired() {
+             super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME);
+        }
+
+        public void timeReached() {
+            if (isClosed())
+                return;
+            if (!_sendingStates.isEmpty()) {
+                long now = _context.clock().now();
+                for (Iterator<MessageState> iter = _sendingStates.values().iterator(); iter.hasNext(); ) {
+                    MessageState state = iter.next();
+                    if (state.getExpires() < now)
+                        iter.remove();
+                }
+            }
+            schedule(REMOVE_EXPIRED_TIME);
+        }
+    }
+
+
     protected long getTimeout() {
         return SEND_TIMEOUT;
     }
@@ -109,6 +155,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
      *  Todo: don't compress if destination is local?
      */
     private static final int DONT_COMPRESS_SIZE = 66;
+
     protected boolean shouldCompress(int size) {
          if (size <= DONT_COMPRESS_SIZE)
              return false;
@@ -118,33 +165,47 @@ class I2PSessionImpl2 extends I2PSessionImpl {
          return SHOULD_COMPRESS;
     }
     
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
     public void addSessionListener(I2PSessionListener lsnr, int proto, int port) {
-        throw new IllegalArgumentException("Use MuxedImpl");
+        throw new UnsupportedOperationException("Use MuxedImpl");
     }
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
     public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) {
-        throw new IllegalArgumentException("Use MuxedImpl");
+        throw new UnsupportedOperationException("Use MuxedImpl");
     }
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
     public void removeListener(int proto, int port) {
-        throw new IllegalArgumentException("Use MuxedImpl");
+        throw new UnsupportedOperationException("Use MuxedImpl");
     }
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
     public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException {
-        throw new IllegalArgumentException("Use MuxedImpl");
+        throw new UnsupportedOperationException("Use MuxedImpl");
     }
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
                                int proto, int fromport, int toport) throws I2PSessionException {
-        throw new IllegalArgumentException("Use MuxedImpl");
+        throw new UnsupportedOperationException("Use MuxedImpl");
     }
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
                                int proto, int fromport, int toport) throws I2PSessionException {
-        throw new IllegalArgumentException("Use MuxedImpl");
+        throw new UnsupportedOperationException("Use MuxedImpl");
     }
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
                                int proto, int fromport, int toport, int flags) throws I2PSessionException {
-        throw new IllegalArgumentException("Use MuxedImpl");
+        throw new UnsupportedOperationException("Use MuxedImpl");
     }
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
                                int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException {
-        throw new IllegalArgumentException("Use MuxedImpl");
+        throw new UnsupportedOperationException("Use MuxedImpl");
+    }
+    /** @throws UnsupportedOperationException always, use MuxedImpl */
+    public long sendMessage(Destination dest, byte[] payload, int offset, int size,
+                               int proto, int fromport, int toport,
+                               SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException {
+        throw new UnsupportedOperationException("Use MuxedImpl");
     }
 
     /** unused, see MuxedImpl override */
@@ -210,8 +271,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
             String d = dest.calculateHash().toBase64().substring(0,4);
             _log.info("sending message to: " + d + " compress? " + sc + " sizeIn=" + size + " sizeOut=" + compressed);
         }
-        _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0);
-        _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
+        _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed);
+        _context.statManager().addRateData("i2cp.tx.msgExpanded", size);
         if (_noEffort)
             return sendNoEffort(dest, payload, expires, 0);
         else
@@ -257,142 +318,29 @@ class I2PSessionImpl2 extends I2PSessionImpl {
      */
     protected boolean sendBestEffort(Destination dest, byte payload[], long expires, int flags)
                     throws I2PSessionException {
-        //SessionKey key = null;
-        //SessionKey newKey = null;
-        //SessionTag tag = null;
-        //Set sentTags = null;
-        //int oldTags = 0;
-        long begin = _context.clock().now();
-        /***********
-        if (I2CPMessageProducer.END_TO_END_CRYPTO) {
-            if (_log.shouldLog(Log.DEBUG)) _log.debug("begin sendBestEffort");
-            key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey());
-            if (_log.shouldLog(Log.DEBUG)) _log.debug("key fetched");
-            if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey());
-            tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key);
-            if (_log.shouldLog(Log.DEBUG)) _log.debug("tag consumed");
-            sentTags = null;
-            oldTags = _context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key);
-            long availTimeLeft = _context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key);
-        
-            if ( (tagsSent == null) || (tagsSent.isEmpty()) ) {
-                if (oldTags < NUM_TAGS) {
-                    sentTags = createNewTags(NUM_TAGS);
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding " + NUM_TAGS + ": " + sentTags);
-                } else if (availTimeLeft < 2 * 60 * 1000) {
-                    // if we have > 50 tags, but they expire in under 2 minutes, we want more
-                    sentTags = createNewTags(NUM_TAGS);
-                    if (_log.shouldLog(Log.DEBUG)) 
-                        _log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding " + NUM_TAGS + " new ones: " + sentTags);
-                    //_log.error("** sendBestEffort available time left " + availTimeLeft);
-                } else {
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft);
-                }
-            } else {
-                if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("sendBestEffort is sending " + tagsSent.size() + " with " + availTimeLeft 
-                               + "ms left, " + oldTags + " tags known and " 
-                               + (tag == null ? "no tag" : " a valid tag"));
-            }
-
-            if (false) // rekey
-                newKey = _context.keyGenerator().generateSessionKey();
         
-            if ( (tagsSent != null) && (!tagsSent.isEmpty()) ) {
-                if (sentTags == null)
-                    sentTags = new HashSet();
-                sentTags.addAll(tagsSent);
-            }
-        } else {
-            // not using end to end crypto, so don't ever bundle any tags
-        }
-        **********/
-        
-        //if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce");
-        
-        long nonce = _context.random().nextInt(Integer.MAX_VALUE - 1) + 1;
-        //if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
+        long nonce = _sendMessageNonce.incrementAndGet();
         MessageState state = new MessageState(_context, nonce, getPrefix());
-        //state.setKey(key);
-        //state.setTags(sentTags);
-        //state.setNewKey(newKey);
-        state.setTo(dest);
-        //if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Setting key = " + key);
-
-        //if (keyUsed != null) {
-            //if (I2CPMessageProducer.END_TO_END_CRYPTO) {
-            //    if (newKey != null)
-            //        keyUsed.setData(newKey.getData());
-            //    else
-            //        keyUsed.setData(key.getData());
-            //} else {
-            //    keyUsed.setData(SessionKey.INVALID_KEY.getData());
-            //}
-        //}
-        //if (tagsSent != null) {
-        //    if (sentTags != null) {
-        //        tagsSent.addAll(sentTags);
-        //    }
-        //}
 
-        //if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
-        long beforeSendingSync = _context.clock().now();
-        long inSendingSync = 0;
-        synchronized (_sendingStates) {
-            inSendingSync = _context.clock().now();
-            _sendingStates.add(state);
-        }
-        long afterSendingSync = _context.clock().now();
-        if (_log.shouldLog(Log.DEBUG))
-            _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / "
-                       + state.getNonce() + " for best effort "
-                       + " sync took " + (inSendingSync-beforeSendingSync) 
-                       + " add took " + (afterSendingSync-inSendingSync));
-        //_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires);
-        _producer.sendMessage(this, dest, nonce, payload, expires, flags);
-        
         // since this is 'best effort', all we're waiting for is a status update 
         // saying that the router received it - in theory, that should come back
         // immediately, but in practice can take up to a second (though usually
         // much quicker).  setting this to false will short-circuit that delay
         boolean actuallyWait = false; // true;
-        
-        long beforeWaitFor = _context.clock().now();
         if (actuallyWait)
-            state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, 
-                          _context.clock().now() + getTimeout());
-        //long afterWaitFor = _context.clock().now();
-        //long inRemovingSync = 0;
-        synchronized (_sendingStates) {
-            //inRemovingSync = _context.clock().now();
-            _sendingStates.remove(state);
-        }
-        long afterRemovingSync = _context.clock().now();
-        boolean found = !actuallyWait || state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
-        if (_log.shouldLog(Log.DEBUG))
-            _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
-                       + " / " + state.getNonce() + " found = " + found);
-        
-        long timeToSend = afterRemovingSync - beforeSendingSync;
-        if ( (timeToSend > 10*1000) && (_log.shouldLog(Log.WARN)) ) {
-            _log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
-        }
+            _sendingStates.put(Long.valueOf(nonce), state);
+        _producer.sendMessage(this, dest, nonce, payload, expires, flags);
         
-        if ( (afterRemovingSync - begin > 500) && (_log.shouldLog(Log.WARN) ) ) {
-            _log.warn("Took " + (afterRemovingSync-begin) + "ms to sendBestEffort, "
-                      + (afterSendingSync-begin) + "ms to prepare, "
-                      + (beforeWaitFor-afterSendingSync) + "ms to send, "
-                      + (afterRemovingSync-beforeWaitFor) + "ms waiting for reply");
+        if (actuallyWait) {
+            try {
+                state.waitForAccept(_context.clock().now() + getTimeout());
+            } catch (InterruptedException ie) {
+                throw new I2PSessionException("interrupted");
+            } finally {
+                _sendingStates.remove(Long.valueOf(nonce));
+            }
         }
-        
-        _context.statManager().addRateData("i2cp.sendBestEffortTotalTime", afterRemovingSync - begin, 0);
-        //_context.statManager().addRateData("i2cp.sendBestEffortStage0", beforeSendingSync- begin, 0);
-        //_context.statManager().addRateData("i2cp.sendBestEffortStage1", afterSendingSync- beforeSendingSync, 0);
-        //_context.statManager().addRateData("i2cp.sendBestEffortStage2", beforeWaitFor- afterSendingSync, 0);
-        //_context.statManager().addRateData("i2cp.sendBestEffortStage3", afterWaitFor- beforeWaitFor, 0);
-        //_context.statManager().addRateData("i2cp.sendBestEffortStage4", afterRemovingSync- afterWaitFor, 0);
+        boolean found = !actuallyWait || state.wasAccepted();
         
         if (found) {
             if (_log.shouldLog(Log.INFO))
@@ -402,9 +350,9 @@ class I2PSessionImpl2 extends I2PSessionImpl {
             if (_log.shouldLog(Log.INFO))
                 _log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with "
                           + payload.length + " bytes");
-            if (_log.shouldLog(Log.ERROR))
-                _log.error(getPrefix() + "Never received *accepted* from the router!  dropping and reconnecting");
-            disconnect();
+            //if (_log.shouldLog(Log.ERROR))
+            //    _log.error(getPrefix() + "Never received *accepted* from the router!  dropping and reconnecting");
+            //disconnect();
             return false;
         }
         return found;
@@ -432,8 +380,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
      *  Even when using sendBestEffort(), this is a waste, because the
      *  MessageState is removed from _sendingStates immediately and
      *  so the lookup here fails.
-     *  And iterating through the HashSet instead of having a map
-     *  is bad too.
      *
      *  This is now pretty much avoided since streaming now sets
      *  i2cp.messageReliability = none, which forces sendNoEffort() instead of sendBestEffort(),
@@ -443,32 +389,24 @@ class I2PSessionImpl2 extends I2PSessionImpl {
      */
     @Override
     public void receiveStatus(int msgId, long nonce, int status) {
-        if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
+
         MessageState state = null;
-        long beforeSync = _context.clock().now();
-        long inSync = 0;
-        synchronized (_sendingStates) {
-            inSync = _context.clock().now();
-            for (Iterator<MessageState> iter = _sendingStates.iterator(); iter.hasNext();) {
-                state = iter.next();
-                if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State " + state.getMessageId() + " / " + state.getNonce());
-                if (state.getNonce() == nonce) {
-                    if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state");
-                    break;
-                } else if ((state.getMessageId() != null) && (state.getMessageId().getMessageId() == msgId)) {
+        if ((state = _sendingStates.get(Long.valueOf(nonce))) != null) {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(getPrefix() + "Found a matching state");
+        } else if (!_sendingStates.isEmpty()) {
+            // O(n**2)
+            // shouldn't happen, router sends good nonce for all statuses as of 0.9.14
+            for (MessageState s : _sendingStates.values()) {
+                if (s.getMessageId() != null && s.getMessageId().getMessageId() == msgId) {
                     if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state by msgId");
+                    state = s;
                     break;
-                } else {
-                    if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State does not match");
-                    state = null;
                 }
             }
         }
-        long afterSync = _context.clock().now();
-
-        if (_log.shouldLog(Log.DEBUG))
-            _log.debug("receiveStatus(" + msgId + ", " + nonce + ", " + status+ "): sync: " 
-                       + (inSync-beforeSync) + "ms, check: " + (afterSync-inSync));
         
         if (state != null) {
             if (state.getMessageId() == null) {
@@ -477,11 +415,13 @@ class I2PSessionImpl2 extends I2PSessionImpl {
                 state.setMessageId(id);
             }
             state.receive(status);
+            if (state.wasSuccessful())
+                _sendingStates.remove(Long.valueOf(nonce));
             
             long lifetime = state.getElapsed();
             switch (status) {
                 case 1:
-                    _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime, 0);
+                    _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime);
                     break;
                 // best effort codes unused
                 //case 2:
@@ -491,10 +431,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
                 //    _context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0);
                 //    break;
                 case 4:
-                    _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime, 0);
+                    _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime);
                     break;
                 case 5:
-                    _context.statManager().addRateData("i2cp.receiveStatusTime.5", lifetime, 0);
+                    _context.statManager().addRateData("i2cp.receiveStatusTime.5", lifetime);
                     break;
             }
             
@@ -503,7 +443,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
                 _log.info(getPrefix() + "No matching state for messageId " + msgId + " / " + nonce
                           + " w/ status = " + status);
         }
-        _context.statManager().addRateData("i2cp.receiveStatusTime", _context.clock().now() - beforeSync, 0);
     }
 
     /**
@@ -522,11 +461,11 @@ class I2PSessionImpl2 extends I2PSessionImpl {
     private void clearStates() {
         if (_sendingStates == null)    // only null if overridden by I2PSimpleSession
             return;
-        synchronized (_sendingStates) {
-            for (MessageState state : _sendingStates)
-                state.cancel();
-            if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Disconnecting " + _sendingStates.size() + " states");
-            _sendingStates.clear();
+        for (MessageState state : _sendingStates.values()) {
+            state.cancel();
         }
+        if (_log.shouldLog(Log.INFO))
+            _log.info(getPrefix() + "Disconnecting " + _sendingStates.size() + " states");
+        _sendingStates.clear();
     }
 }
diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java
index 89a9161b0f0697fd0b531097d5b0e364048f86cf..48ca6016e8c963ee400cc7f12d08bfd2fce09d7c 100644
--- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java
+++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java
@@ -193,21 +193,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
                                SessionKey keyUsed, Set tagsSent, long expires,
                                int proto, int fromPort, int toPort, int flags)
                    throws I2PSessionException {
-        if (isClosed()) throw new I2PSessionException("Already closed");
-        updateActivity();
-
-        boolean sc = shouldCompress(size);
-        if (sc)
-            payload = DataHelper.compress(payload, offset, size);
-        else
-            payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
-
-        setProto(payload, proto);
-        setFromPort(payload, fromPort);
-        setToPort(payload, toPort);
-
-        _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
-        _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
+        payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
         if (_noEffort)
             return sendNoEffort(dest, payload, expires, flags);
         else
@@ -232,11 +218,48 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
     @Override
     public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
                                int proto, int fromPort, int toPort, SendMessageOptions options) throws I2PSessionException {
+        payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
+        //if (_noEffort) {
+            sendNoEffort(dest, payload, options);
+            return true;
+        //} else {
+            // unimplemented
+            //return sendBestEffort(dest, payload, options);
+        //}
+    }
+
+    /**
+     * Send a message and request an asynchronous notification of delivery status.
+     *
+     * See I2PSessionMuxedImpl for proto/port details.
+     * See SendMessageOptions for option details.
+     *
+     * @return the message ID to be used for later notification to the listener
+     * @throws I2PSessionException on all errors
+     * @since 0.9.14
+     */
+    @Override
+    public long sendMessage(Destination dest, byte[] payload, int offset, int size,
+                            int proto, int fromPort, int toPort,
+                            SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException {
+        payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
+        long nonce = _sendMessageNonce.incrementAndGet();
+        long expires = Math.max(_context.clock().now() + 60*1000L, options.getTime());
+        MessageState state = new MessageState(_context, nonce, this, expires, listener);
+        _sendingStates.put(Long.valueOf(nonce), state);
+        _producer.sendMessage(this, dest, nonce, payload, options);
+        return nonce;
+    }
+
+    /**
+     * @return gzip compressed payload, ready to send
+     * @since 0.9.14
+     */
+    private byte[] prepPayload(byte[] payload, int offset, int size, int proto, int fromPort, int toPort) throws I2PSessionException {
         if (isClosed()) throw new I2PSessionException("Already closed");
         updateActivity();
 
-        boolean sc = shouldCompress(size);
-        if (sc)
+        if (shouldCompress(size))
             payload = DataHelper.compress(payload, offset, size);
         else
             payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
@@ -245,15 +268,9 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
         setFromPort(payload, fromPort);
         setToPort(payload, toPort);
 
-        _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
-        _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
-        //if (_noEffort) {
-            sendNoEffort(dest, payload, options);
-            return true;
-        //} else {
-            // unimplemented
-            //return sendBestEffort(dest, payload, options);
-        //}
+        _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length);
+        _context.statManager().addRateData("i2cp.tx.msgExpanded", size);
+        return payload;
     }
 
     /**
diff --git a/core/java/src/net/i2p/client/MessageState.java b/core/java/src/net/i2p/client/MessageState.java
index 4d56f648592e4d76f63a62e2a58115ec12eede25..ea1b4284dc2234620ea6f05613431f1342abe86f 100644
--- a/core/java/src/net/i2p/client/MessageState.java
+++ b/core/java/src/net/i2p/client/MessageState.java
@@ -1,12 +1,8 @@
 package net.i2p.client;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import net.i2p.I2PAppContext;
-import net.i2p.data.Destination;
-import net.i2p.data.SessionKey;
 import net.i2p.data.i2cp.MessageId;
 import net.i2p.data.i2cp.MessageStatusMessage;
 import net.i2p.util.Log;
@@ -14,8 +10,10 @@ import net.i2p.util.Log;
 /**
  * Contains the state of a payload message being sent to a peer.
  *
- * This is mostly unused. See sendNoEffort vs. sendBestEffort in I2PSessionImpl2.
- * TODO delete altogether? This is really bad.
+ * Originally was a general-purpose waiter.
+ * Then we got rid of guaranteed delivery.
+ * Then we stopped waiting for accept in best-effort delivery.
+ * Brought back to life for asynchronous status delivery to the client.
  */
 class MessageState {
     private final I2PAppContext _context;
@@ -23,32 +21,59 @@ class MessageState {
     private final long _nonce;
     private final String _prefix;
     private MessageId _id;
-    private final Set<Integer> _receivedStatus;
-    private SessionKey _key;
-    private SessionKey _newKey;
-    private Set _tags;
-    private Destination _to;
-    private boolean _cancelled;
     private final long _created;
+    private final long _expires;
+    private final SendMessageStatusListener _listener;
+    private final I2PSession _session;
 
-    private static final AtomicLong __stateId = new AtomicLong();
-    private final long _stateId;
-    
+    private enum State { INIT, ACCEPTED, PROBABLE_FAIL, FAIL, SUCCESS };
+    private State _state = State.INIT;
+
+    /**
+     *  For synchronous waiting for accept with waitForAccept().
+     *  UNUSED.
+     */
     public MessageState(I2PAppContext ctx, long nonce, String prefix) {
-        _stateId = __stateId.incrementAndGet();
         _context = ctx;
         _log = ctx.logManager().getLog(MessageState.class);
         _nonce = nonce;
-        _prefix = prefix + "[" + _stateId + "]: ";
-        _receivedStatus = new HashSet<Integer>();
+        _prefix = prefix + '[' + _nonce + "]: ";
+        _created = ctx.clock().now();
+        _expires = _created + 60*1000L;
+        _listener = null;
+        _session = null;
+    }
+
+    /**
+     *  For asynchronous notification
+     *  @param expires absolute time (not interval)
+     *  @since 0.9.14
+     */
+    public MessageState(I2PAppContext ctx, long nonce, I2PSession session,
+                        long expires, SendMessageStatusListener listener) {
+        _context = ctx;
+        _log = ctx.logManager().getLog(MessageState.class);
+        _nonce = nonce;
+        _prefix = session.toString() + " [" + _nonce + "]: ";
         _created = ctx.clock().now();
-        //ctx.statManager().createRateStat("i2cp.checkStatusTime", "how long it takes to go through the states", "i2cp", new long[] { 60*1000 });
+        _expires = expires;
+        _listener = listener;
+        _session = session;
     }
 
     public void receive(int status) {
-        synchronized (_receivedStatus) {
-            _receivedStatus.add(Integer.valueOf(status));
-            _receivedStatus.notifyAll();
+        State oldState;
+        State newState;
+        synchronized (this) {
+            oldState = _state;
+            locked_update(status);
+            newState = _state;
+            this.notifyAll();
+        }
+        if (_listener != null) {
+            // only notify on changing state, and only if we haven't expired
+            if (oldState != newState && _expires > _context.clock().now())
+                _listener.messageStatus(_session, _nonce, status);
         }
     }
 
@@ -60,221 +85,114 @@ class MessageState {
         return _id;
     }
 
-    public long getNonce() {
-        return _nonce;
-    }
-
-    /** @deprecated unused */
-    public void setKey(SessionKey key) {
-        if (_log.shouldLog(Log.DEBUG)) 
-            _log.debug(_prefix + "Setting key [" + _key + "] to [" + key + "]");
-        _key = key;
-    }
-
-    /** @deprecated unused */
-    public SessionKey getKey() {
-        return _key;
-    }
-
-    /** @deprecated unused */
-    public void setNewKey(SessionKey key) {
-        _newKey = key;
-    }
-
-    /** @deprecated unused */
-    public SessionKey getNewKey() {
-        return _newKey;
-    }
-
-    /** @deprecated unused */
-    public void setTags(Set tags) {
-        _tags = tags;
-    }
-
-    /** @deprecated unused */
-    public Set getTags() {
-        return _tags;
-    }
-
-    public void setTo(Destination dest) {
-        _to = dest;
-    }
-
-    /** @deprecated unused */
-    public Destination getTo() {
-        return _to;
-    }
-
     public long getElapsed() {
         return _context.clock().now() - _created;
     }
 
-    public void waitFor(int status, long expiration) {
-        //long checkTime = -1;
-        boolean found = false;
-        while (!found) {
-            if (_cancelled) return;
+    /**
+     *  @since 0.9.14
+     */
+    public long getExpires() {
+        return _expires;
+    }
+
+    /**
+     *  For guaranteed/best effort only. Not really used.
+     */
+    public void waitForAccept(long expiration) throws InterruptedException {
+        while (true) {
             long timeToWait = expiration - _context.clock().now();
             if (timeToWait <= 0) {
                 if (_log.shouldLog(Log.WARN)) 
-                    _log.warn(_prefix + "Expired waiting for the status [" + status + "]");
+                    _log.warn(_prefix + "Expired waiting for the status");
                 return;
             }
-            found = false;
-            synchronized (_receivedStatus) {
-                //long beforeCheck = _context.clock().now();
-                if (locked_isSuccess(status) || locked_isFailure(status)) {
+            synchronized (this) {
+                if (_state != State.INIT) {
                     if (_log.shouldLog(Log.DEBUG)) 
                         _log.debug(_prefix + "Received a confirm (one way or the other)");
-                    found = true;
-                }
-                //checkTime = _context.clock().now() - beforeCheck;
-                if (!found) {
-                    if (timeToWait > 5000) {
-                        timeToWait = 5000;
-                    }
-                    try {
-                        _receivedStatus.wait(timeToWait);
-                    } catch (InterruptedException ie) { // nop
-                    }
+                    return;
                 }
+                if (timeToWait > 5000)
+                    timeToWait = 5000;
+                this.wait(timeToWait);
             }
-            //if (found) 
-            //    _context.statManager().addRateData("i2cp.checkStatusTime", checkTime, 0);
         }
     }
 
-    private boolean locked_isSuccess(int wantedStatus) {
-        boolean rv = false;
-
-        if (_log.shouldLog(Log.DEBUG)) 
-            _log.debug(_prefix + "isSuccess(" + wantedStatus + "): " + _receivedStatus);
-        for (Integer val : _receivedStatus) {
-            int recv = val.intValue();
-            switch (recv) {
-                case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
-                    if (_log.shouldLog(Log.WARN))
-                         _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
-                                   + toString());
-                    rv = false;
-                    break;
-                case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
-                    if (_log.shouldLog(Log.WARN))
-                         _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
-                                   + toString());
-                    rv = false;
-                    break;
-                case MessageStatusMessage.STATUS_SEND_ACCEPTED:
-                    if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
-                        return true; // if we're only looking for accepted, take it directly (don't let any GUARANTEED_* override it)
-                    }
-                    // ignore accepted, as we want something better
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug(_prefix + "Got accepted, but we're waiting for more from " + toString());
-                    continue;
-                case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug(_prefix + "Received best effort success after " + getElapsed()
-                                   + " from " + toString());
-                    if (wantedStatus == recv) {
-                        rv = true;
-                    } else {
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug(_prefix + "Not guaranteed success, but best effort after "
-                                       + getElapsed() + " will do... from " + toString());
-                        rv = true;
-                    }
-                    break;
-                case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
-                                   + toString());
-                    // even if we're waiting for best effort success, guaranteed is good enough
-                    rv = true;
-                    break;
-                case -1:
-                    continue;
-                default:
-                    if (_log.shouldLog(Log.DEBUG)) 
-                        _log.debug(_prefix + "Received something else [" + recv + "]...");
-            }
+    /**
+     *  Update our flags
+     *  @since 0.9.14
+     */
+    private void locked_update(int status) {
+        switch (status) {
+            case MessageStatusMessage.STATUS_SEND_ACCEPTED:
+                // only trumps init
+                if (_state == State.INIT)
+                    _state = State.ACCEPTED;
+                break;
+
+            case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
+            case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
+                // does not trump failure or success
+                if (_state != State.FAIL && _state != State.SUCCESS)
+                    _state = State.PROBABLE_FAIL;
+                break;
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
+            case SendMessageStatusListener.STATUS_CANCELLED:
+                // does not trump success
+                if (_state != State.SUCCESS)
+                    _state = State.FAIL;
+                break;
+
+            case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
+            case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
+            case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
+                // trumps all
+                _state = State.SUCCESS;
+
+            default:
+                break;
         }
-        return rv;
     }
 
-    private boolean locked_isFailure(int wantedStatus) {
-        boolean rv = false;
-
-        if (_log.shouldLog(Log.DEBUG)) 
-            _log.debug(_prefix + "isFailure(" + wantedStatus + "): " + _receivedStatus);
-        
-        for (Integer val : _receivedStatus) {
-            int recv = val.intValue();
-            switch (recv) {
-                case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
-                                  + toString());
-                    rv = true;
-                    break;
-                case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
-                                  + toString());
-                    rv = true;
-                    break;
-                case MessageStatusMessage.STATUS_SEND_ACCEPTED:
-                    if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
-                        rv = false;
-                    } else {
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug(_prefix + "Got accepted, but we're waiting for more from "
-                                       + toString());
-                        continue;
-                        // ignore accepted, as we want something better
-                    }
-                    break;
-                case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug(_prefix + "Received best effort success after " + getElapsed()
-                                   + " from " + toString());
-                    if (wantedStatus == recv) {
-                        rv = false;
-                    } else {
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug(_prefix + "Not guaranteed success, but best effort after "
-                                       + getElapsed() + " will do... from " + toString());
-                        rv = false;
-                    }
-                    break;
-                case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
-                                   + toString());
-                    // even if we're waiting for best effort success, guaranteed is good enough
-                    rv = false;
-                    break;
-                case -1:
-                    continue;
-                default:
-                    if (_log.shouldLog(Log.DEBUG)) 
-                        _log.debug(_prefix + "Received something else [" + recv + "]...");
-            }
+    /**
+     *  @return true if accepted (fixme and not failed)
+     *  @since 0.9.14
+     */
+    public boolean wasAccepted() {
+        synchronized (this) {
+            return _state != State.INIT && _state != State.FAIL;
         }
-        return rv;
     }
 
-    /** #return true if the given status (or an equivalent) was received */
-    public boolean received(int status) {
-        synchronized (_receivedStatus) {
-            return locked_isSuccess(status);
+    /**
+     *  @return true if successful
+     *  @since 0.9.14
+     */
+    public boolean wasSuccessful() {
+        synchronized (this) {
+            return _state == State.SUCCESS;
         }
     }
 
     public void cancel() {
-        _cancelled = true;
-        synchronized (_receivedStatus) {
-            _receivedStatus.notifyAll();
-        }
+        // Inject a fake status
+        receive(SendMessageStatusListener.STATUS_CANCELLED);
     }
 }
diff --git a/core/java/src/net/i2p/client/SendMessageStatusListener.java b/core/java/src/net/i2p/client/SendMessageStatusListener.java
new file mode 100644
index 0000000000000000000000000000000000000000..ed4121371f580a129fc31a4fda6cf21c1d43b7f8
--- /dev/null
+++ b/core/java/src/net/i2p/client/SendMessageStatusListener.java
@@ -0,0 +1,25 @@
+package net.i2p.client;
+
+/**
+ * Asynchronously notify the client of the status
+ * of a sent message.
+ *
+ * @since 0.9.14
+ */
+public interface SendMessageStatusListener {
+
+    /** I2CP status codes are 0 - 255. Start our fake ones at 256. */
+    public static final int STATUS_CANCELLED = 256;
+
+    /**
+     * Tell the client of an update in the send status for a message
+     * previously sent with I2PSession.sendMessage().
+     * Multiple calls for a single message ID are possible.
+     *
+     * @param session session notifying
+     * @param msgId message number returned from a previous sendMessage() call
+     * @param status of the message, as defined in MessageStatusMessage and this class.
+     */
+    void messageStatus(I2PSession session, long msgId, int status);
+
+}