diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandler.java b/apps/sam/java/src/net/i2p/sam/SAMHandler.java index 44441a4af1c19813130d54e1f373dca16ea74329..bfd146ffbba7fefc06da0ad51ac9055ed9391a5b 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandler.java @@ -43,8 +43,8 @@ public abstract class SAMHandler implements Runnable { * */ public void startHandling() { - thread = new I2PThread(this, "SAMHandler"); - thread.start(); + thread = new I2PThread(this, "SAMHandler"); + thread.start(); } /** @@ -61,13 +61,13 @@ public abstract class SAMHandler implements Runnable { * @param data A byte array to be written */ protected void writeBytes(byte[] data) throws IOException { - synchronized (socketWLock) { - if (socketOS == null) { - socketOS = socket.getOutputStream(); - } - socketOS.write(data); - socketOS.flush(); - } + synchronized (socketWLock) { + if (socketOS == null) { + socketOS = socket.getOutputStream(); + } + socketOS.write(data); + socketOS.flush(); + } } /** @@ -80,14 +80,14 @@ public abstract class SAMHandler implements Runnable { * @return True is the string was successfully written, false otherwise */ protected boolean writeString(String str) { - try { - writeBytes(str.getBytes("ISO-8859-1")); - } catch (IOException e) { - _log.debug("Caught IOException", e); - return false; - } - - return true; + try { + writeBytes(str.getBytes("ISO-8859-1")); + } catch (IOException e) { + _log.debug("Caught IOException", e); + return false; + } + + return true; } /** @@ -95,9 +95,9 @@ public abstract class SAMHandler implements Runnable { * */ public void stopHandling() { - synchronized (stopLock) { - stopHandler = true; - } + synchronized (stopLock) { + stopHandler = true; + } } /** @@ -106,9 +106,9 @@ public abstract class SAMHandler implements Runnable { * @return True if the handler should be stopped, false otherwise */ protected boolean shouldStop() { - synchronized (stopLock) { - return stopHandler; - } + synchronized (stopLock) { + return stopHandler; + } } /** @@ -119,6 +119,6 @@ public abstract class SAMHandler implements Runnable { public abstract String toString(); public final void run() { - handle(); + handle(); } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java index c598a0d501fda2ef606ed62d47d3e3ea86e60e00..f7fdb8249070bc16b3be9e47969cfee66e539fd2 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java @@ -35,145 +35,145 @@ public class SAMHandlerFactory { * @return A SAM protocol handler */ public static SAMHandler createSAMHandler(Socket s) throws SAMException { - BufferedReader br; - StringTokenizer tok; - - try { - br = new BufferedReader(new InputStreamReader(s.getInputStream(), - "ISO-8859-1")); - tok = new StringTokenizer(br.readLine(), " "); - } catch (IOException e) { - throw new SAMException("Error reading from socket: " - + e.getMessage()); - } catch (Exception e) { - throw new SAMException("Unexpected error: " - + e.getMessage()); - } - - // Message format: HELLO VERSION MIN=v1 MAX=v2 - if (tok.countTokens() != 4) { - throw new SAMException("Bad format in HELLO message"); - } - if (!tok.nextToken().equals("HELLO")) { - throw new SAMException("Bad domain in HELLO message"); - } - { - String opcode; - if (!(opcode = tok.nextToken()).equals("VERSION")) { - throw new SAMException("Unrecognized HELLO message opcode: \"" - + opcode + "\""); - } - } - - Properties props; - props = SAMUtils.parseParams(tok); - if (props == null) { - throw new SAMException("No parameters in HELLO VERSION message"); - } - - String minVer = props.getProperty("MIN"); - if (minVer == null) { - throw new SAMException("Missing MIN parameter in HELLO VERSION message"); - } - - String maxVer = props.getProperty("MAX"); - if (maxVer == null) { - throw new SAMException("Missing MAX parameter in HELLO VERSION message"); - } - - String ver = chooseBestVersion(minVer, maxVer); - if (ver == null) { - // Let's answer negatively - try { - OutputStream out = s.getOutputStream(); - out.write("HELLO REPLY RESULT=NOVERSION\n".getBytes("ISO-8859-1")); - return null; - } catch (UnsupportedEncodingException e) { - _log.error("Caught UnsupportedEncodingException (" - + e.getMessage() + ")"); - throw new SAMException("Character encoding error: " - + e.getMessage()); - } catch (IOException e) { - throw new SAMException("Error reading from socket: " - + e.getMessage()); - } - } - - // Let's answer positively - try { - OutputStream out = s.getOutputStream(); - out.write(("HELLO REPLY RESULT=OK VERSION=" - + ver + "\n").getBytes("ISO-8859-1")); - } catch (UnsupportedEncodingException e) { - _log.error("Caught UnsupportedEncodingException (" - + e.getMessage() + ")"); - throw new SAMException("Character encoding error: " - + e.getMessage()); - } catch (IOException e) { - throw new SAMException("Error writing to socket: " - + e.getMessage()); - } - - // ...and instantiate the right SAM handler - int verMajor = getMajor(ver); - int verMinor = getMinor(ver); - SAMHandler handler; - switch (verMajor) { - case 1: - handler = new SAMv1Handler(s, verMajor, verMinor); - break; - default: - _log.error("BUG! Trying to initialize the wrong SAM version!"); - throw new SAMException("BUG triggered! (handler instantiation)"); - } - - return handler; + BufferedReader br; + StringTokenizer tok; + + try { + br = new BufferedReader(new InputStreamReader(s.getInputStream(), + "ISO-8859-1")); + tok = new StringTokenizer(br.readLine(), " "); + } catch (IOException e) { + throw new SAMException("Error reading from socket: " + + e.getMessage()); + } catch (Exception e) { + throw new SAMException("Unexpected error: " + + e.getMessage()); + } + + // Message format: HELLO VERSION MIN=v1 MAX=v2 + if (tok.countTokens() != 4) { + throw new SAMException("Bad format in HELLO message"); + } + if (!tok.nextToken().equals("HELLO")) { + throw new SAMException("Bad domain in HELLO message"); + } + { + String opcode; + if (!(opcode = tok.nextToken()).equals("VERSION")) { + throw new SAMException("Unrecognized HELLO message opcode: \"" + + opcode + "\""); + } + } + + Properties props; + props = SAMUtils.parseParams(tok); + if (props == null) { + throw new SAMException("No parameters in HELLO VERSION message"); + } + + String minVer = props.getProperty("MIN"); + if (minVer == null) { + throw new SAMException("Missing MIN parameter in HELLO VERSION message"); + } + + String maxVer = props.getProperty("MAX"); + if (maxVer == null) { + throw new SAMException("Missing MAX parameter in HELLO VERSION message"); + } + + String ver = chooseBestVersion(minVer, maxVer); + if (ver == null) { + // Let's answer negatively + try { + OutputStream out = s.getOutputStream(); + out.write("HELLO REPLY RESULT=NOVERSION\n".getBytes("ISO-8859-1")); + return null; + } catch (UnsupportedEncodingException e) { + _log.error("Caught UnsupportedEncodingException (" + + e.getMessage() + ")"); + throw new SAMException("Character encoding error: " + + e.getMessage()); + } catch (IOException e) { + throw new SAMException("Error reading from socket: " + + e.getMessage()); + } + } + + // Let's answer positively + try { + OutputStream out = s.getOutputStream(); + out.write(("HELLO REPLY RESULT=OK VERSION=" + + ver + "\n").getBytes("ISO-8859-1")); + } catch (UnsupportedEncodingException e) { + _log.error("Caught UnsupportedEncodingException (" + + e.getMessage() + ")"); + throw new SAMException("Character encoding error: " + + e.getMessage()); + } catch (IOException e) { + throw new SAMException("Error writing to socket: " + + e.getMessage()); + } + + // ...and instantiate the right SAM handler + int verMajor = getMajor(ver); + int verMinor = getMinor(ver); + SAMHandler handler; + switch (verMajor) { + case 1: + handler = new SAMv1Handler(s, verMajor, verMinor); + break; + default: + _log.error("BUG! Trying to initialize the wrong SAM version!"); + throw new SAMException("BUG triggered! (handler instantiation)"); + } + + return handler; } /* Return the best version we can use, or null on failure */ private static String chooseBestVersion(String minVer, String maxVer) { - int minMajor = getMajor(minVer), minMinor = getMinor(minVer); - int maxMajor = getMajor(maxVer), maxMinor = getMinor(maxVer); - - // Consistency checks - if ((minMajor == -1) || (minMinor == -1) - || (maxMajor == -1) || (maxMinor == -1)) { - return null; - } - if (minMajor > maxMajor) { - return null; - } else if ((minMajor == maxMajor) && (minMinor > maxMinor)) { - return null; - } - - if ((minMajor >= 1) && (minMinor >= 0)) { - return "1.0"; - } - - return null; + int minMajor = getMajor(minVer), minMinor = getMinor(minVer); + int maxMajor = getMajor(maxVer), maxMinor = getMinor(maxVer); + + // Consistency checks + if ((minMajor == -1) || (minMinor == -1) + || (maxMajor == -1) || (maxMinor == -1)) { + return null; + } + if (minMajor > maxMajor) { + return null; + } else if ((minMajor == maxMajor) && (minMinor > maxMinor)) { + return null; + } + + if ((minMajor >= 1) && (minMinor >= 0)) { + return "1.0"; + } + + return null; } /* Get the major protocol version from a string */ private static int getMajor(String ver) { - try { - String major = ver.substring(0, ver.indexOf(".")); - return Integer.parseInt(major); - } catch (NumberFormatException e) { - return -1; - } catch (ArrayIndexOutOfBoundsException e) { - return -1; - } + try { + String major = ver.substring(0, ver.indexOf(".")); + return Integer.parseInt(major); + } catch (NumberFormatException e) { + return -1; + } catch (ArrayIndexOutOfBoundsException e) { + return -1; + } } /* Get the minor protocol version from a string */ private static int getMinor(String ver) { - try { - String major = ver.substring(ver.indexOf(".") + 1); - return Integer.parseInt(major); - } catch (NumberFormatException e) { - return -1; - } catch (ArrayIndexOutOfBoundsException e) { - return -1; - } + try { + String major = ver.substring(ver.indexOf(".") + 1); + return Integer.parseInt(major); + } catch (NumberFormatException e) { + return -1; + } catch (ArrayIndexOutOfBoundsException e) { + return -1; + } } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index dada05ceea3c0d40bb2df966830bf49e0f75b393..1be21a9df2f60fd513bde301fc09be61544d55a4 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -54,57 +54,78 @@ public class SAMStreamSession { private Object idLock = new Object(); private int lastNegativeId = 0; + // Can we create outgoing connections? + private boolean canCreate = false; + /** * Create a new SAM STREAM session. * * @param dest Base64-encoded destination (private key) + * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") * @param props Properties to setup the I2P session * @param recv Object that will receive incoming data */ - public SAMStreamSession(String dest, Properties props, - SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { - ByteArrayInputStream bais; + public SAMStreamSession(String dest, String dir, Properties props, + SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { + ByteArrayInputStream bais; - bais = new ByteArrayInputStream(Base64.decode(dest)); + bais = new ByteArrayInputStream(Base64.decode(dest)); - initSAMStreamSession(bais, props, recv); + initSAMStreamSession(bais, dir, props, recv); } /** * Create a new SAM STREAM session. * * @param destStream Input stream containing the destination keys + * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") * @param props Properties to setup the I2P session * @param recv Object that will receive incoming data */ - public SAMStreamSession(InputStream destStream, Properties props, - SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { - initSAMStreamSession(destStream, props, recv); + public SAMStreamSession(InputStream destStream, String dir, + Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { + initSAMStreamSession(destStream, dir, props, recv); } - private void initSAMStreamSession(InputStream destStream, Properties props, - SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException{ - this.recv = recv; + private void initSAMStreamSession(InputStream destStream, String dir, + Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException{ + this.recv = recv; - _log.debug("SAM STREAM session instantiated"); + _log.debug("SAM STREAM session instantiated"); Properties allprops = new Properties(); allprops.putAll(System.getProperties()); - allprops.putAll(props); + allprops.putAll(props); - // FIXME: we should setup I2CP host and port, too - _log.debug("Creating I2PSocketManager..."); + // FIXME: we should setup I2CP host and port, too + _log.debug("Creating I2PSocketManager..."); socketMgr = I2PSocketManagerFactory.createManager(destStream, - "127.0.0.1", - 7654, allprops); - if (socketMgr == null) { - throw new SAMException("Error creating I2PSocketManager"); - } - - server = new SAMStreamSessionServer(); - Thread t = new I2PThread(server, "SAMStreamSessionServer"); - - t.start(); + "127.0.0.1", + 7654, allprops); + if (socketMgr == null) { + throw new SAMException("Error creating I2PSocketManager"); + } + + boolean canReceive = false; + if (dir.equals("BOTH")) { + canCreate = true; + canReceive = true; + } else if (dir.equals("CREATE")) { + canCreate = true; + } else if (dir.equals("RECEIVE")) { + canReceive = true; + } else { + _log.error("BUG! Wrong direction passed to SAMStreamSession: " + + dir); + throw new SAMException("BUG! Wrong direction specified!"); + } + + if (canReceive) { + server = new SAMStreamSessionServer(); + Thread t = new I2PThread(server, "SAMStreamSessionServer"); + + t.start(); + } } /** @@ -113,7 +134,7 @@ public class SAMStreamSession { * @return The SAM STREAM session Destination. */ public Destination getDestination() { - return socketMgr.getSession().getMyDestination(); + return socketMgr.getSession().getMyDestination(); } /** @@ -123,25 +144,30 @@ public class SAMStreamSession { * @param dest Base64-encoded Destination to connect to * @param props Options to be used for connection */ - public boolean connect(int id, String dest, Properties props) throws I2PException, DataFormatException { - if (checkSocketHandlerId(id)) { - _log.debug("The specified id (" + id + ") is already in use"); - return false; - } + public boolean connect(int id, String dest, Properties props) throws I2PException, DataFormatException, SAMInvalidDirectionException { + if (!canCreate) { + _log.debug("Trying to create an outgoing connection using a receive-only session"); + throw new SAMInvalidDirectionException("Trying to create connections through a receive-only session"); + } - Destination d = new Destination(); - d.fromBase64(dest); + if (checkSocketHandlerId(id)) { + _log.debug("The specified id (" + id + ") is already in use"); + return false; + } - // FIXME: we should config I2PSocketOptions here + Destination d = new Destination(); + d.fromBase64(dest); + + // FIXME: we should config I2PSocketOptions here I2PSocketOptions opts = new I2PSocketOptions(); opts.setConnectTimeout(60 * 1000); - _log.debug("Connecting new I2PSocket..."); - I2PSocket i2ps = socketMgr.connect(d, opts); + _log.debug("Connecting new I2PSocket..."); + I2PSocket i2ps = socketMgr.connect(d, opts); - createSocketHandler(i2ps, id); + createSocketHandler(i2ps, id); - return true; + return true; } /** @@ -152,15 +178,15 @@ public class SAMStreamSession { * @return True if the data was sent, false otherwise */ public boolean sendBytes(int id, byte[] data) { - Destination d = new Destination(); - SAMStreamSessionSocketHandler handler = getSocketHandler(id); - - if (handler == null) { - _log.error("Trying to send bytes through inexistent handler " +id); - return false; - } - - return handler.sendBytes(data); + Destination d = new Destination(); + SAMStreamSessionSocketHandler handler = getSocketHandler(id); + + if (handler == null) { + _log.error("Trying to send bytes through inexistent handler " +id); + return false; + } + + return handler.sendBytes(data); } /** @@ -168,9 +194,11 @@ public class SAMStreamSession { * */ public void close() { - server.stopRunning(); - removeAllSocketHandlers(); - recv.stopStreamReceiving(); + if (server != null) { + server.stopRunning(); + } + removeAllSocketHandlers(); + recv.stopStreamReceiving(); } /** @@ -179,13 +207,13 @@ public class SAMStreamSession { * @param id Connection id */ public boolean closeConnection(int id) { - if (!checkSocketHandlerId(id)) { - _log.debug("The specified id (" + id + ") does not exist!"); - return false; - } - removeSocketHandler(id); + if (!checkSocketHandlerId(id)) { + _log.debug("The specified id (" + id + ") does not exist!"); + return false; + } + removeSocketHandler(id); - return true; + return true; } /** @@ -197,34 +225,34 @@ public class SAMStreamSession { * @return An id associated to the socket handler */ private int createSocketHandler(I2PSocket s, int id) { - SAMStreamSessionSocketHandler handler; - if (id == 0) { - id = createUniqueId(); - } - - try { - handler = new SAMStreamSessionSocketHandler(s, id); - } catch (IOException e) { - _log.error("IOException when creating SAM STREAM session socket handler", e); - recv.stopStreamReceiving(); - return 0; - } - - synchronized (handlersMapLock) { - handlersMap.put(new Integer(id), handler); - } - - I2PThread t = new I2PThread(handler, "SAMStreamSessionSocketHandler"); - t.start(); - - return id; + SAMStreamSessionSocketHandler handler; + if (id == 0) { + id = createUniqueId(); + } + + try { + handler = new SAMStreamSessionSocketHandler(s, id); + } catch (IOException e) { + _log.error("IOException when creating SAM STREAM session socket handler", e); + recv.stopStreamReceiving(); + return 0; + } + + synchronized (handlersMapLock) { + handlersMap.put(new Integer(id), handler); + } + + I2PThread t = new I2PThread(handler, "SAMStreamSessionSocketHandler"); + t.start(); + + return id; } /* Create an unique id, either positive or negative */ private int createUniqueId() { - synchronized (idLock) { - return --lastNegativeId; - } + synchronized (idLock) { + return --lastNegativeId; + } } /** @@ -233,9 +261,9 @@ public class SAMStreamSession { * @param id Handler id */ private SAMStreamSessionSocketHandler getSocketHandler(int id) { - synchronized (handlersMapLock) { - return (SAMStreamSessionSocketHandler)handlersMap.get(new Integer(id)); - } + synchronized (handlersMapLock) { + return (SAMStreamSessionSocketHandler)handlersMap.get(new Integer(id)); + } } /** @@ -244,9 +272,9 @@ public class SAMStreamSession { * @param id Handler id */ private boolean checkSocketHandlerId(int id) { - synchronized (handlersMapLock) { - return (!(handlersMap.get(new Integer(id)) == null)); - } + synchronized (handlersMapLock) { + return (!(handlersMap.get(new Integer(id)) == null)); + } } /** @@ -255,42 +283,41 @@ public class SAMStreamSession { * @param id Handler id to be removed */ private void removeSocketHandler(int id) { - SAMStreamSessionSocketHandler removed; - - synchronized (handlersMapLock) { - removed = (SAMStreamSessionSocketHandler)handlersMap.remove(new Integer(id)); - } - - if (removed == null) { - _log.error("BUG! Trying to remove inexistent SAM STREAM session socket handler " + id); - recv.stopStreamReceiving(); - } else { - removed.stopRunning(); - _log.debug("Removed SAM STREAM session socket handler " + id); - } + SAMStreamSessionSocketHandler removed; + + synchronized (handlersMapLock) { + removed = (SAMStreamSessionSocketHandler)handlersMap.remove(new Integer(id)); + } + + if (removed == null) { + _log.error("BUG! Trying to remove inexistent SAM STREAM session socket handler " + id); + recv.stopStreamReceiving(); + } else { + removed.stopRunning(); + _log.debug("Removed SAM STREAM session socket handler " + id); + } } /** * Remove and close all the socket handlers managed by this SAM * STREAM session. * - * @param id Handler id to be removed */ private void removeAllSocketHandlers() { - Integer id; - Set keySet; - Iterator iter; - - synchronized (handlersMapLock) { - keySet = handlersMap.keySet(); - iter = keySet.iterator(); - - while (iter.hasNext()) { - id = (Integer)iter.next(); - ((SAMStreamSessionSocketHandler)handlersMap.get(id)).stopRunning(); - } - handlersMap.clear(); - } + Integer id; + Set keySet; + Iterator iter; + + synchronized (handlersMapLock) { + keySet = handlersMap.keySet(); + iter = keySet.iterator(); + + while (iter.hasNext()) { + id = (Integer)iter.next(); + ((SAMStreamSessionSocketHandler)handlersMap.get(id)).stopRunning(); + } + handlersMap.clear(); + } } /** @@ -301,76 +328,76 @@ public class SAMStreamSession { */ public class SAMStreamSessionServer implements Runnable { - private Object runningLock = new Object(); - private boolean stillRunning = true; - - private I2PServerSocket serverSocket = null; - - /** - * Create a new SAM STREAM session server - * - */ - public SAMStreamSessionServer() { - _log.debug("Instantiating new SAM STREAM session server"); - - serverSocket = socketMgr.getServerSocket(); - } - - /** - * Stop a SAM STREAM session server - * - */ - public void stopRunning() { - _log.debug("SAMStreamSessionServer.stopRunning() invoked"); - synchronized (runningLock) { - if (stillRunning) { - stillRunning = false; - try { - serverSocket.close(); - } catch (I2PException e) { - _log.error("I2PException caught", e); - } - } - } - } - - public void run() { - _log.debug("SAM STREAM session server running"); - I2PSocket i2ps; - - while (stillRunning) { - try { - i2ps = serverSocket.accept(); - - _log.debug("New incoming connection"); - - int id = createSocketHandler(i2ps, 0); - if (id == 0) { - _log.error("SAM STREAM session handler not created!"); - i2ps.close(); - continue; - } - - _log.debug("New connection id: " + id); - recv.notifyStreamConnection(id, i2ps.getPeerDestination()); - } catch (I2PException e) { - _log.debug("Caught I2PException", e); - break; - } catch (IOException e) { - _log.debug("Caught IOException", e); - break; - } - } - - try { - serverSocket.close(); // In case it wasn't closed, yet - } catch (I2PException e) { - _log.debug("Caught I2PException", e); - } - - _log.debug("Shutting down SAM STREAM session server"); - } - + private Object runningLock = new Object(); + private boolean stillRunning = true; + + private I2PServerSocket serverSocket = null; + + /** + * Create a new SAM STREAM session server + * + */ + public SAMStreamSessionServer() { + _log.debug("Instantiating new SAM STREAM session server"); + + serverSocket = socketMgr.getServerSocket(); + } + + /** + * Stop a SAM STREAM session server + * + */ + public void stopRunning() { + _log.debug("SAMStreamSessionServer.stopRunning() invoked"); + synchronized (runningLock) { + if (stillRunning) { + stillRunning = false; + try { + serverSocket.close(); + } catch (I2PException e) { + _log.error("I2PException caught", e); + } + } + } + } + + public void run() { + _log.debug("SAM STREAM session server running"); + I2PSocket i2ps; + + while (stillRunning) { + try { + i2ps = serverSocket.accept(); + + _log.debug("New incoming connection"); + + int id = createSocketHandler(i2ps, 0); + if (id == 0) { + _log.error("SAM STREAM session handler not created!"); + i2ps.close(); + continue; + } + + _log.debug("New connection id: " + id); + recv.notifyStreamConnection(id, i2ps.getPeerDestination()); + } catch (I2PException e) { + _log.debug("Caught I2PException", e); + break; + } catch (IOException e) { + _log.debug("Caught IOException", e); + break; + } + } + + try { + serverSocket.close(); // In case it wasn't closed, yet + } catch (I2PException e) { + _log.debug("Caught I2PException", e); + } + + _log.debug("Shutting down SAM STREAM session server"); + } + } /** @@ -380,109 +407,109 @@ public class SAMStreamSession { * @author human */ public class SAMStreamSessionSocketHandler implements Runnable { - - private I2PSocket i2pSocket = null; - private OutputStream i2pSocketOS = null; - - private Object runningLock = new Object(); - private boolean stillRunning = true; - - private int id; - - /** - * Create a new SAM STREAM session socket handler - * - * @param s Socket to be handled - * @param id Unique id assigned to the handler - */ - public SAMStreamSessionSocketHandler(I2PSocket s, int id) throws IOException { - _log.debug("Instantiating new SAM STREAM session socket handler"); - - i2pSocket = s; - i2pSocketOS = s.getOutputStream(); - this.id = id; - } - - /** - * Send bytes through the SAM STREAM session socket handler - * - * @param data Data to be sent - * - * @return True if data has been sent without errors, false otherwise - */ - public boolean sendBytes(byte[] data) { - if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Handler " + id + ": sending " + data.length - + " bytes"); - } - try { - i2pSocketOS.write(data); - } catch (IOException e) { - _log.error("Error sending data through I2P socket", e); - return false; - } - - return true; - } - - /** - * Stop a SAM STREAM session socket handler - * - */ - public void stopRunning() { - _log.debug("stopRunning() invoked on socket handler " + id); - synchronized (runningLock) { - if (stillRunning) { - stillRunning = false; - try { - i2pSocket.close(); - } catch (IOException e) { - _log.debug("Caught IOException", e); - } - } - } - } - - public void run() { - _log.debug("SAM STREAM session socket handler running"); - - int read = -1; - byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE]; - - try { - InputStream in = i2pSocket.getInputStream(); - - while (stillRunning) { - read = in.read(data); - if (read == -1) { - _log.debug("Handler " + id + ": connection closed"); - break; - } - - recv.receiveStreamBytes(id, data, read); - } - } catch (IOException e) { - _log.debug("Caught IOException", e); - } - - try { - i2pSocket.close(); - } catch (IOException e) { - _log.debug("Caught IOException", e); - } - - if (stillRunning) { - removeSocketHandler(id); - // FIXME: we need error reporting here! - try { - recv.notifyStreamDisconnection(id, "OK", null); - } catch (IOException e) { - _log.debug("Error sending disconnection notice for handler " - + id, e); - } - } - - _log.debug("Shutting down SAM STREAM session socket handler " +id); - } + + private I2PSocket i2pSocket = null; + private OutputStream i2pSocketOS = null; + + private Object runningLock = new Object(); + private boolean stillRunning = true; + + private int id; + + /** + * Create a new SAM STREAM session socket handler + * + * @param s Socket to be handled + * @param id Unique id assigned to the handler + */ + public SAMStreamSessionSocketHandler(I2PSocket s, int id) throws IOException { + _log.debug("Instantiating new SAM STREAM session socket handler"); + + i2pSocket = s; + i2pSocketOS = s.getOutputStream(); + this.id = id; + } + + /** + * Send bytes through the SAM STREAM session socket handler + * + * @param data Data to be sent + * + * @return True if data has been sent without errors, false otherwise + */ + public boolean sendBytes(byte[] data) { + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("Handler " + id + ": sending " + data.length + + " bytes"); + } + try { + i2pSocketOS.write(data); + } catch (IOException e) { + _log.error("Error sending data through I2P socket", e); + return false; + } + + return true; + } + + /** + * Stop a SAM STREAM session socket handler + * + */ + public void stopRunning() { + _log.debug("stopRunning() invoked on socket handler " + id); + synchronized (runningLock) { + if (stillRunning) { + stillRunning = false; + try { + i2pSocket.close(); + } catch (IOException e) { + _log.debug("Caught IOException", e); + } + } + } + } + + public void run() { + _log.debug("SAM STREAM session socket handler running"); + + int read = -1; + byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE]; + + try { + InputStream in = i2pSocket.getInputStream(); + + while (stillRunning) { + read = in.read(data); + if (read == -1) { + _log.debug("Handler " + id + ": connection closed"); + break; + } + + recv.receiveStreamBytes(id, data, read); + } + } catch (IOException e) { + _log.debug("Caught IOException", e); + } + + try { + i2pSocket.close(); + } catch (IOException e) { + _log.debug("Caught IOException", e); + } + + if (stillRunning) { + removeSocketHandler(id); + // FIXME: we need error reporting here! + try { + recv.notifyStreamDisconnection(id, "OK", null); + } catch (IOException e) { + _log.debug("Error sending disconnection notice for handler " + + id, e); + } + } + + _log.debug("Shutting down SAM STREAM session socket handler " +id); + } } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index 583096c09cd5aaf010849b36d569ede78bdb8a01..1d930d1499f7298694e0d009ae11c8d8c17fb271 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -51,571 +51,587 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea * @param s Socket attached to a SAM client */ public SAMv1Handler(Socket s, int verMajor, int verMinor) throws SAMException{ - _log.debug("SAM version 1 handler instantiated"); + _log.debug("SAM version 1 handler instantiated"); - this.verMajor = verMajor; - this.verMinor = verMinor; + this.verMajor = verMajor; + this.verMinor = verMinor; - if ((this.verMajor != 1) || (this.verMinor != 0)) { - throw new SAMException("BUG! Wrong protocol version!"); - } + if ((this.verMajor != 1) || (this.verMinor != 0)) { + throw new SAMException("BUG! Wrong protocol version!"); + } - this.socket = s; - this.verMajor = verMajor; - this.verMinor = verMinor; + this.socket = s; + this.verMajor = verMajor; + this.verMinor = verMinor; } public void handle() { - String msg, domain, opcode; - boolean canContinue = false; - ByteArrayOutputStream buf = new ByteArrayOutputStream(IN_BUFSIZE); - StringTokenizer tok; - Properties props; - - this.thread.setName("SAMv1Handler"); - _log.debug("SAM handling started"); - - try { - InputStream in = socket.getInputStream(); - int b = -1; - - while (true) { - if (shouldStop()) { - _log.debug("Stop request found"); - break; - } - - while ((b = in.read()) != -1) { - if (b == '\n') { - break; - } - buf.write(b); - } - if (b == -1) { - _log.debug("Connection closed by client"); - break; - } - - msg = buf.toString("ISO-8859-1"); - if (_log.shouldLog(Log.DEBUG)) { - _log.debug("New message received: " + msg); - } - buf.reset(); - - tok = new StringTokenizer(msg, " "); - if (tok.countTokens() < 2) { - // This is not a correct message, for sure - _log.debug("Error in message format"); - break; - } - domain = tok.nextToken(); - opcode = tok.nextToken(); - if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Parsing (domain: \"" + domain - + "\"; opcode: \"" + opcode + "\")"); - } - props = SAMUtils.parseParams(tok); - - if (domain.equals("STREAM")) { - canContinue = execStreamMessage(opcode, props); - } else if (domain.equals("RAW")) { - canContinue = execRawMessage(opcode, props); - } else if (domain.equals("SESSION")) { - canContinue = execSessionMessage(opcode, props); - } else if (domain.equals("DEST")) { - canContinue = execDestMessage(opcode, props); - } else if (domain.equals("NAMING")) { - canContinue = execNamingMessage(opcode, props); - } else { - _log.debug("Unrecognized message domain: \"" - + domain + "\""); - break; - } - - if (!canContinue) { - break; - } - } - } catch (UnsupportedEncodingException e) { - _log.error("Caught UnsupportedEncodingException (" - + e.getMessage() + ")"); - } catch (IOException e) { - _log.debug("Caught IOException (" - + e.getMessage() + ")"); - } catch (Exception e) { - _log.error("Unexpected exception", e); - } finally { - _log.debug("Stopping handler"); - try { - this.socket.close(); - } catch (IOException e) { - _log.error("Error closing socket: " + e.getMessage()); - } - if (rawSession != null) { - rawSession.close(); - } - if (datagramSession != null) { - datagramSession.close(); - } - if (streamSession != null) { - streamSession.close(); - } - } + String msg, domain, opcode; + boolean canContinue = false; + ByteArrayOutputStream buf = new ByteArrayOutputStream(IN_BUFSIZE); + StringTokenizer tok; + Properties props; + + this.thread.setName("SAMv1Handler"); + _log.debug("SAM handling started"); + + try { + InputStream in = socket.getInputStream(); + int b = -1; + + while (true) { + if (shouldStop()) { + _log.debug("Stop request found"); + break; + } + + while ((b = in.read()) != -1) { + if (b == '\n') { + break; + } + buf.write(b); + } + if (b == -1) { + _log.debug("Connection closed by client"); + break; + } + + msg = buf.toString("ISO-8859-1"); + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("New message received: " + msg); + } + buf.reset(); + + tok = new StringTokenizer(msg, " "); + if (tok.countTokens() < 2) { + // This is not a correct message, for sure + _log.debug("Error in message format"); + break; + } + domain = tok.nextToken(); + opcode = tok.nextToken(); + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("Parsing (domain: \"" + domain + + "\"; opcode: \"" + opcode + "\")"); + } + props = SAMUtils.parseParams(tok); + + if (domain.equals("STREAM")) { + canContinue = execStreamMessage(opcode, props); + } else if (domain.equals("RAW")) { + canContinue = execRawMessage(opcode, props); + } else if (domain.equals("SESSION")) { + canContinue = execSessionMessage(opcode, props); + } else if (domain.equals("DEST")) { + canContinue = execDestMessage(opcode, props); + } else if (domain.equals("NAMING")) { + canContinue = execNamingMessage(opcode, props); + } else { + _log.debug("Unrecognized message domain: \"" + + domain + "\""); + break; + } + + if (!canContinue) { + break; + } + } + } catch (UnsupportedEncodingException e) { + _log.error("Caught UnsupportedEncodingException (" + + e.getMessage() + ")"); + } catch (IOException e) { + _log.debug("Caught IOException (" + + e.getMessage() + ")"); + } catch (Exception e) { + _log.error("Unexpected exception", e); + } finally { + _log.debug("Stopping handler"); + try { + this.socket.close(); + } catch (IOException e) { + _log.error("Error closing socket: " + e.getMessage()); + } + if (rawSession != null) { + rawSession.close(); + } + if (datagramSession != null) { + datagramSession.close(); + } + if (streamSession != null) { + streamSession.close(); + } + } } /* Parse and execute a SESSION message */ private boolean execSessionMessage(String opcode, Properties props) { - String dest = "BUG!"; - - try{ - if (opcode.equals("CREATE")) { - if ((rawSession != null) || (datagramSession != null) - || (streamSession != null)) { - _log.debug("Trying to create a session, but one still exists"); - return false; - } - if (props == null) { - _log.debug("No parameters specified in SESSION CREATE message"); - return false; - } - - dest = props.getProperty("DESTINATION"); - if (dest == null) { - _log.debug("SESSION DESTINATION parameter not specified"); - return false; - } - props.remove("DESTINATION"); - - if (dest.equals("TRANSIENT")) { - _log.debug("TRANSIENT destination requested"); - ByteArrayOutputStream priv = new ByteArrayOutputStream(); - SAMUtils.genRandomKey(priv, null); - - dest = Base64.encode(priv.toByteArray()); - } - - String style = props.getProperty("STYLE"); - if (style == null) { - _log.debug("SESSION STYLE parameter not specified"); - return false; - } - props.remove("STYLE"); - - if (style.equals("RAW")) { - rawSession = new SAMRawSession(dest, props, this); - } else if (style.equals("STREAM")) { - streamSession = new SAMStreamSession(dest, props, this); - } else { - _log.debug("Unrecognized SESSION STYLE: \"" + style + "\""); - return false; - } - return writeString("SESSION STATUS RESULT=OK DESTINATION=" - + dest + "\n"); - } else { - _log.debug("Unrecognized SESSION message opcode: \"" - + opcode + "\""); - return false; - } - } catch (DataFormatException e) { - _log.debug("Invalid destination specified"); - return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + "\n"); - } catch (I2PSessionException e) { - _log.debug("I2P error when instantiating session", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n"); - } catch (SAMException e) { - _log.error("Unexpected SAM error", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n"); - } catch (IOException e) { - _log.error("Unexpected IOException", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n"); - } + String dest = "BUG!"; + + try{ + if (opcode.equals("CREATE")) { + if ((rawSession != null) || (datagramSession != null) + || (streamSession != null)) { + _log.debug("Trying to create a session, but one still exists"); + return false; + } + if (props == null) { + _log.debug("No parameters specified in SESSION CREATE message"); + return false; + } + + dest = props.getProperty("DESTINATION"); + if (dest == null) { + _log.debug("SESSION DESTINATION parameter not specified"); + return false; + } + props.remove("DESTINATION"); + + if (dest.equals("TRANSIENT")) { + _log.debug("TRANSIENT destination requested"); + ByteArrayOutputStream priv = new ByteArrayOutputStream(); + SAMUtils.genRandomKey(priv, null); + + dest = Base64.encode(priv.toByteArray()); + } + + String style = props.getProperty("STYLE"); + if (style == null) { + _log.debug("SESSION STYLE parameter not specified"); + return false; + } + props.remove("STYLE"); + + if (style.equals("RAW")) { + rawSession = new SAMRawSession(dest, props, this); + } else if (style.equals("STREAM")) { + String dir = props.getProperty("DIRECTION"); + if (dir == null) { + _log.debug("No DIRECTION parameter in STREAM session"); + return false; + } + if (!dir.equals("CREATE") && !dir.equals("RECEIVE") + && !dir.equals("BOTH")) { + _log.debug("Unknow DIRECTION parameter value: " + dir); + return false; + } + props.remove("DIRECTION"); + + streamSession = new SAMStreamSession(dest, dir,props,this); + } else { + _log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); + return false; + } + return writeString("SESSION STATUS RESULT=OK DESTINATION=" + + dest + "\n"); + } else { + _log.debug("Unrecognized SESSION message opcode: \"" + + opcode + "\""); + return false; + } + } catch (DataFormatException e) { + _log.debug("Invalid destination specified"); + return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + "\n"); + } catch (I2PSessionException e) { + _log.debug("I2P error when instantiating session", e); + return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n"); + } catch (SAMException e) { + _log.error("Unexpected SAM error", e); + return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n"); + } catch (IOException e) { + _log.error("Unexpected IOException", e); + return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n"); + } } /* Parse and execute a DEST message*/ private boolean execDestMessage(String opcode, Properties props) { - if (opcode.equals("GENERATE")) { - if (props != null) { - _log.debug("Properties specified in DEST GENERATE message"); - return false; - } - - ByteArrayOutputStream priv = new ByteArrayOutputStream(); - ByteArrayOutputStream pub = new ByteArrayOutputStream(); - - SAMUtils.genRandomKey(priv, pub); - return writeString("DEST REPLY" - + " PUB=" - + Base64.encode(pub.toByteArray()) - + " PRIV=" - + Base64.encode(priv.toByteArray()) - + "\n"); - } else { - _log.debug("Unrecognized DEST message opcode: \"" + opcode + "\""); - return false; - } + if (opcode.equals("GENERATE")) { + if (props != null) { + _log.debug("Properties specified in DEST GENERATE message"); + return false; + } + + ByteArrayOutputStream priv = new ByteArrayOutputStream(); + ByteArrayOutputStream pub = new ByteArrayOutputStream(); + + SAMUtils.genRandomKey(priv, pub); + return writeString("DEST REPLY" + + " PUB=" + + Base64.encode(pub.toByteArray()) + + " PRIV=" + + Base64.encode(priv.toByteArray()) + + "\n"); + } else { + _log.debug("Unrecognized DEST message opcode: \"" + opcode + "\""); + return false; + } } /* Parse and execute a NAMING message */ private boolean execNamingMessage(String opcode, Properties props) { - if (opcode.equals("LOOKUP")) { - if (props == null) { - _log.debug("No parameters specified in NAMING LOOKUP message"); - return false; - } - - String name = props.getProperty("NAME"); - if (name == null) { - _log.debug("Name to resolve not specified in NAMING message"); - return false; - } - - Destination dest; - if (name.equals("ME")) { - if (rawSession != null) { - dest = rawSession.getDestination(); - } else if (streamSession != null) { - dest = streamSession.getDestination(); - } else if (datagramSession != null) { - dest = datagramSession.getDestination(); - } else { - _log.debug("Lookup for SESSION destination, but session is null"); - return false; - } - } else { - dest = SAMUtils.lookupHost(name, null); - } - - if (dest == null) { - return writeString("NAMING REPLY RESULT=KEY_NOT_FOUND\n"); - } - - return writeString("NAMING REPLY RESULT=OK NAME=" + name - + " VALUE=" - + SAMUtils.getBase64DestinationPubKey(dest) - + "\n"); - } else { - _log.debug("Unrecognized NAMING message opcode: \"" - + opcode + "\""); - return false; - } + if (opcode.equals("LOOKUP")) { + if (props == null) { + _log.debug("No parameters specified in NAMING LOOKUP message"); + return false; + } + + String name = props.getProperty("NAME"); + if (name == null) { + _log.debug("Name to resolve not specified in NAMING message"); + return false; + } + + Destination dest; + if (name.equals("ME")) { + if (rawSession != null) { + dest = rawSession.getDestination(); + } else if (streamSession != null) { + dest = streamSession.getDestination(); + } else if (datagramSession != null) { + dest = datagramSession.getDestination(); + } else { + _log.debug("Lookup for SESSION destination, but session is null"); + return false; + } + } else { + dest = SAMUtils.lookupHost(name, null); + } + + if (dest == null) { + return writeString("NAMING REPLY RESULT=KEY_NOT_FOUND\n"); + } + + return writeString("NAMING REPLY RESULT=OK NAME=" + name + + " VALUE=" + + SAMUtils.getBase64DestinationPubKey(dest) + + "\n"); + } else { + _log.debug("Unrecognized NAMING message opcode: \"" + + opcode + "\""); + return false; + } } /* Parse and execute a RAW message */ private boolean execRawMessage(String opcode, Properties props) { - if (rawSession == null) { - _log.debug("RAW message received, but no RAW session exists"); - return false; - } - - if (opcode.equals("SEND")) { - if (props == null) { - _log.debug("No parameters specified in RAW SEND message"); - return false; - } - - String dest = props.getProperty("DESTINATION"); - if (dest == null) { - _log.debug("Destination not specified in RAW SEND message"); - return false; - } - - int size; - { - String strsize = props.getProperty("SIZE"); - if (strsize == null) { - _log.debug("Size not specified in RAW SEND message"); - return false; - } - try { - size = Integer.parseInt(strsize); - } catch (NumberFormatException e) { - _log.debug("Invalid RAW SEND size specified: " + strsize); - return false; - } - if (!checkSize(size)) { - _log.debug("Specified size (" + size - + ") is out of protocol limits"); - return false; - } - } - - try { - DataInputStream in = new DataInputStream(socket.getInputStream()); - byte[] data = new byte[size]; - - in.readFully(data); - - if (!rawSession.sendBytes(dest, data)) { - _log.error("RAW SEND failed"); - return false; - } - - return true; - } catch (EOFException e) { - _log.debug("Too few bytes with RAW SEND message (expected: " - + size); - return false; - } catch (IOException e) { - _log.debug("Caught IOException while parsing RAW SEND message", - e); - return false; - } catch (DataFormatException e) { - _log.debug("Invalid key specified with RAW SEND message", - e); - return false; - } - } else { - _log.debug("Unrecognized RAW message opcode: \"" - + opcode + "\""); - return false; - } + if (rawSession == null) { + _log.debug("RAW message received, but no RAW session exists"); + return false; + } + + if (opcode.equals("SEND")) { + if (props == null) { + _log.debug("No parameters specified in RAW SEND message"); + return false; + } + + String dest = props.getProperty("DESTINATION"); + if (dest == null) { + _log.debug("Destination not specified in RAW SEND message"); + return false; + } + + int size; + { + String strsize = props.getProperty("SIZE"); + if (strsize == null) { + _log.debug("Size not specified in RAW SEND message"); + return false; + } + try { + size = Integer.parseInt(strsize); + } catch (NumberFormatException e) { + _log.debug("Invalid RAW SEND size specified: " + strsize); + return false; + } + if (!checkSize(size)) { + _log.debug("Specified size (" + size + + ") is out of protocol limits"); + return false; + } + } + + try { + DataInputStream in = new DataInputStream(socket.getInputStream()); + byte[] data = new byte[size]; + + in.readFully(data); + + if (!rawSession.sendBytes(dest, data)) { + _log.error("RAW SEND failed"); + return false; + } + + return true; + } catch (EOFException e) { + _log.debug("Too few bytes with RAW SEND message (expected: " + + size); + return false; + } catch (IOException e) { + _log.debug("Caught IOException while parsing RAW SEND message", + e); + return false; + } catch (DataFormatException e) { + _log.debug("Invalid key specified with RAW SEND message", + e); + return false; + } + } else { + _log.debug("Unrecognized RAW message opcode: \"" + + opcode + "\""); + return false; + } } /* Parse and execute a STREAM message */ private boolean execStreamMessage(String opcode, Properties props) { - if (streamSession == null) { - _log.debug("STREAM message received, but no STREAM session exists"); - return false; - } - - if (opcode.equals("SEND")) { - if (props == null) { - _log.debug("No parameters specified in STREAM SEND message"); - return false; - } - - int id; - { - String strid = props.getProperty("ID"); - if (strid == null) { - _log.debug("ID not specified in STREAM SEND message"); - return false; - } - try { - id = Integer.parseInt(strid); - } catch (NumberFormatException e) { - _log.debug("Invalid STREAM SEND ID specified: " + strid); - return false; - } - } - - int size; - { - String strsize = props.getProperty("SIZE"); - if (strsize == null) { - _log.debug("Size not specified in STREAM SEND message"); - return false; - } - try { - size = Integer.parseInt(strsize); - } catch (NumberFormatException e) { - _log.debug("Invalid STREAM SEND size specified: "+strsize); - return false; - } - if (!checkSize(size)) { - _log.debug("Specified size (" + size - + ") is out of protocol limits"); - return false; - } - } - - try { - DataInputStream in = new DataInputStream(socket.getInputStream()); - byte[] data = new byte[size]; - - in.readFully(data); - - if (!streamSession.sendBytes(id, data)) { - _log.error("STREAM SEND failed"); - return false; - } - - return true; - } catch (EOFException e) { - _log.debug("Too few bytes with RAW SEND message (expected: " - + size); - return false; - } catch (IOException e) { - _log.debug("Caught IOException while parsing RAW SEND message", - e); - return false; - } - } else if (opcode.equals("CONNECT")) { - if (props == null) { - _log.debug("No parameters specified in STREAM CONNECT message"); - return false; - } - - int id; - { - String strid = props.getProperty("ID"); - if (strid == null) { - _log.debug("ID not specified in STREAM SEND message"); - return false; - } - try { - id = Integer.parseInt(strid); - } catch (NumberFormatException e) { - _log.debug("Invalid STREAM CONNECT ID specified: " +strid); - return false; - } - if (id < 1) { - _log.debug("Invalid STREAM CONNECT ID specified: " +strid); - return false; - } - props.remove("ID"); - } - - String dest = props.getProperty("DESTINATION"); - if (dest == null) { - _log.debug("Destination not specified in RAW SEND message"); - return false; - } - props.remove("DESTINATION"); - - try { - if (!streamSession.connect(id, dest, props)) { - _log.debug("STREAM connection failed"); - return false; - } - return writeString("STREAM STATUS RESULT=OK ID=" + id + "\n"); - } catch (DataFormatException e) { - _log.debug("Invalid destination in STREAM CONNECT message"); - return writeString("STREAM STATUS RESULT=INVALID_KEY ID=" - + id + "\n"); - } catch (I2PException e) { - _log.debug("STREAM CONNECT failed: " + e.getMessage()); - return writeString("STREAM STATUS RESULT=I2P_ERROR ID=" - + id + "\n"); - } - } else if (opcode.equals("CLOSE")) { - if (props == null) { - _log.debug("No parameters specified in STREAM CLOSE message"); - return false; - } - - int id; - { - String strid = props.getProperty("ID"); - if (strid == null) { - _log.debug("ID not specified in STREAM CLOSE message"); - return false; - } - try { - id = Integer.parseInt(strid); - } catch (NumberFormatException e) { - _log.debug("Invalid STREAM CLOSE ID specified: " +strid); - return false; - } - } - - return streamSession.closeConnection(id); - } else { - _log.debug("Unrecognized RAW message opcode: \"" - + opcode + "\""); - return false; - } + if (streamSession == null) { + _log.debug("STREAM message received, but no STREAM session exists"); + return false; + } + + if (opcode.equals("SEND")) { + if (props == null) { + _log.debug("No parameters specified in STREAM SEND message"); + return false; + } + + int id; + { + String strid = props.getProperty("ID"); + if (strid == null) { + _log.debug("ID not specified in STREAM SEND message"); + return false; + } + try { + id = Integer.parseInt(strid); + } catch (NumberFormatException e) { + _log.debug("Invalid STREAM SEND ID specified: " + strid); + return false; + } + } + + int size; + { + String strsize = props.getProperty("SIZE"); + if (strsize == null) { + _log.debug("Size not specified in STREAM SEND message"); + return false; + } + try { + size = Integer.parseInt(strsize); + } catch (NumberFormatException e) { + _log.debug("Invalid STREAM SEND size specified: "+strsize); + return false; + } + if (!checkSize(size)) { + _log.debug("Specified size (" + size + + ") is out of protocol limits"); + return false; + } + } + + try { + DataInputStream in = new DataInputStream(socket.getInputStream()); + byte[] data = new byte[size]; + + in.readFully(data); + + if (!streamSession.sendBytes(id, data)) { + _log.error("STREAM SEND failed"); + return false; + } + + return true; + } catch (EOFException e) { + _log.debug("Too few bytes with RAW SEND message (expected: " + + size); + return false; + } catch (IOException e) { + _log.debug("Caught IOException while parsing RAW SEND message", + e); + return false; + } + } else if (opcode.equals("CONNECT")) { + if (props == null) { + _log.debug("No parameters specified in STREAM CONNECT message"); + return false; + } + + int id; + { + String strid = props.getProperty("ID"); + if (strid == null) { + _log.debug("ID not specified in STREAM SEND message"); + return false; + } + try { + id = Integer.parseInt(strid); + } catch (NumberFormatException e) { + _log.debug("Invalid STREAM CONNECT ID specified: " +strid); + return false; + } + if (id < 1) { + _log.debug("Invalid STREAM CONNECT ID specified: " +strid); + return false; + } + props.remove("ID"); + } + + String dest = props.getProperty("DESTINATION"); + if (dest == null) { + _log.debug("Destination not specified in RAW SEND message"); + return false; + } + props.remove("DESTINATION"); + + try { + if (!streamSession.connect(id, dest, props)) { + _log.debug("STREAM connection failed"); + return false; + } + return writeString("STREAM STATUS RESULT=OK ID=" + id + "\n"); + } catch (DataFormatException e) { + _log.debug("Invalid destination in STREAM CONNECT message"); + return writeString("STREAM STATUS RESULT=INVALID_KEY ID=" + + id + "\n"); + } catch (I2PException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + return writeString("STREAM STATUS RESULT=I2P_ERROR ID=" + + id + "\n"); + } catch (SAMInvalidDirectionException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + return writeString("STREAM STATUS RESULT=INVALID_DIRECTION ID=" + + id + "\n"); + } + } else if (opcode.equals("CLOSE")) { + if (props == null) { + _log.debug("No parameters specified in STREAM CLOSE message"); + return false; + } + + int id; + { + String strid = props.getProperty("ID"); + if (strid == null) { + _log.debug("ID not specified in STREAM CLOSE message"); + return false; + } + try { + id = Integer.parseInt(strid); + } catch (NumberFormatException e) { + _log.debug("Invalid STREAM CLOSE ID specified: " +strid); + return false; + } + } + + return streamSession.closeConnection(id); + } else { + _log.debug("Unrecognized RAW message opcode: \"" + + opcode + "\""); + return false; + } } public String toString() { - return "SAM v1 handler (client: " - + this.socket.getInetAddress().toString() + ":" - + this.socket.getPort() + ")"; + return "SAM v1 handler (client: " + + this.socket.getInetAddress().toString() + ":" + + this.socket.getPort() + ")"; } /* Check whether a size is inside the limits allowed by this protocol */ private boolean checkSize(int size) { - return ((size >= 1) && (size <= 32768)); + return ((size >= 1) && (size <= 32768)); } // SAMRawReceiver implementation public void receiveRawBytes(byte data[]) throws IOException { - if (rawSession == null) { - _log.error("BUG! Received raw bytes, but session is null!"); - throw new NullPointerException("BUG! RAW session is null!"); - } + if (rawSession == null) { + _log.error("BUG! Received raw bytes, but session is null!"); + throw new NullPointerException("BUG! RAW session is null!"); + } - ByteArrayOutputStream msg = new ByteArrayOutputStream(); + ByteArrayOutputStream msg = new ByteArrayOutputStream(); - msg.write(("RAW RECEIVED SIZE=" + data.length - + "\n").getBytes("ISO-8859-1")); - msg.write(data); + msg.write(("RAW RECEIVED SIZE=" + data.length + + "\n").getBytes("ISO-8859-1")); + msg.write(data); - writeBytes(msg.toByteArray()); + writeBytes(msg.toByteArray()); } public void stopRawReceiving() { - _log.debug("stopRawReceiving() invoked"); - - if (rawSession == null) { - _log.error("BUG! Got raw receiving stop, but session is null!"); - throw new NullPointerException("BUG! RAW session is null!"); - } - - try { - this.socket.close(); - } catch (IOException e) { - _log.error("Error closing socket: " + e.getMessage()); - } + _log.debug("stopRawReceiving() invoked"); + + if (rawSession == null) { + _log.error("BUG! Got raw receiving stop, but session is null!"); + throw new NullPointerException("BUG! RAW session is null!"); + } + + try { + this.socket.close(); + } catch (IOException e) { + _log.error("Error closing socket: " + e.getMessage()); + } } // SAMStreamReceiver implementation public void notifyStreamConnection(int id, Destination d) throws IOException { - if (streamSession == null) { - _log.error("BUG! Received stream connection, but session is null!"); - throw new NullPointerException("BUG! STREAM session is null!"); - } - - if (!writeString("STREAM CONNECTED DESTINATION=" - + SAMUtils.getBase64DestinationPubKey(d) - + " ID=" + id + "\n")) { - throw new IOException("Error notifying connection to SAM client"); - } + if (streamSession == null) { + _log.error("BUG! Received stream connection, but session is null!"); + throw new NullPointerException("BUG! STREAM session is null!"); + } + + if (!writeString("STREAM CONNECTED DESTINATION=" + + SAMUtils.getBase64DestinationPubKey(d) + + " ID=" + id + "\n")) { + throw new IOException("Error notifying connection to SAM client"); + } } public void receiveStreamBytes(int id, byte data[], int len) throws IOException { - if (streamSession == null) { - _log.error("Received stream bytes, but session is null!"); - throw new NullPointerException("BUG! STREAM session is null!"); - } + if (streamSession == null) { + _log.error("Received stream bytes, but session is null!"); + throw new NullPointerException("BUG! STREAM session is null!"); + } - ByteArrayOutputStream msg = new ByteArrayOutputStream(); + ByteArrayOutputStream msg = new ByteArrayOutputStream(); - msg.write(("STREAM RECEIVED ID=" + id - +" SIZE=" + len + "\n").getBytes("ISO-8859-1")); - msg.write(data); + msg.write(("STREAM RECEIVED ID=" + id + +" SIZE=" + len + "\n").getBytes("ISO-8859-1")); + msg.write(data); - writeBytes(msg.toByteArray()); + writeBytes(msg.toByteArray()); } public void notifyStreamDisconnection(int id, String result, String msg) throws IOException { - if (streamSession == null) { - _log.error("BUG! Received stream disconnection, but session is null!"); - throw new NullPointerException("BUG! STREAM session is null!"); - } - - // FIXME: msg should be escaped! - if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result - + (msg == null ? "" : (" MESSAGE=" + msg)) - + "\n")) { - throw new IOException("Error notifying disconnection to SAM client"); - } + if (streamSession == null) { + _log.error("BUG! Received stream disconnection, but session is null!"); + throw new NullPointerException("BUG! STREAM session is null!"); + } + + // FIXME: msg should be escaped! + if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result + + (msg == null ? "" : (" MESSAGE=" + msg)) + + "\n")) { + throw new IOException("Error notifying disconnection to SAM client"); + } } public void stopStreamReceiving() { - _log.debug("stopStreamReceiving() invoked"); - - if (streamSession == null) { - _log.error("BUG! Got stream receiving stop, but session is null!"); - throw new NullPointerException("BUG! STREAM session is null!"); - } - - try { - this.socket.close(); - } catch (IOException e) { - _log.error("Error closing socket: " + e.getMessage()); - } + _log.debug("stopStreamReceiving() invoked"); + + if (streamSession == null) { + _log.error("BUG! Got stream receiving stop, but session is null!"); + throw new NullPointerException("BUG! STREAM session is null!"); + } + + try { + this.socket.close(); + } catch (IOException e) { + _log.error("Error closing socket: " + e.getMessage()); + } } }