From 7b03c95cfd85cf41cc4a19a116d9b1a94eb588cd Mon Sep 17 00:00:00 2001
From: human <human>
Date: Wed, 14 Apr 2004 16:59:47 +0000
Subject: [PATCH] * Added STREAMing support; * added NAMING LOOKUP NAME=ME
 support; * various cleanups & fixes; * what else? (human)

---
 apps/sam/java/src/net/i2p/sam/SAMHandler.java |  22 +-
 .../java/src/net/i2p/sam/SAMRawReceiver.java  |   4 +-
 .../java/src/net/i2p/sam/SAMRawSession.java   |  40 +-
 .../src/net/i2p/sam/SAMStreamReceiver.java    |  51 ++
 .../src/net/i2p/sam/SAMStreamSession.java     | 488 ++++++++++++++++++
 apps/sam/java/src/net/i2p/sam/SAMUtils.java   |  34 +-
 .../java/src/net/i2p/sam/SAMv1Handler.java    | 465 ++++++++++++-----
 7 files changed, 953 insertions(+), 151 deletions(-)
 create mode 100644 apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java
 create mode 100644 apps/sam/java/src/net/i2p/sam/SAMStreamSession.java

diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandler.java b/apps/sam/java/src/net/i2p/sam/SAMHandler.java
index d5d4453cc4..44441a4af1 100644
--- a/apps/sam/java/src/net/i2p/sam/SAMHandler.java
+++ b/apps/sam/java/src/net/i2p/sam/SAMHandler.java
@@ -60,7 +60,7 @@ public abstract class SAMHandler implements Runnable {
      *
      * @param data A byte array to be written
      */
-    protected void writeBytes(byte[] data)  throws IOException {
+    protected void writeBytes(byte[] data) throws IOException {
 	synchronized (socketWLock) {
 	    if (socketOS == null) {
 		socketOS = socket.getOutputStream();
@@ -70,6 +70,26 @@ public abstract class SAMHandler implements Runnable {
 	}
     }
 
+    /**
+     * Write a string to the handler's socket.  This method must
+     * always be used when writing strings, unless you really know what
+     * you're doing.
+     *
+     * @param str A byte array to be written
+     *
+     * @return True is the string was successfully written, false otherwise
+     */
+    protected boolean writeString(String str) {
+	try {
+	    writeBytes(str.getBytes("ISO-8859-1"));
+	} catch (IOException e) {
+	    _log.debug("Caught IOException", e);
+	    return false;
+	}
+
+	return true;
+    }
+
     /**
      * Stop the SAM handler
      *
diff --git a/apps/sam/java/src/net/i2p/sam/SAMRawReceiver.java b/apps/sam/java/src/net/i2p/sam/SAMRawReceiver.java
index 95a0e9df7e..6149f348c5 100644
--- a/apps/sam/java/src/net/i2p/sam/SAMRawReceiver.java
+++ b/apps/sam/java/src/net/i2p/sam/SAMRawReceiver.java
@@ -19,7 +19,7 @@ public interface SAMRawReceiver {
      * Send a byte array to a SAM client, without informations
      * regarding the sender.
      *
-     * @param data Byte array to be written
+     * @param data Byte array to be received
      */
     public void receiveRawBytes(byte data[]) throws IOException;
 
@@ -27,5 +27,5 @@ public interface SAMRawReceiver {
      * Stop receiving data.
      *
      */
-    public void stopReceiving();
+    public void stopRawReceiving();
 }
diff --git a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java
index 1078ba6bb9..7ced494cc3 100644
--- a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java
+++ b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java
@@ -49,7 +49,7 @@ public class SAMRawSession {
      * @param recv Object that will receive incoming data
      */
     public SAMRawSession(String dest, Properties props,
-			 SAMRawReceiver recv) throws DataFormatException, I2PSessionException {
+			 SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
 	ByteArrayInputStream bais;
 
 	bais = new ByteArrayInputStream(Base64.decode(dest));
@@ -65,22 +65,31 @@ public class SAMRawSession {
      * @param recv Object that will receive incoming data
      */
     public SAMRawSession(InputStream destStream, Properties props,
-			 SAMRawReceiver recv) throws I2PSessionException {
+			 SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
 	initSAMRawSession(destStream, props, recv);
     }
 
     private void initSAMRawSession(InputStream destStream, Properties props,
-				   SAMRawReceiver recv) throws I2PSessionException {
+				   SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
 	this.recv = recv;
 
 	_log.debug("SAM RAW session instantiated");
 
 	handler = new SAMRawSessionHandler(destStream, props);
-	Thread t = new I2PThread(handler, "SAMRawSessionHandler");
 
+	Thread t = new I2PThread(handler, "SAMRawSessionHandler");
 	t.start();
     }
 
+    /**
+     * Get the SAM RAW session Destination.
+     *
+     * @return The SAM RAW session Destination.
+     */
+    public Destination getDestination() {
+	return session.getMyDestination();
+    }
+
     /**
      * Send bytes through a SAM RAW session.
      *
@@ -92,6 +101,10 @@ public class SAMRawSession {
 	Destination d = new Destination();
 	d.fromBase64(dest);
 
+	if (_log.shouldLog(Log.DEBUG)) {
+	    _log.debug("Sending " + data.length + " bytes to " + dest);
+	}
+
 	try {
 	    return session.sendMessage(d, data);
 	} catch (I2PSessionException e) {
@@ -158,17 +171,18 @@ public class SAMRawSession {
 			runningLock.wait();
 		    } catch (InterruptedException ie) {}
 		}
-		_log.debug("Shutting down SAM RAW session handler");
-
-		recv.stopReceiving();
+	    }
 
-		try {
-		    _log.debug("Destroying I2P session...");
-		    session.destroySession();
-		    _log.debug("I2P session destroyed");
-		} catch (I2PSessionException e) {
+	    _log.debug("Shutting down SAM RAW session handler");
+	    
+	    recv.stopRawReceiving();
+	    
+	    try {
+		_log.debug("Destroying I2P session...");
+		session.destroySession();
+		_log.debug("I2P session destroyed");
+	    } catch (I2PSessionException e) {
 		    _log.error("Error destroying I2P session", e);
-		}
 	    }
 	}
 	
diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java b/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java
new file mode 100644
index 0000000000..1a0fd48237
--- /dev/null
+++ b/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java
@@ -0,0 +1,51 @@
+package net.i2p.sam;
+/*
+ * free (adj.): unencumbered; not under the control of others
+ * Written by human in 2004 and released into the public domain 
+ * with no warranty of any kind, either expressed or implied.  
+ * It probably won't  make your computer catch on fire, or eat 
+ * your children, but it might.  Use at your own risk.
+ *
+ */
+
+import java.io.IOException;
+
+import net.i2p.data.Destination;
+
+/**
+ * Interface for sending streaming data to a SAM client
+ */
+public interface SAMStreamReceiver {
+
+    /**
+     * Notify about a new incoming connection
+     *
+     * @param id New connection id
+     */
+    public void notifyStreamConnection(int id, Destination dest) throws IOException;
+
+    /**
+     * Send a byte array to a SAM client.
+     *
+     * @param id Connection id
+     * @param data Byte array to be received
+     * @param len Number of bytes in data
+     */
+    public void receiveStreamBytes(int id, byte data[], int len) throws IOException;
+
+    /**
+     * Notify that a connection has been closed
+     * FIXME: this interface should be cleaner
+     *
+     * @param id Connection id
+     * @param result Disconnection reason ("OK" or something else)
+     * @param msg Error message, if any
+     */
+    public void notifyStreamDisconnection(int id, String result, String msg) throws IOException;
+
+    /**
+     * Stop receiving data.
+     *
+     */
+    public void stopStreamReceiving();
+}
diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java
new file mode 100644
index 0000000000..dada05ceea
--- /dev/null
+++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java
@@ -0,0 +1,488 @@
+package net.i2p.sam;
+/*
+ * free (adj.): unencumbered; not under the control of others
+ * Written by human in 2004 and released into the public domain 
+ * with no warranty of any kind, either expressed or implied.  
+ * It probably won't  make your computer catch on fire, or eat 
+ * your children, but it might.  Use at your own risk.
+ *
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+
+import net.i2p.client.streaming.I2PServerSocket;
+import net.i2p.client.streaming.I2PSocket;
+import net.i2p.client.streaming.I2PSocketManager;
+import net.i2p.client.streaming.I2PSocketManagerFactory;
+import net.i2p.client.streaming.I2PSocketOptions;
+import net.i2p.data.Base64;
+import net.i2p.data.DataFormatException;
+import net.i2p.data.Destination;
+import net.i2p.I2PException;
+import net.i2p.util.HexDump;
+import net.i2p.util.I2PThread;
+import net.i2p.util.Log;
+
+/**
+ * SAM STREAM session class.
+ *
+ * @author human
+ */
+public class SAMStreamSession {
+
+    private final static Log _log = new Log(SAMStreamSession.class);
+
+    private final static int SOCKET_HANDLER_BUF_SIZE = 32768;
+
+    private SAMStreamReceiver recv = null;
+
+    private SAMStreamSessionServer server = null;
+
+    private I2PSocketManager socketMgr = null;
+
+    private Object handlersMapLock = new Object();
+    private HashMap handlersMap = new HashMap();
+
+    private Object idLock = new Object();
+    private int lastNegativeId = 0;
+
+    /**
+     * Create a new SAM STREAM session.
+     *
+     * @param dest Base64-encoded destination (private key)
+     * @param props Properties to setup the I2P session
+     * @param recv Object that will receive incoming data
+     */
+    public SAMStreamSession(String dest, Properties props,
+			    SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
+	ByteArrayInputStream bais;
+
+	bais = new ByteArrayInputStream(Base64.decode(dest));
+
+	initSAMStreamSession(bais, props, recv);
+    }
+
+    /**
+     * Create a new SAM STREAM session.
+     *
+     * @param destStream Input stream containing the destination keys
+     * @param props Properties to setup the I2P session
+     * @param recv Object that will receive incoming data
+     */
+    public SAMStreamSession(InputStream destStream, Properties props,
+			    SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
+	initSAMStreamSession(destStream, props, recv);
+    }
+
+    private void initSAMStreamSession(InputStream destStream, Properties props,
+				      SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException{
+	this.recv = recv;
+
+	_log.debug("SAM STREAM session instantiated");
+
+        Properties allprops = new Properties();
+        allprops.putAll(System.getProperties());
+	allprops.putAll(props);
+
+	// FIXME: we should setup I2CP host and port, too
+	_log.debug("Creating I2PSocketManager...");
+        socketMgr = I2PSocketManagerFactory.createManager(destStream,
+							  "127.0.0.1",
+							  7654, allprops);
+	if (socketMgr == null) {
+	    throw new SAMException("Error creating I2PSocketManager");
+	}
+
+	server = new SAMStreamSessionServer();
+	Thread t = new I2PThread(server, "SAMStreamSessionServer");
+
+	t.start();
+    }
+
+    /**
+     * Get the SAM STREAM session Destination.
+     *
+     * @return The SAM STREAM session Destination.
+     */
+    public Destination getDestination() {
+	return socketMgr.getSession().getMyDestination();
+    }
+
+    /**
+     * Connect the SAM STREAM session to the specified Destination
+     *
+     * @param id Unique id for the connection
+     * @param dest Base64-encoded Destination to connect to
+     * @param props Options to be used for connection
+     */
+    public boolean connect(int id, String dest, Properties props) throws I2PException, DataFormatException {
+	if (checkSocketHandlerId(id)) {
+	    _log.debug("The specified id (" + id + ") is already in use");
+	    return false;
+	}
+
+	Destination d = new Destination();
+	d.fromBase64(dest);
+
+	// FIXME: we should config I2PSocketOptions here
+        I2PSocketOptions opts = new I2PSocketOptions();
+        opts.setConnectTimeout(60 * 1000);
+
+	_log.debug("Connecting new I2PSocket...");
+	I2PSocket i2ps = socketMgr.connect(d, opts);
+
+	createSocketHandler(i2ps, id);
+
+	return true;
+    }
+
+    /**
+     * Send bytes through a SAM STREAM session.
+     *
+     * @param data Bytes to be sent
+     *
+     * @return True if the data was sent, false otherwise
+     */
+    public boolean sendBytes(int id, byte[] data) {
+	Destination d = new Destination();
+	SAMStreamSessionSocketHandler handler = getSocketHandler(id);
+	
+	if (handler == null) {
+	    _log.error("Trying to send bytes through inexistent handler " +id);
+	    return false;
+	}
+
+	return handler.sendBytes(data);
+    }
+
+    /**
+     * Close a SAM STREAM session.
+     *
+     */
+    public void close() {
+	server.stopRunning();
+	removeAllSocketHandlers();
+	recv.stopStreamReceiving();
+    }
+
+    /**
+     * Close a connection managed by the SAM STREAM session.
+     *
+     * @param id Connection id
+     */
+    public boolean closeConnection(int id) {
+	if (!checkSocketHandlerId(id)) {
+	    _log.debug("The specified id (" + id + ") does not exist!");
+	    return false;
+	}
+	removeSocketHandler(id);
+
+	return true;
+    }
+
+    /** 
+     * Create a new SAM STREAM session socket handler, detaching its thread.
+     *
+     * @param s Socket to be handled
+     * @param id Socket id, or 0 if it must be auto-generated
+     *
+     * @return An id associated to the socket handler
+     */
+    private int createSocketHandler(I2PSocket s, int id) {
+	SAMStreamSessionSocketHandler handler;
+	if (id == 0) {
+	    id = createUniqueId();
+	}
+
+	try {
+	    handler = new SAMStreamSessionSocketHandler(s, id);
+	} catch (IOException e) {
+	    _log.error("IOException when creating SAM STREAM session socket handler", e);
+	    recv.stopStreamReceiving();
+	    return 0;
+	}
+
+	synchronized (handlersMapLock) {
+	    handlersMap.put(new Integer(id), handler);
+	}
+
+	I2PThread t = new I2PThread(handler, "SAMStreamSessionSocketHandler");
+	t.start();
+
+	return id;
+    }
+
+    /* Create an unique id, either positive or negative */
+    private int createUniqueId() {
+	synchronized (idLock) {
+	    return --lastNegativeId;
+	}
+    }
+
+    /**
+     * Get a SAM STREAM session socket handler.
+     *
+     * @param id Handler id
+     */
+    private SAMStreamSessionSocketHandler getSocketHandler(int id) {
+	synchronized (handlersMapLock) {
+	    return (SAMStreamSessionSocketHandler)handlersMap.get(new Integer(id));
+	}
+    }
+
+    /**
+     * Check whether a SAM STREAM session socket handler id is still in use.
+     *
+     * @param id Handler id
+     */
+    private boolean checkSocketHandlerId(int id) {
+	synchronized (handlersMapLock) {
+	    return (!(handlersMap.get(new Integer(id)) == null));
+	}
+    }
+
+    /**
+     * Remove and close a SAM STREAM session socket handler.
+     *
+     * @param id Handler id to be removed
+     */
+    private void removeSocketHandler(int id) {
+	SAMStreamSessionSocketHandler removed;
+
+	synchronized (handlersMapLock) {
+	    removed = (SAMStreamSessionSocketHandler)handlersMap.remove(new Integer(id));
+	}
+
+	if (removed == null) {
+	    _log.error("BUG! Trying to remove inexistent SAM STREAM session socket handler " + id);
+	    recv.stopStreamReceiving();
+	} else {
+	    removed.stopRunning();
+	    _log.debug("Removed SAM STREAM session socket handler " + id);
+	}
+    }
+
+    /**
+     * Remove and close all the socket handlers managed by this SAM
+     * STREAM session.
+     *
+     * @param id Handler id to be removed
+     */
+    private void removeAllSocketHandlers() {
+	Integer id;
+	Set keySet;
+	Iterator iter;
+
+	synchronized (handlersMapLock) {
+	    keySet = handlersMap.keySet();
+	    iter = keySet.iterator();
+	    
+	    while (iter.hasNext()) {
+		 id = (Integer)iter.next();
+		 ((SAMStreamSessionSocketHandler)handlersMap.get(id)).stopRunning();
+	    }
+	    handlersMap.clear();
+	}
+    }
+
+    /**
+     * SAM STREAM session server, running in its own thread.  It will
+     * wait for incoming connections from the I2P network.
+     *
+     * @author human
+     */
+    public class SAMStreamSessionServer implements Runnable {
+
+	private Object runningLock = new Object();
+	private boolean stillRunning = true;
+
+	private I2PServerSocket serverSocket = null;
+
+	/**
+	 * Create a new SAM STREAM session server
+	 *
+	 */
+	public SAMStreamSessionServer() {
+	    _log.debug("Instantiating new SAM STREAM session server");
+
+	    serverSocket = socketMgr.getServerSocket();
+	}
+
+	/**
+	 * Stop a SAM STREAM session server
+	 *
+	 */
+	public void stopRunning() {
+	    _log.debug("SAMStreamSessionServer.stopRunning() invoked");
+	    synchronized (runningLock) {
+		if (stillRunning) {
+		    stillRunning = false;
+		    try {
+			serverSocket.close();
+		    } catch (I2PException e) {
+			_log.error("I2PException caught", e);
+		    }
+		}
+	    }
+	}
+
+	public void run() {
+	    _log.debug("SAM STREAM session server running");
+	    I2PSocket i2ps;
+
+	    while (stillRunning) {
+		try {
+		    i2ps = serverSocket.accept();
+
+		    _log.debug("New incoming connection");
+
+		    int id = createSocketHandler(i2ps, 0);
+		    if (id == 0) {
+			_log.error("SAM STREAM session handler not created!");
+			i2ps.close();
+			continue;
+		    }
+
+		    _log.debug("New connection id: " + id);
+		    recv.notifyStreamConnection(id, i2ps.getPeerDestination());
+		} catch (I2PException e) {
+		    _log.debug("Caught I2PException", e);
+		    break;
+		} catch (IOException e) {
+		    _log.debug("Caught IOException", e);
+		    break;
+		}
+	    }
+
+	    try {
+		serverSocket.close(); // In case it wasn't closed, yet
+	    } catch (I2PException e) {
+		_log.debug("Caught I2PException", e);
+	    }
+
+	    _log.debug("Shutting down SAM STREAM session server");
+	}
+		
+    }
+
+    /**
+     * SAM STREAM socket handler, running in its own thread.  It forwards
+     * forward data to/from an I2P socket.
+     *
+     * @author human
+     */
+    public class SAMStreamSessionSocketHandler implements Runnable {
+	
+	private I2PSocket i2pSocket = null;
+	private OutputStream i2pSocketOS = null;
+
+	private Object runningLock = new Object();
+	private boolean stillRunning = true;
+
+	private int id;
+		
+	/**
+	 * Create a new SAM STREAM session socket handler
+	 *
+	 * @param s Socket to be handled
+	 * @param id Unique id assigned to the handler
+	 */
+	public SAMStreamSessionSocketHandler(I2PSocket s, int id) throws IOException {
+	    _log.debug("Instantiating new SAM STREAM session socket handler");
+
+	    i2pSocket = s;
+	    i2pSocketOS = s.getOutputStream();
+	    this.id = id;
+	}
+
+	/**
+	 * Send bytes through the SAM STREAM session socket handler
+	 *
+	 * @param data Data to be sent
+	 *
+	 * @return True if data has been sent without errors, false otherwise
+	 */
+	public boolean sendBytes(byte[] data) {
+	    if (_log.shouldLog(Log.DEBUG)) {
+		_log.debug("Handler " + id + ": sending " + data.length
+			   + " bytes");
+	    }
+	    try {
+		i2pSocketOS.write(data);
+	    } catch (IOException e) {
+		_log.error("Error sending data through I2P socket", e);
+		return false;
+	    }
+
+	    return true;
+	}
+
+	/**
+	 * Stop a SAM STREAM session socket handler
+	 *
+	 */
+	public void stopRunning() {
+	    _log.debug("stopRunning() invoked on socket handler " + id);
+	    synchronized (runningLock) {
+		if (stillRunning) {
+		    stillRunning = false;
+		    try {
+			i2pSocket.close();
+		    } catch (IOException e) {
+			_log.debug("Caught IOException", e);
+		    }
+		}
+	    }
+	}
+
+	public void run() {
+	    _log.debug("SAM STREAM session socket handler running");
+
+	    int read = -1;
+	    byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
+
+	    try {
+		InputStream in = i2pSocket.getInputStream();
+
+		while (stillRunning) {
+		    read = in.read(data);
+		    if (read == -1) {
+			_log.debug("Handler " + id + ": connection closed");
+			break;
+		    }
+		    
+		    recv.receiveStreamBytes(id, data, read);
+		}
+	    } catch (IOException e) {
+		_log.debug("Caught IOException", e);
+	    }
+	    
+	    try {
+		i2pSocket.close();
+	    } catch (IOException e) {
+		_log.debug("Caught IOException", e);
+	    }
+
+	    if (stillRunning) {
+		removeSocketHandler(id);
+		// FIXME: we need error reporting here!
+		try {
+		    recv.notifyStreamDisconnection(id, "OK", null);
+		} catch (IOException e) {
+		    _log.debug("Error sending disconnection notice for handler "
+			       + id, e);
+		}
+	    }
+
+	    _log.debug("Shutting down SAM STREAM session socket handler " +id);
+	}
+    }
+}
diff --git a/apps/sam/java/src/net/i2p/sam/SAMUtils.java b/apps/sam/java/src/net/i2p/sam/SAMUtils.java
index c22d0cec0b..778c0d06f0 100644
--- a/apps/sam/java/src/net/i2p/sam/SAMUtils.java
+++ b/apps/sam/java/src/net/i2p/sam/SAMUtils.java
@@ -8,6 +8,7 @@ package net.i2p.sam;
  *
  */
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Enumeration;
@@ -56,6 +57,28 @@ public class SAMUtils {
         }
     }
 
+    /**
+     * Get the Base64 representation of a Destination public key
+     *
+     * @param d A Destination
+     *
+     * @return A String representing the Destination public key
+     */
+    public static String getBase64DestinationPubKey(Destination d) {
+	ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+	try {
+	    d.writeBytes(baos);
+	    return Base64.encode(baos.toByteArray());
+	} catch (IOException e) {
+	    _log.error("getDestinationPubKey(): caught IOException", e);
+	    return null;
+	} catch (DataFormatException e) {
+	    _log.error("getDestinationPubKey(): caught DataFormatException",e);
+	    return null;
+	}
+    }
+
     /**
      * Check whether a base64-encoded dest is valid
      *
@@ -106,10 +129,10 @@ public class SAMUtils {
      *
      * @param tok A StringTokenizer pointing to the SAM parameters
      *
-     * @return A Properties object with the parsed SAM parameters
+     * @return Properties with the parsed SAM params, or null if none is found
      */
     public static Properties parseParams(StringTokenizer tok) {
-	int pos, ntoks = tok.countTokens();
+	int pos, nprops = 0, ntoks = tok.countTokens();
 	String token, param, value;
 	Properties props = new Properties();
 	
@@ -125,13 +148,18 @@ public class SAMUtils {
 	    value = token.substring(pos + 1);
 
 	    props.setProperty(param, value);
+	    nprops += 1;
 	}
 
 	if (_log.shouldLog(Log.DEBUG)) {
 	    _log.debug("Parsed properties: " + dumpProperties(props));
 	}
 
-	return props;
+	if (nprops != 0) {
+	    return props;
+	} else {
+	    return null;
+	}
     }
 
     /* Dump a Properties object in an human-readable form */
diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java
index bc02b5054e..583096c09c 100644
--- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java
+++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java
@@ -25,6 +25,7 @@ import net.i2p.client.I2PSessionException;
 import net.i2p.data.Base64;
 import net.i2p.data.DataFormatException;
 import net.i2p.data.Destination;
+import net.i2p.I2PException;
 import net.i2p.util.Log;
 
 /**
@@ -32,7 +33,7 @@ import net.i2p.util.Log;
  *
  * @author human
  */
-public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
+public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStreamReceiver {
     
     private final static Log _log = new Log(SAMv1Handler.class);
 
@@ -40,7 +41,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
 
     private SAMRawSession rawSession = null;
     private SAMRawSession datagramSession = null;
-    private SAMRawSession streamSession = null;
+    private SAMStreamSession streamSession = null;
 
     /**
      * Create a new SAM version 1 handler.  This constructor expects
@@ -69,6 +70,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
 	boolean canContinue = false;
 	ByteArrayOutputStream buf = new ByteArrayOutputStream(IN_BUFSIZE);
 	StringTokenizer tok;
+	Properties props;
 
 	this.thread.setName("SAMv1Handler");
 	_log.debug("SAM handling started");
@@ -108,17 +110,22 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
 		}
 		domain = tok.nextToken();
 		opcode = tok.nextToken();
+		if (_log.shouldLog(Log.DEBUG)) {
+		    _log.debug("Parsing (domain: \"" + domain
+			       + "\"; opcode: \"" + opcode + "\")");
+		}
+		props = SAMUtils.parseParams(tok);
 
-		_log.debug("Parsing (domain: \"" + domain + "\"; opcode: \""
-			   + opcode + "\")");
-		if (domain.equals("RAW")) {
-		    canContinue = execRawMessage(opcode, tok);
+		if (domain.equals("STREAM")) {
+		    canContinue = execStreamMessage(opcode, props);
+		} else if (domain.equals("RAW")) {
+		    canContinue = execRawMessage(opcode, props);
 		} else if (domain.equals("SESSION")) {
-		    canContinue = execSessionMessage(opcode, tok);
+		    canContinue = execSessionMessage(opcode, props);
 		} else if (domain.equals("DEST")) {
-		    canContinue = execDestMessage(opcode, tok);
+		    canContinue = execDestMessage(opcode, props);
 		} else if (domain.equals("NAMING")) {
-		    canContinue = execNamingMessage(opcode, tok);
+		    canContinue = execNamingMessage(opcode, props);
 		} else {
 		    _log.debug("Unrecognized message domain: \""
 			       + domain + "\"");
@@ -157,152 +164,137 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
     }
 
     /* Parse and execute a SESSION message */
-    private boolean execSessionMessage(String opcode, StringTokenizer tok) {
-	Properties props = null;
+    private boolean execSessionMessage(String opcode, Properties props) {
 
-	if (opcode.equals("CREATE")) {
-
-	    if ((rawSession != null) || (datagramSession != null)
-		|| (streamSession != null)) {
-		_log.debug("Trying to create a session, but one still exists");
-		return false;
-	    }
-	    props = SAMUtils.parseParams(tok);
-	    if (props == null) {
-		return false;
-	    }
-	    
-	    String dest = props.getProperty("DESTINATION");
-	    if (dest == null) {
-		_log.debug("SESSION DESTINATION parameter not specified");
-		return false;
-	    }
-	    props.remove("DESTINATION");
+	String dest = "BUG!";
 
-	    String style = props.getProperty("STYLE");
-	    if (style == null) {
-		_log.debug("SESSION STYLE parameter not specified");
-		return false;
-	    }
-	    props.remove("STYLE");
-
-	    try {
+	try{
+	    if (opcode.equals("CREATE")) {
+		if ((rawSession != null) || (datagramSession != null)
+		    || (streamSession != null)) {
+		    _log.debug("Trying to create a session, but one still exists");
+		    return false;
+		}
+		if (props == null) {
+		    _log.debug("No parameters specified in SESSION CREATE message");
+		    return false;
+		}
+		
+		dest = props.getProperty("DESTINATION");
+		if (dest == null) {
+		    _log.debug("SESSION DESTINATION parameter not specified");
+		    return false;
+		}
+		props.remove("DESTINATION");
+		
+		if (dest.equals("TRANSIENT")) {
+		    _log.debug("TRANSIENT destination requested");
+		    ByteArrayOutputStream priv = new ByteArrayOutputStream();
+		    SAMUtils.genRandomKey(priv, null);
+		    
+		    dest = Base64.encode(priv.toByteArray());
+		}
+		
+		String style = props.getProperty("STYLE");
+		if (style == null) {
+		    _log.debug("SESSION STYLE parameter not specified");
+		    return false;
+		}
+		props.remove("STYLE");
+		
 		if (style.equals("RAW")) {
-		    try {
-			if (dest.equals("TRANSIENT")) {
-			    _log.debug("TRANSIENT destination requested");
-			    ByteArrayOutputStream priv = new ByteArrayOutputStream();			
-			    SAMUtils.genRandomKey(priv, null);
-			    
-			    dest = Base64.encode(priv.toByteArray());
-			}
-			rawSession = new SAMRawSession (dest, props, this);
-			writeBytes(("SESSION STATUS RESULT=OK DESTINATION=" + dest + "\n").getBytes("ISO-8859-1"));
-		    } catch (DataFormatException e) {
-			_log.debug("Invalid destination specified");
-			writeBytes(("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + "\n").getBytes("ISO-8859-1"));
-			return true;
-		    } catch (I2PSessionException e) {
-			_log.debug("I2P error when instantiating RAW session", e);
-			writeBytes(("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n").getBytes("ISO-8859-1"));
-			return true;
-		    }
+		    rawSession = new SAMRawSession(dest, props, this);
+		} else if (style.equals("STREAM")) {
+		    streamSession = new SAMStreamSession(dest, props, this);
 		} else {
 		    _log.debug("Unrecognized SESSION STYLE: \"" + style + "\"");
 		    return false;
 		}
-	    } catch (UnsupportedEncodingException e) {
-		_log.error("Caught UnsupportedEncodingException ("
-			   + e.getMessage() + ")");
-		return false;
-	    } catch (IOException e) {
-		_log.error("Caught IOException while parsing SESSION message ("
-			   + e.getMessage() + ")");
+		return writeString("SESSION STATUS RESULT=OK DESTINATION="
+				   + dest + "\n");
+	    } else {
+		_log.debug("Unrecognized SESSION message opcode: \""
+			   + opcode + "\"");
 		return false;
 	    }
-	    
-	    return true;
-	} else {
-	    _log.debug("Unrecognized SESSION message opcode: \""
-		       + opcode + "\"");
-	    return false;
+	} catch (DataFormatException e) {
+	    _log.debug("Invalid destination specified");
+	    return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + "\n");
+	} catch (I2PSessionException e) {
+	    _log.debug("I2P error when instantiating session", e);
+	    return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n");
+	} catch (SAMException e) {
+	    _log.error("Unexpected SAM error", e);
+	    return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n");
+	} catch (IOException e) {
+	    _log.error("Unexpected IOException", e);
+	    return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n");
 	}
     }
 
     /* Parse and execute a DEST message*/
-    private boolean execDestMessage(String opcode, StringTokenizer tok) {
+    private boolean execDestMessage(String opcode, Properties props) {
 
 	if (opcode.equals("GENERATE")) {
-	    if (tok.countTokens() > 0) {
-		_log.debug("Bad format in DEST GENERATE message");
+	    if (props != null) {
+		_log.debug("Properties specified in DEST GENERATE message");
 		return false;
 	    }
 
-	    try {
-		ByteArrayOutputStream priv = new ByteArrayOutputStream();
-		ByteArrayOutputStream pub = new ByteArrayOutputStream();
-		
-		SAMUtils.genRandomKey(priv, pub);
-		writeBytes(("DEST REPLY"
-			    + " PUB="
-			    + Base64.encode(pub.toByteArray())
-			    + " PRIV="
-			    + Base64.encode(priv.toByteArray())
-			    + "\n").getBytes("ISO-8859-1"));
-	    } catch (UnsupportedEncodingException e) {
-		_log.error("Caught UnsupportedEncodingException ("
-			   + e.getMessage() + ")");
-		return false;
-	    } catch (IOException e) {
-		_log.debug("IOException while executing DEST message", e);
-		return false;
-	    }
+	    ByteArrayOutputStream priv = new ByteArrayOutputStream();
+	    ByteArrayOutputStream pub = new ByteArrayOutputStream();
+	    
+	    SAMUtils.genRandomKey(priv, pub);
+	    return writeString("DEST REPLY"
+			       + " PUB="
+			       + Base64.encode(pub.toByteArray())
+			       + " PRIV="
+			       + Base64.encode(priv.toByteArray())
+			       + "\n");
 	} else {
 	    _log.debug("Unrecognized DEST message opcode: \"" + opcode + "\"");
 	    return false;
 	}
-
-	return true;
     }
 
     /* Parse and execute a NAMING message */
-    private boolean execNamingMessage(String opcode, StringTokenizer tok) {
-	Properties props = null;
-
+    private boolean execNamingMessage(String opcode, Properties props) {
 	if (opcode.equals("LOOKUP")) {
-	    props = SAMUtils.parseParams(tok);
 	    if (props == null) {
+		_log.debug("No parameters specified in NAMING LOOKUP message");
 		return false;
 	    }
 	    
 	    String name = props.getProperty("NAME");
 	    if (name == null) {
-		_log.debug("Name to resolve not specified");
+		_log.debug("Name to resolve not specified in NAMING message");
 		return false;
 	    }
 
-	    try {
-		ByteArrayOutputStream pubKey = new ByteArrayOutputStream();
-		Destination dest = SAMUtils.lookupHost(name, pubKey);
-
-		if (dest == null) {
-		    writeBytes("NAMING REPLY RESULT=KEY_NOT_FOUND\n".getBytes("ISP-8859-1"));
-		    return true;
+	    Destination dest;
+	    if (name.equals("ME")) {
+		if (rawSession != null) {
+		    dest = rawSession.getDestination();
+		} else if (streamSession != null) {
+		    dest = streamSession.getDestination();
+		} else if (datagramSession != null) {
+		    dest = datagramSession.getDestination();
+		} else {
+		    _log.debug("Lookup for SESSION destination, but session is null");
+		    return false;
 		}
-		
-		writeBytes(("NAMING REPLY RESULT=OK NAME=" + name
-			    + " VALUE=" + Base64.encode(pubKey.toByteArray())
-			    + "\n").getBytes("ISO-8859-1"));
-		return true;
-	    } catch (UnsupportedEncodingException e) {
-		_log.error("Caught UnsupportedEncodingException ("
-			   + e.getMessage() + ")");
-		return false;
-	    } catch (IOException e) {
-		_log.debug("Caught IOException while parsing NAMING message",
-			   e);
-		return false;
+	    } else {
+		dest = SAMUtils.lookupHost(name, null);
 	    }
+	    
+	    if (dest == null) {
+		return writeString("NAMING REPLY RESULT=KEY_NOT_FOUND\n");
+	    }
+	    
+	    return writeString("NAMING REPLY RESULT=OK NAME=" + name
+			       + " VALUE="
+			       + SAMUtils.getBase64DestinationPubKey(dest)
+			       + "\n");
 	} else {
 	    _log.debug("Unrecognized NAMING message opcode: \""
 		       + opcode + "\"");
@@ -310,24 +302,16 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
 	}
     }
 
-    public String toString() {
-	return "SAM v1 handler (client: "
-	    + this.socket.getInetAddress().toString() + ":"
-	    + this.socket.getPort() + ")";
-    }
-
     /* Parse and execute a RAW message */
-    private boolean execRawMessage(String opcode, StringTokenizer tok) {
-	Properties props = null;
-
+    private boolean execRawMessage(String opcode, Properties props) {
 	if (rawSession == null) {
 	    _log.debug("RAW message received, but no RAW session exists");
 	    return false;
 	}
 
 	if (opcode.equals("SEND")) {
-	    props = SAMUtils.parseParams(tok);
 	    if (props == null) {
+		_log.debug("No parameters specified in RAW SEND message");
 		return false;
 	    }
 	    
@@ -389,6 +373,158 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
 	}
     }
 
+    /* Parse and execute a STREAM message */
+    private boolean execStreamMessage(String opcode, Properties props) {
+	if (streamSession == null) {
+	    _log.debug("STREAM message received, but no STREAM session exists");
+	    return false;
+	}
+
+	if (opcode.equals("SEND")) {
+	    if (props == null) {
+		_log.debug("No parameters specified in STREAM SEND message");
+		return false;
+	    }
+	    
+	    int id;
+	    {
+		String strid = props.getProperty("ID");
+		if (strid == null) {
+		    _log.debug("ID not specified in STREAM SEND message");
+		    return false;
+		}
+		try {
+		    id = Integer.parseInt(strid);
+		} catch (NumberFormatException e) {
+		    _log.debug("Invalid STREAM SEND ID specified: " + strid);
+		    return false;
+		}
+	    }
+
+	    int size;
+	    {
+		String strsize = props.getProperty("SIZE");
+		if (strsize == null) {
+		    _log.debug("Size not specified in STREAM SEND message");
+		    return false;
+		}
+		try {
+		    size = Integer.parseInt(strsize);
+		} catch (NumberFormatException e) {
+		    _log.debug("Invalid STREAM SEND size specified: "+strsize);
+		    return false;
+		}
+		if (!checkSize(size)) {
+		    _log.debug("Specified size (" + size
+			       + ") is out of protocol limits");
+		    return false;
+		}
+	    }
+
+	    try {
+		DataInputStream in = new DataInputStream(socket.getInputStream());
+		byte[] data = new byte[size];
+
+		in.readFully(data);
+
+		if (!streamSession.sendBytes(id, data)) {
+		    _log.error("STREAM SEND failed");
+		    return false;
+		}
+
+		return true;
+	    } catch (EOFException e) {
+		_log.debug("Too few bytes with RAW SEND message (expected: "
+			   + size);
+		return false;
+	    } catch (IOException e) {
+		_log.debug("Caught IOException while parsing RAW SEND message",
+			   e);
+		return false;
+	    }
+	} else if (opcode.equals("CONNECT")) {
+	    if (props == null) {
+		_log.debug("No parameters specified in STREAM CONNECT message");
+		return false;
+	    }
+	    
+	    int id;
+	    {
+		String strid = props.getProperty("ID");
+		if (strid == null) {
+		    _log.debug("ID not specified in STREAM SEND message");
+		    return false;
+		}
+		try {
+		    id = Integer.parseInt(strid);
+		} catch (NumberFormatException e) {
+		    _log.debug("Invalid STREAM CONNECT ID specified: " +strid);
+		    return false;
+		}
+		if (id < 1) {
+		    _log.debug("Invalid STREAM CONNECT ID specified: " +strid);
+		    return false;
+		}
+		props.remove("ID");
+	    }
+
+	    String dest = props.getProperty("DESTINATION");
+	    if (dest == null) {
+		_log.debug("Destination not specified in RAW SEND message");
+		return false;
+	    }
+	    props.remove("DESTINATION");
+
+	    try {
+		if (!streamSession.connect(id, dest, props)) {
+		    _log.debug("STREAM connection failed");
+		    return false;
+		}
+		return writeString("STREAM STATUS RESULT=OK ID=" + id + "\n");
+	    } catch (DataFormatException e) {
+		_log.debug("Invalid destination in STREAM CONNECT message");
+		return writeString("STREAM STATUS RESULT=INVALID_KEY ID="
+				   + id + "\n");
+	    } catch (I2PException e) {
+		_log.debug("STREAM CONNECT failed: " + e.getMessage());
+		return writeString("STREAM STATUS RESULT=I2P_ERROR ID="
+				   + id + "\n");
+	    }
+	} else if (opcode.equals("CLOSE")) {
+	    if (props == null) {
+		_log.debug("No parameters specified in STREAM CLOSE message");
+		return false;
+	    }
+	    
+	    int id;
+	    {
+		String strid = props.getProperty("ID");
+		if (strid == null) {
+		    _log.debug("ID not specified in STREAM CLOSE message");
+		    return false;
+		}
+		try {
+		    id = Integer.parseInt(strid);
+		} catch (NumberFormatException e) {
+		    _log.debug("Invalid STREAM CLOSE ID specified: " +strid);
+		    return false;
+		}
+	    }
+
+	    return streamSession.closeConnection(id);
+	} else {
+	    _log.debug("Unrecognized RAW message opcode: \""
+		       + opcode + "\"");
+	    return false;
+	}
+    }
+
+    public String toString() {
+	return "SAM v1 handler (client: "
+	    + this.socket.getInetAddress().toString() + ":"
+	    + this.socket.getPort() + ")";
+    }
+
     /* Check whether a size is inside the limits allowed by this protocol */
     private boolean checkSize(int size) {
 	return ((size >= 1) && (size <= 32768));
@@ -397,20 +533,85 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
     // SAMRawReceiver implementation
     public void receiveRawBytes(byte data[]) throws IOException {
 	if (rawSession == null) {
-	    _log.error("BUG! Trying to write raw bytes, but session is null!");
+	    _log.error("BUG! Received raw bytes, but session is null!");
 	    throw new NullPointerException("BUG! RAW session is null!");
 	}
 
 	ByteArrayOutputStream msg = new ByteArrayOutputStream();
 
-	msg.write(("RAW RECEIVED SIZE=" + data.length + "\n").getBytes());
+	msg.write(("RAW RECEIVED SIZE=" + data.length
+		   + "\n").getBytes("ISO-8859-1"));
 	msg.write(data);
 
 	writeBytes(msg.toByteArray());
     }
 
-    public void stopReceiving() {
-	_log.debug("stopReceiving() invoked");
+    public void stopRawReceiving() {
+	_log.debug("stopRawReceiving() invoked");
+
+	if (rawSession == null) {
+	    _log.error("BUG! Got raw receiving stop, but session is null!");
+	    throw new NullPointerException("BUG! RAW session is null!");
+	}
+
+	try {
+	    this.socket.close();
+	} catch (IOException e) {
+	    _log.error("Error closing socket: " + e.getMessage());
+	}
+    }
+
+    // SAMStreamReceiver implementation
+    public void notifyStreamConnection(int id, Destination d) throws IOException {
+	if (streamSession == null) {
+	    _log.error("BUG! Received stream connection, but session is null!");
+	    throw new NullPointerException("BUG! STREAM session is null!");
+	}
+
+	if (!writeString("STREAM CONNECTED DESTINATION="
+			 + SAMUtils.getBase64DestinationPubKey(d)
+			 + " ID=" + id + "\n")) {
+	    throw new IOException("Error notifying connection to SAM client");
+	}
+    }
+
+    public void receiveStreamBytes(int id, byte data[], int len) throws IOException {
+	if (streamSession == null) {
+	    _log.error("Received stream bytes, but session is null!");
+	    throw new NullPointerException("BUG! STREAM session is null!");
+	}
+
+	ByteArrayOutputStream msg = new ByteArrayOutputStream();
+
+	msg.write(("STREAM RECEIVED ID=" + id 
+		   +" SIZE=" + len + "\n").getBytes("ISO-8859-1"));
+	msg.write(data);
+
+	writeBytes(msg.toByteArray());
+    }
+
+    public void notifyStreamDisconnection(int id, String result, String msg) throws IOException {
+	if (streamSession == null) {
+	    _log.error("BUG! Received stream disconnection, but session is null!");
+	    throw new NullPointerException("BUG! STREAM session is null!");
+	}
+
+	// FIXME: msg should be escaped!
+	if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result
+			 + (msg == null ? "" : (" MESSAGE=" + msg))
+			 + "\n")) {
+	    throw new IOException("Error notifying disconnection to SAM client");
+	}
+    }
+
+    public void stopStreamReceiving() {
+	_log.debug("stopStreamReceiving() invoked");
+
+	if (streamSession == null) {
+	    _log.error("BUG! Got stream receiving stop, but session is null!");
+	    throw new NullPointerException("BUG! STREAM session is null!");
+	}
+
 	try {
 	    this.socket.close();
 	} catch (IOException e) {
-- 
GitLab