From f341e5566b3efdf3d51c4eebc6fd238b5d0e91f8 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 10 Jun 2015 19:14:33 +0000 Subject: [PATCH] Pass session in connect(); Store the session in Connection; Don't create a new ConnectionManager for a subsession, now that all components track the session properly. @since updates --- .../i2p/client/streaming/impl/Connection.java | 7 +- .../streaming/impl/ConnectionManager.java | 13 +++- .../streaming/impl/I2PSocketManagerFull.java | 75 ++++++++++++------- 3 files changed, 62 insertions(+), 33 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 83fd802291..1d43a6cef4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -28,6 +28,7 @@ class Connection { private final I2PAppContext _context; private final Log _log; private final ConnectionManager _connectionManager; + private final I2PSession _session; private Destination _remotePeer; private final AtomicLong _sendStreamId = new AtomicLong(); private final AtomicLong _receiveStreamId = new AtomicLong(); @@ -112,12 +113,14 @@ class Connection { /** * @param opts may be null */ - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + public Connection(I2PAppContext ctx, ConnectionManager manager, + I2PSession session, SchedulerChooser chooser, SimpleTimer2 timer, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts, boolean isInbound) { _context = ctx; _connectionManager = manager; + _session = session; _chooser = chooser; _outboundQueue = queue; _handler = handler; @@ -877,7 +880,7 @@ class Connection { /** @since 0.9.21 */ public ConnectionManager getConnectionManager() { return _connectionManager; } - public I2PSession getSession() { return _connectionManager.getSession(); } + public I2PSession getSession() { return _session; } public I2PSocketFull getSocket() { return _socket; } public void setSocket(I2PSocketFull socket) { _socket = socket; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index e712cd1cab..c55c8a67d8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -214,7 +214,8 @@ class ConnectionManager { ConnectionOptions opts = new ConnectionOptions(_defaultOptions); opts.setPort(synPacket.getRemotePort()); opts.setLocalPort(synPacket.getLocalPort()); - Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, true); + Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser, + _timer, _outboundQueue, _conPacketHandler, opts, true); _tcbShare.updateOptsFromShare(con); boolean reject = false; int active = 0; @@ -393,9 +394,10 @@ class ConnectionManager { * * @param peer Destination to contact * @param opts Connection's options + * @param session generally the session from the constructor, but could be a subsession * @return new connection, or null if we have exceeded our limit */ - public Connection connect(Destination peer, ConnectionOptions opts) { + public Connection connect(Destination peer, ConnectionOptions opts, I2PSession session) { Connection con = null; long expiration = _context.clock().now(); long tmout = opts.getConnectTimeout(); @@ -429,7 +431,7 @@ class ConnectionManager { // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} } else { - con = new Connection(_context, this, _schedulerChooser, _timer, + con = new Connection(_context, this, session, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false); con.setRemotePeer(peer); assignReceiveStreamId(con); @@ -591,7 +593,12 @@ class ConnectionManager { public MessageHandler getMessageHandler() { return _messageHandler; } public PacketHandler getPacketHandler() { return _packetHandler; } + + /** + * This is the primary session only + */ public I2PSession getSession() { return _session; } + public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); } public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); } // Both of these methods are diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java index 7ea8c90bd0..452f715db4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -35,6 +34,7 @@ import net.i2p.data.PrivateKey; import net.i2p.data.PublicKey; import net.i2p.data.SimpleDataStructure; import net.i2p.util.ConvertToHash; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; /** @@ -51,7 +51,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { private final I2PAppContext _context; private final Log _log; private final I2PSession _session; - private final ConcurrentHashMap<I2PSession, ConnectionManager> _subsessions; + private final Set<I2PSession> _subsessions; private final I2PServerSocketFull _serverSocket; private StandardServerSocket _realServerSocket; private final ConnectionOptions _defaultOptions; @@ -61,7 +61,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { private final ConnectionManager _connectionManager; private final AtomicBoolean _isDestroyed = new AtomicBoolean(); - /** @since 0.9.20 */ + /** @since 0.9.21 */ private static final Set<Hash> _dsaOnly = new HashSet<Hash>(16); private static final String[] DSA_ONLY_HASHES = { // list from http://zzz.i2p/topics/1682?page=1#p8414 @@ -140,7 +140,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; _session = session; - _subsessions = new ConcurrentHashMap<I2PSession, ConnectionManager>(4); + _subsessions = new ConcurrentHashSet<I2PSession>(4); _log = _context.logManager().getLog(I2PSocketManagerFull.class); _name = name + " " + (__managerId.incrementAndGet()); @@ -186,7 +186,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { * @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session * and different signing keys * @param opts subsession options if any, may be null - * @since 0.9.19 + * @since 0.9.21 */ public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException { if (privateKeyStream == null) { @@ -214,15 +214,15 @@ public class I2PSocketManagerFull implements I2PSocketManager { privateKeyStream = new ByteArrayInputStream(keyStream.toByteArray()); } I2PSession rv = _session.addSubsession(privateKeyStream, opts); - ConnectionOptions defaultOptions = new ConnectionOptions(opts); - ConnectionManager connectionManager = new ConnectionManager(_context, rv, defaultOptions); - ConnectionManager old = _subsessions.putIfAbsent(rv, connectionManager); - if (old != null) { + boolean added = _subsessions.add(rv); + if (!added) { // shouldn't happen _session.removeSubsession(rv); - connectionManager.shutdown(); throw new I2PSessionException("dup"); } + ConnectionOptions defaultOptions = new ConnectionOptions(opts); + int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY; + rv.addMuxedSessionListener(_connectionManager.getMessageHandler(), protocol, defaultOptions.getLocalPort()); if (_log.shouldLog(Log.WARN)) _log.warn("Added subsession " + rv); return rv; @@ -230,7 +230,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * @param opts may be null - * @since 0.9.20 copied from I2PSocketManagerFactory + * @since 0.9.21 copied from I2PSocketManagerFactory */ private SigType getSigType(Properties opts) { if (opts != null) { @@ -252,13 +252,12 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * Remove the subsession * - * @since 0.9.19 + * @since 0.9.21 */ public void removeSubsession(I2PSession session) { _session.removeSubsession(session); - ConnectionManager cm = _subsessions.remove(session); - if (cm != null) { - cm.shutdown(); + boolean removed = _subsessions.remove(session); + if (removed) { if (_log.shouldLog(Log.WARN)) _log.warn("Removeed subsession " + session); } else { @@ -269,7 +268,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * @return a list of subsessions, non-null, does not include the primary session - * @since 0.9.19 + * @since 0.9.21 */ public List<I2PSession> getSubsessions() { return _session.getSubsessions(); @@ -282,6 +281,9 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * The accept() call. * + * This only listens on the primary session. There is no way to get + * incoming connections on a subsession. + * * @return connected I2PSocket, or null through 0.9.16, non-null as of 0.9.17 * @throws I2PException if session is closed * @throws ConnectException (since 0.9.17; I2PServerSocket interface always declared it) @@ -301,6 +303,8 @@ public class I2PSocketManagerFull implements I2PSocketManager { * * Uses the ports from the default options. * + * TODO There is no way to ping on a subsession. + * * @param peer * @param timeoutMs timeout in ms, greater than zero * @return true on success, false on failure @@ -318,6 +322,8 @@ public class I2PSocketManagerFull implements I2PSocketManager { * the timeout specified, false otherwise. This call blocks. * * Uses the ports specified. + * + * TODO There is no way to ping on a subsession. * * @param peer Destination to ping * @param localPort 0 - 65535 @@ -341,6 +347,8 @@ public class I2PSocketManagerFull implements I2PSocketManager { * the timeout specified, false otherwise. This call blocks. * * Uses the ports specified. + * + * TODO There is no way to ping on a subsession. * * @param peer Destination to ping * @param localPort 0 - 65535 @@ -374,6 +382,8 @@ public class I2PSocketManagerFull implements I2PSocketManager { * Parameters in the I2PSocketOptions interface may be changed directly * with the setters; no need to use this method for those. * This does NOT update the underlying I2CP or tunnel options; use getSession().updateOptions() for that. + * + * TODO There is no way to update the options on a subsession. * * @param options as created from a call to buildOptions(properties), non-null */ @@ -388,6 +398,8 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * Current options, not a copy, setters may be used to make changes. + * + * TODO There is no facility to specify the session. */ public I2PSocketOptions getDefaultOptions() { return _defaultOptions; @@ -397,6 +409,9 @@ public class I2PSocketManagerFull implements I2PSocketManager { * Returns non-null socket. * This method does not throw exceptions, but methods on the returned socket * may throw exceptions if the socket or socket manager is closed. + * + * This only listens on the primary session. There is no way to get + * incoming connections on a subsession. * * @return non-null */ @@ -407,6 +422,10 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * Like getServerSocket but returns a real ServerSocket for easier porting of apps. + * + * This only listens on the primary session. There is no way to get + * incoming connections on a subsession. + * * @since 0.8.4 */ public synchronized ServerSocket getStandardServerSocket() throws IOException { @@ -417,16 +436,16 @@ public class I2PSocketManagerFull implements I2PSocketManager { } private void verifySession() throws I2PException { - verifySession(_connectionManager); + verifySession(_connectionManager.getSession()); } - /** @since 0.9.20 */ - private void verifySession(ConnectionManager cm) throws I2PException { + /** @since 0.9.21 */ + private void verifySession(I2PSession session) throws I2PException { if (_isDestroyed.get()) throw new I2PException("Session was closed"); - if (!cm.getSession().isClosed()) + if (!session.isClosed()) return; - cm.getSession().connect(); + session.connect(); } /** @@ -457,22 +476,22 @@ public class I2PSocketManagerFull implements I2PSocketManager { _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) + " with options: " + opts); // pick the subsession here - ConnectionManager cm = _connectionManager; + I2PSession session = _session; if (!_subsessions.isEmpty()) { Hash h = peer.calculateHash(); if (_dsaOnly.contains(h)) { // FIXME just taking the first one for now - for (Map.Entry<I2PSession, ConnectionManager> e : _subsessions.entrySet()) { - if (e.getKey().getMyDestination().getSigType() == SigType.DSA_SHA1) { - cm = e.getValue(); + for (I2PSession sess : _subsessions) { + if (sess.getMyDestination().getSigType() == SigType.DSA_SHA1) { + session = sess; break; } } } } - verifySession(cm); + verifySession(session); // the following blocks unless connect delay > 0 - Connection con = cm.connect(peer, opts); + Connection con = _connectionManager.connect(peer, opts, session); if (con == null) throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns()); I2PSocketFull socket = new I2PSocketFull(con,_context); @@ -556,7 +575,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { _connectionManager.setAllowIncomingConnections(false); _connectionManager.shutdown(); if (!_subsessions.isEmpty()) { - for (I2PSession sess : _subsessions.keySet()) { + for (I2PSession sess : _subsessions) { removeSubsession(sess); } } -- GitLab