From d32b4e9f24d7ab9f8c828031c9924177d511c956 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Sun, 18 May 2014 00:05:13 +0000
Subject: [PATCH]  * I2CP: Per-message status codes back through streaming
 (ticket #788)    - New I2PSessionException    - Streaming PacketQueue
 requests status for SYNs on outbound conns    - PacketQueue throws
 I2PSessionException in streams

---
 .../client/streaming/I2PSocketException.java  | 112 +++++++++++++
 .../streaming/impl/ConnectionManager.java     |   1 +
 .../client/streaming/impl/PacketQueue.java    | 149 +++++++++++++++++-
 .../java/src/net/i2p/client/MessageState.java |   1 +
 4 files changed, 259 insertions(+), 4 deletions(-)
 create mode 100644 apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketException.java

diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketException.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketException.java
new file mode 100644
index 0000000000..c657ac6723
--- /dev/null
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketException.java
@@ -0,0 +1,112 @@
+package net.i2p.client.streaming;
+
+import java.net.SocketException;
+
+import net.i2p.client.SendMessageStatusListener;
+import net.i2p.data.i2cp.MessageStatusMessage;
+
+/**
+ *  An I2P-specific IOException thrown from input and output streams.
+ *  with a stored status code to be used for programmatic responses.
+ *
+ *  @since 0.9.14
+ */
+public class I2PSocketException extends SocketException {
+
+    private final int _status;
+    private static final int CUSTOM = -1;
+
+    /**
+     *  Use canned message for this status code
+     *  @param status >= 0 from MessageStatusMessage or SendMessageStatusListener
+     */
+    public I2PSocketException(int status) {
+        super();
+        _status = status;
+    }
+
+    /**
+     *  Use message provided
+     */
+    public I2PSocketException(String message) {
+        super(message);
+        _status = CUSTOM;
+    }
+
+    /**
+     *  For programmatic action based on specific failure code
+     *
+     *  @return value from int constructor or -1 for String constructor
+     */
+    public int getStatus() {
+        return _status;
+    }
+
+    /**
+     *  For programmatic action based on specific failure code
+     *
+     *  @return canned message based on status in int constructor or message from String constructor
+     */
+    @Override
+    public String getMessage() {
+        switch (_status) {
+            case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
+            case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
+                return "Message timeout";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
+                return "Failed delivery to local destination";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
+                return "Local router failure";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
+                return "Local network failure";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
+                return "Session closed";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
+                return "Invalid message";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
+                return "Invalid message options";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
+                return "Buffer overflow";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
+                return "Message expired";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
+                return "Local lease set invalid";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
+                return "No local tunnels";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
+                return "Unsupported encryption options";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
+                return "Invalid destination";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
+                return "Local router failure";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
+                return "Destination lease set expired";
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
+                return "Destination lease set not found";
+
+            case SendMessageStatusListener.STATUS_CANCELLED:
+                return "Local destination shutdown";
+
+            case CUSTOM:
+                return super.getMessage();
+
+            default:
+                return "Failure code: " + _status;
+        }
+    }
+}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
index 805234f4a9..196038eea6 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
@@ -617,6 +617,7 @@ class ConnectionManager {
         disconnectAllHard();
         _tcbShare.stop();
         _timer.stop();
+        _outboundQueue.close();
     }
     
     /**
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java
index dd44087cec..18c9746281 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java
@@ -1,12 +1,21 @@
 package net.i2p.client.streaming.impl;
 
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import net.i2p.I2PAppContext;
 import net.i2p.client.I2PSession;
 import net.i2p.client.I2PSessionException;
 import net.i2p.client.SendMessageOptions;
+import net.i2p.client.SendMessageStatusListener;
+import net.i2p.client.streaming.I2PSocketException;
 import net.i2p.data.ByteArray;
+import net.i2p.data.i2cp.MessageStatusMessage;
 import net.i2p.util.ByteCache;
 import net.i2p.util.Log;
+import net.i2p.util.SimpleTimer2;
 
 /**
  * Queue out packets to be sent through the session.  
@@ -16,12 +25,14 @@ import net.i2p.util.Log;
  *<p>
  * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession
  */
-class PacketQueue {
+class PacketQueue implements SendMessageStatusListener {
     private final I2PAppContext _context;
     private final Log _log;
     private final I2PSession _session;
     private final ConnectionManager _connectionManager;
     private final ByteCache _cache = ByteCache.getInstance(64, 36*1024);
+    private final Map<Long, Connection> _messageStatusMap;
+    private volatile boolean _dead;
     
     private static final int FLAGS_INITIAL_TAGS = Packet.FLAG_SYNCHRONIZE;
     private static final int FLAGS_FINAL_TAGS = Packet.FLAG_CLOSE |
@@ -32,14 +43,27 @@ class PacketQueue {
     private static final int TAG_WINDOW_FACTOR = 5;
     private static final int FINAL_TAGS_TO_SEND = 4;
     private static final int FINAL_TAG_THRESHOLD = 2;
+    private static final long REMOVE_EXPIRED_TIME = 67*1000;
 
     public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
         _context = context;
         _session = session;
         _connectionManager = mgr;
         _log = context.logManager().getLog(PacketQueue.class);
+        _messageStatusMap = new ConcurrentHashMap<Long, Connection>(16);
+        new RemoveExpired();
         // all createRateStats in ConnectionManager
     }
+
+    /**
+     * Cannot be restarted.
+     *
+     * @since 0.9.14
+     */
+    public void close() {
+        _dead = true;
+        _messageStatusMap.clear();
+    }
     
     /**
      * Add a new packet to be sent out ASAP
@@ -48,6 +72,8 @@ class PacketQueue {
      * @return true if sent
      */
     public boolean enqueue(PacketLocal packet) {
+        if (_dead)
+            return false;
         // this updates the ack/nack field
         packet.prepare();
         
@@ -102,10 +128,15 @@ class PacketQueue {
             SendMessageOptions options = new SendMessageOptions();
             if (expires > 0)
                 options.setDate(expires);
+            boolean listenForStatus = false;
             if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) {
                 Connection con = packet.getConnection();
-                if (con != null && con.isInbound())
-                    options.setSendLeaseSet(false);
+                if (con != null) {
+                    if (con.isInbound())
+                        options.setSendLeaseSet(false);
+                    else
+                        listenForStatus = true;
+                }
                 options.setTagsToSend(INITIAL_TAGS_TO_SEND);
                 options.setTagThreshold(MIN_TAG_THRESHOLD);
             } else if (packet.isFlagSet(FLAGS_FINAL_TAGS)) {
@@ -130,9 +161,17 @@ class PacketQueue {
                     options.setTagThreshold(thresh);
                 }
             }
-            sent = _session.sendMessage(packet.getTo(), buf, 0, size,
+            if (listenForStatus) {
+                long id = _session.sendMessage(packet.getTo(), buf, 0, size,
+                                 I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
+                                 options, this);
+                _messageStatusMap.put(Long.valueOf(id), packet.getConnection());
+                sent = true;
+            } else {
+                sent = _session.sendMessage(packet.getTo(), buf, 0, size,
                                  I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
                                  options);
+            }
             end = _context.clock().now();
             
             if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) ) 
@@ -192,5 +231,107 @@ class PacketQueue {
         }
         return sent;
     }
+
+    /**
+     * SendMessageStatusListener interface
+     *
+     * Tell the client of an update in the send status for a message
+     * previously sent with I2PSession.sendMessage().
+     * Multiple calls for a single message ID are possible.
+     *
+     * @param session session notifying
+     * @param msgId message number returned from a previous sendMessage() call
+     * @param status of the message, as defined in MessageStatusMessage and this class.
+     * @since 0.9.14
+     */
+    public void messageStatus(I2PSession session, long msgId, int status) {
+        if (_dead)
+            return;
+        Connection con = _messageStatusMap.get(Long.valueOf(msgId));
+        if (con == null) {
+            if (_log.shouldLog(Log.WARN))
+                _log.warn("Rcvd status " + status + " for msg " + msgId + " on unknown connection");
+            return;
+        }
+
+        switch (status) {
+            case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
+            case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
+                _messageStatusMap.remove(Long.valueOf(msgId));
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Rcvd soft failure status " + status + " for msg " + msgId + " on " + con);
+                break;
+
+            case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
+            case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
+            case SendMessageStatusListener.STATUS_CANCELLED:
+                IOException ioe = new I2PSocketException(status);
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Rcvd hard failure status " + status + " for msg " + msgId + " on " + con);
+                _messageStatusMap.remove(Long.valueOf(msgId));
+                con.getOutputStream().streamErrorOccurred(ioe);
+                con.getInputStream().streamErrorOccurred(ioe);
+                con.setConnectionError("barf boof bazzle code " + status);
+                con.disconnect(false);
+                break;
+
+            case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
+            case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
+            case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Rcvd success status " + status + " for msg " + msgId + " on " + con);
+                _messageStatusMap.remove(Long.valueOf(msgId));
+                break;
+
+            case MessageStatusMessage.STATUS_SEND_ACCEPTED:
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Rcvd accept status " + status + " for msg " + msgId + " on " + con);
+                break;
+
+            default:
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Rcvd unknown status " + status + " for msg " + msgId + " on " + con);
+                _messageStatusMap.remove(Long.valueOf(msgId));
+                break;
+        }
     
+    }
+
+    /**
+     *  Check for expired message states, without wastefully setting a timer for each
+     *  message.
+     *  @since 0.9.14
+     */
+    private class RemoveExpired extends SimpleTimer2.TimedEvent {
+        
+        public RemoveExpired() {
+             super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME);
+        }
+
+        public void timeReached() {
+            if (_dead)
+                return;
+            if (!_messageStatusMap.isEmpty()) {
+                for (Iterator<Connection> iter = _messageStatusMap.values().iterator(); iter.hasNext(); ) {
+                    Connection con = iter.next();
+                    if (!con.getIsConnected() || con.getLifetime() > 2*60*1000L)
+                        iter.remove();
+                }
+            }
+            schedule(REMOVE_EXPIRED_TIME);
+        }
+    }
 }
diff --git a/core/java/src/net/i2p/client/MessageState.java b/core/java/src/net/i2p/client/MessageState.java
index ea1b4284dc..e56eb9a8a3 100644
--- a/core/java/src/net/i2p/client/MessageState.java
+++ b/core/java/src/net/i2p/client/MessageState.java
@@ -165,6 +165,7 @@ class MessageState {
             case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
                 // trumps all
                 _state = State.SUCCESS;
+                break;
 
             default:
                 break;
-- 
GitLab