diff --git a/apps/sam/java/src/net/i2p/sam/MasterSession.java b/apps/sam/java/src/net/i2p/sam/MasterSession.java index de4ebfde3e44dfbdb1ef43057f4c2017ce4647b8..7d5efd439f081297006515f918d8003ec67649c8 100644 --- a/apps/sam/java/src/net/i2p/sam/MasterSession.java +++ b/apps/sam/java/src/net/i2p/sam/MasterSession.java @@ -56,10 +56,6 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S public MasterSession(String nick, SAMv3DatagramServer dgServer, SAMv3Handler handler, Properties props) throws IOException, DataFormatException, SAMException { super(nick); - props.setProperty("net.i2p.streaming.enforceProtocol", "true"); - props.setProperty("i2cp.dontPublishLeaseSet", "false"); - props.setProperty("FROM_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED)); - props.setProperty("TO_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED)); dgs = dgServer; sessions = new ConcurrentHashMap<String, SAMMessageSess>(4); this.handler = handler; @@ -84,6 +80,8 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S * @return null for success, or error message */ public synchronized String add(String nick, String style, Properties props) { + if (props.containsKey("DESTINATION")) + return "SESSION ADD may not contain DESTINATION"; SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if (rec != null || sessions.containsKey(nick)) return "Duplicate ID " + nick; @@ -103,9 +101,11 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S } int listenProtocol; SAMMessageSess sess; - // temp + SAMv3Handler subhandler; try { I2PSession isess = socketMgr.getSession(); + subhandler = new SAMv3Handler(handler.getClientSocket(), handler.verMajor, + handler.verMinor, handler.getBridge()); if (style.equals("RAW")) { if (!props.containsKey("PORT")) return "RAW subsession must specify PORT"; @@ -124,21 +124,32 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S return "Bad LISTEN_PROTOCOL " + spr; } } - sess = new SAMv3RawSession(nick, props, handler, isess, listenProtocol, listenPort, dgs); + SAMv3RawSession ssess = new SAMv3RawSession(nick, props, handler, isess, listenProtocol, listenPort, dgs); + subhandler.setSession(ssess); + sess = ssess; } else if (style.equals("DATAGRAM")) { if (!props.containsKey("PORT")) return "DATAGRAM subsession must specify PORT"; listenProtocol = I2PSession.PROTO_DATAGRAM; - sess = new SAMv3DatagramSession(nick, props, handler, isess, listenPort, dgs); + SAMv3DatagramSession ssess = new SAMv3DatagramSession(nick, props, handler, isess, listenPort, dgs); + subhandler.setSession(ssess); + sess = ssess; } else if (style.equals("STREAM")) { listenProtocol = I2PSession.PROTO_STREAMING; // FIXME need something that hangs off an existing dest - sess = new SAMv3StreamSession(nick, props, handler, socketMgr, listenPort); + SAMv3StreamSession ssess = new SAMv3StreamSession(nick, props, handler, socketMgr, listenPort); + subhandler.setSession(ssess); + sess = ssess; } else { return "Unrecognized SESSION STYLE " + style; } - } catch (Exception e) { - // temp + } catch (IOException e) { + return e.toString(); + } catch (DataFormatException e) { + return e.toString(); + } catch (SAMException e) { + return e.toString(); + } catch (I2PSessionException e) { return e.toString(); } @@ -148,8 +159,7 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S return "Duplicate protocol " + listenProtocol + " and port " + listenPort; } - // add to session db and our map - rec = new SessionRecord(getDestination().toBase64(), props, handler); + rec = new SessionRecord(getDestination().toBase64(), props, subhandler); try { if (!SAMv3Handler.sSessionsHash.put(nick, rec)) return "Duplicate ID " + nick; diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index eb347b3ad2670dc4484248fcf59ed1ea5544fc19..a223b49f56bf0ebbdc9ed7e52b7b4bc2015852c0 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -29,6 +29,7 @@ import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PClient; import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; @@ -51,13 +52,9 @@ import net.i2p.util.Log; class SAMStreamSession implements SAMMessageSess { protected final Log _log; - protected final static int SOCKET_HANDLER_BUF_SIZE = 32768; - protected final SAMStreamReceiver recv; - protected final SAMStreamSessionServer server; - protected final I2PSocketManager socketMgr; /** stream id (Long) to SAMStreamSessionSocketReader */ @@ -163,12 +160,13 @@ class SAMStreamSession implements SAMMessageSess { if (_log.shouldLog(Log.DEBUG)) _log.debug("Creating I2PSocketManager..."); - socketMgr = I2PSocketManagerFactory.createManager(destStream, - i2cpHost, - i2cpPort, - allprops); - if (socketMgr == null) { - throw new SAMException("Error creating I2PSocketManager"); + try { + // we do it this way so we get exceptions + socketMgr = I2PSocketManagerFactory.createDisconnectedManager(destStream, + i2cpHost, i2cpPort, allprops); + socketMgr.getSession().connect(); + } catch (I2PSessionException ise) { + throw new SAMException("Error creating I2PSocketManager: " + ise.getMessage(), ise); } socketMgr.addDisconnectListener(new DisconnectListener()); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java index 66f76efc30723a8305bc9df41f6c279f6e1298a0..0033b3ee236a1df50f7981992b72ba9d400fea4d 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -133,7 +133,31 @@ class SAMv3Handler extends SAMv1Handler Session getSession() { return session; } - + + /** + * For subsessions created by MasterSession + * @since 0.9.25 + */ + void setSession(SAMv3RawSession sess) { + rawSession = sess; session = sess; + } + + /** + * For subsessions created by MasterSession + * @since 0.9.25 + */ + void setSession(SAMv3DatagramSession sess) { + datagramSession = sess; session = sess; + } + + /** + * For subsessions created by MasterSession + * @since 0.9.25 + */ + void setSession(SAMv3StreamSession sess) { + streamSession = sess; session = sess; + } + @Override public void handle() { String msg = null; @@ -435,7 +459,16 @@ class SAMv3Handler extends SAMv1Handler Properties allProps = new Properties(); allProps.putAll(i2cpProps); allProps.putAll(props); - + + if (style.equals("MASTER")) { + // We must put these here, as SessionRecord.getProps() makes a copy, + // and the socket manager is instantiated in the + // SAMStreamSession constructor. + allProps.setProperty("i2p.streaming.enforceProtocol", "true"); + allProps.setProperty("i2cp.dontPublishLeaseSet", "false"); + allProps.setProperty("FROM_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED)); + allProps.setProperty("TO_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED)); + } try { sSessionsHash.put( nick, new SessionRecord(dest, allProps, this) ) ; @@ -486,7 +519,7 @@ class SAMv3Handler extends SAMv1Handler } else if (opcode.equals("ADD") || opcode.equals("REMOVE")) { // prevent trouble in finally block ok = true; - if (streamSession != null || datagramSession != null || rawSession != null) + if (streamSession == null || datagramSession == null || rawSession == null) return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Not a MASTER session\"\n"); MasterSession msess = (MasterSession) session; String msg; @@ -512,14 +545,14 @@ class SAMv3Handler extends SAMv1Handler } catch (I2PSessionException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("I2P error when instantiating session", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n"); } catch (SAMException e) { if (_log.shouldLog(Log.INFO)) _log.info("Funny SAM error", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n"); } catch (IOException e) { _log.error("Unexpected IOException", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n"); } finally { // unregister the session if it has not been created if ( !ok && nick!=null ) { @@ -566,23 +599,21 @@ class SAMv3Handler extends SAMv1Handler } rec = sSessionsHash.get(nick); - if ( rec==null ) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM SESSION ID does not exist"); try { - notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID does not exist"); + notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID " + nick + " does not exist"); } catch (IOException e) {} return false ; } streamSession = rec.getHandler().streamSession ; - if (streamSession==null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("specified ID is not a stream session"); try { - notifyStreamResult(true, "I2P_ERROR", "specified ID is not a STREAM session"); + notifyStreamResult(true, "I2P_ERROR", "specified ID " + nick + " is not a STREAM session"); } catch (IOException e) {} return false ; } @@ -638,19 +669,19 @@ class SAMv3Handler extends SAMv1Handler } catch (DataFormatException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Invalid destination in STREAM CONNECT message"); - notifyStreamResult ( verbose, "INVALID_KEY", null ); + notifyStreamResult ( verbose, "INVALID_KEY", e.getMessage()); } catch (ConnectException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM CONNECT failed", e); - notifyStreamResult ( verbose, "CONNECTION_REFUSED", null ); + notifyStreamResult ( verbose, "CONNECTION_REFUSED", e.getMessage()); } catch (NoRouteToHostException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM CONNECT failed", e); - notifyStreamResult ( verbose, "CANT_REACH_PEER", null ); + notifyStreamResult ( verbose, "CANT_REACH_PEER", e.getMessage()); } catch (InterruptedIOException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM CONNECT failed", e); - notifyStreamResult ( verbose, "TIMEOUT", null ); + notifyStreamResult ( verbose, "TIMEOUT", e.getMessage()); } catch (I2PException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM CONNECT failed", e); @@ -702,7 +733,7 @@ class SAMv3Handler extends SAMv1Handler } catch (SAMException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM ACCEPT failed", e); - notifyStreamResult ( verbose, "ALREADY_ACCEPTING", null ); + notifyStreamResult ( verbose, "ALREADY_ACCEPTING", e.getMessage()); } } catch (IOException e) { } @@ -710,6 +741,11 @@ class SAMv3Handler extends SAMv1Handler } + /** + * @param verbose if false, does nothing + * @param result non-null + * @param message may be null + */ public void notifyStreamResult(boolean verbose, String result, String message) throws IOException { if (!verbose) return ; String msgString = createMessageString(message); diff --git a/apps/sam/java/src/net/i2p/sam/SessionRecord.java b/apps/sam/java/src/net/i2p/sam/SessionRecord.java index c154a4ae8add668a83dd2b7ded16d5ad792e619c..f50e6d5206b5b1bfc7618b4de8261286cced2197 100644 --- a/apps/sam/java/src/net/i2p/sam/SessionRecord.java +++ b/apps/sam/java/src/net/i2p/sam/SessionRecord.java @@ -42,6 +42,10 @@ class SessionRecord { return m_dest; } + /** + * Warning - returns a copy. + * @return a copy + */ synchronized public Properties getProps() { Properties p = new Properties(); diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java index dc90d3fb2423dd31688e7e7cbcef8a0411158617..04fea78bf302018ca3a8ac75d65b2dc2abb54645 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java @@ -18,6 +18,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { private String _version; private final Object _helloLock = new Object(); private Boolean _sessionCreateOk; + private Boolean _sessionAddOk; private Boolean _streamStatusOk; private final Object _sessionCreateLock = new Object(); private final Object _namingReplyLock = new Object(); @@ -41,13 +42,19 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } } + /** may be called twice, first for CREATE and second for ADD */ @Override public void sessionStatusReceived(String result, String destination, String msg) { synchronized (_sessionCreateLock) { + Boolean ok; if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result)) - _sessionCreateOk = Boolean.TRUE; + ok = Boolean.TRUE; else - _sessionCreateOk = Boolean.FALSE; + ok = Boolean.FALSE; + if (_sessionCreateOk == null) + _sessionCreateOk = ok; + else if (_sessionAddOk == null) + _sessionAddOk = ok; _sessionCreateLock.notifyAll(); } } @@ -120,6 +127,25 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } } + /** + * Wait for the session to be added, returning true if everything went ok + * + * @return true if everything ok + * @since 0.9.25 + */ + public boolean waitForSessionAddReply() { + while (true) { + try { + synchronized (_sessionCreateLock) { + if (_sessionAddOk == null) + _sessionCreateLock.wait(); + else + return _sessionAddOk.booleanValue(); + } + } catch (InterruptedException ie) { return false; } + } + } + /** * Wait for the stream to be created, returning true if everything went ok * diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java index 761e50e814aac27b7e3090146d42495cd569af80..1eb867ce435c49238f7f707c7df7735b0599e6e2 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -55,14 +55,17 @@ public class SAMStreamSend { private static I2PSSLSocketFactory _sslSocketFactory; private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4; - private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" + + private static final int MASTER=8; + private static final String USAGE = "Usage: SAMStreamSend [-s] [-x] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" + " modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" + " -s: use SSL\n" + + " -x: use master session (forces -v 3.3)\n" + " multiple -o session options are allowed"; public static void main(String args[]) { - Getopt g = new Getopt("SAM", args, "sb:m:o:p:u:v:w:"); + Getopt g = new Getopt("SAM", args, "sxb:m:o:p:u:v:w:"); boolean isSSL = false; + boolean isMaster = false; int mode = STREAM; String version = "1.0"; String host = "127.0.0.1"; @@ -77,6 +80,10 @@ public class SAMStreamSend { isSSL = true; break; + case 'x': + isMaster = true; + break; + case 'm': mode = Integer.parseInt(g.getOptarg()); if (mode < 0 || mode > V1RAW) { @@ -123,6 +130,10 @@ public class SAMStreamSend { System.err.println(USAGE); return; } + if (isMaster) { + mode += MASTER; + version = "3.3"; + } if ((user == null && password != null) || (user != null && password == null)) { System.err.println("both user and password or neither"); @@ -162,6 +173,8 @@ public class SAMStreamSend { _log.debug("Reader created"); OutputStream out = sock.getOutputStream(); String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts); + if (mode >= MASTER) + mode -= MASTER; if (ourDest == null) throw new IOException("handshake failed"); if (_log.shouldLog(Log.DEBUG)) @@ -230,7 +243,10 @@ public class SAMStreamSend { return sock; } - /** @return our b64 dest or null */ + /** + * @param isMaster is this the control socket + * @return our b64 dest or null + */ private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode, String user, String password, String opts) { @@ -261,6 +277,16 @@ public class SAMStreamSend { _v3ID = "xx€€xx" + _v3ID; _conOptions = "ID=" + _v3ID; } + boolean masterMode; // are we using v3.3 master session + String command; + if (mode >= MASTER) { + masterMode = true; + command = "ADD"; + mode -= MASTER; + } else { + masterMode = false; + command = "CREATE DESTINATION=TRANSIENT"; + } String style; if (mode == STREAM) style = "STREAM"; @@ -268,16 +294,33 @@ public class SAMStreamSend { style = "DATAGRAM"; else style = "RAW"; - String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n'; + + if (masterMode) { + String req = "SESSION CREATE DESTINATION=TRANSIENT STYLE=MASTER ID=master " + opts + '\n'; + samOut.write(req.getBytes("UTF-8")); + samOut.flush(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("SESSION CREATE STYLE=MASTER sent"); + boolean ok = eventHandler.waitForSessionCreateReply(); + if (!ok) + throw new IOException("SESSION CREATE STYLE=MASTER failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok); + } + String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + opts + '\n'; samOut.write(req.getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Session create sent"); - boolean ok = eventHandler.waitForSessionCreateReply(); + _log.debug("SESSION " + command + " sent"); + boolean ok; + if (masterMode) + ok = eventHandler.waitForSessionAddReply(); + else + ok = eventHandler.waitForSessionCreateReply(); if (!ok) - throw new IOException("Session create failed"); + throw new IOException("SESSION " + command + " failed"); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Session create reply found: " + ok); + _log.debug("SESSION " + command + " reply found: " + ok); req = "NAMING LOOKUP NAME=ME\n"; samOut.write(req.getBytes("UTF-8"));