diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 2ab965993f4ee03b2aeee4f8b18fb1c0cf9bc1f8..c437c2b2a1cbb199d27853d6e69e6cdd3088c76a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -59,6 +59,8 @@ public class Connection { /** window size when we last saw congestion */ private int _lastCongestionSeenAt; private boolean _ackSinceCongestion; + /** Notify this on connection (or connection failure) */ + private Object _connectLock; public static final long MAX_RESEND_DELAY = 60*1000; public static final long MIN_RESEND_DELAY = 20*1000; @@ -100,6 +102,7 @@ public class Connection { _lastReceivedOn = -1; _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; + _connectLock = new Object(); } public long getNextOutboundPacketNum() { @@ -111,6 +114,7 @@ public class Connection { void closeReceived() { setCloseReceivedOn(_context.clock().now()); _inputStream.closeReceived(); + synchronized (_connectLock) { _connectLock.notifyAll(); } } /** @@ -296,6 +300,8 @@ public class Connection { _resetReceived = true; _outputStream.streamErrorOccurred(new IOException("Reset received")); _inputStream.streamErrorOccurred(new IOException("Reset received")); + _connectionError = "Connection reset"; + synchronized (_connectLock) { _connectLock.notifyAll(); } } public boolean getResetReceived() { return _resetReceived; } @@ -307,6 +313,7 @@ public class Connection { void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) { if (!_connected) return; _connected = false; + synchronized (_connectLock) { _connectLock.notifyAll(); } if (_log.shouldLog(Log.DEBUG)) _log.debug("Disconnecting " + toString(), new Exception("discon")); @@ -362,6 +369,7 @@ public class Connection { private void doClose() { _outputStream.streamErrorOccurred(new IOException("Hard disconnect")); _inputStream.closeReceived(); + synchronized (_connectLock) { _connectLock.notifyAll(); } } /** who are we talking with */ @@ -374,7 +382,10 @@ public class Connection { /** stream the peer sends data to us on. (may be null) */ public byte[] getReceiveStreamId() { return _receiveStreamId; } - public void setReceiveStreamId(byte[] id) { _receiveStreamId = id; } + public void setReceiveStreamId(byte[] id) { + _receiveStreamId = id; + synchronized (_connectLock) { _connectLock.notifyAll(); } + } /** when did we last send anything to the peer? */ public long getLastSendTime() { return _lastSendTime; } @@ -394,6 +405,8 @@ public class Connection { public String getConnectionError() { return _connectionError; } public void setConnectionError(String err) { _connectionError = err; } + public long getLifetime() { return _context.clock().now() - _createdOn; } + public ConnectionPacketHandler getPacketHandler() { return _handler; } /** @@ -462,6 +475,57 @@ public class Connection { void packetReceived() { _lastReceivedOn = _context.clock().now(); resetActivityTimer(); + synchronized (_connectLock) { _connectLock.notifyAll(); } + } + + /** + * wait until a connection is made or the connection fails within the + * timeout period, setting the error accordingly. + */ + void waitForConnect() { + long expiration = _context.clock().now() + _options.getConnectTimeout(); + while (true) { + if (_connected && (_receiveStreamId != null) && (_sendStreamId != null) ) { + // w00t + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): Connected and we have stream IDs"); + return; + } + if (_connectionError != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): connection error found: " + _connectionError); + return; + } + if (!_connected) { + _connectionError = "Connection failed"; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): not connected"); + return; + } + + long timeLeft = expiration - _context.clock().now(); + if ( (timeLeft <= 0) && (_options.getConnectTimeout() > 0) ) { + if (_connectionError == null) { + _connectionError = "Connection timed out"; + disconnect(false); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): timed out: " + _connectionError); + return; + } + if (timeLeft > 60*1000) + timeLeft = 60*1000; + if (_options.getConnectTimeout() <= 0) + timeLeft = 60*1000; + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): wait " + timeLeft); + try { + synchronized (_connectLock) { + _connectLock.wait(timeLeft); + } + } catch (InterruptedException ie) {} + } } private void resetActivityTimer() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 3241adef739105dd039cfb5447d69e3d9954f765..c7b39c2997b9ac969d6cc9ceefc144a13f33d126 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -103,7 +103,9 @@ public class ConnectionManager { } /** - * Build a new connection to the given peer + * Build a new connection to the given peer. This blocks if there is no + * connection delay, otherwise it returns immediately. + * */ public Connection connect(Destination peer, ConnectionOptions opts) { Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); @@ -120,6 +122,11 @@ public class ConnectionManager { con.setReceiveStreamId(receiveId); con.eventOccurred(); + + _log.debug("Connect() conDelay = " + opts.getConnectDelay()); + if (opts.getConnectDelay() <= 0) { + con.waitForConnect(); + } return con; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 03fa3b5d4743faabec3c63f19c1bac9838900257..2ac2ce215b80aca9228bb966b8efdaa24543048b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -32,22 +32,32 @@ public class ConnectionOptions extends I2PSocketOptions { /** on inactivity timeout, send a payload message */ public static final int INACTIVITY_ACTION_SEND = 2; + public static final String PROP_CONNECT_DELAY = "i2p.streaming.connectDelay"; + public static final String PROP_PROFILE = "i2p.streaming.profile"; + public static final String PROP_MAX_MESSAGE_SIZE = "i2p.streaming.maxMessageSize"; + public static final String PROP_MAX_RESENDS = "i2p.streaming.maxResends"; + public static final String PROP_INITIAL_RTT = "i2p.streaming.initialRTT"; + public static final String PROP_INITIAL_RESEND_DELAY = "i2p.streaming.initialResendDelay"; + public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay"; + public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize"; + public static final String PROP_INITIAL_RECEIVE_WINDOW = "i2p.streaming.initialReceiveWindow"; + public static final String PROP_INACTIVITY_TIMEOUT = "i2p.streaming.inactivityTimeout"; + public static final String PROP_INACTIVITY_ACTION = "i2p.streaming.inactivityAction"; + public ConnectionOptions() { super(); - init(null); } - public ConnectionOptions(I2PSocketOptions opts) { + public ConnectionOptions(Properties opts) { super(opts); - init(null); } - public ConnectionOptions(ConnectionOptions opts) { + public ConnectionOptions(I2PSocketOptions opts) { super(opts); - init(opts); } - private void init(ConnectionOptions opts) { + public ConnectionOptions(ConnectionOptions opts) { + super(opts); if (opts != null) { setConnectDelay(opts.getConnectDelay()); setProfile(opts.getProfile()); @@ -61,26 +71,24 @@ public class ConnectionOptions extends I2PSocketOptions { setInactivityTimeout(opts.getInactivityTimeout()); setInactivityAction(opts.getInactivityAction()); setInboundBufferSize(opts.getInboundBufferSize()); - } else { - setConnectDelay(2*1000); - setProfile(PROFILE_BULK); - setMaxMessageSize(Packet.MAX_PAYLOAD_SIZE); - setRTT(30*1000); - setReceiveWindow(1); - setResendDelay(5*1000); - setSendAckDelay(2*1000); - setWindowSize(1); - setMaxResends(5); - setWriteTimeout(-1); - setInactivityTimeout(5*60*1000); - setInactivityAction(INACTIVITY_ACTION_SEND); - setInboundBufferSize((Packet.MAX_PAYLOAD_SIZE + 2) * Connection.MAX_WINDOW_SIZE); } } - public ConnectionOptions(Properties opts) { - super(opts); - // load the options; + protected void init(Properties opts) { + super.init(opts); + setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); + setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); + setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE)); + setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000)); + setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); + setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 5*1000)); + setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2*1000)); + setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1)); + setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5)); + setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); + setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); + setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE); } /** diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 3c3ba65f3b731066f77fed36a54b87e7e2d9c40d..422d49451e6035ccbb099a3b49b91136fec2381b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -134,7 +134,12 @@ public class I2PSocketManagerFull implements I2PSocketManager { throws I2PException, NoRouteToHostException { if (_connectionManager.getSession().isClosed()) throw new I2PException("Session is closed"); - Connection con = _connectionManager.connect(peer, new ConnectionOptions(options)); + ConnectionOptions opts = null; + if (options instanceof ConnectionOptions) + opts = (ConnectionOptions)options; + else + opts = new ConnectionOptions(options); + Connection con = _connectionManager.connect(peer, opts); I2PSocketFull socket = new I2PSocketFull(con); con.setSocket(socket); if (con.getConnectionError() != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java index 613c1b0674046a653a9133316203e53bd0206be0..3b665291d684a3531a709b56d1ce0a9b5e648018 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java @@ -32,14 +32,18 @@ class SchedulerClosed extends SchedulerImpl { } public boolean accept(Connection con) { - boolean ok = (con != null) && - (con.getCloseSentOn() > 0) && + if (con == null) return false; + long timeSinceClose = _context.clock().now() - con.getCloseSentOn(); + boolean ok = (con.getCloseSentOn() > 0) && (con.getCloseReceivedOn() > 0) && (con.getUnackedPacketsReceived() <= 0) && (con.getUnackedPacketsSent() <= 0) && (!con.getResetReceived()) && - (con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT > _context.clock().now()); - return ok; + (timeSinceClose < Connection.DISCONNECT_TIMEOUT); + boolean conTimeout = (con.getOptions().getConnectTimeout() < con.getLifetime()) && + con.getSendStreamId() == null && + con.getLifetime() < Connection.DISCONNECT_TIMEOUT; + return (ok || conTimeout); } public void eventOccurred(Connection con) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java index 45cfc1d3a9c9638546c19a741510f3fda8663e1d..9701310a62eaa0ee51b6989501db45f6b7895ee0 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java @@ -35,10 +35,13 @@ class SchedulerConnecting extends SchedulerImpl { } public boolean accept(Connection con) { - return (con != null) && - (con.getLastSendId() >= 0) && - (con.getAckedPackets() <= 0) && - (!con.getResetReceived()); + if (con == null) return false; + boolean notYetConnected = (con.getIsConnected()) && + (con.getSendStreamId() == null) && + (con.getLastSendId() >= 0) && + (con.getAckedPackets() <= 0) && + (!con.getResetReceived()); + return notYetConnected; } public void eventOccurred(Connection con) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java index 5f0981c58168bd2adfbbf3f2b8e0d8e699d8bcec..6c14981ccb627a9d07c786243281f946d4f0973c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java @@ -31,14 +31,17 @@ class SchedulerDead extends SchedulerImpl { } public boolean accept(Connection con) { - boolean ok = (con != null) && - (con.getResetReceived()) || - ((con.getCloseSentOn() > 0) && - (con.getCloseReceivedOn() > 0) && - (con.getUnackedPacketsReceived() <= 0) && - (con.getUnackedPacketsSent() <= 0) && - (con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT <= _context.clock().now())); - return ok; + if (con == null) return false; + long timeSinceClose = _context.clock().now() - con.getCloseSentOn(); + boolean nothingLeftToDo = (con.getCloseSentOn() > 0) && + (con.getCloseReceivedOn() > 0) && + (con.getUnackedPacketsReceived() <= 0) && + (con.getUnackedPacketsSent() <= 0) && + (timeSinceClose >= Connection.DISCONNECT_TIMEOUT); + boolean timedOut = (con.getOptions().getConnectTimeout() < con.getLifetime()) && + con.getSendStreamId() == null && + con.getLifetime() >= Connection.DISCONNECT_TIMEOUT; + return con.getResetReceived() || nothingLeftToDo || timedOut; } public void eventOccurred(Connection con) {