diff --git a/apps/sam/java/src/net/i2p/sam/MasterSession.java b/apps/sam/java/src/net/i2p/sam/MasterSession.java new file mode 100644 index 0000000000000000000000000000000000000000..67ff566b5627a8349f17e451349e086b7ae1543a --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/MasterSession.java @@ -0,0 +1,404 @@ +package net.i2p.sam; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import net.i2p.I2PException; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; +import net.i2p.client.I2PSessionMuxedListener; +import net.i2p.client.streaming.I2PServerSocket; +import net.i2p.client.streaming.I2PSocket; +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.util.I2PAppThread; +import net.i2p.util.Log; + +/** + * A session that does nothing, but implements interfaces for raw, datagram, and streaming + * for convenience. + * + * We extend SAMv3StreamSession as we must have it set up the I2PSession, in case + * user adds a STREAM session (and he probably will). + * This session receives all data from I2P, but you can't send any data on it. + * + * @since 0.9.25 + */ +class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, SAMRawReceiver, + SAMMessageSess, I2PSessionMuxedListener { + private final SAMv3Handler handler; + private final SAMv3DatagramServer dgs; + private final Map<String, SAMMessageSess> sessions; + private final StreamAcceptor streamAcceptor; + private static final String[] INVALID_OPTS = { "PORT", "HOST", "FROM_PORT", "TO_PORT", + "PROTOCOL", "LISTEN_PORT", "LISTEN_PROTOCOL" }; + + /** + * Build a Session according to information + * registered with the given nickname. + * + * Caller MUST call start(). + * + * @param nick nickname of the session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + */ + public MasterSession(String nick, SAMv3DatagramServer dgServer, SAMv3Handler handler, Properties props) + throws IOException, DataFormatException, SAMException { + super(nick); + for (int i = 0; i < INVALID_OPTS.length; i++) { + String p = INVALID_OPTS[i]; + if (props.containsKey(p)) + throw new SAMException("MASTER session options may not contain " + p); + } + dgs = dgServer; + sessions = new ConcurrentHashMap<String, SAMMessageSess>(4); + this.handler = handler; + I2PSession isess = socketMgr.getSession(); + // if we get a RAW session added with 0/0, it will replace this, + // and we won't add this back if removed. + isess.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); + streamAcceptor = new StreamAcceptor(); + } + + /** + * Overridden to start the acceptor. + */ + @Override + public void start() { + Thread t = new I2PAppThread(streamAcceptor, "SAMMasterAcceptor"); + t.start(); + } + + /** + * Add a session + * @return null for success, or error message + */ + public synchronized String add(String nick, String style, Properties props) { + if (props.containsKey("DESTINATION")) + return "SESSION ADD may not contain DESTINATION"; + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + if (rec != null || sessions.containsKey(nick)) + return "Duplicate ID " + nick; + int listenPort = I2PSession.PORT_ANY; + String slp = (String) props.remove("LISTEN_PORT"); + if (slp == null) + slp = props.getProperty("FROM_PORT"); + if (slp != null) { + try { + listenPort = Integer.parseInt(slp); + if (listenPort < 0 || listenPort > 65535) + return "Bad LISTEN_PORT " + slp; + // TODO enforce streaming listen port must be 0 or from port + } catch (NumberFormatException nfe) { + return "Bad LISTEN_PORT " + slp; + } + } + int listenProtocol; + SAMMessageSess sess; + SAMv3Handler subhandler; + try { + I2PSession isess = socketMgr.getSession(); + subhandler = new SAMv3Handler(handler.getClientSocket(), handler.verMajor, + handler.verMinor, handler.getBridge()); + if (style.equals("RAW")) { + if (!props.containsKey("PORT")) + return "RAW subsession must specify PORT"; + listenProtocol = I2PSession.PROTO_DATAGRAM_RAW; + String spr = (String) props.remove("LISTEN_PROTOCOL"); + if (spr == null) + spr = props.getProperty("PROTOCOL"); + if (spr != null) { + try { + listenProtocol = Integer.parseInt(spr); + // RAW can't listen on streaming protocol + if (listenProtocol < 0 || listenProtocol > 255 || + listenProtocol == I2PSession.PROTO_STREAMING) + return "Bad RAW LISTEN_PPROTOCOL " + spr; + } catch (NumberFormatException nfe) { + return "Bad LISTEN_PROTOCOL " + spr; + } + } + SAMv3RawSession ssess = new SAMv3RawSession(nick, props, handler, isess, listenProtocol, listenPort, dgs); + subhandler.setSession(ssess); + sess = ssess; + } else if (style.equals("DATAGRAM")) { + if (!props.containsKey("PORT")) + return "DATAGRAM subsession must specify PORT"; + listenProtocol = I2PSession.PROTO_DATAGRAM; + SAMv3DatagramSession ssess = new SAMv3DatagramSession(nick, props, handler, isess, listenPort, dgs); + subhandler.setSession(ssess); + sess = ssess; + } else if (style.equals("STREAM")) { + listenProtocol = I2PSession.PROTO_STREAMING; + // FIXME need something that hangs off an existing dest + SAMv3StreamSession ssess = new SAMv3StreamSession(nick, props, handler, socketMgr, listenPort); + subhandler.setSession(ssess); + sess = ssess; + } else { + return "Unrecognized SESSION STYLE " + style; + } + } catch (IOException e) { + return e.toString(); + } catch (DataFormatException e) { + return e.toString(); + } catch (SAMException e) { + return e.toString(); + } catch (I2PSessionException e) { + return e.toString(); + } + + for (SAMMessageSess s : sessions.values()) { + if (listenProtocol == s.getListenProtocol() && + listenPort == s.getListenPort()) + return "Duplicate protocol " + listenProtocol + " and port " + listenPort; + } + + rec = new SessionRecord(getDestination().toBase64(), props, subhandler); + try { + SAMv3Handler.sSessionsHash.putDupDestOK(nick, rec); + sessions.put(nick, sess); + } catch (SessionsDB.ExistingIdException e) { + return "Duplicate ID " + nick; + } + if (_log.shouldWarn()) + _log.warn("added " + style + " proto " + listenProtocol + " port " + listenPort); + + sess.start(); + // all ok + return null; + } + + /** + * Remove a session + * @return null for success, or error message + */ + public synchronized String remove(String nick, Properties props) { + boolean ok; + SAMMessageSess sess = sessions.remove(nick); + if (sess != null) { + ok = SAMv3Handler.sSessionsHash.del(nick); + sess.close(); + // TODO if 0/0, add back this as listener? + if (_log.shouldWarn()) + _log.warn("removed " + sess + " proto " + sess.getListenProtocol() + " port " + sess.getListenPort()); + } else { + ok = false; + } + if (!ok) + return "ID " + nick + " not found"; + // all ok + return null; + } + + /** + * @throws IOException always + */ + public void receiveDatagramBytes(Destination sender, byte[] data, int proto, + int fromPort, int toPort) throws IOException { + throw new IOException("master session"); + } + + /** + * Does nothing. + */ + public void stopDatagramReceiving() {} + + /** + * @throws IOException always + */ + public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException { + throw new IOException("master session"); + } + + /** + * Does nothing. + */ + public void stopRawReceiving() {} + + + + /////// stream session overrides + + /** @throws I2PException always */ + @Override + public void connect(SAMv3Handler handler, String dest, Properties props) throws I2PException { + throw new I2PException("master session"); + } + + /** @throws SAMException always */ + @Override + public void accept(SAMv3Handler handler, boolean verbose) throws SAMException { + throw new SAMException("master session"); + } + + /** @throws SAMException always */ + @Override + public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException { + throw new SAMException("master session"); + } + + /** does nothing */ + @Override + public void stopForwardingIncoming() {} + + + ///// SAMMessageSess interface + + @Override + public int getListenProtocol() { + return I2PSession.PROTO_ANY; + } + + @Override + public int getListenPort() { + return I2PSession.PORT_ANY; + } + + /** + * Close the master session + * Overridden to stop the acceptor. + */ + @Override + public void close() { + // close sessions? + streamAcceptor.stopRunning(); + super.close(); + } + + // I2PSessionMuxedImpl interface + + public void disconnected(I2PSession session) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("I2P session disconnected"); + close(); + } + + public void errorOccurred(I2PSession session, String message, + Throwable error) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("I2P error: " + message, error); + close(); + } + + public void messageAvailable(I2PSession session, int msgId, long size) { + messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED, + I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + } + + /** @since 0.9.24 */ + public void messageAvailable(I2PSession session, int msgId, long size, + int proto, int fromPort, int toPort) { + try { + byte msg[] = session.receiveMessage(msgId); + if (msg == null) + return; + messageReceived(msg, proto, fromPort, toPort); + } catch (I2PSessionException e) { + _log.error("Error fetching I2P message", e); + close(); + } + } + + public void reportAbuse(I2PSession session, int severity) { + _log.warn("Abuse reported (severity: " + severity + ")"); + close(); + } + + private void messageReceived(byte[] msg, int proto, int fromPort, int toPort) { + if (_log.shouldWarn()) + _log.warn("Unhandled message received, length = " + msg.length + + " protocol: " + proto + " from port: " + fromPort + " to port: " + toPort); + } + + private class StreamAcceptor implements Runnable { + + private volatile boolean stop; + + public StreamAcceptor() { + } + + public void stopRunning() { + stop = true; + } + + public void run() { + if (_log.shouldWarn()) + _log.warn("Stream acceptor started"); + final I2PServerSocket i2pss = socketMgr.getServerSocket(); + while (!stop) { + // wait and accept a connection from I2P side + I2PSocket i2ps; + try { + i2ps = i2pss.accept(); + if (i2ps == null) // never null as of 0.9.17 + continue; + } catch (SocketTimeoutException ste) { + continue; + } catch (ConnectException ce) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error accepting", ce); + try { Thread.sleep(50); } catch (InterruptedException ie) {} + continue; + } catch (I2PException ipe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error accepting", ipe); + break; + } + int port = i2ps.getLocalPort(); + SAMMessageSess foundSess = null; + Collection<SAMMessageSess> all = sessions.values(); + for (Iterator<SAMMessageSess> iter = all.iterator(); iter.hasNext(); ) { + SAMMessageSess sess = iter.next(); + if (sess.getListenProtocol() != I2PSession.PROTO_STREAMING) { + // remove as we may be going around again below + iter.remove(); + continue; + } + if (sess.getListenPort() == port) { + foundSess = sess; + break; + } + } + // We never send streaming out as a raw packet to a default listener, + // and we don't allow raw to listen on streaming protocol, + // so we don't have to look for a default protocol, + // but we do have to look for a default port listener. + if (foundSess == null) { + for (SAMMessageSess sess : all) { + if (sess.getListenPort() == 0) { + foundSess = sess; + break; + } + } + } + if (foundSess != null) { + SAMv3StreamSession ssess = (SAMv3StreamSession) foundSess; + boolean ok = ssess.queueSocket(i2ps); + if (!ok) { + _log.logAlways(Log.WARN, "Accept queue overflow for " + ssess); + try { i2ps.close(); } catch (IOException ioe) {} + } + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("No subsession found for incoming streaming connection on port " + port); + } + } + if (_log.shouldWarn()) + _log.warn("Stream acceptor stopped"); + } + } +} diff --git a/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java b/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java index b25d6ad0b3b1911b11f7a75c5f8422f1a855b2b5..baef6f454cc7598eb2db7b98342c024868069c08 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java @@ -32,9 +32,9 @@ class SAMDatagramSession extends SAMMessageSession { // FIXME make final after fixing SAMv3DatagramSession override protected SAMDatagramReceiver recv; - private final I2PDatagramMaker dgramMaker; private final I2PDatagramDissector dgramDissector = new I2PDatagramDissector(); + /** * Create a new SAM DATAGRAM session. * @@ -45,11 +45,10 @@ class SAMDatagramSession extends SAMMessageSession { * @throws DataFormatException * @throws I2PSessionException */ - public SAMDatagramSession(String dest, Properties props, + protected SAMDatagramSession(String dest, Properties props, SAMDatagramReceiver recv) throws IOException, DataFormatException, I2PSessionException { super(dest, props); - this.recv = recv; dgramMaker = new I2PDatagramMaker(getI2PSession()); } @@ -57,6 +56,8 @@ class SAMDatagramSession extends SAMMessageSession { /** * Create a new SAM DATAGRAM session. * + * Caller MUST call start(). + * * @param destStream Input stream containing the destination keys * @param props Properties to setup the I2P session * @param recv Object that will receive incoming data @@ -68,7 +69,20 @@ class SAMDatagramSession extends SAMMessageSession { SAMDatagramReceiver recv) throws IOException, DataFormatException, I2PSessionException { super(destStream, props); + this.recv = recv; + dgramMaker = new I2PDatagramMaker(getI2PSession()); + } + /** + * Create a new SAM DATAGRAM session on an existing I2P session. + * + * @param props unused for now + * @since 0.9.25 + */ + protected SAMDatagramSession(I2PSession sess, Properties props, int listenPort, + SAMDatagramReceiver recv) throws IOException, + DataFormatException, I2PSessionException { + super(sess, I2PSession.PROTO_DATAGRAM, listenPort); this.recv = recv; dgramMaker = new I2PDatagramMaker(getI2PSession()); } @@ -90,11 +104,31 @@ class SAMDatagramSession extends SAMMessageSession { throw new DataFormatException("Datagram size exceeded (" + data.length + ")"); byte[] dgram ; synchronized (dgramMaker) { - dgram = dgramMaker.makeI2PDatagram(data); + dgram = dgramMaker.makeI2PDatagram(data); } return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM, fromPort, toPort); } + /** + * Send bytes through a SAM DATAGRAM session. + * + * @since 0.9.25 + */ + public boolean sendBytes(String dest, byte[] data, int proto, + int fromPort, int toPort, + boolean sendLeaseSet, int sendTags, + int tagThreshold, int expiration) + throws DataFormatException, I2PSessionException { + if (data.length > DGRAM_SIZE_MAX) + throw new DataFormatException("Datagram size exceeded (" + data.length + ")"); + byte[] dgram ; + synchronized (dgramMaker) { + dgram = dgramMaker.makeI2PDatagram(data); + } + return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM, fromPort, toPort, + sendLeaseSet, sendTags,tagThreshold, expiration); + } + protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) { byte[] payload; Destination sender; diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java index d40b5352e9d834101f2a4570af0f5617c97f5ef7..54e9c56b56446e853ad1917bd2e6cc29746cac6b 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java @@ -25,7 +25,7 @@ import net.i2p.util.VersionComparator; */ class SAMHandlerFactory { - private static final String VERSION = "3.2"; + private static final String VERSION = "3.3"; private static final int HELLO_TIMEOUT = 60*1000; @@ -139,6 +139,9 @@ class SAMHandlerFactory { if (VersionComparator.comp(VERSION, minVer) >= 0 && VersionComparator.comp(VERSION, maxVer) <= 0) return VERSION; + if (VersionComparator.comp("3.2", minVer) >= 0 && + VersionComparator.comp("3.2", maxVer) <= 0) + return "3.2"; if (VersionComparator.comp("3.1", minVer) >= 0 && VersionComparator.comp("3.1", maxVer) <= 0) return "3.1"; diff --git a/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java b/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java new file mode 100644 index 0000000000000000000000000000000000000000..2fc45431f94badd9392f2f8f90642b2a7e41b09f --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java @@ -0,0 +1,63 @@ +package net.i2p.sam; + +import java.io.Closeable; + +import net.i2p.client.I2PSessionException; +import net.i2p.data.DataFormatException; +import net.i2p.data.Destination; + +/** + * Base interface for SAMMessageSession, which is the base for + * v1/v3 datagram and raw sessions. + * Also implemented by SAMStreamSession. + * + * @since 0.9.25 pulled from SAMMessageSession + */ +interface SAMMessageSess extends Closeable { + + /** + * Start a SAM message-based session. + * MUST be called after constructor. + */ + public void start(); + + /** + * Close a SAM message-based session. + */ + public void close(); + + /** + * Get the SAM message-based session Destination. + * + * @return The SAM message-based session Destination. + */ + public Destination getDestination(); + + /** + * Send bytes through a SAM message-based session. + * + * @param dest Destination + * @param data Bytes to be sent + * + * @return True if the data was sent, false otherwise + * @throws DataFormatException on unknown / bad dest + * @throws I2PSessionException on serious error, probably session closed + */ + public boolean sendBytes(String dest, byte[] data, int proto, + int fromPort, int toPort) throws DataFormatException, I2PSessionException; + + /** + * Send bytes through a SAM message-based session. + * + * @since 0.9.25 + */ + public boolean sendBytes(String dest, byte[] data, int proto, + int fromPort, int toPort, + boolean sendLeaseSet, int sendTags, + int tagThreshold, int expiration) + throws DataFormatException, I2PSessionException; + + public int getListenProtocol(); + + public int getListenPort(); +} diff --git a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java index c95b6a9ab22c8faff38daeb7c99d507096662ec8..0bbecb77c7780e843b6d1b4fc4a6c867f03ee98a 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java @@ -33,11 +33,14 @@ import net.i2p.util.Log; * * @author human */ -abstract class SAMMessageSession implements Closeable { +abstract class SAMMessageSession implements SAMMessageSess { protected final Log _log; - private I2PSession session; - private SAMMessageSessionHandler handler; + private final I2PSession session; + protected final boolean _isOwnSession; + private final SAMMessageSessionHandler handler; + private final int listenProtocol; + private final int listenPort; /** * Initialize a new SAM message-based session. @@ -49,9 +52,7 @@ abstract class SAMMessageSession implements Closeable { * @throws I2PSessionException */ protected SAMMessageSession(String dest, Properties props) throws IOException, DataFormatException, I2PSessionException { - _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); - ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(dest)); - initSAMMessageSession(bais, props); + this(new ByteArrayInputStream(Base64.decode(dest)), props); } /** @@ -64,17 +65,42 @@ abstract class SAMMessageSession implements Closeable { * @throws I2PSessionException */ protected SAMMessageSession(InputStream destStream, Properties props) throws IOException, DataFormatException, I2PSessionException { - _log = new Log(getClass()); - initSAMMessageSession(destStream, props); + _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Initializing SAM message-based session"); + listenProtocol = I2PSession.PROTO_ANY; + listenPort = I2PSession.PORT_ANY; + _isOwnSession = true; + handler = new SAMMessageSessionHandler(destStream, props); + session = handler.getSession(); } - private void initSAMMessageSession (InputStream destStream, Properties props) throws IOException, DataFormatException, I2PSessionException { + /** + * Initialize a new SAM message-based session using an existing I2PSession. + * + * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile) + * @param props Properties to setup the I2P session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + * @since 0.9.25 + */ + protected SAMMessageSession(I2PSession sess, int listenProtocol, int listenPort) + throws IOException, DataFormatException, I2PSessionException { + _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Initializing SAM message-based session"); + this.listenProtocol = listenProtocol; + this.listenPort = listenPort; + _isOwnSession = false; + session = sess; + handler = new SAMMessageSessionHandler(session); + } - handler = new SAMMessageSessionHandler(destStream, props); - - // FIXME don't start threads in constructors + /* + * @since 0.9.25 + */ + public void start() { Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler"); t.start(); } @@ -88,6 +114,20 @@ abstract class SAMMessageSession implements Closeable { return session.getMyDestination(); } + /** + * @since 0.9.25 + */ + public int getListenProtocol() { + return listenProtocol; + } + + /** + * @since 0.9.25 + */ + public int getListenPort() { + return listenPort; + } + /** * Send bytes through a SAM message-based session. * @@ -128,14 +168,19 @@ abstract class SAMMessageSession implements Closeable { } /** - * Actually send bytes through the SAM message-based session I2PSession. - * TODO unused, umimplemented in the sessions and handlers + * Actually send bytes through the SAM message-based session I2PSession, + * using per-message extended options. + * For efficiency, use the method without all the extra options if they are all defaults. * * @param dest Destination * @param data Bytes to be sent * @param proto I2CP protocol * @param fromPort I2CP from port * @param toPort I2CP to port + * @param sendLeaseSet true is the usual setting and the I2CP default + * @param sendTags 0 to leave as default + * @param tagThreshold 0 to leave as default + * @param expiration SECONDS from now, NOT absolute time, 0 to leave as default * * @return True if the data was sent, false otherwise * @throws DataFormatException on unknown / bad dest @@ -145,7 +190,7 @@ abstract class SAMMessageSession implements Closeable { protected boolean sendBytesThroughMessageSession(String dest, byte[] data, int proto, int fromPort, int toPort, boolean sendLeaseSet, int sendTags, - int tagThreshold, long expires) + int tagThreshold, int expiration) throws DataFormatException, I2PSessionException { Destination d = SAMUtils.getDest(dest); @@ -153,10 +198,14 @@ abstract class SAMMessageSession implements Closeable { _log.debug("Sending " + data.length + " bytes to " + dest); } SendMessageOptions opts = new SendMessageOptions(); - opts.setSendLeaseSet(sendLeaseSet); - opts.setTagsToSend(sendTags); - opts.setTagThreshold(tagThreshold); - opts.setDate(expires); + if (!sendLeaseSet) + opts.setSendLeaseSet(false); + if (sendTags > 0) + opts.setTagsToSend(sendTags); + if (tagThreshold > 0) + opts.setTagThreshold(tagThreshold); + if (expiration > 0) + opts.setDate(I2PAppContext.getGlobalContext().clock().now() + (expiration * 1000)); return session.sendMessage(d, data, 0, data.length, proto, fromPort, toPort, opts); } @@ -194,8 +243,9 @@ abstract class SAMMessageSession implements Closeable { * * @author human */ - class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener { + private class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener { + private final I2PSession _session; private final Object runningLock = new Object(); private volatile boolean stillRunning = true; @@ -203,8 +253,8 @@ abstract class SAMMessageSession implements Closeable { * Create a new SAM message-based session handler * * @param destStream Input stream containing the destination keys - * @param props Properties to setup the I2P session - * @throws I2PSessionException + * @param props Properties to setup the I2P session + * @throws I2PSessionException */ public SAMMessageSessionHandler(InputStream destStream, Properties props) throws I2PSessionException { if (_log.shouldLog(Log.DEBUG)) @@ -215,15 +265,33 @@ abstract class SAMMessageSession implements Closeable { props.setProperty("inbound.nickname", "SAM UDP Client"); props.setProperty("outbound.nickname", "SAM UDP Client"); } - session = client.createSession(destStream, props); + _session = client.createSession(destStream, props); if (_log.shouldLog(Log.DEBUG)) _log.debug("Connecting I2P session..."); - session.connect(); + _session.connect(); if (_log.shouldLog(Log.DEBUG)) _log.debug("I2P session connected"); - session.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); + _session.addMuxedSessionListener(this, listenProtocol, listenPort); + } + + /** + * Create a new SAM message-based session handler on an existing I2PSession + * + * @since 0.9.25 + */ + public SAMMessageSessionHandler(I2PSession sess) throws I2PSessionException { + _session = sess; + _session.addMuxedSessionListener(this, listenProtocol, listenPort); + } + + /** + * The session. + * @since 0.9.25 + */ + public final I2PSession getSession() { + return _session; } /** @@ -254,16 +322,18 @@ abstract class SAMMessageSession implements Closeable { _log.debug("Shutting down SAM message-based session handler"); shutDown(); - session.removeListener(I2PSession.PROTO_ANY, I2PSession.PORT_ANY); + session.removeListener(listenProtocol, listenPort); - try { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Destroying I2P session..."); - session.destroySession(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("I2P session destroyed"); - } catch (I2PSessionException e) { - _log.error("Error destroying I2P session", e); + if (_isOwnSession) { + try { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Destroying I2P session..."); + session.destroySession(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("I2P session destroyed"); + } catch (I2PSessionException e) { + _log.error("Error destroying I2P session", e); + } } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java index ab1cd76322e36d6ca305510d14b9f0b1f82fffb9..3857f92c755f2581db3a5b57137a2588489c4bd1 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java @@ -39,16 +39,17 @@ class SAMRawSession extends SAMMessageSession { * @throws DataFormatException * @throws I2PSessionException */ - public SAMRawSession(String dest, Properties props, + protected SAMRawSession(String dest, Properties props, SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException { super(dest, props); - this.recv = recv; } /** * Create a new SAM RAW session. * + * Caller MUST call start(). + * * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile) * @param props Properties to setup the I2P session * @param recv Object that will receive incoming data @@ -59,7 +60,19 @@ class SAMRawSession extends SAMMessageSession { public SAMRawSession(InputStream destStream, Properties props, SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException { super(destStream, props); + this.recv = recv; + } + /** + * Create a new SAM RAW session on an existing I2P session. + * + * @param props unused for now + * @since 0.9.25 + */ + protected SAMRawSession(I2PSession sess, Properties props, int listenProtocol, int listenPort, + SAMRawReceiver recv) throws IOException, + DataFormatException, I2PSessionException { + super(sess, listenProtocol, listenPort); this.recv = recv; } @@ -82,6 +95,24 @@ class SAMRawSession extends SAMMessageSession { return sendBytesThroughMessageSession(dest, data, proto, fromPort, toPort); } + /** + * Send bytes through a SAM RAW session. + * + * @since 0.9.25 + */ + public boolean sendBytes(String dest, byte[] data, int proto, + int fromPort, int toPort, + boolean sendLeaseSet, int sendTags, + int tagThreshold, int expiration) + throws DataFormatException, I2PSessionException { + if (data.length > RAW_SIZE_MAX) + throw new DataFormatException("Data size limit exceeded (" + data.length + ")"); + if (proto == I2PSession.PROTO_UNSPECIFIED) + proto = I2PSession.PROTO_DATAGRAM_RAW; + return sendBytesThroughMessageSession(dest, data, proto, fromPort, toPort, + sendLeaseSet, sendTags,tagThreshold, expiration); + } + protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) { try { recv.receiveRawBytes(msg, proto, fromPort, toPort); diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index 14ed92b3eec9057c3f99449563c15045237e2565..8cc9b56620d388f30eec64b44cae27324b1260e8 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PClient; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; @@ -47,16 +49,12 @@ import net.i2p.util.Log; * * @author human */ -class SAMStreamSession { +class SAMStreamSession implements SAMMessageSess { protected final Log _log; - protected final static int SOCKET_HANDLER_BUF_SIZE = 32768; - protected final SAMStreamReceiver recv; - protected final SAMStreamSessionServer server; - protected final I2PSocketManager socketMgr; /** stream id (Long) to SAMStreamSessionSocketReader */ @@ -68,6 +66,9 @@ class SAMStreamSession { // Can we create outgoing connections? protected final boolean canCreate; + private final int listenProtocol; + private final int listenPort; + protected final boolean _isOwnSession; /** * should we flush every time we get a STREAM SEND, or leave that up to @@ -81,6 +82,8 @@ class SAMStreamSession { /** * Create a new SAM STREAM session. * + * Caller MUST call start(). + * * @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile) * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") or "__v3__" if extended by SAMv3StreamSession * @param props Properties to setup the I2P session @@ -105,8 +108,8 @@ class SAMStreamSession { * @throws DataFormatException * @throws SAMException */ - public SAMStreamSession(InputStream destStream, String dir, - Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { + protected SAMStreamSession(InputStream destStream, String dir, + Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { this.recv = recv; _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); @@ -156,30 +159,88 @@ class SAMStreamSession { allprops.setProperty("outbound.nickname", "SAM TCP Client"); } + _isOwnSession = true; if (_log.shouldLog(Log.DEBUG)) _log.debug("Creating I2PSocketManager..."); - socketMgr = I2PSocketManagerFactory.createManager(destStream, - i2cpHost, - i2cpPort, - allprops); - if (socketMgr == null) { - throw new SAMException("Error creating I2PSocketManager"); + try { + // we do it this way so we get exceptions + socketMgr = I2PSocketManagerFactory.createDisconnectedManager(destStream, + i2cpHost, i2cpPort, allprops); + socketMgr.getSession().connect(); + } catch (I2PSessionException ise) { + throw new SAMException("Error creating I2PSocketManager: " + ise.getMessage(), ise); } socketMgr.addDisconnectListener(new DisconnectListener()); forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH)); + if (Boolean.parseBoolean(props.getProperty("i2p.streaming.enforceProtocol"))) + listenProtocol = I2PSession.PROTO_STREAMING; + else + listenProtocol = I2PSession.PROTO_ANY; + listenPort = I2PSession.PORT_ANY; + if (startAcceptor) { server = new SAMStreamSessionServer(); - Thread t = new I2PAppThread(server, "SAMStreamSessionServer"); - - t.start(); } else { server = null; } } + + /** + * Create a new SAM STREAM session on an existing socket manager. + * v3 only. + * + * @param props Properties to setup the I2P session + * @param recv Object that will receive incoming data + * @throws IOException + * @throws DataFormatException + * @throws SAMException + * @since 0.9.25 + */ + protected SAMStreamSession(I2PSocketManager mgr, Properties props, SAMStreamReceiver recv, int listenport) + throws IOException, DataFormatException, SAMException { + this.recv = recv; + _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("SAM STREAM session instantiated"); + canCreate = true; + Properties allprops = (Properties) System.getProperties().clone(); + allprops.putAll(props); + _isOwnSession = false; + socketMgr = mgr; + socketMgr.addDisconnectListener(new DisconnectListener()); + forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH)); + listenProtocol = I2PSession.PROTO_STREAMING; + listenPort = listenport; + server = null; + } + + /* + * @since 0.9.25 + */ + public void start() { + if (server != null) { + Thread t = new I2PAppThread(server, "SAMStreamSessionServer"); + t.start(); + } + } + + /* + * @since 0.9.25 + */ + public int getListenProtocol() { + return listenProtocol; + } + + /* + * @since 0.9.25 + */ + public int getListenPort() { + return listenPort; + } protected class DisconnectListener implements I2PSocketManager.DisconnectListener { public void sessionDisconnected() { @@ -284,7 +345,8 @@ class SAMStreamSession { } removeAllSocketHandlers(); recv.stopStreamReceiving(); - socketMgr.destroySocketManager(); + if (_isOwnSession) + socketMgr.destroySocketManager(); } /** @@ -304,6 +366,27 @@ class SAMStreamSession { return true; } + /** + * Unsupported + * @throws I2PSessionException always + * @since 0.9.25 moved from subclass SAMv3StreamSession to implement SAMMessageSess + */ + public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws I2PSessionException { + throw new I2PSessionException("Unsupported in STREAM or MASTER session"); + } + + /** + * Unsupported + * @throws I2PSessionException always + * @since 0.9.25 + */ + public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp, + boolean sendLeaseSet, int sendTags, + int tagThreshold, int expiration) + throws I2PSessionException { + throw new I2PSessionException("Unsupported in STREAM or MASTER session"); + } + /** * Create a new SAM STREAM session socket handler, detaching its thread. * diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index d2cba4b82d5c4ceb7731539520e433dbd2f0e471..73192da826c9ef0bbbc482c71409b3f56a70734c 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -40,17 +40,18 @@ import net.i2p.util.Log; */ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver { - protected SAMRawSession rawSession; - protected SAMDatagramSession datagramSession; + protected SAMMessageSess rawSession; + protected SAMMessageSess datagramSession; protected SAMStreamSession streamSession; - protected SAMRawSession getRawSession() {return rawSession ;} - protected SAMDatagramSession getDatagramSession() {return datagramSession ;} - protected SAMStreamSession getStreamSession() {return streamSession ;} + protected final SAMMessageSess getRawSession() { return rawSession; } + protected final SAMMessageSess getDatagramSession() { return datagramSession; } + protected final SAMStreamSession getStreamSession() { return streamSession; } protected final long _id; private static final AtomicLong __id = new AtomicLong(); private static final int FIRST_READ_TIMEOUT = 60*1000; + protected static final String SESSION_ERROR = "SESSION STATUS RESULT=I2P_ERROR"; /** * Create a new SAM version 1 handler. This constructor expects @@ -132,7 +133,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece ReadLine.readLine(sock, buf, gotFirstLine ? 0 : FIRST_READ_TIMEOUT); sock.setSoTimeout(0); } catch (SocketTimeoutException ste) { - writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n"); + writeString(SESSION_ERROR, "command timeout, bye"); break; } msg = buf.toString(); @@ -199,14 +200,14 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldWarn()) _log.warn("Error closing socket", e); } - if (getRawSession() != null) { - getRawSession().close(); + if (rawSession != null) { + rawSession.close(); } - if (getDatagramSession() != null) { - getDatagramSession().close(); + if (datagramSession != null) { + datagramSession.close(); } - if (getStreamSession() != null) { - getStreamSession().close(); + if (streamSession != null) { + streamSession.close(); } } } @@ -218,25 +219,24 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece try{ if (opcode.equals("CREATE")) { - if ((getRawSession() != null) || (getDatagramSession() != null) - || (getStreamSession() != null)) { + if ((rawSession != null) || (datagramSession != null) + || (streamSession != null)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Trying to create a session, but one still exists"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n"); + return writeString(SESSION_ERROR, "Session already exists"); } if (props.isEmpty()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("No parameters specified in SESSION CREATE message"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n"); + return writeString(SESSION_ERROR, "No parameters for SESSION CREATE"); } - dest = props.getProperty("DESTINATION"); + dest = (String) props.remove("DESTINATION"); if (dest == null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("SESSION DESTINATION parameter not specified"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n"); + return writeString(SESSION_ERROR, "DESTINATION not specified"); } - props.remove("DESTINATION"); String destKeystream = null; @@ -261,13 +261,12 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } } - String style = props.getProperty("STYLE"); + String style = (String) props.remove("STYLE"); if (style == null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("SESSION STYLE parameter not specified"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n"); + return writeString(SESSION_ERROR, "No SESSION STYLE specified"); } - props.remove("STYLE"); // Unconditionally override what the client may have set // (iMule sets BestEffort) as None is more efficient @@ -276,10 +275,12 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (style.equals("RAW")) { rawSession = new SAMRawSession(destKeystream, props, this); + rawSession.start(); } else if (style.equals("DATAGRAM")) { datagramSession = new SAMDatagramSession(destKeystream, props,this); + datagramSession.start(); } else if (style.equals("STREAM")) { - String dir = props.getProperty("DIRECTION"); + String dir = (String) props.remove("DIRECTION"); if (dir == null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("No DIRECTION parameter in STREAM session, defaulting to BOTH"); @@ -288,16 +289,15 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece && !dir.equals("BOTH")) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Unknown DIRECTION parameter value: [" + dir + "]"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown DIRECTION parameter\"\n"); - } else { - props.remove("DIRECTION"); + return writeString(SESSION_ERROR, "Unknown DIRECTION parameter"); } streamSession = newSAMStreamSession(destKeystream, dir,props); + streamSession.start(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n"); + return writeString(SESSION_ERROR, "Unrecognized SESSION STYLE"); } return writeString("SESSION STATUS RESULT=OK DESTINATION=" + dest + "\n"); @@ -305,22 +305,22 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldLog(Log.DEBUG)) _log.debug("Unrecognized SESSION message opcode: \"" + opcode + "\""); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n"); + return writeString(SESSION_ERROR, "Unrecognized opcode"); } } catch (DataFormatException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Invalid destination specified"); - return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString("SESSION STATUS RESULT=INVALID_KEY", e.getMessage()); } catch (I2PSessionException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("I2P error when instantiating session", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString(SESSION_ERROR, e.getMessage()); } catch (SAMException e) { _log.error("Unexpected SAM error", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString(SESSION_ERROR, e.getMessage()); } catch (IOException e) { _log.error("Unexpected IOException", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString(SESSION_ERROR, e.getMessage()); } } @@ -378,12 +378,12 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece Destination dest = null ; if (name.equals("ME")) { - if (getRawSession() != null) { - dest = getRawSession().getDestination(); - } else if (getStreamSession() != null) { - dest = getStreamSession().getDestination(); - } else if (getDatagramSession() != null) { - dest = getDatagramSession().getDestination(); + if (rawSession != null) { + dest = rawSession.getDestination(); + } else if (streamSession != null) { + dest = streamSession.getDestination(); + } else if (datagramSession != null) { + dest = datagramSession.getDestination(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Lookup for SESSION destination, but session is null"); @@ -415,126 +415,46 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /* Parse and execute a DATAGRAM message */ protected boolean execDatagramMessage(String opcode, Properties props) { - if (getDatagramSession() == null) { + if (datagramSession == null) { _log.error("DATAGRAM message received, but no DATAGRAM session exists"); return false; } - - if (opcode.equals("SEND")) { - if (props.isEmpty()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("No parameters specified in DATAGRAM SEND message"); - return false; - } - - String dest = props.getProperty("DESTINATION"); - if (dest == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Destination not specified in DATAGRAM SEND message"); - return false; - } - - int size; - String strsize = props.getProperty("SIZE"); - if (strsize == null) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Size not specified in DATAGRAM SEND message"); - return false; - } - try { - size = Integer.parseInt(strsize); - } catch (NumberFormatException e) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid DATAGRAM SEND size specified: " + strsize); - return false; - } - if (!checkDatagramSize(size)) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Specified size (" + size - + ") is out of protocol limits"); - return false; - } - int proto = I2PSession.PROTO_DATAGRAM; - int fromPort = I2PSession.PORT_UNSPECIFIED; - int toPort = I2PSession.PORT_UNSPECIFIED; - String s = props.getProperty("FROM_PORT"); - if (s != null) { - try { - fromPort = Integer.parseInt(s); - } catch (NumberFormatException e) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid DATAGRAM SEND port specified: " + s); - } - } - s = props.getProperty("TO_PORT"); - if (s != null) { - try { - toPort = Integer.parseInt(s); - } catch (NumberFormatException e) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid RAW SEND port specified: " + s); - } - } - - try { - DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream()); - byte[] data = new byte[size]; - - in.readFully(data); - - if (!getDatagramSession().sendBytes(dest, data, proto, fromPort, toPort)) { - _log.error("DATAGRAM SEND failed"); - // a message send failure is no reason to drop the SAM session - // for raw and repliable datagrams, just carry on our merry way - return true; - } - - return true; - } catch (EOFException e) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Too few bytes with DATAGRAM SEND message (expected: " - + size); - return false; - } catch (IOException e) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Caught IOException while parsing DATAGRAM SEND message", - e); - return false; - } catch (DataFormatException e) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Invalid key specified with DATAGRAM SEND message", - e); - return false; - } catch (I2PSessionException e) { - _log.error("Session error with DATAGRAM SEND message", e); - return false; - } - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Unrecognized DATAGRAM message opcode: \"" - + opcode + "\""); - return false; - } + return execDgOrRawMessage(false, opcode, props); } /* Parse and execute a RAW message */ protected boolean execRawMessage(String opcode, Properties props) { - if (getRawSession() == null) { + if (rawSession == null) { _log.error("RAW message received, but no RAW session exists"); return false; } + return execDgOrRawMessage(true, opcode, props); + } + + /* + * Parse and execute a RAW or DATAGRAM SEND message. + * This is for v1/v2 compatible sending only. + * For v3 sending, see SAMv3DatagramServer. + * + * Note that props are from the command line only. + * Session defaults from CREATE are NOT honored here. + * FIXME if we care, but nobody's probably using v3.2 options for v1/v2 sending. + * + * @since 0.9.25 consolidated from execDatagramMessage() and execRawMessage() + */ + private boolean execDgOrRawMessage(boolean isRaw, String opcode, Properties props) { if (opcode.equals("SEND")) { if (props.isEmpty()) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("No parameters specified in RAW SEND message"); + _log.debug("No parameters specified in SEND message"); return false; } String dest = props.getProperty("DESTINATION"); if (dest == null) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Destination not specified in RAW SEND message"); + _log.debug("Destination not specified in SEND message"); return false; } @@ -542,41 +462,47 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece String strsize = props.getProperty("SIZE"); if (strsize == null) { if (_log.shouldLog(Log.WARN)) - _log.warn("Size not specified in RAW SEND message"); + _log.warn("Size not specified in SEND message"); return false; } try { size = Integer.parseInt(strsize); } catch (NumberFormatException e) { if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid RAW SEND size specified: " + strsize); + _log.warn("Invalid SEND size specified: " + strsize); return false; } - if (!checkSize(size)) { + boolean ok = isRaw ? checkSize(size) : checkDatagramSize(size); + if (!ok) { if (_log.shouldLog(Log.WARN)) _log.warn("Specified size (" + size + ") is out of protocol limits"); return false; } - int proto = I2PSession.PROTO_DATAGRAM_RAW; int fromPort = I2PSession.PORT_UNSPECIFIED; int toPort = I2PSession.PORT_UNSPECIFIED; - String s = props.getProperty("PROTOCOL"); - if (s != null) { - try { - proto = Integer.parseInt(s); - } catch (NumberFormatException e) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid RAW SEND protocol specified: " + s); + int proto; + if (isRaw) { + proto = I2PSession.PROTO_DATAGRAM_RAW; + String s = props.getProperty("PROTOCOL"); + if (s != null) { + try { + proto = Integer.parseInt(s); + } catch (NumberFormatException e) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Invalid SEND protocol specified: " + s); + } } + } else { + proto = I2PSession.PROTO_DATAGRAM; } - s = props.getProperty("FROM_PORT"); + String s = props.getProperty("FROM_PORT"); if (s != null) { try { fromPort = Integer.parseInt(s); } catch (NumberFormatException e) { if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid RAW SEND port specified: " + s); + _log.warn("Invalid SEND port specified: " + s); } } s = props.getProperty("TO_PORT"); @@ -585,7 +511,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece toPort = Integer.parseInt(s); } catch (NumberFormatException e) { if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid RAW SEND port specified: " + s); + _log.warn("Invalid SEND port specified: " + s); } } @@ -595,8 +521,9 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece in.readFully(data); - if (!getRawSession().sendBytes(dest, data, proto, fromPort, toPort)) { - _log.error("RAW SEND failed"); + SAMMessageSess sess = isRaw ? rawSession : datagramSession; + if (sess.sendBytes(dest, data, proto, fromPort, toPort)) { + _log.error("SEND failed"); // a message send failure is no reason to drop the SAM session // for raw and repliable datagrams, just carry on our merry way return true; @@ -605,26 +532,26 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece return true; } catch (EOFException e) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Too few bytes with RAW SEND message (expected: " + _log.debug("Too few bytes with SEND message (expected: " + size); return false; } catch (IOException e) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Caught IOException while parsing RAW SEND message", + _log.debug("Caught IOException while parsing SEND message", e); return false; } catch (DataFormatException e) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Invalid key specified with RAW SEND message", + _log.debug("Invalid key specified with SEND message", e); return false; } catch (I2PSessionException e) { - _log.error("Session error with RAW SEND message", e); + _log.error("Session error with SEND message", e); return false; } } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Unrecognized RAW message opcode: \"" + _log.debug("Unrecognized message opcode: \"" + opcode + "\""); return false; } @@ -632,7 +559,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /* Parse and execute a STREAM message */ protected boolean execStreamMessage(String opcode, Properties props) { - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("STREAM message received, but no STREAM session exists"); return false; } @@ -645,7 +572,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece return execStreamClose(props); } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Unrecognized RAW message opcode: \"" + _log.debug("Unrecognized STREAM message opcode: \"" + opcode + "\""); return false; } @@ -699,13 +626,13 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } try { - if (!getStreamSession().sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) { + if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) { if (_log.shouldLog(Log.WARN)) _log.warn("STREAM SEND [" + size + "] failed"); // a message send failure is no reason to drop the SAM session // for style=stream, tell the client the stream failed, and kill the virtual connection.. boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n"); - getStreamSession().closeConnection(id); + streamSession.closeConnection(id); return rv; } @@ -732,7 +659,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece int id; { - String strid = props.getProperty("ID"); + String strid = (String) props.remove("ID"); if (strid == null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("ID not specified in STREAM SEND message"); @@ -750,19 +677,17 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece _log.debug("Invalid STREAM CONNECT ID specified: " +strid); return false; } - props.remove("ID"); } - String dest = props.getProperty("DESTINATION"); + String dest = (String) props.remove("DESTINATION"); if (dest == null) { _log.debug("Destination not specified in RAW SEND message"); return false; } - props.remove("DESTINATION"); try { try { - if (!getStreamSession().connect(id, dest, props)) { + if (!streamSession.connect(id, dest, props)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM connection failed"); return false; @@ -823,7 +748,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } } - boolean closed = getStreamSession().closeConnection(id); + boolean closed = streamSession.closeConnection(id); if ( (!closed) && (_log.shouldLog(Log.WARN)) ) _log.warn("Stream unable to be closed, but this is non fatal"); return true; @@ -841,7 +766,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece // SAMRawReceiver implementation public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException { - if (getRawSession() == null) { + if (rawSession == null) { _log.error("BUG! Received raw bytes, but session is null!"); return; } @@ -867,7 +792,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldLog(Log.DEBUG)) _log.debug("stopRawReceiving() invoked"); - if (getRawSession() == null) { + if (rawSession == null) { _log.error("BUG! Got raw receiving stop, but session is null!"); return; } @@ -883,7 +808,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece // SAMDatagramReceiver implementation public void receiveDatagramBytes(Destination sender, byte data[], int proto, int fromPort, int toPort) throws IOException { - if (getDatagramSession() == null) { + if (datagramSession == null) { _log.error("BUG! Received datagram bytes, but session is null!"); return; } @@ -910,7 +835,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldLog(Log.DEBUG)) _log.debug("stopDatagramReceiving() invoked"); - if (getDatagramSession() == null) { + if (datagramSession == null) { _log.error("BUG! Got datagram receiving stop, but session is null!"); return; } @@ -927,7 +852,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void streamSendAnswer( int id, String result, String bufferState ) throws IOException { - if ( getStreamSession() == null ) + if ( streamSession == null ) { _log.error ( "BUG! Want to answer to stream SEND, but session is null!" ); return; @@ -945,7 +870,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void notifyStreamSendBufferFree( int id ) throws IOException { - if ( getStreamSession() == null ) + if ( streamSession == null ) { _log.error ( "BUG! Stream outgoing buffer is free, but session is null!" ); return; @@ -959,7 +884,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void notifyStreamIncomingConnection(int id, Destination d) throws IOException { - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("BUG! Received stream connection, but session is null!"); return; } @@ -974,7 +899,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /** @param msg may be null */ public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException { - if ( getStreamSession() == null ) + if ( streamSession == null ) { _log.error ( "BUG! Received stream connection, but session is null!" ); return; @@ -1015,9 +940,21 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } return rv; } + + /** + * Write a string and message, escaping the message. + * Writes s + createMessageString(msg) + \n + * + * @param s The string, non-null + * @param s The message may be null + * @since 0.9.25 + */ + protected boolean writeString(String s, String msg) { + return writeString(s + createMessageString(msg) + '\n'); + } public void receiveStreamBytes(int id, ByteBuffer data) throws IOException { - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("Received stream bytes, but session is null!"); return; } @@ -1038,7 +975,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /** @param msg may be null */ public void notifyStreamDisconnection(int id, String result, String msg) throws IOException { - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("BUG! Received stream disconnection, but session is null!"); return; } @@ -1053,7 +990,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldLog(Log.DEBUG)) _log.debug("stopStreamReceiving() invoked", new Exception("stopped")); - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("BUG! Got stream receiving stop, but session is null!"); return; } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java index dcd8ca4be48bfadf74ca28e6c890baa1f3143662..a1df41eb7740f994ce3a00fb6e39196998cc49e7 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java @@ -36,13 +36,13 @@ import net.i2p.util.Log; * * @author mkvore */ - class SAMv2StreamSession extends SAMStreamSession { - /** * Create a new SAM STREAM session. * + * Caller MUST call start(). + * * @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile) * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") * @param props Properties to setup the I2P session @@ -60,6 +60,8 @@ class SAMv2StreamSession extends SAMStreamSession /** * Create a new SAM STREAM session. * + * Caller MUST call start(). + * * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile) * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") * @param props Properties to setup the I2P session @@ -86,7 +88,6 @@ class SAMv2StreamSession extends SAMStreamSession * receive-only session * @return true if the communication with the SAM client is ok */ - @Override public boolean connect ( int id, String dest, Properties props ) throws DataFormatException, SAMInvalidDirectionException @@ -120,31 +121,25 @@ class SAMv2StreamSession extends SAMStreamSession return true ; } - - - /** - * SAM STREAM socket connecter, running in its own thread. - * - * @author mkvore - */ - + * SAM STREAM socket connecter, running in its own thread. + * + * @author mkvore + */ private class StreamConnector implements Runnable { - private final int id; private final Destination dest ; private final I2PSocketOptions opts ; /** - * Create a new SAM STREAM session socket reader - * - * @param id Unique id assigned to the handler - * @param dest Destination to reach - * @param opts Socket options (I2PSocketOptions) + * Create a new SAM STREAM session socket reader + * + * @param id Unique id assigned to the handler + * @param dest Destination to reach + * @param opts Socket options (I2PSocketOptions) */ - public StreamConnector ( int id, Destination dest, I2PSocketOptions opts )// throws IOException { if (_log.shouldLog(Log.DEBUG)) @@ -155,7 +150,6 @@ class SAMv2StreamSession extends SAMStreamSession this.dest = dest ; } - public void run() { if (_log.shouldLog(Log.DEBUG)) @@ -215,18 +209,15 @@ class SAMv2StreamSession extends SAMStreamSession } } - - /** - * Lets us push data through the stream without blocking, (even after exceeding - * the I2PSocket's buffer) + * Lets us push data through the stream without blocking, (even after exceeding + * the I2PSocket's buffer) * * @param s I2PSocket * @param id Socket ID * @return v2StreamSender * @throws IOException */ - @Override protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException { @@ -241,7 +232,6 @@ class SAMv2StreamSession extends SAMStreamSession } private class V2StreamSender extends StreamSender - { private final List<ByteArray> _data; private int _dataSize; @@ -260,12 +250,12 @@ class SAMv2StreamSession extends SAMStreamSession } /** - * Send bytes through the SAM STREAM session socket sender - * + * Send bytes through the SAM STREAM session socket sender + * * @param in Data stream of data to send * @param size Count of bytes to send * @throws IOException if the client didnt provide enough data - */ + */ @Override public void sendBytes ( InputStream in, int size ) throws IOException { @@ -307,9 +297,9 @@ class SAMv2StreamSession extends SAMStreamSession } /** - * Stop a SAM STREAM session socket sender thread immediately - * - */ + * Stop a SAM STREAM session socket sender thread immediately + * + */ @Override public void stopRunning() { @@ -342,9 +332,9 @@ class SAMv2StreamSession extends SAMStreamSession } /** - * Stop a SAM STREAM session socket sender gracefully: stop the - * sender thread once all pending data has been sent. - */ + * Stop a SAM STREAM session socket sender gracefully: stop the + * sender thread once all pending data has been sent. + */ @Override public void shutDownGracefully() { @@ -431,8 +421,6 @@ class SAMv2StreamSession extends SAMStreamSession } } - - /** * Send bytes through a SAM STREAM session. * @@ -459,30 +447,24 @@ class SAMv2StreamSession extends SAMStreamSession return true; } - /** - * SAM STREAM socket reader, running in its own thread. It forwards - * forward data to/from an I2P socket. - * - * @author human + * SAM STREAM socket reader, running in its own thread. It forwards + * forward data to/from an I2P socket. + * + * @author human */ - - - public class SAMv2StreamSessionSocketReader extends SAMv1StreamSessionSocketReader { - protected boolean nolimit ; protected long limit ; protected long totalReceived ; - /** - * Create a new SAM STREAM session socket reader - * - * @param s Socket to be handled - * @param id Unique id assigned to the handler - */ + * Create a new SAM STREAM session socket reader + * + * @param s Socket to be handled + * @param id Unique id assigned to the handler + */ public SAMv2StreamSessionSocketReader ( I2PSocket s, int id ) throws IOException { super ( s, id ); @@ -581,7 +563,4 @@ class SAMv2StreamSession extends SAMStreamSession _log.debug ( "Shutting down SAM STREAM session socket handler " + id ); } } - - - } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java index e23f4e066dcdf65054aa34572d5de7a7c914339d..b5d4aa058cf5fe2397352293f35a8407726d0c94 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java @@ -137,6 +137,7 @@ class SAMv3DatagramServer implements Handler { private static class MessageDispatcher implements Runnable { private final ByteArrayInputStream is; + private static final int MAX_LINE_LENGTH = 2*1024; public MessageDispatcher(byte[] buf) { this.is = new ByteArrayInputStream(buf) ; @@ -144,8 +145,21 @@ class SAMv3DatagramServer implements Handler { public void run() { try { - String header = DataHelper.readLine(is).trim(); + // not UTF-8 + //String header = DataHelper.readLine(is).trim(); // we cannot use SAMUtils.parseParams() here + final UTF8Reader reader = new UTF8Reader(is); + final StringBuilder buf = new StringBuilder(MAX_LINE_LENGTH); + int c; + int i = 0; + while ((c = reader.read()) != -1) { + if (++i > MAX_LINE_LENGTH) + throw new IOException("Line too long - max " + MAX_LINE_LENGTH); + if (c == '\n') + break; + buf.append((char)c); + } + String header = buf.toString(); StringTokenizer tok = new StringTokenizer(header, " "); if (tok.countTokens() < 3) { // This is not a correct message, for sure @@ -160,57 +174,89 @@ class SAMv3DatagramServer implements Handler { String nick = tok.nextToken(); String dest = tok.nextToken(); - SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if (rec!=null) { Properties sprops = rec.getProps(); + // 3.2 props String pr = sprops.getProperty("PROTOCOL"); String fp = sprops.getProperty("FROM_PORT"); String tp = sprops.getProperty("TO_PORT"); + // 3.3 props + // If this is a straight DATAGRAM or RAW session, we + // don't need to send these, the router already got them in + // the options, but if a subsession, we must, so just + // do it all the time. + String st = sprops.getProperty("crypto.tagsToSend"); + String tt = sprops.getProperty("crypto.lowTagThreshold"); + String sl = sprops.getProperty("shouldBundleReplyInfo"); + String exms = sprops.getProperty("clientMessageTimeout"); // ms + String exs = null; // seconds while (tok.hasMoreTokens()) { String t = tok.nextToken(); + // 3.2 props if (t.startsWith("PROTOCOL=")) pr = t.substring("PROTOCOL=".length()); else if (t.startsWith("FROM_PORT=")) fp = t.substring("FROM_PORT=".length()); else if (t.startsWith("TO_PORT=")) tp = t.substring("TO_PORT=".length()); + // 3.3 props + else if (t.startsWith("SEND_TAGS=")) + st = t.substring("SEND_TAGS=".length()); + else if (t.startsWith("TAG_THRESHOLD=")) + tt = t.substring("TAG_THRESHOLD=".length()); + else if (t.startsWith("EXPIRES=")) + exs = t.substring("EXPIRES=".length()); + else if (t.startsWith("SEND_LEASESET=")) + sl = t.substring("SEND_LEASESET=".length()); } + // 3.2 props int proto = I2PSession.PROTO_UNSPECIFIED; int fromPort = I2PSession.PORT_UNSPECIFIED; int toPort = I2PSession.PORT_UNSPECIFIED; - if (pr != null) { - try { + // 3.3 props + int sendTags = 0; + int tagThreshold = 0; + int expires = 0; // seconds + boolean sendLeaseSet = true; + try { + // 3.2 props + if (pr != null) proto = Integer.parseInt(pr); - } catch (NumberFormatException nfe) { - warn("Bad datagram header received"); - return; - } - } - if (fp != null) { - try { + if (fp != null) fromPort = Integer.parseInt(fp); - } catch (NumberFormatException nfe) { - warn("Bad datagram header received"); - return; - } - } - if (tp != null) { - try { + if (tp != null) toPort = Integer.parseInt(tp); - } catch (NumberFormatException nfe) { - warn("Bad datagram header received"); - return; - } + // 3.3 props + if (st != null) + sendTags = Integer.parseInt(st); + if (tt != null) + tagThreshold = Integer.parseInt(tt); + if (exs != null) + expires = Integer.parseInt(exs); + else if (exms != null) + expires = Integer.parseInt(exms) / 1000; + if (sl != null) + sendLeaseSet = Boolean.parseBoolean(sl); + } catch (NumberFormatException nfe) { + warn("Bad datagram header received"); + return; } // TODO too many allocations and copies. One here and one in Listener above. byte[] data = new byte[is.available()]; is.read(data); - SAMv3Handler.Session sess = rec.getHandler().getSession(); - if (sess != null) - sess.sendBytes(dest, data, proto, fromPort, toPort); - else + Session sess = rec.getHandler().getSession(); + if (sess != null) { + if (sendTags > 0 || tagThreshold > 0 || expires > 0 || !sendLeaseSet) { + sess.sendBytes(dest, data, proto, fromPort, toPort, + sendLeaseSet, sendTags, tagThreshold, expires); + } else { + sess.sendBytes(dest, data, proto, fromPort, toPort); + } + } else { warn("Dropping datagram, no session for " + nick); + } } else { warn("Dropping datagram, no session for " + nick); } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java index ee2782559266ceb1bb820069d796155a1fd02666..979d467ff2f9d43f509c24d2085e86da7a77ad87 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java @@ -6,19 +6,20 @@ package net.i2p.sam; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress ; +import java.nio.ByteBuffer; import java.util.Properties; +import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.util.Log; -import java.net.InetSocketAddress; -import java.net.SocketAddress ; -import java.nio.ByteBuffer; -class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Session, SAMDatagramReceiver { +class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDatagramReceiver { private final SAMv3Handler handler; private final SAMv3DatagramServer server; @@ -31,6 +32,8 @@ class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Se * build a DatagramSession according to informations registered * with the given nickname * + * Caller MUST call start(). + * * @param nick nickname of the session * @throws IOException * @throws DataFormatException @@ -46,28 +49,37 @@ class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Se this.recv = this; // replacement this.server = dgServer; - SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if (rec == null) throw new SAMException("Record disappeared for nickname : \""+nick+"\""); this.handler = rec.getHandler(); Properties props = rec.getProps(); - String portStr = props.getProperty("PORT"); - if (portStr == null) { - if (_log.shouldDebug()) - _log.debug("receiver port not specified. Current socket will be used."); - this.clientAddress = null; - } else { - int port = Integer.parseInt(portStr); - String host = props.getProperty("HOST"); - if (host == null) { - host = rec.getHandler().getClientIP(); - if (_log.shouldDebug()) - _log.debug("no host specified. Taken from the client socket : " + host+':'+port); - } - this.clientAddress = new InetSocketAddress(host, port); - } + clientAddress = SAMv3RawSession.getSocketAddress(props, handler); + } + + /** + * Build a Datagram Session on an existing i2p session + * registered with the given nickname + * + * Caller MUST call start(). + * + * @param nick nickname of the session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + * @since 0.9.25 + */ + public SAMv3DatagramSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess, + int listenPort, SAMv3DatagramServer dgServer) + throws IOException, DataFormatException, I2PSessionException { + super(isess, props, listenPort, null); // to be replaced by this + this.nick = nick ; + this.recv = this ; // replacement + this.server = dgServer; + this.handler = handler; + clientAddress = SAMv3RawSession.getSocketAddress(props, handler); } public void receiveDatagramBytes(Destination sender, byte[] data, int proto, diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java index 032538b0c8962f9412dc2d7c7358cccd6774f337..48774d5652ed1183ec6f0edc8a5b94656922a8a7 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -23,7 +23,6 @@ import java.net.NoRouteToHostException; import java.nio.channels.SocketChannel; import java.nio.ByteBuffer; import java.util.Properties; -import java.util.HashMap; import net.i2p.I2PAppContext; import net.i2p.I2PException; @@ -49,6 +48,7 @@ class SAMv3Handler extends SAMv1Handler { private Session session; + // TODO remove singleton, hang off SAMBridge like dgserver public static final SessionsDB sSessionsHash = new SessionsDB(); private volatile boolean stolenSocket; private volatile boolean streamForwardingSocket; @@ -56,13 +56,7 @@ class SAMv3Handler extends SAMv1Handler private long _lastPing; private static final int FIRST_READ_TIMEOUT = 60*1000; private static final int READ_TIMEOUT = 3*60*1000; - - interface Session { - String getNick(); - void close(); - boolean sendBytes(String dest, byte[] data, int proto, - int fromPort, int toPort) throws DataFormatException, I2PSessionException; - } + private static final String AUTH_ERROR = "AUTH STATUS RESULT=I2P_ERROR"; /** * Create a new SAM version 3 handler. This constructor expects @@ -104,121 +98,6 @@ class SAMv3Handler extends SAMv1Handler { return (verMajor == 3); } - - /** - * The values in the SessionsDB - */ - public static class SessionRecord - { - private final String m_dest ; - private final Properties m_props ; - private ThreadGroup m_threadgroup ; - private final SAMv3Handler m_handler ; - - public SessionRecord( String dest, Properties props, SAMv3Handler handler ) - { - m_dest = dest; - m_props = new Properties() ; - m_props.putAll(props); - m_handler = handler ; - } - - public SessionRecord( SessionRecord in ) - { - m_dest = in.getDest(); - m_props = in.getProps(); - m_threadgroup = in.getThreadGroup(); - m_handler = in.getHandler(); - } - - public String getDest() - { - return m_dest; - } - - synchronized public Properties getProps() - { - Properties p = new Properties(); - p.putAll(m_props); - return m_props; - } - - public SAMv3Handler getHandler() - { - return m_handler ; - } - - synchronized public ThreadGroup getThreadGroup() - { - return m_threadgroup ; - } - - synchronized public void createThreadGroup(String name) - { - if (m_threadgroup == null) - m_threadgroup = new ThreadGroup(name); - } - } - - /** - * basically a HashMap from String to SessionRecord - */ - public static class SessionsDB - { - private static final long serialVersionUID = 0x1; - - static class ExistingIdException extends Exception { - private static final long serialVersionUID = 0x1; - } - - static class ExistingDestException extends Exception { - private static final long serialVersionUID = 0x1; - } - - private final HashMap<String, SessionRecord> map; - - public SessionsDB() { - map = new HashMap<String, SessionRecord>() ; - } - - /** @return success */ - synchronized public boolean put( String nick, SessionRecord session ) - throws ExistingIdException, ExistingDestException - { - if ( map.containsKey(nick) ) { - throw new ExistingIdException(); - } - for ( SessionRecord r : map.values() ) { - if (r.getDest().equals(session.getDest())) { - throw new ExistingDestException(); - } - } - - if ( !map.containsKey(nick) ) { - session.createThreadGroup("SAM session "+nick); - map.put(nick, session) ; - return true ; - } - else - return false ; - } - - /** @return true if removed */ - synchronized public boolean del( String nick ) - { - return map.remove(nick) != null; - } - - synchronized public SessionRecord get(String nick) - { - return map.get(nick); - } - - synchronized public boolean containsKey( String nick ) - { - return map.containsKey(nick); - } - } public String getClientIP() { @@ -255,7 +134,31 @@ class SAMv3Handler extends SAMv1Handler Session getSession() { return session; } - + + /** + * For subsessions created by MasterSession + * @since 0.9.25 + */ + void setSession(SAMv3RawSession sess) { + rawSession = sess; session = sess; + } + + /** + * For subsessions created by MasterSession + * @since 0.9.25 + */ + void setSession(SAMv3DatagramSession sess) { + datagramSession = sess; session = sess; + } + + /** + * For subsessions created by MasterSession + * @since 0.9.25 + */ + void setSession(SAMv3StreamSession sess) { + streamSession = sess; session = sess; + } + @Override public void handle() { String msg = null; @@ -294,7 +197,7 @@ class SAMv3Handler extends SAMv1Handler if (now - _lastPing >= READ_TIMEOUT) { if (_log.shouldWarn()) _log.warn("Failed to respond to PING"); - writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); + writeString(SESSION_ERROR, "PONG timeout"); break; } } else { @@ -309,13 +212,13 @@ class SAMv3Handler extends SAMv1Handler if (now - _lastPing >= 2*READ_TIMEOUT) { if (_log.shouldWarn()) _log.warn("Failed to respond to PING"); - writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); + writeString(SESSION_ERROR, "PONG timeout"); break; } } else if (_lastPing < 0) { if (_log.shouldWarn()) _log.warn("2nd timeout"); - writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n"); + writeString(SESSION_ERROR, "command timeout, bye"); break; } else { // don't clear buffer, don't send ping, @@ -336,7 +239,7 @@ class SAMv3Handler extends SAMv1Handler ReadLine.readLine(socket, buf, gotFirstLine ? 0 : FIRST_READ_TIMEOUT); socket.setSoTimeout(0); } catch (SocketTimeoutException ste) { - writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n"); + writeString(SESSION_ERROR, "command timeout, bye"); break; } line = buf.toString(); @@ -373,7 +276,7 @@ class SAMv3Handler extends SAMv1Handler if (opcode == null) { // This is not a correct message, for sure - if (writeString(domain + " STATUS RESULT=I2P_ERROR MESSAGE=\"command not specified\"\n")) + if (writeString(domain + " STATUS RESULT=I2P_ERROR", "command not specified")) continue; else break; @@ -411,10 +314,13 @@ class SAMv3Handler extends SAMv1Handler } catch (IOException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Caught IOException in handler", e); + writeString(SESSION_ERROR, e.getMessage()); } catch (SAMException e) { _log.error("Unexpected exception for message [" + msg + ']', e); + writeString(SESSION_ERROR, e.getMessage()); } catch (RuntimeException e) { _log.error("Unexpected exception for message [" + msg + ']', e); + writeString(SESSION_ERROR, e.getMessage()); } finally { if (_log.shouldLog(Log.DEBUG)) _log.debug("Stopping handler"); @@ -492,43 +398,48 @@ class SAMv3Handler extends SAMv1Handler protected boolean execSessionMessage(String opcode, Properties props) { String dest = "BUG!"; - String nick = null ; boolean ok = false ; + String nick = (String) props.remove("ID"); + if (nick == null) + return writeString(SESSION_ERROR, "ID not specified"); + + String style = (String) props.remove("STYLE"); + if (style == null && !opcode.equals("REMOVE")) + return writeString(SESSION_ERROR, "No SESSION STYLE specified"); + try{ if (opcode.equals("CREATE")) { if ((this.getRawSession()!= null) || (this.getDatagramSession() != null) || (this.getStreamSession() != null)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Trying to create a session, but one still exists"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n"); + return writeString(SESSION_ERROR, "Session already exists"); } if (props.isEmpty()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("No parameters specified in SESSION CREATE message"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n"); + return writeString(SESSION_ERROR, "No parameters for SESSION CREATE"); } - dest = props.getProperty("DESTINATION"); + dest = (String) props.remove("DESTINATION"); if (dest == null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("SESSION DESTINATION parameter not specified"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n"); + return writeString(SESSION_ERROR, "DESTINATION not specified"); } - props.remove("DESTINATION"); if (dest.equals("TRANSIENT")) { if (_log.shouldLog(Log.DEBUG)) _log.debug("TRANSIENT destination requested"); - String sigTypeStr = props.getProperty("SIGNATURE_TYPE"); + String sigTypeStr = (String) props.remove("SIGNATURE_TYPE"); SigType sigType; if (sigTypeStr != null) { sigType = SigType.parseSigType(sigTypeStr); if (sigType == null) { - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"SIGNATURE_TYPE " - + sigTypeStr + " unsupported\"\n"); + return writeString(SESSION_ERROR, "SIGNATURE_TYPE " + + sigTypeStr + " unsupported"); } - props.remove("SIGNATURE_TYPE"); } else { sigType = SigType.DSA_SHA1; } @@ -543,24 +454,6 @@ class SAMv3Handler extends SAMv1Handler return writeString("SESSION STATUS RESULT=INVALID_KEY\n"); } - - nick = props.getProperty("ID"); - if (nick == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("SESSION ID parameter not specified"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n"); - } - props.remove("ID"); - - - String style = props.getProperty("STYLE"); - if (style == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("SESSION STYLE parameter not specified"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n"); - } - props.remove("STYLE"); - // Unconditionally override what the client may have set // (iMule sets BestEffort) as None is more efficient // and the client has no way to access delivery notifications @@ -570,7 +463,14 @@ class SAMv3Handler extends SAMv1Handler Properties allProps = new Properties(); allProps.putAll(i2cpProps); allProps.putAll(props); - + + if (style.equals("MASTER")) { + // We must put these here, as SessionRecord.getProps() makes a copy, + // and the socket manager is instantiated in the + // SAMStreamSession constructor. + allProps.setProperty("i2p.streaming.enforceProtocol", "true"); + allProps.setProperty("i2cp.dontPublishLeaseSet", "false"); + } try { sSessionsHash.put( nick, new SessionRecord(dest, allProps, this) ) ; @@ -590,44 +490,71 @@ class SAMv3Handler extends SAMv1Handler SAMv3RawSession v3 = new SAMv3RawSession(nick, dgs); rawSession = v3; this.session = v3; + v3.start(); } else if (style.equals("DATAGRAM")) { SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props); SAMv3DatagramSession v3 = new SAMv3DatagramSession(nick, dgs); datagramSession = v3; this.session = v3; + v3.start(); } else if (style.equals("STREAM")) { SAMv3StreamSession v3 = newSAMStreamSession(nick); streamSession = v3; this.session = v3; + v3.start(); + } else if (style.equals("MASTER")) { + SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props); + MasterSession v3 = new MasterSession(nick, dgs, this, allProps); + streamSession = v3; + datagramSession = v3; + rawSession = v3; + this.session = v3; + v3.start(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n"); + return writeString(SESSION_ERROR, "Unrecognized SESSION STYLE"); } ok = true ; return writeString("SESSION STATUS RESULT=OK DESTINATION=" + dest + "\n"); + } else if (opcode.equals("ADD") || opcode.equals("REMOVE")) { + // prevent trouble in finally block + ok = true; + if (streamSession == null || datagramSession == null || rawSession == null) + return writeString(SESSION_ERROR, "Not a MASTER session"); + MasterSession msess = (MasterSession) session; + String msg; + if (opcode.equals("ADD")) { + msg = msess.add(nick, style, props); + } else { + msg = msess.remove(nick, props); + } + if (msg == null) + return writeString("SESSION STATUS RESULT=OK ID=\"" + nick + '"', opcode + ' ' + nick); + else + return writeString(SESSION_ERROR + " ID=\"" + nick + '"', msg); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Unrecognized SESSION message opcode: \"" + opcode + "\""); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n"); + return writeString(SESSION_ERROR, "Unrecognized opcode"); } } catch (DataFormatException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Invalid destination specified"); - return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString("SESSION STATUS RESULT=INVALID_KEY", e.getMessage()); } catch (I2PSessionException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("I2P error when instantiating session", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString(SESSION_ERROR, e.getMessage()); } catch (SAMException e) { if (_log.shouldLog(Log.INFO)) _log.info("Funny SAM error", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString(SESSION_ERROR, e.getMessage()); } catch (IOException e) { _log.error("Unexpected IOException", e); - return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + return writeString(SESSION_ERROR, e.getMessage()); } finally { // unregister the session if it has not been created if ( !ok && nick!=null ) { @@ -655,15 +582,14 @@ class SAMv3Handler extends SAMv1Handler if ( session != null ) { - _log.error ( "STREAM message received, but this session is a master session" ); - + _log.error("v3 control socket cannot be used for STREAM"); try { - notifyStreamResult(true, "I2P_ERROR", "master session cannot be used for streams"); + notifyStreamResult(true, "I2P_ERROR", "v3 control socket cannot be used for STREAM"); } catch (IOException e) {} return false; } - nick = props.getProperty("ID"); + nick = (String) props.remove("ID"); if (nick == null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("SESSION ID parameter not specified"); @@ -672,26 +598,23 @@ class SAMv3Handler extends SAMv1Handler } catch (IOException e) {} return false ; } - props.remove("ID"); rec = sSessionsHash.get(nick); - if ( rec==null ) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM SESSION ID does not exist"); try { - notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID does not exist"); + notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID " + nick + " does not exist"); } catch (IOException e) {} return false ; } streamSession = rec.getHandler().streamSession ; - if (streamSession==null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("specified ID is not a stream session"); try { - notifyStreamResult(true, "I2P_ERROR", "specified ID is not a STREAM session"); + notifyStreamResult(true, "I2P_ERROR", "specified ID " + nick + " is not a STREAM session"); } catch (IOException e) {} return false ; } @@ -733,14 +656,13 @@ class SAMv3Handler extends SAMv1Handler return false; } - String dest = props.getProperty("DESTINATION"); + String dest = (String) props.remove("DESTINATION"); if (dest == null) { notifyStreamResult(verbose, "I2P_ERROR", "Destination not specified in STREAM CONNECT message"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Destination not specified in STREAM CONNECT message"); return false; } - props.remove("DESTINATION"); try { ((SAMv3StreamSession)streamSession).connect( this, dest, props ); @@ -748,19 +670,19 @@ class SAMv3Handler extends SAMv1Handler } catch (DataFormatException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Invalid destination in STREAM CONNECT message"); - notifyStreamResult ( verbose, "INVALID_KEY", null ); + notifyStreamResult ( verbose, "INVALID_KEY", e.getMessage()); } catch (ConnectException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM CONNECT failed", e); - notifyStreamResult ( verbose, "CONNECTION_REFUSED", null ); + notifyStreamResult ( verbose, "CONNECTION_REFUSED", e.getMessage()); } catch (NoRouteToHostException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM CONNECT failed", e); - notifyStreamResult ( verbose, "CANT_REACH_PEER", null ); + notifyStreamResult ( verbose, "CANT_REACH_PEER", e.getMessage()); } catch (InterruptedIOException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM CONNECT failed", e); - notifyStreamResult ( verbose, "TIMEOUT", null ); + notifyStreamResult ( verbose, "TIMEOUT", e.getMessage()); } catch (I2PException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM CONNECT failed", e); @@ -812,7 +734,7 @@ class SAMv3Handler extends SAMv1Handler } catch (SAMException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM ACCEPT failed", e); - notifyStreamResult ( verbose, "ALREADY_ACCEPTING", null ); + notifyStreamResult ( verbose, "ALREADY_ACCEPTING", e.getMessage()); } } catch (IOException e) { } @@ -820,6 +742,11 @@ class SAMv3Handler extends SAMv1Handler } + /** + * @param verbose if false, does nothing + * @param result non-null + * @param message may be null + */ public void notifyStreamResult(boolean verbose, String result, String message) throws IOException { if (!verbose) return ; String msgString = createMessageString(message); @@ -870,29 +797,29 @@ class SAMv3Handler extends SAMv1Handler String user = props.getProperty("USER"); String pw = props.getProperty("PASSWORD"); if (user == null || pw == null) - return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER and PASSWORD required\"\n"); + return writeString(AUTH_ERROR, "USER and PASSWORD required"); String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX; if (i2cpProps.containsKey(prop)) - return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " already exists\"\n"); + return writeString(AUTH_ERROR, "user " + user + " already exists"); PasswordManager pm = new PasswordManager(I2PAppContext.getGlobalContext()); String shash = pm.createHash(pw); i2cpProps.setProperty(prop, shash); } else if (opcode.equals("REMOVE")) { String user = props.getProperty("USER"); if (user == null) - return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER required\"\n"); + return writeString(AUTH_ERROR, "USER required"); String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX; if (!i2cpProps.containsKey(prop)) - return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " not found\"\n"); + return writeString(AUTH_ERROR, "user " + user + " not found"); i2cpProps.remove(prop); } else { - return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown AUTH command\"\n"); + return writeString(AUTH_ERROR, "Unknown AUTH command"); } try { bridge.saveConfig(); return writeString("AUTH STATUS RESULT=OK\n"); } catch (IOException ioe) { - return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Config save failed: " + ioe + "\"\n"); + return writeString(AUTH_ERROR, "Config save failed: " + ioe); } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java index b68f3a74a151f7d5126b68e3be8234fc18906002..9cbd4f489a0b87eb0faa674ffcbf59f02455da43 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java @@ -10,6 +10,7 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Properties; +import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; @@ -19,7 +20,7 @@ import net.i2p.util.Log; * @author MKVore * */ -class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SAMRawReceiver { +class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver { private final String nick; private final SAMv3Handler handler; @@ -33,6 +34,8 @@ class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SA * Build a Raw Datagram Session according to information * registered with the given nickname * + * Caller MUST call start(). + * * @param nick nickname of the session * @throws IOException * @throws DataFormatException @@ -42,36 +45,64 @@ class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SA throws IOException, DataFormatException, I2PSessionException { super(SAMv3Handler.sSessionsHash.get(nick).getDest(), SAMv3Handler.sSessionsHash.get(nick).getProps(), - SAMv3Handler.sSessionsHash.get(nick).getHandler() // to be replaced by this + null // to be replaced by this ); this.nick = nick ; this.recv = this ; // replacement this.server = dgServer; - - SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if (rec == null) throw new InterruptedIOException() ; this.handler = rec.getHandler(); Properties props = rec.getProps(); + clientAddress = getSocketAddress(props, handler); + _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) && + Boolean.parseBoolean(props.getProperty("HEADER")); + } + + /** + * Build a Raw Session on an existing i2p session + * registered with the given nickname + * + * Caller MUST call start(). + * + * @param nick nickname of the session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + * @since 0.9.25 + */ + public SAMv3RawSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess, + int listenProtocol, int listenPort, SAMv3DatagramServer dgServer) + throws IOException, DataFormatException, I2PSessionException { + super(isess, props, listenProtocol, listenPort, null); // to be replaced by this + this.nick = nick ; + this.recv = this ; // replacement + this.server = dgServer; + this.handler = handler; + clientAddress = getSocketAddress(props, handler); + _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) && + Boolean.parseBoolean(props.getProperty("HEADER")); + } + + /** + * @return null if PORT not set + * @since 0.9.25 moved from constructor + */ + static SocketAddress getSocketAddress(Properties props, SAMv3Handler handler) { String portStr = props.getProperty("PORT") ; if (portStr == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("receiver port not specified. Current socket will be used."); - this.clientAddress = null; + return null; } else { int port = Integer.parseInt(portStr); String host = props.getProperty("HOST"); if ( host==null ) { - host = rec.getHandler().getClientIP(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("no host specified. Taken from the client socket : " + host +':'+port); + host = handler.getClientIP(); } - this.clientAddress = new InetSocketAddress(host, port); + return new InetSocketAddress(host, port); } - _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) && - Boolean.parseBoolean(props.getProperty("HEADER")); } - + public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException { if (this.clientAddress==null) { this.handler.receiveRawBytes(data, proto, fromPort, toPort); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java index 54f58054f6b7e74e66bafed57f05ae60fb4dbe37..ffeb7142a6979f67a15c93327b8776f6aa0012b4 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.security.GeneralSecurityException; import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLException; @@ -30,6 +31,7 @@ import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketOptions; import net.i2p.data.DataFormatException; import net.i2p.data.Destination; @@ -43,16 +45,19 @@ import net.i2p.util.Log; * @author mkvore */ -class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Session +class SAMv3StreamSession extends SAMStreamSession implements Session { - private static final int BUFFER_SIZE = 1024 ; + private static final int BUFFER_SIZE = 1024; + private static final int MAX_ACCEPT_QUEUE = 64; private final Object socketServerLock = new Object(); /** this is ONLY set for FORWARD, not for ACCEPT */ private I2PServerSocket socketServer; /** this is the count of active ACCEPT sockets */ private final AtomicInteger _acceptors = new AtomicInteger(); + /** for subsession only, null otherwise */ + private final LinkedBlockingQueue<I2PSocket> _acceptQueue; private static I2PSSLSocketFactory _sslSocketFactory; @@ -66,6 +71,8 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi * Create a new SAM STREAM session, according to information * registered with the given nickname * + * Caller MUST call start(). + * * @param login The nickname * @throws IOException * @throws DataFormatException @@ -79,9 +86,61 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi getDB().get(login).getProps(), getDB().get(login).getHandler()); this.nick = login ; + _acceptQueue = null; + } + + /** + * Build a Stream Session on an existing I2P session + * registered with the given nickname + * + * Caller MUST call start(). + * + * @param nick nickname of the session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + * @since 0.9.25 + */ + public SAMv3StreamSession(String login, Properties props, SAMv3Handler handler, I2PSocketManager mgr, + int listenPort) throws IOException, DataFormatException, SAMException { + super(mgr, props, handler, listenPort); + this.nick = login ; + _acceptQueue = new LinkedBlockingQueue<I2PSocket>(MAX_ACCEPT_QUEUE); + } + + /** + * Put a socket on the accept queue. + * Only for subsession, throws IllegalStateException otherwise. + * + * @return success, false if full + * @since 0.9.25 + */ + public boolean queueSocket(I2PSocket sock) { + if (_acceptQueue == null) + throw new IllegalStateException(); + return _acceptQueue.offer(sock); } - public static SAMv3Handler.SessionsDB getDB() + /** + * Take a socket from the accept queue. + * Only for subsession, throws IllegalStateException otherwise. + * + * @since 0.9.25 + */ + private I2PSocket acceptSocket() throws ConnectException { + if (_acceptQueue == null) + throw new IllegalStateException(); + try { + // TODO there's no CoDel or expiration in this queue + return _acceptQueue.take(); + } catch (InterruptedException ie) { + ConnectException ce = new ConnectException("interrupted"); + ce.initCause(ie); + throw ce; + } + } + + public static SessionsDB getDB() { return SAMv3Handler.sSessionsHash ; } @@ -135,7 +194,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi I2PSocket i2ps = socketMgr.connect(d, opts); - SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if ( rec==null ) throw new InterruptedIOException() ; @@ -185,15 +244,18 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi } } - I2PSocket i2ps; + I2PSocket i2ps = null; _acceptors.incrementAndGet(); try { - i2ps = socketMgr.getServerSocket().accept(); + if (_acceptQueue != null) + i2ps = acceptSocket(); + else + i2ps = socketMgr.getServerSocket().accept(); } finally { _acceptors.decrementAndGet(); } - SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if ( rec==null || i2ps==null ) throw new InterruptedIOException() ; @@ -223,7 +285,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi */ public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException, InterruptedIOException { - SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT")); if ( rec==null ) throw new InterruptedIOException() ; @@ -257,25 +319,23 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi this.socketServer = this.socketMgr.getServerSocket(); } - SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, this, verbose, sendPorts); + SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, verbose, sendPorts); (new I2PAppThread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start(); } /** * Forward sockets from I2P to the host/port provided */ - private static class SocketForwarder implements Runnable + private class SocketForwarder implements Runnable { private final String host; private final int port; - private final SAMv3StreamSession session; private final boolean isSSL, verbose, sendPorts; SocketForwarder(String host, int port, boolean isSSL, - SAMv3StreamSession session, boolean verbose, boolean sendPorts) { + boolean verbose, boolean sendPorts) { this.host = host ; this.port = port ; - this.session = session ; this.verbose = verbose ; this.sendPorts = sendPorts; this.isSSL = isSSL; @@ -283,12 +343,15 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi public void run() { - while (session.getSocketServer()!=null) { + while (getSocketServer() != null) { // wait and accept a connection from I2P side I2PSocket i2ps; try { - i2ps = session.getSocketServer().accept(); + if (_acceptQueue != null) + i2ps = acceptSocket(); + else + i2ps = getSocketServer().accept(); if (i2ps == null) continue; } catch (SocketTimeoutException ste) { @@ -437,7 +500,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi } } - private I2PServerSocket getSocketServer() + protected I2PServerSocket getSocketServer() { synchronized ( this.socketServerLock ) { return this.socketServer ; @@ -450,7 +513,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi */ public void stopForwardingIncoming() throws SAMException, InterruptedIOException { - SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if ( rec==null ) throw new InterruptedIOException() ; @@ -474,18 +537,11 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi /** * Close the stream session + * TODO Why do we override? */ @Override public void close() { - socketMgr.destroySocketManager(); - } - - /** - * Unsupported - * @throws DataFormatException always - */ - public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws DataFormatException - { - throw new DataFormatException(null); + if (_isOwnSession) + socketMgr.destroySocketManager(); } } diff --git a/apps/sam/java/src/net/i2p/sam/Session.java b/apps/sam/java/src/net/i2p/sam/Session.java new file mode 100644 index 0000000000000000000000000000000000000000..b3c13adf94f6225239810e6adf9966c68c9125d6 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/Session.java @@ -0,0 +1,11 @@ +package net.i2p.sam; + +/** + * A V3 session. + * + * @since 0.9.25 moved from SAMv3Handler + */ +interface Session extends SAMMessageSess { + String getNick(); +} + diff --git a/apps/sam/java/src/net/i2p/sam/SessionRecord.java b/apps/sam/java/src/net/i2p/sam/SessionRecord.java new file mode 100644 index 0000000000000000000000000000000000000000..f50e6d5206b5b1bfc7618b4de8261286cced2197 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SessionRecord.java @@ -0,0 +1,71 @@ +package net.i2p.sam; +/* + * free (adj.): unencumbered; not under the control of others + * Written by human in 2004 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.util.Properties; + +/** + * The values in the SessionsDB + * + * @since 0.9.25 moved from SAMv3Handler + */ +class SessionRecord { + private final String m_dest ; + private final Properties m_props ; + private ThreadGroup m_threadgroup ; + private final SAMv3Handler m_handler ; + + public SessionRecord( String dest, Properties props, SAMv3Handler handler ) + { + m_dest = dest; + m_props = new Properties() ; + m_props.putAll(props); + m_handler = handler ; + } + + public SessionRecord( SessionRecord in ) + { + m_dest = in.getDest(); + m_props = in.getProps(); + m_threadgroup = in.getThreadGroup(); + m_handler = in.getHandler(); + } + + public String getDest() + { + return m_dest; + } + + /** + * Warning - returns a copy. + * @return a copy + */ + synchronized public Properties getProps() + { + Properties p = new Properties(); + p.putAll(m_props); + return m_props; + } + + public SAMv3Handler getHandler() + { + return m_handler ; + } + + synchronized public ThreadGroup getThreadGroup() + { + return m_threadgroup ; + } + + synchronized public void createThreadGroup(String name) + { + if (m_threadgroup == null) + m_threadgroup = new ThreadGroup(name); + } +} diff --git a/apps/sam/java/src/net/i2p/sam/SessionsDB.java b/apps/sam/java/src/net/i2p/sam/SessionsDB.java new file mode 100644 index 0000000000000000000000000000000000000000..dfdc3c0041a143ce5904ce2a1f7e7c224d91c8be --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SessionsDB.java @@ -0,0 +1,76 @@ +package net.i2p.sam; +/* + * free (adj.): unencumbered; not under the control of others + * Written by human in 2004 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.util.HashMap; + +/** + * basically a HashMap from String to SessionRecord + * + * @since 0.9.25 moved from SAMv3Handler + */ +class SessionsDB { + private static final long serialVersionUID = 0x1; + + static class ExistingIdException extends Exception { + private static final long serialVersionUID = 0x1; + } + + static class ExistingDestException extends Exception { + private static final long serialVersionUID = 0x1; + } + + private final HashMap<String, SessionRecord> map; + + public SessionsDB() { + map = new HashMap<String, SessionRecord>() ; + } + + public synchronized void put(String nick, SessionRecord session) + throws ExistingIdException, ExistingDestException + { + if ( map.containsKey(nick) ) { + throw new ExistingIdException(); + } + for ( SessionRecord r : map.values() ) { + if (r.getDest().equals(session.getDest())) { + throw new ExistingDestException(); + } + } + session.createThreadGroup("SAM session "+nick); + map.put(nick, session) ; + } + + /** @since 0.9.25 */ + public synchronized void putDupDestOK(String nick, SessionRecord session) + throws ExistingIdException + { + if (map.containsKey(nick)) { + throw new ExistingIdException(); + } + session.createThreadGroup("SAM session "+nick); + map.put(nick, session) ; + } + + /** @return true if removed */ + synchronized public boolean del( String nick ) + { + return map.remove(nick) != null; + } + + synchronized public SessionRecord get(String nick) + { + return map.get(nick); + } + + synchronized public boolean containsKey( String nick ) + { + return map.containsKey(nick); + } +} diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java index dc90d3fb2423dd31688e7e7cbcef8a0411158617..04fea78bf302018ca3a8ac75d65b2dc2abb54645 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java @@ -18,6 +18,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { private String _version; private final Object _helloLock = new Object(); private Boolean _sessionCreateOk; + private Boolean _sessionAddOk; private Boolean _streamStatusOk; private final Object _sessionCreateLock = new Object(); private final Object _namingReplyLock = new Object(); @@ -41,13 +42,19 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } } + /** may be called twice, first for CREATE and second for ADD */ @Override public void sessionStatusReceived(String result, String destination, String msg) { synchronized (_sessionCreateLock) { + Boolean ok; if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result)) - _sessionCreateOk = Boolean.TRUE; + ok = Boolean.TRUE; else - _sessionCreateOk = Boolean.FALSE; + ok = Boolean.FALSE; + if (_sessionCreateOk == null) + _sessionCreateOk = ok; + else if (_sessionAddOk == null) + _sessionAddOk = ok; _sessionCreateLock.notifyAll(); } } @@ -120,6 +127,25 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } } + /** + * Wait for the session to be added, returning true if everything went ok + * + * @return true if everything ok + * @since 0.9.25 + */ + public boolean waitForSessionAddReply() { + while (true) { + try { + synchronized (_sessionCreateLock) { + if (_sessionAddOk == null) + _sessionCreateLock.wait(); + else + return _sessionAddOk.booleanValue(); + } + } catch (InterruptedException ie) { return false; } + } + } + /** * Wait for the stream to be created, returning true if everything went ok * diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java index 0ab88cfcee93bc4b1916401632dc88ba5d0dea97..7153998e94d7a21a0734d722ac67cb1571753a7b 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java @@ -137,6 +137,10 @@ public class SAMReader { if ( (eq > 0) && (eq < pair.length() - 1) ) { String name = pair.substring(0, eq); String val = pair.substring(eq+1); + if (val.length() <= 0) { + _log.error("Empty value for " + name); + continue; + } while ( (val.charAt(0) == '\"') && (val.length() > 0) ) val = val.substring(1); while ( (val.length() > 0) && (val.charAt(val.length()-1) == '\"') ) diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java index 761e50e814aac27b7e3090146d42495cd569af80..30f0af0864e54bc5d2cdc9038c9d285134a039d3 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -55,16 +55,21 @@ public class SAMStreamSend { private static I2PSSLSocketFactory _sslSocketFactory; private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4; - private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" + + private static final int MASTER=8; + private static final String USAGE = "Usage: SAMStreamSend [-s] [-x] [-m mode] [-v version] [-b samHost] [-p samPort]\n" + + " [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" + " modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" + + " default is stream\n" + " -s: use SSL\n" + + " -x: use master session (forces -v 3.3)\n" + " multiple -o session options are allowed"; public static void main(String args[]) { - Getopt g = new Getopt("SAM", args, "sb:m:o:p:u:v:w:"); + Getopt g = new Getopt("SAM", args, "sxhb:m:o:p:u:v:w:"); boolean isSSL = false; + boolean isMaster = false; int mode = STREAM; - String version = "1.0"; + String version = "3.3"; String host = "127.0.0.1"; String port = "7656"; String user = null; @@ -77,6 +82,10 @@ public class SAMStreamSend { isSSL = true; break; + case 'x': + isMaster = true; + break; + case 'm': mode = Integer.parseInt(g.getOptarg()); if (mode < 0 || mode > V1RAW) { @@ -123,6 +132,10 @@ public class SAMStreamSend { System.err.println(USAGE); return; } + if (isMaster) { + mode += MASTER; + version = "3.3"; + } if ((user == null && password != null) || (user != null && password == null)) { System.err.println("both user and password or neither"); @@ -162,6 +175,8 @@ public class SAMStreamSend { _log.debug("Reader created"); OutputStream out = sock.getOutputStream(); String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts); + if (mode >= MASTER) + mode -= MASTER; if (ourDest == null) throw new IOException("handshake failed"); if (_log.shouldLog(Log.DEBUG)) @@ -230,7 +245,10 @@ public class SAMStreamSend { return sock; } - /** @return our b64 dest or null */ + /** + * @param isMaster is this the control socket + * @return our b64 dest or null + */ private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode, String user, String password, String opts) { @@ -261,24 +279,66 @@ public class SAMStreamSend { _v3ID = "xx€€xx" + _v3ID; _conOptions = "ID=" + _v3ID; } + boolean masterMode; // are we using v3.3 master session + String command; + if (mode >= MASTER) { + masterMode = true; + command = "ADD"; + mode -= MASTER; + } else { + masterMode = false; + command = "CREATE DESTINATION=TRANSIENT"; + } String style; if (mode == STREAM) style = "STREAM"; else if (mode == DG || mode == V1DG) style = "DATAGRAM"; - else + else // RAW or V1RAW style = "RAW"; - String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n'; + + if (masterMode) { + if (mode == V1DG || mode == V1RAW) + throw new IllegalArgumentException("v1 dg/raw incompatible with master session"); + String req = "SESSION CREATE DESTINATION=TRANSIENT STYLE=MASTER ID=masterSend " + opts + '\n'; + samOut.write(req.getBytes("UTF-8")); + samOut.flush(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("SESSION CREATE STYLE=MASTER sent"); + boolean ok = eventHandler.waitForSessionCreateReply(); + if (!ok) + throw new IOException("SESSION CREATE STYLE=MASTER failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok); + // PORT required even if we aren't listening for this test + if (mode != STREAM) + opts += " PORT=9999"; + } + String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + opts + '\n'; samOut.write(req.getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Session create sent"); - boolean ok = eventHandler.waitForSessionCreateReply(); + _log.debug("SESSION " + command + " sent"); + boolean ok; + if (masterMode) + ok = eventHandler.waitForSessionAddReply(); + else + ok = eventHandler.waitForSessionCreateReply(); if (!ok) - throw new IOException("Session create failed"); + throw new IOException("SESSION " + command + " failed"); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Session create reply found: " + ok); + _log.debug("SESSION " + command + " reply found: " + ok); + if (masterMode) { + // do a bunch more + req = "SESSION ADD STYLE=STREAM FROM_PORT=99 ID=stream99\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION ADD STYLE=STREAM FROM_PORT=98 ID=stream98\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION REMOVE ID=stream99\n"; + samOut.write(req.getBytes("UTF-8")); + samOut.flush(); + } req = "NAMING LOOKUP NAME=ME\n"; samOut.write(req.getBytes("UTF-8")); samOut.flush(); @@ -453,6 +513,7 @@ public class SAMStreamSend { baos.write(DataHelper.getUTF8(" PROTOCOL=123 TO_PORT=5678")); else baos.write(DataHelper.getUTF8(" TO_PORT=5678")); + baos.write(DataHelper.getUTF8(" SEND_TAGS=19 TAG_THRESHOLD=13 EXPIRES=33 SEND_LEASESET=true")); } baos.write((byte) '\n'); baos.write(data, 0, read); diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index d59ebb0a37e303cf43d78ed6ecf71a21fd85c430..0b67b7abdfebd8be97d283625c06ee4121bd4181 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -58,21 +58,25 @@ public class SAMStreamSink { private static I2PSSLSocketFactory _sslSocketFactory; private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5, FORWARD = 6, FORWARDSSL=7; + private static final int MASTER=8; private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort]\n" + " [-o opt=val] [-u user] [-w password] myDestFile sinkDir\n" + " modes: stream: 0; datagram: 1; v1datagram: 2;\n" + " raw: 3; v1raw: 4; raw-with-headers: 5;\n" + " stream-forward: 6; stream-forward-ssl: 7\n" + + " default is stream\n" + " -s: use SSL to connect to bridge\n" + + " -x: use master session (forces -v 3.3)\n" + " multiple -o session options are allowed"; private static final int V3FORWARDPORT=9998; private static final int V3DGPORT=9999; public static void main(String args[]) { - Getopt g = new Getopt("SAM", args, "sb:m:p:u:v:w:"); + Getopt g = new Getopt("SAM", args, "sxhb:m:p:u:v:w:"); boolean isSSL = false; + boolean isMaster = false; int mode = STREAM; - String version = "1.0"; + String version = "3.3"; String host = "127.0.0.1"; String port = "7656"; String user = null; @@ -85,6 +89,10 @@ public class SAMStreamSink { isSSL = true; break; + case 'x': + isMaster = true; + break; + case 'm': mode = Integer.parseInt(g.getOptarg()); if (mode < 0 || mode > FORWARDSSL) { @@ -131,6 +139,10 @@ public class SAMStreamSink { System.err.println(USAGE); return; } + if (isMaster) { + mode += MASTER; + version = "3.3"; + } if ((user == null && password != null) || (user != null && password == null)) { System.err.println("both user and password or neither"); @@ -169,6 +181,8 @@ public class SAMStreamSink { if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts); + if (mode >= MASTER) + mode -= MASTER; if (ourDest == null) throw new IOException("handshake failed"); if (_log.shouldLog(Log.DEBUG)) @@ -560,7 +574,10 @@ public class SAMStreamSink { return sock; } - /** @return our b64 dest or null */ + /** + * @param isMaster is this the control socket + * @return our b64 dest or null + */ private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode, String user, String password, String sopts) { @@ -641,6 +658,16 @@ public class SAMStreamSink { // and give it to the SAM server dest = _destFile; } + boolean masterMode; // are we using v3.3 master session + String command; + if (mode >= MASTER) { + masterMode = true; + command = "ADD"; + mode -= MASTER; + } else { + masterMode = false; + command = "CREATE DESTINATION=" + dest; + } String style; if (mode == STREAM || mode == FORWARD || mode == FORWARDSSL) style = "STREAM"; @@ -654,17 +681,61 @@ public class SAMStreamSink { style = "RAW PORT=" + V3DGPORT; else style = "RAW HEADER=true PORT=" + V3DGPORT; - String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + ' ' + _conOptions + ' ' + sopts + '\n'; + + if (masterMode) { + if (mode == V1DG || mode == V1RAW) + throw new IllegalArgumentException("v1 dg/raw incompatible with master session"); + String req = "SESSION CREATE DESTINATION=" + dest + " STYLE=MASTER ID=masterSink " + sopts + '\n'; + samOut.write(req.getBytes("UTF-8")); + samOut.flush(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("SESSION CREATE STYLE=MASTER sent"); + boolean ok = eventHandler.waitForSessionCreateReply(); + if (!ok) + throw new IOException("SESSION CREATE STYLE=MASTER failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok); + } + + String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + sopts + '\n'; samOut.write(req.getBytes("UTF-8")); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Session create sent"); - if (mode == STREAM) { - boolean ok = eventHandler.waitForSessionCreateReply(); + _log.debug("SESSION " + command + " sent"); + //if (mode == STREAM) { + boolean ok; + if (masterMode) + ok = eventHandler.waitForSessionAddReply(); + else + ok = eventHandler.waitForSessionCreateReply(); if (!ok) - throw new IOException("Session create failed"); + throw new IOException("SESSION " + command + " failed"); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Session create reply found: " + ok); + _log.debug("SESSION " + command + " reply found: " + ok); + //} + if (masterMode) { + // do a bunch more + req = "SESSION ADD STYLE=STREAM FROM_PORT=99 ID=stream99\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION ADD STYLE=STREAM FROM_PORT=98 ID=stream98\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION ADD STYLE=DATAGRAM PORT=9997 LISTEN_PORT=97 ID=dg97\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION ADD STYLE=DATAGRAM PORT=9996 FROM_PORT=96 ID=dg96\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION ADD STYLE=RAW PORT=9995 LISTEN_PORT=95 ID=raw95\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION ADD STYLE=RAW PORT=9994 FROM_PORT=94 LISTEN_PROTOCOL=222 ID=raw94\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION REMOVE ID=stream99\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION REMOVE ID=raw95\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION REMOVE ID=notfound\n"; + samOut.write(req.getBytes("UTF-8")); + req = "SESSION REMOVE ID=masterSink\n"; // shouldn't remove ourselves + samOut.write(req.getBytes("UTF-8")); + samOut.flush(); } req = "NAMING LOOKUP NAME=ME\n"; samOut.write(req.getBytes("UTF-8"));