diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 24e2c58eab..1b16007b74 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadFactory; import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; @@ -37,17 +38,16 @@ import net.i2p.util.Log; public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected final Log _log; - protected I2PSocketManager sockMgr; + protected final I2PSocketManager sockMgr; protected I2PServerSocket i2pss; private final Object lock = new Object(); protected final Object slock = new Object(); - protected InetAddress remoteHost; - protected int remotePort; - private boolean _usePool; - - protected Logging l; + protected final InetAddress remoteHost; + protected final int remotePort; + private final boolean _usePool; + protected final Logging l; private static final long DEFAULT_READ_TIMEOUT = 5*60*1000; /** default timeout to 5 minutes - override if desired */ @@ -56,6 +56,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { /** do we use threads? default true (ignored for standard servers, always false) */ private static final String PROP_USE_POOL = "i2ptunnel.usePool"; private static final boolean DEFAULT_USE_POOL = true; + /** apparently unused */ protected static volatile long __serverId = 0; /** max number of threads - this many slowlorisses will DOS this server, but too high could OOM the JVM */ private static final String PROP_HANDLER_COUNT = "i2ptunnel.blockingHandlerCount"; @@ -65,8 +66,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { /** how long to wait before dropping an idle thread */ private static final long HANDLER_KEEPALIVE_MS = 30*1000; - protected I2PTunnelTask task = null; - protected boolean bidir = false; + protected I2PTunnelTask task; + protected boolean bidir; private ThreadPoolExecutor _executor; /** unused? port should always be specified */ @@ -74,8 +75,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected int localPort = DEFAULT_LOCALPORT; /** - * Warning, blocks in constructor while connecting to router and building tunnels; - * TODO move that to startRunning() + * Non-blocking * * @param privData Base64-encoded private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} @@ -86,16 +86,19 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { super("Server at " + host + ':' + port, notifyThis, tunnel); _log = tunnel.getContext().logManager().getLog(getClass()); ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(privData)); - init(host, port, bais, privData, l); + this.l = l; + this.remoteHost = host; + this.remotePort = port; + _usePool = getUsePool(); + sockMgr = createManager(bais); } /** - * Warning, blocks in constructor while connecting to router and building tunnels; - * TODO move that to startRunning() + * Non-blocking * * @param privkey file containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} - * @param privkeyname the name of the privKey file, not clear why we need this too + * @param privkeyname the name of the privKey file, just for logging * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ @@ -103,13 +106,18 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { EventDispatcher notifyThis, I2PTunnel tunnel) { super("Server at " + host + ':' + port, notifyThis, tunnel); _log = tunnel.getContext().logManager().getLog(getClass()); + this.l = l; + this.remoteHost = host; + this.remotePort = port; + _usePool = getUsePool(); FileInputStream fis = null; try { fis = new FileInputStream(privkey); - init(host, port, fis, privkeyname, l); + sockMgr = createManager(fis); } catch (IOException ioe) { - _log.error("Error starting server", ioe); + _log.error("Cannot read private key data for " + privkeyname, ioe); notifyEvent("openServerResult", "error"); + throw new IllegalArgumentException("Error starting server", ioe); } finally { if (fis != null) try { fis.close(); } catch (IOException ioe) {} @@ -117,19 +125,22 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } /** - * Warning, blocks in constructor while connecting to router and building tunnels; - * TODO move that to startRunning() + * Non-blocking * * @param privData stream containing the private key data, * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} - * @param privkeyname the name of the privKey file, not clear why we need this too + * @param privkeyname the name of the privKey file, just for logging * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager */ public I2PTunnelServer(InetAddress host, int port, InputStream privData, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super("Server at " + host + ':' + port, notifyThis, tunnel); _log = tunnel.getContext().logManager().getLog(getClass()); - init(host, port, privData, privkeyname, l); + this.l = l; + this.remoteHost = host; + this.remotePort = port; + _usePool = getUsePool(); + sockMgr = createManager(privData); } /** @@ -145,27 +156,35 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { this.remoteHost = host; this.remotePort = port; _log = tunnel.getContext().logManager().getLog(getClass()); + _usePool = false; sockMgr = sktMgr; open = true; } + /** @since 0.9.8 */ + private boolean getUsePool() { + // extending classes default to threaded, but for a standard server, we can't get slowlorissed + boolean rv = !getClass().equals(I2PTunnelServer.class); + if (rv) { + String usePool = getTunnel().getClientOptions().getProperty(PROP_USE_POOL); + if (usePool != null) + rv = Boolean.parseBoolean(usePool); + else + rv = DEFAULT_USE_POOL; + } + return rv; + } + private static final int RETRY_DELAY = 20*1000; private static final int MAX_RETRIES = 4; /** - * Warning, blocks while connecting to router and building tunnels; - * TODO move that to startRunning() * - * @param privData stream containing the private key data, - * format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} - * @param privkeyname the name of the privKey file, not clear why we need this too * @throws IllegalArgumentException if the I2CP configuration is b0rked so * badly that we cant create a socketManager + * @since 0.9.8 */ - private void init(InetAddress host, int port, InputStream privData, String privkeyname, Logging l) { - this.l = l; - this.remoteHost = host; - this.remotePort = port; + private I2PSocketManager createManager(InputStream privData) { Properties props = new Properties(); props.putAll(getTunnel().getClientOptions()); int portNum = 7654; @@ -176,54 +195,54 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { _log.error("Invalid port specified [" + getTunnel().port + "], reverting to " + portNum); } } - - // copy the privData to a new BAIS, so we can always reset() it if we have to retry - ByteArrayInputStream privDataCopy; try { - privDataCopy = copyOfInputStream(privData); - } catch (IOException ioe) { - _log.log(Log.CRIT, "Cannot read private key data for " + privkeyname, ioe); - return; + I2PSocketManager rv = I2PSocketManagerFactory.createDisconnectedManager(privData, getTunnel().host, + portNum, props); + rv.setName("Server"); + getTunnel().addSession(rv.getSession()); + return rv; + } catch (I2PSessionException ise) { + throw new IllegalArgumentException("Can't create socket manager", ise); + } finally { + try { privData.close(); } catch (IOException ioe) {} } + } - // extending classes default to threaded, but for a standard server, we can't get slowlorissed - _usePool = !getClass().equals(I2PTunnelServer.class); - if (_usePool) { - String usePool = getTunnel().getClientOptions().getProperty(PROP_USE_POOL); - if (usePool != null) - _usePool = Boolean.parseBoolean(usePool); - else - _usePool = DEFAULT_USE_POOL; - } - // Todo: Can't stop a tunnel from the UI while it's in this loop (no session yet) + /** + * Warning, blocks while connecting to router and building tunnels; + * + * @throws IllegalArgumentException if the I2CP configuration is b0rked so + * badly that we cant create a socketManager + * @since 0.9.8 + */ + private void connectManager() { int retries = 0; - while (sockMgr == null) { - synchronized (slock) { - sockMgr = I2PSocketManagerFactory.createManager(privDataCopy, getTunnel().host, portNum, - props); - - } - if (sockMgr == null) { + while (sockMgr.getSession().isClosed()) { + try { + sockMgr.getSession().connect(); + } catch (I2PSessionException ise) { // try to make this error sensible as it will happen... + String portNum = getTunnel().port; + if (portNum == null) + portNum = "7654"; String msg = "Unable to connect to the router at " + getTunnel().host + ':' + portNum + - " and build tunnels for the server at " + host.getHostAddress() + ':' + port; + " and build tunnels for the server at " + remoteHost.getHostAddress() + ':' + remotePort; if (++retries < MAX_RETRIES) { - this.l.log(msg + ", retrying in " + (RETRY_DELAY / 1000) + " seconds"); - _log.error(msg + ", retrying in " + (RETRY_DELAY / 1000) + " seconds"); + msg += ", retrying in " + (RETRY_DELAY / 1000) + " seconds"; + this.l.log(msg); + _log.error(msg); } else { - this.l.log(msg + ", giving up"); - _log.log(Log.CRIT, msg + ", giving up"); - throw new IllegalArgumentException(msg); + msg += ", giving up"; + this.l.log(msg); + _log.log(Log.CRIT, msg, ise); + throw new IllegalArgumentException(msg, ise); } try { Thread.sleep(RETRY_DELAY); } catch (InterruptedException ie) {} - privDataCopy.reset(); } } - sockMgr.setName("Server"); - getTunnel().addSession(sockMgr.getSession()); - l.log("Tunnels ready for server at " + host.getHostAddress() + ':' + port); + l.log("Tunnels ready for server at " + remoteHost.getHostAddress() + ':' + remotePort); notifyEvent("openServerResult", "ok"); open = true; } @@ -249,10 +268,13 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { /** * Start running the I2PTunnelServer. + * Warning, blocks while connecting to router and building tunnels; * - * TODO: Wait to connect to router until here. + * @throws IllegalArgumentException if the I2CP configuration is b0rked so + * badly that we cant create a socketManager */ - public void startRunning() { + public synchronized void startRunning() { + connectManager(); // prevent JVM exit when running outside the router boolean isDaemon = getTunnel().getContext().isRouterContext(); Thread t = new I2PAppThread(this, "Server " + remoteHost + ':' + remotePort, isDaemon); @@ -405,7 +427,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { /** just to set the name and set Daemon */ private static class CustomThreadFactory implements ThreadFactory { - private String _name; + private final String _name; public CustomThreadFactory(String name) { _name = name; @@ -425,7 +447,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { * Run the blockingHandler. */ private class Handler implements Runnable { - private I2PSocket _i2ps; + private final I2PSocket _i2ps; public Handler(I2PSocket socket) { _i2ps = socket; diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index e7990f28f6..71199960cd 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -26,7 +26,6 @@ import net.i2p.util.Log; public class I2PSocketManagerFactory { public static final String PROP_MANAGER = "i2p.streaming.manager"; - //public static final String DEFAULT_MANAGER = "net.i2p.client.streaming.I2PSocketManagerImpl"; public static final String DEFAULT_MANAGER = "net.i2p.client.streaming.I2PSocketManagerFull"; /** @@ -47,7 +46,7 @@ public class I2PSocketManagerFactory { * * Blocks for a long time while the router builds tunnels. * - * @param opts I2CP options + * @param opts Streaming and I2CP options, may be null * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(Properties opts) { @@ -60,8 +59,8 @@ public class I2PSocketManagerFactory { * * Blocks for a long time while the router builds tunnels. * - * @param host I2CP host - * @param port I2CP port + * @param host I2CP host null to use default + * @param port I2CP port <= 0 to use default * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(String host, int port) { @@ -74,9 +73,9 @@ public class I2PSocketManagerFactory { * * Blocks for a long time while the router builds tunnels. * - * @param i2cpHost I2CP host - * @param i2cpPort I2CP port - * @param opts I2CP options + * @param i2cpHost I2CP host null to use default + * @param i2cpPort I2CP port <= 0 to use default + * @param opts Streaming and I2CP options, may be null * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(String i2cpHost, int i2cpPort, Properties opts) { @@ -102,6 +101,7 @@ public class I2PSocketManagerFactory { * Blocks for a long time while the router builds tunnels. * * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * or null for a transient destination. Caller must close. * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(InputStream myPrivateKeyStream) { @@ -115,7 +115,8 @@ public class I2PSocketManagerFactory { * Blocks for a long time while the router builds tunnels. * * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} - * @param opts I2CP options + * or null for a transient destination. Caller must close. + * @param opts Streaming and I2CP options, may be null * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(InputStream myPrivateKeyStream, Properties opts) { @@ -130,13 +131,73 @@ public class I2PSocketManagerFactory { * Blocks for a long time while the router builds tunnels. * * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} - * @param i2cpHost I2CP host - * @param i2cpPort I2CP port - * @param opts I2CP options + * or null for a transient destination. Caller must close. + * @param i2cpHost I2CP host null to use default + * @param i2cpPort I2CP port <= 0 to use default + * @param opts Streaming and I2CP options, may be null * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort, Properties opts) { + try { + return createManager(myPrivateKeyStream, i2cpHost, i2cpPort, opts, true); + } catch (I2PSessionException ise) { + getLog().error("Error creating session for socket manager", ise); + return null; + } + } + + /** + * Create a disconnected socket manager using the destination loaded from the given private key + * stream, or null for a transient destination. + * + * Non-blocking. Does not connect to the router or build tunnels. + * For servers, caller MUST call getSession().connect() to build tunnels and start listening. + * For clients, caller may do that to build tunnels in advance; + * otherwise, the first call to connect() will initiate a connection to the router, + * with significant delay for tunnel building. + * + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * or null for a transient destination. Caller must close. + * @param i2cpHost I2CP host null to use default + * @param i2cpPort I2CP port <= 0 to use default + * @param opts Streaming and I2CP options, may be null + * @return the newly created socket manager, non-null (throws on error) + * @since 0.9.8 + */ + public static I2PSocketManager createDisconnectedManager(InputStream myPrivateKeyStream, String i2cpHost, + int i2cpPort, Properties opts) throws I2PSessionException { + if (myPrivateKeyStream == null) { + I2PClient client = I2PClientFactory.createClient(); + ByteArrayOutputStream keyStream = new ByteArrayOutputStream(512); + try { + client.createDestination(keyStream); + } catch (Exception e) { + throw new I2PSessionException("Error creating keys", e); + } + myPrivateKeyStream = new ByteArrayInputStream(keyStream.toByteArray()); + } + return createManager(myPrivateKeyStream, i2cpHost, i2cpPort, opts, false); + } + + /** + * Create a socket manager using the destination loaded from the given private key + * stream and connected to the I2CP router on the specified machine on the given + * port. + * + * Blocks for a long time while the router builds tunnels if connect is true. + * + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} + * non-null. Caller must close. + * @param i2cpHost I2CP host null to use default + * @param i2cpPort I2CP port <= 0 to use default + * @param opts Streaming and I2CP options, may be null + * @param connect true to connect (blocking) + * @return the newly created socket manager, non-null (throws on error) + * @since 0.9.7 + */ + private static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort, + Properties opts, boolean connect) throws I2PSessionException { I2PClient client = I2PClientFactory.createClient(); if (opts == null) opts = new Properties(); @@ -146,34 +207,20 @@ public class I2PSocketManagerFactory { if (!opts.containsKey(name)) opts.setProperty(name, (String) e.getValue()); } - //boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER)); - //if (oldLib && false) { - // for the old streaming lib - // opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); - //opts.setProperty("tunnels.depthInbound", "0"); - //} else { - // for new streaming lib: - //opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT); - // as of 0.8.1 (I2CP default is BestEffort) - if (!opts.containsKey(I2PClient.PROP_RELIABILITY)) - opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_NONE); - //p.setProperty("tunnels.depthInbound", "0"); - //} + // as of 0.8.1 (I2CP default is BestEffort) + if (!opts.containsKey(I2PClient.PROP_RELIABILITY)) + opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_NONE); if (i2cpHost != null) opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost); if (i2cpPort > 0) opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort); - try { - I2PSession session = client.createSession(myPrivateKeyStream, opts); + I2PSession session = client.createSession(myPrivateKeyStream, opts); + if (connect) session.connect(); - I2PSocketManager sockMgr = createManager(session, opts, "manager"); - return sockMgr; - } catch (I2PSessionException ise) { - getLog().error("Error creating session for socket manager", ise); - return null; - } + I2PSocketManager sockMgr = createManager(session, opts, "manager"); + return sockMgr; } private static I2PSocketManager createManager(I2PSession session, Properties opts, String name) { diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index e43be08f8e..667c05d654 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -80,12 +80,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa protected I2CPMessageReader _reader; /** writer message queue */ protected ClientWriterRunner _writer; - /** where we pipe our messages */ - protected /* FIXME final FIXME */OutputStream _out; /** * Used for internal connections to the router. - * If this is set, _socket, _writer, and _out will be null. + * If this is set, _socket and _writer will be null. * @since 0.8.3 */ protected I2CPMessageQueue _queue; @@ -111,22 +109,24 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** monitor for waiting until a lease set has been granted */ private final Object _leaseSetWait = new Object(); - /** whether the session connection has already been closed (or not yet opened) */ - protected volatile boolean _closed; + /** + * @since 0.9.8 + */ + protected enum State { + OPENING, + OPEN, + CLOSING, + CLOSED + } - /** whether the session connection is in the process of being closed */ - protected volatile boolean _closing; + protected State _state = State.CLOSED; + protected final Object _stateLock = new Object(); /** have we received the current date from the router yet? */ private volatile boolean _dateReceived; /** lock that we wait upon, that the SetDateMessageHandler notifies */ private final Object _dateReceivedLock = new Object(); - /** whether the session connection is in the process of being opened */ - protected volatile boolean _opening; - - /** monitor for waiting until opened */ - private final Object _openingWait = new Object(); /** * thread that we tell when new messages are available who then tells us * to fetch them. The point of this is so that the fetch doesn't block the @@ -168,7 +168,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa public static final int LISTEN_PORT = 7654; private static final int BUF_SIZE = 32*1024; - + /** * for extension by SimpleSession (no dest) */ @@ -183,7 +183,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa private I2PSessionImpl(I2PAppContext context, Properties options, boolean hasDest) { _context = context; _log = context.logManager().getLog(getClass()); - _closed = true; if (options == null) options = (Properties) System.getProperties().clone(); _options = loadConfig(options); @@ -351,17 +350,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa return _leaseSet; } - void setOpening(boolean ls) { - _opening = ls; - synchronized (_openingWait) { - _openingWait.notifyAll(); + protected void changeState(State state) { + synchronized (_stateLock) { + _state = state; + _stateLock.notifyAll(); } } - boolean getOpening() { - return _opening; - } - /** * Load up the destKeyFile for our Destination, PrivateKey, and SigningPrivateKey * @@ -378,12 +373,39 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Connect to the router and establish a session. This call blocks until * a session is granted. * + * Should be threadsafe, other threads will block until complete. + * Disconnect / destroy from another thread may be called simultaneously and + * will (should?) interrupt the connect. + * * @throws I2PSessionException if there is a configuration error or the router is * not reachable */ public void connect() throws I2PSessionException { - setOpening(true); - _closed = false; + synchronized(_stateLock) { + boolean wasOpening = false; + boolean loop = true; + while (loop) { + switch (_state) { + case CLOSED: + if (wasOpening) + throw new I2PSessionException("connect by other thread failed"); + loop = false; + break; + case OPENING: + wasOpening = true; + try { + _stateLock.wait(); + } catch (InterruptedException ie) {} + break; + case CLOSING: + throw new I2PSessionException("close in progress"); + case OPEN: + return; + } + } + changeState(State.OPENING); + } + _availabilityNotifier.stopNotifying(); if ( (_options != null) && @@ -392,29 +414,33 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _log.error("I2CP guaranteed delivery mode has been removed, using best effort."); } + boolean success = false; long startConnect = _context.clock().now(); try { - // If we are in the router JVM, connect using the interal queue - if (_context.isRouterContext()) { - // _socket, _out, and _writer remain null - InternalClientManager mgr = _context.internalClientManager(); - if (mgr == null) - throw new I2PSessionException("Router is not ready for connections"); - // the following may throw an I2PSessionException - _queue = mgr.connect(); - _reader = new QueuedI2CPMessageReader(_queue, this); - } else { - if (Boolean.parseBoolean(_options.getProperty(PROP_ENABLE_SSL))) - _socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum); - else - _socket = new Socket(_hostname, _portNum); - // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. - _out = _socket.getOutputStream(); - _out.write(I2PClient.PROTOCOL_BYTE); - _out.flush(); - _writer = new ClientWriterRunner(_out, this); - InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE); - _reader = new I2CPMessageReader(in, this); + // protect w/ closeSocket() + synchronized(_stateLock) { + // If we are in the router JVM, connect using the interal queue + if (_context.isRouterContext()) { + // _socket and _writer remain null + InternalClientManager mgr = _context.internalClientManager(); + if (mgr == null) + throw new I2PSessionException("Router is not ready for connections"); + // the following may throw an I2PSessionException + _queue = mgr.connect(); + _reader = new QueuedI2CPMessageReader(_queue, this); + } else { + if (Boolean.parseBoolean(_options.getProperty(PROP_ENABLE_SSL))) + _socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum); + else + _socket = new Socket(_hostname, _portNum); + // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. + OutputStream out = _socket.getOutputStream(); + out.write(I2PClient.PROTOCOL_BYTE); + out.flush(); + _writer = new ClientWriterRunner(out, this); + InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE); + _reader = new I2CPMessageReader(in, this); + } } Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); notifier.start(); @@ -466,15 +492,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa + "ms - ready to participate in the network!"); startIdleMonitor(); startVerifyUsage(); - setOpening(false); + success = true; } catch (UnknownHostException uhe) { - _closed = true; - setOpening(false); throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, uhe); } catch (IOException ioe) { - _closed = true; - setOpening(false); throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe); + } finally { + changeState(success ? State.OPEN : State.CLOSED); } } @@ -570,8 +594,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Needs work. */ protected class AvailabilityNotifier implements Runnable { - private final List _pendingIds; - private final List _pendingSizes; + private final List _pendingIds; + private final List _pendingSizes; private volatile boolean _alive; public AvailabilityNotifier() { @@ -606,8 +630,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } } if (!_pendingIds.isEmpty()) { - msgId = (Long)_pendingIds.remove(0); - size = (Integer)_pendingSizes.remove(0); + msgId = _pendingIds.remove(0); + size = _pendingSizes.remove(0); } } if ( (msgId != null) && (size != null) ) { @@ -695,8 +719,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** configure the listener */ public void setSessionListener(I2PSessionListener lsnr) { _sessionListener = lsnr; } - /** has the session been closed (or not yet connected)? */ - public boolean isClosed() { return _closed; } + /** + * Has the session been closed (or not yet connected)? + * False when open and during transitions. Unsynchronized. + */ + public boolean isClosed() { return _state == State.CLOSED; } /** * Deliver an I2CP message to the router @@ -756,21 +783,16 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** * Tear down the session, and do NOT reconnect. * - * Blocks if session has not been fully started. + * Will interrupt an open in progress. */ public void destroySession(boolean sendDisconnect) { - while (_opening) { - synchronized (_openingWait) { - try { - _openingWait.wait(1000); - } catch (InterruptedException ie) { // nop - } - } + synchronized(_stateLock) { + if (_state == State.CLOSING || _state == State.CLOSED) + return; + changeState(State.CLOSING); } - if (_closed) return; if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Destroy the session", new Exception("DestroySession()")); - _closing = true; // we use this to prevent a race if (sendDisconnect && _producer != null) { // only null if overridden by I2PSimpleSession try { _producer.disconnect(this); @@ -783,19 +805,27 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa // SimpleSession does not initialize if (_availabilityNotifier != null) _availabilityNotifier.stopNotifying(); - _closed = true; - _closing = false; closeSocket(); if (_sessionListener != null) _sessionListener.disconnected(this); } /** - * Close the socket carefully - * + * Close the socket carefully. */ private void closeSocket() { + synchronized(_stateLock) { + changeState(State.CLOSING); + locked_closeSocket(); + changeState(State.CLOSED); + } + } + + /** + * Close the socket carefully. + * Caller must change state. + */ + private void locked_closeSocket() { if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Closing the socket", new Exception("closeSocket")); - _closed = true; if (_reader != null) { _reader.stopReading(); _reader = null; @@ -830,8 +860,15 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa disconnect(); } + /** + * Will interrupt a connect in progress. + */ protected void disconnect() { - if (_closed || _closing) return; + synchronized(_stateLock) { + if (_state == State.CLOSING || _state == State.CLOSED) + return; + changeState(State.CLOSING); + } if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Disconnect() called", new Exception("Disconnect")); if (shouldReconnect()) { if (reconnect()) { @@ -842,11 +879,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } if (_log.shouldLog(Log.ERROR)) - _log.error(getPrefix() + "Disconned from the router, and not trying to reconnect further. I hope you're not hoping anything else will happen"); + _log.error(getPrefix() + "Disconned from the router, and not trying to reconnect"); if (_sessionListener != null) _sessionListener.disconnected(this); - _closed = true; closeSocket(); + changeState(State.CLOSED); } private final static int MAX_RECONNECT_DELAY = 320*1000; @@ -970,7 +1007,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (rv != null) return rv; } - if (_closed) + if (isClosed()) return null; LookupWaiter waiter = new LookupWaiter(h); _pendingLookups.offer(waiter); @@ -996,7 +1033,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * @return null on failure */ public int[] bandwidthLimits() throws I2PSessionException { - if (_closed) + if (isClosed()) return null; sendMessage(new GetBandwidthLimitsMessage()); try { diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java index 9962c325d7..66daa01fb3 100644 --- a/core/java/src/net/i2p/client/I2PSimpleSession.java +++ b/core/java/src/net/i2p/client/I2PSimpleSession.java @@ -8,6 +8,7 @@ package net.i2p.client; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; import java.util.Properties; @@ -45,45 +46,50 @@ class I2PSimpleSession extends I2PSessionImpl2 { * Connect to the router and establish a session. This call blocks until * a session is granted. * + * NOT threadsafe, do not call from multiple threads. + * * @throws I2PSessionException if there is a configuration error or the router is * not reachable */ @Override public void connect() throws I2PSessionException { - _closed = false; - + changeState(State.OPENING); + boolean success = false; try { - // If we are in the router JVM, connect using the interal queue - if (_context.isRouterContext()) { - // _socket, _out, and _writer remain null - InternalClientManager mgr = _context.internalClientManager(); - if (mgr == null) - throw new I2PSessionException("Router is not ready for connections"); - // the following may throw an I2PSessionException - _queue = mgr.connect(); - _reader = new QueuedI2CPMessageReader(_queue, this); - } else { - if (Boolean.parseBoolean(getOptions().getProperty(PROP_ENABLE_SSL))) - _socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum); - else - _socket = new Socket(_hostname, _portNum); - _out = _socket.getOutputStream(); - _out.write(I2PClient.PROTOCOL_BYTE); - _out.flush(); - _writer = new ClientWriterRunner(_out, this); - InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE); - _reader = new I2CPMessageReader(in, this); + // protect w/ closeSocket() + synchronized(_stateLock) { + // If we are in the router JVM, connect using the interal queue + if (_context.isRouterContext()) { + // _socket and _writer remain null + InternalClientManager mgr = _context.internalClientManager(); + if (mgr == null) + throw new I2PSessionException("Router is not ready for connections"); + // the following may throw an I2PSessionException + _queue = mgr.connect(); + _reader = new QueuedI2CPMessageReader(_queue, this); + } else { + if (Boolean.parseBoolean(getOptions().getProperty(PROP_ENABLE_SSL))) + _socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum); + else + _socket = new Socket(_hostname, _portNum); + OutputStream out = _socket.getOutputStream(); + out.write(I2PClient.PROTOCOL_BYTE); + out.flush(); + _writer = new ClientWriterRunner(out, this); + InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE); + _reader = new I2CPMessageReader(in, this); + } } // we do not receive payload messages, so we do not need an AvailabilityNotifier // ... or an Idle timer, or a VerifyUsage _reader.startReading(); - + success = true; } catch (UnknownHostException uhe) { - _closed = true; throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, uhe); } catch (IOException ioe) { - _closed = true; throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe); + } finally { + changeState(success ? State.OPEN : State.CLOSED); } }