From 270bc24b627ce9eb482b35ea731f6b2030e811ce Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Fri, 5 Feb 2016 18:44:35 +0000 Subject: [PATCH] SAM: Add start() to session interface, don't start threads in constructors. Start master acceptor thread. Javadocs, SAMv2StreamSession cleanup --- .../java/src/net/i2p/sam/MasterSession.java | 31 ++++++- .../src/net/i2p/sam/SAMDatagramSession.java | 3 + .../java/src/net/i2p/sam/SAMMessageSess.java | 6 ++ .../src/net/i2p/sam/SAMMessageSession.java | 10 ++- .../java/src/net/i2p/sam/SAMRawSession.java | 2 + .../src/net/i2p/sam/SAMStreamSession.java | 16 +++- .../java/src/net/i2p/sam/SAMv1Handler.java | 3 + .../src/net/i2p/sam/SAMv2StreamSession.java | 87 +++++++------------ .../src/net/i2p/sam/SAMv3DatagramSession.java | 4 + .../java/src/net/i2p/sam/SAMv3Handler.java | 4 + .../java/src/net/i2p/sam/SAMv3RawSession.java | 4 + .../src/net/i2p/sam/SAMv3StreamSession.java | 6 +- 12 files changed, 110 insertions(+), 66 deletions(-) diff --git a/apps/sam/java/src/net/i2p/sam/MasterSession.java b/apps/sam/java/src/net/i2p/sam/MasterSession.java index 6e6a5f07f0..de4ebfde3e 100644 --- a/apps/sam/java/src/net/i2p/sam/MasterSession.java +++ b/apps/sam/java/src/net/i2p/sam/MasterSession.java @@ -22,6 +22,7 @@ import net.i2p.client.streaming.I2PSocket; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; +import net.i2p.util.I2PAppThread; import net.i2p.util.Log; /** @@ -39,11 +40,14 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S private final SAMv3Handler handler; private final SAMv3DatagramServer dgs; private final Map<String, SAMMessageSess> sessions; + private final StreamAcceptor streamAcceptor; /** * Build a Session according to information - * registered with the given nickname - * + * registered with the given nickname. + * + * Caller MUST call start(). + * * @param nick nickname of the session * @throws IOException * @throws DataFormatException @@ -63,6 +67,16 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S // if we get a RAW session added with 0/0, it will replace this, // and we won't add this back if removed. isess.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); + streamAcceptor = new StreamAcceptor(); + } + + /** + * Overridden to start the acceptor. + */ + @Override + public void start() { + Thread t = new I2PAppThread(streamAcceptor, "SAMMasterAcceptor"); + t.start(); } /** @@ -147,6 +161,7 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S } // listeners etc + sess.start(); // all ok return null; } @@ -236,10 +251,12 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S /** * Close the master session + * Overridden to stop the acceptor. */ @Override public void close() { // close sessions? + streamAcceptor.stopRunning(); super.close(); } @@ -290,11 +307,17 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S private class StreamAcceptor implements Runnable { + private volatile boolean stop; + public StreamAcceptor() { } + public void stopRunning() { + stop = true; + } + public void run() { - while (getSocketServer()!=null) { + while (!stop && getSocketServer() != null) { // wait and accept a connection from I2P side I2PSocket i2ps; @@ -353,6 +376,8 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S _log.warn("No subsession found for incoming streaming connection on port " + port); } } + if (_log.shouldWarn()) + _log.warn("Stream acceptor stopped"); } } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java b/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java index d72dfb5a48..e9f289e360 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java @@ -35,6 +35,7 @@ class SAMDatagramSession extends SAMMessageSession { private final I2PDatagramMaker dgramMaker; private final I2PDatagramDissector dgramDissector = new I2PDatagramDissector(); + /** * Create a new SAM DATAGRAM session. * @@ -57,6 +58,8 @@ class SAMDatagramSession extends SAMMessageSession { /** * Create a new SAM DATAGRAM session. * + * Caller MUST call start(). + * * @param destStream Input stream containing the destination keys * @param props Properties to setup the I2P session * @param recv Object that will receive incoming data diff --git a/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java b/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java index 97af742125..189a8f6de3 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java +++ b/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java @@ -15,6 +15,12 @@ import net.i2p.data.Destination; */ interface SAMMessageSess extends Closeable { + /** + * Start a SAM message-based session. + * MUST be called after constructor. + */ + public void start(); + /** * Close a SAM message-based session. */ diff --git a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java index eca08f1df0..a65f387cf1 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java @@ -72,9 +72,6 @@ abstract class SAMMessageSession implements SAMMessageSess { session = handler.getSession(); listenProtocol = I2PSession.PROTO_ANY; listenPort = I2PSession.PORT_ANY; - // FIXME don't start threads in constructors - Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler"); - t.start(); } /** @@ -97,7 +94,12 @@ abstract class SAMMessageSession implements SAMMessageSess { handler = new SAMMessageSessionHandler(session); this.listenProtocol = listenProtocol; this.listenPort = listenPort; - // FIXME don't start threads in constructors + } + + /* + * @since 0.9.25 + */ + public void start() { Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler"); t.start(); } diff --git a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java index b92a5db698..fa2fef9151 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java @@ -49,6 +49,8 @@ class SAMRawSession extends SAMMessageSession { /** * Create a new SAM RAW session. * + * Caller MUST call start(). + * * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile) * @param props Properties to setup the I2P session * @param recv Object that will receive incoming data diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index 44c7b1d0b2..eb347b3ad2 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -84,6 +84,8 @@ class SAMStreamSession implements SAMMessageSess { /** * Create a new SAM STREAM session. * + * Caller MUST call start(). + * * @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile) * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") or "__v3__" if extended by SAMv3StreamSession * @param props Properties to setup the I2P session @@ -181,11 +183,7 @@ class SAMStreamSession implements SAMMessageSess { if (startAcceptor) { - // FIXME don't start threads in constructors server = new SAMStreamSessionServer(); - Thread t = new I2PAppThread(server, "SAMStreamSessionServer"); - - t.start(); } else { server = null; } @@ -219,6 +217,16 @@ class SAMStreamSession implements SAMMessageSess { server = null; } + /* + * @since 0.9.25 + */ + public void start() { + if (server != null) { + Thread t = new I2PAppThread(server, "SAMStreamSessionServer"); + t.start(); + } + } + /* * @since 0.9.25 */ diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index c181fba6b7..8f16c1c160 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -274,8 +274,10 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (style.equals("RAW")) { rawSession = new SAMRawSession(destKeystream, props, this); + rawSession.start(); } else if (style.equals("DATAGRAM")) { datagramSession = new SAMDatagramSession(destKeystream, props,this); + datagramSession.start(); } else if (style.equals("STREAM")) { String dir = (String) props.remove("DIRECTION"); if (dir == null) { @@ -290,6 +292,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } streamSession = newSAMStreamSession(destKeystream, dir,props); + streamSession.start(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java index dcd8ca4be4..a1df41eb77 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java @@ -36,13 +36,13 @@ import net.i2p.util.Log; * * @author mkvore */ - class SAMv2StreamSession extends SAMStreamSession { - /** * Create a new SAM STREAM session. * + * Caller MUST call start(). + * * @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile) * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") * @param props Properties to setup the I2P session @@ -60,6 +60,8 @@ class SAMv2StreamSession extends SAMStreamSession /** * Create a new SAM STREAM session. * + * Caller MUST call start(). + * * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile) * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") * @param props Properties to setup the I2P session @@ -86,7 +88,6 @@ class SAMv2StreamSession extends SAMStreamSession * receive-only session * @return true if the communication with the SAM client is ok */ - @Override public boolean connect ( int id, String dest, Properties props ) throws DataFormatException, SAMInvalidDirectionException @@ -120,31 +121,25 @@ class SAMv2StreamSession extends SAMStreamSession return true ; } - - - /** - * SAM STREAM socket connecter, running in its own thread. - * - * @author mkvore - */ - + * SAM STREAM socket connecter, running in its own thread. + * + * @author mkvore + */ private class StreamConnector implements Runnable { - private final int id; private final Destination dest ; private final I2PSocketOptions opts ; /** - * Create a new SAM STREAM session socket reader - * - * @param id Unique id assigned to the handler - * @param dest Destination to reach - * @param opts Socket options (I2PSocketOptions) + * Create a new SAM STREAM session socket reader + * + * @param id Unique id assigned to the handler + * @param dest Destination to reach + * @param opts Socket options (I2PSocketOptions) */ - public StreamConnector ( int id, Destination dest, I2PSocketOptions opts )// throws IOException { if (_log.shouldLog(Log.DEBUG)) @@ -155,7 +150,6 @@ class SAMv2StreamSession extends SAMStreamSession this.dest = dest ; } - public void run() { if (_log.shouldLog(Log.DEBUG)) @@ -215,18 +209,15 @@ class SAMv2StreamSession extends SAMStreamSession } } - - /** - * Lets us push data through the stream without blocking, (even after exceeding - * the I2PSocket's buffer) + * Lets us push data through the stream without blocking, (even after exceeding + * the I2PSocket's buffer) * * @param s I2PSocket * @param id Socket ID * @return v2StreamSender * @throws IOException */ - @Override protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException { @@ -241,7 +232,6 @@ class SAMv2StreamSession extends SAMStreamSession } private class V2StreamSender extends StreamSender - { private final List<ByteArray> _data; private int _dataSize; @@ -260,12 +250,12 @@ class SAMv2StreamSession extends SAMStreamSession } /** - * Send bytes through the SAM STREAM session socket sender - * + * Send bytes through the SAM STREAM session socket sender + * * @param in Data stream of data to send * @param size Count of bytes to send * @throws IOException if the client didnt provide enough data - */ + */ @Override public void sendBytes ( InputStream in, int size ) throws IOException { @@ -307,9 +297,9 @@ class SAMv2StreamSession extends SAMStreamSession } /** - * Stop a SAM STREAM session socket sender thread immediately - * - */ + * Stop a SAM STREAM session socket sender thread immediately + * + */ @Override public void stopRunning() { @@ -342,9 +332,9 @@ class SAMv2StreamSession extends SAMStreamSession } /** - * Stop a SAM STREAM session socket sender gracefully: stop the - * sender thread once all pending data has been sent. - */ + * Stop a SAM STREAM session socket sender gracefully: stop the + * sender thread once all pending data has been sent. + */ @Override public void shutDownGracefully() { @@ -431,8 +421,6 @@ class SAMv2StreamSession extends SAMStreamSession } } - - /** * Send bytes through a SAM STREAM session. * @@ -459,30 +447,24 @@ class SAMv2StreamSession extends SAMStreamSession return true; } - /** - * SAM STREAM socket reader, running in its own thread. It forwards - * forward data to/from an I2P socket. - * - * @author human + * SAM STREAM socket reader, running in its own thread. It forwards + * forward data to/from an I2P socket. + * + * @author human */ - - - public class SAMv2StreamSessionSocketReader extends SAMv1StreamSessionSocketReader { - protected boolean nolimit ; protected long limit ; protected long totalReceived ; - /** - * Create a new SAM STREAM session socket reader - * - * @param s Socket to be handled - * @param id Unique id assigned to the handler - */ + * Create a new SAM STREAM session socket reader + * + * @param s Socket to be handled + * @param id Unique id assigned to the handler + */ public SAMv2StreamSessionSocketReader ( I2PSocket s, int id ) throws IOException { super ( s, id ); @@ -581,7 +563,4 @@ class SAMv2StreamSession extends SAMStreamSession _log.debug ( "Shutting down SAM STREAM session socket handler " + id ); } } - - - } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java index 6022b1c798..c118e3dba2 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java @@ -32,6 +32,8 @@ class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDat * build a DatagramSession according to informations registered * with the given nickname * + * Caller MUST call start(). + * * @param nick nickname of the session * @throws IOException * @throws DataFormatException @@ -61,6 +63,8 @@ class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDat * Build a Datagram Session on an existing i2p session * registered with the given nickname * + * Caller MUST call start(). + * * @param nick nickname of the session * @throws IOException * @throws DataFormatException diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java index ce61f16871..66f76efc30 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -455,15 +455,18 @@ class SAMv3Handler extends SAMv1Handler SAMv3RawSession v3 = new SAMv3RawSession(nick, dgs); rawSession = v3; this.session = v3; + v3.start(); } else if (style.equals("DATAGRAM")) { SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props); SAMv3DatagramSession v3 = new SAMv3DatagramSession(nick, dgs); datagramSession = v3; this.session = v3; + v3.start(); } else if (style.equals("STREAM")) { SAMv3StreamSession v3 = newSAMStreamSession(nick); streamSession = v3; this.session = v3; + v3.start(); } else if (style.equals("MASTER")) { SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props); MasterSession v3 = new MasterSession(nick, dgs, this, allProps); @@ -471,6 +474,7 @@ class SAMv3Handler extends SAMv1Handler datagramSession = v3; rawSession = v3; this.session = v3; + v3.start(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java index 2137fbb56b..5ce5c06f69 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java @@ -34,6 +34,8 @@ class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver { * Build a Raw Datagram Session according to information * registered with the given nickname * + * Caller MUST call start(). + * * @param nick nickname of the session * @throws IOException * @throws DataFormatException @@ -62,6 +64,8 @@ class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver { * Build a Raw Session on an existing i2p session * registered with the given nickname * + * Caller MUST call start(). + * * @param nick nickname of the session * @throws IOException * @throws DataFormatException diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java index bd6a33f37f..2adec41238 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java @@ -71,6 +71,8 @@ class SAMv3StreamSession extends SAMStreamSession implements Session * Create a new SAM STREAM session, according to information * registered with the given nickname * + * Caller MUST call start(). + * * @param login The nickname * @throws IOException * @throws DataFormatException @@ -88,9 +90,11 @@ class SAMv3StreamSession extends SAMStreamSession implements Session } /** - * Build a Datagram Session on an existing I2P session + * Build a Stream Session on an existing I2P session * registered with the given nickname * + * Caller MUST call start(). + * * @param nick nickname of the session * @throws IOException * @throws DataFormatException -- GitLab