From 299e5528bcfaaaa1dba2c86f0c25d00d19a7603b Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Wed, 10 Nov 2004 12:55:06 +0000 Subject: [PATCH] * deal with nondeferred connections (block the mgr.connect(..) until either success or failure) * allow loading the connection options from the env (or another Properties specified) --- .../net/i2p/client/streaming/Connection.java | 66 ++++++++++++++++++- .../client/streaming/ConnectionManager.java | 9 ++- .../client/streaming/ConnectionOptions.java | 54 ++++++++------- .../streaming/I2PSocketManagerFull.java | 7 +- .../i2p/client/streaming/SchedulerClosed.java | 12 ++-- .../client/streaming/SchedulerConnecting.java | 11 ++-- .../i2p/client/streaming/SchedulerDead.java | 19 +++--- 7 files changed, 136 insertions(+), 42 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 2ab965993f..c437c2b2a1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -59,6 +59,8 @@ public class Connection { /** window size when we last saw congestion */ private int _lastCongestionSeenAt; private boolean _ackSinceCongestion; + /** Notify this on connection (or connection failure) */ + private Object _connectLock; public static final long MAX_RESEND_DELAY = 60*1000; public static final long MIN_RESEND_DELAY = 20*1000; @@ -100,6 +102,7 @@ public class Connection { _lastReceivedOn = -1; _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; + _connectLock = new Object(); } public long getNextOutboundPacketNum() { @@ -111,6 +114,7 @@ public class Connection { void closeReceived() { setCloseReceivedOn(_context.clock().now()); _inputStream.closeReceived(); + synchronized (_connectLock) { _connectLock.notifyAll(); } } /** @@ -296,6 +300,8 @@ public class Connection { _resetReceived = true; _outputStream.streamErrorOccurred(new IOException("Reset received")); _inputStream.streamErrorOccurred(new IOException("Reset received")); + _connectionError = "Connection reset"; + synchronized (_connectLock) { _connectLock.notifyAll(); } } public boolean getResetReceived() { return _resetReceived; } @@ -307,6 +313,7 @@ public class Connection { void disconnect(boolean cleanDisconnect, boolean removeFromConMgr) { if (!_connected) return; _connected = false; + synchronized (_connectLock) { _connectLock.notifyAll(); } if (_log.shouldLog(Log.DEBUG)) _log.debug("Disconnecting " + toString(), new Exception("discon")); @@ -362,6 +369,7 @@ public class Connection { private void doClose() { _outputStream.streamErrorOccurred(new IOException("Hard disconnect")); _inputStream.closeReceived(); + synchronized (_connectLock) { _connectLock.notifyAll(); } } /** who are we talking with */ @@ -374,7 +382,10 @@ public class Connection { /** stream the peer sends data to us on. (may be null) */ public byte[] getReceiveStreamId() { return _receiveStreamId; } - public void setReceiveStreamId(byte[] id) { _receiveStreamId = id; } + public void setReceiveStreamId(byte[] id) { + _receiveStreamId = id; + synchronized (_connectLock) { _connectLock.notifyAll(); } + } /** when did we last send anything to the peer? */ public long getLastSendTime() { return _lastSendTime; } @@ -394,6 +405,8 @@ public class Connection { public String getConnectionError() { return _connectionError; } public void setConnectionError(String err) { _connectionError = err; } + public long getLifetime() { return _context.clock().now() - _createdOn; } + public ConnectionPacketHandler getPacketHandler() { return _handler; } /** @@ -462,6 +475,57 @@ public class Connection { void packetReceived() { _lastReceivedOn = _context.clock().now(); resetActivityTimer(); + synchronized (_connectLock) { _connectLock.notifyAll(); } + } + + /** + * wait until a connection is made or the connection fails within the + * timeout period, setting the error accordingly. + */ + void waitForConnect() { + long expiration = _context.clock().now() + _options.getConnectTimeout(); + while (true) { + if (_connected && (_receiveStreamId != null) && (_sendStreamId != null) ) { + // w00t + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): Connected and we have stream IDs"); + return; + } + if (_connectionError != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): connection error found: " + _connectionError); + return; + } + if (!_connected) { + _connectionError = "Connection failed"; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): not connected"); + return; + } + + long timeLeft = expiration - _context.clock().now(); + if ( (timeLeft <= 0) && (_options.getConnectTimeout() > 0) ) { + if (_connectionError == null) { + _connectionError = "Connection timed out"; + disconnect(false); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): timed out: " + _connectionError); + return; + } + if (timeLeft > 60*1000) + timeLeft = 60*1000; + if (_options.getConnectTimeout() <= 0) + timeLeft = 60*1000; + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waitForConnect(): wait " + timeLeft); + try { + synchronized (_connectLock) { + _connectLock.wait(timeLeft); + } + } catch (InterruptedException ie) {} + } } private void resetActivityTimer() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 3241adef73..c7b39c2997 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -103,7 +103,9 @@ public class ConnectionManager { } /** - * Build a new connection to the given peer + * Build a new connection to the given peer. This blocks if there is no + * connection delay, otherwise it returns immediately. + * */ public Connection connect(Destination peer, ConnectionOptions opts) { Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); @@ -120,6 +122,11 @@ public class ConnectionManager { con.setReceiveStreamId(receiveId); con.eventOccurred(); + + _log.debug("Connect() conDelay = " + opts.getConnectDelay()); + if (opts.getConnectDelay() <= 0) { + con.waitForConnect(); + } return con; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 03fa3b5d47..2ac2ce215b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -32,22 +32,32 @@ public class ConnectionOptions extends I2PSocketOptions { /** on inactivity timeout, send a payload message */ public static final int INACTIVITY_ACTION_SEND = 2; + public static final String PROP_CONNECT_DELAY = "i2p.streaming.connectDelay"; + public static final String PROP_PROFILE = "i2p.streaming.profile"; + public static final String PROP_MAX_MESSAGE_SIZE = "i2p.streaming.maxMessageSize"; + public static final String PROP_MAX_RESENDS = "i2p.streaming.maxResends"; + public static final String PROP_INITIAL_RTT = "i2p.streaming.initialRTT"; + public static final String PROP_INITIAL_RESEND_DELAY = "i2p.streaming.initialResendDelay"; + public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay"; + public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize"; + public static final String PROP_INITIAL_RECEIVE_WINDOW = "i2p.streaming.initialReceiveWindow"; + public static final String PROP_INACTIVITY_TIMEOUT = "i2p.streaming.inactivityTimeout"; + public static final String PROP_INACTIVITY_ACTION = "i2p.streaming.inactivityAction"; + public ConnectionOptions() { super(); - init(null); } - public ConnectionOptions(I2PSocketOptions opts) { + public ConnectionOptions(Properties opts) { super(opts); - init(null); } - public ConnectionOptions(ConnectionOptions opts) { + public ConnectionOptions(I2PSocketOptions opts) { super(opts); - init(opts); } - private void init(ConnectionOptions opts) { + public ConnectionOptions(ConnectionOptions opts) { + super(opts); if (opts != null) { setConnectDelay(opts.getConnectDelay()); setProfile(opts.getProfile()); @@ -61,26 +71,24 @@ public class ConnectionOptions extends I2PSocketOptions { setInactivityTimeout(opts.getInactivityTimeout()); setInactivityAction(opts.getInactivityAction()); setInboundBufferSize(opts.getInboundBufferSize()); - } else { - setConnectDelay(2*1000); - setProfile(PROFILE_BULK); - setMaxMessageSize(Packet.MAX_PAYLOAD_SIZE); - setRTT(30*1000); - setReceiveWindow(1); - setResendDelay(5*1000); - setSendAckDelay(2*1000); - setWindowSize(1); - setMaxResends(5); - setWriteTimeout(-1); - setInactivityTimeout(5*60*1000); - setInactivityAction(INACTIVITY_ACTION_SEND); - setInboundBufferSize((Packet.MAX_PAYLOAD_SIZE + 2) * Connection.MAX_WINDOW_SIZE); } } - public ConnectionOptions(Properties opts) { - super(opts); - // load the options; + protected void init(Properties opts) { + super.init(opts); + setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); + setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); + setMaxMessageSize(getInt(opts, PROP_MAX_MESSAGE_SIZE, Packet.MAX_PAYLOAD_SIZE)); + setRTT(getInt(opts, PROP_INITIAL_RTT, 30*1000)); + setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); + setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 5*1000)); + setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2*1000)); + setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1)); + setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5)); + setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); + setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); + setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE); } /** diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 3c3ba65f3b..422d49451e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -134,7 +134,12 @@ public class I2PSocketManagerFull implements I2PSocketManager { throws I2PException, NoRouteToHostException { if (_connectionManager.getSession().isClosed()) throw new I2PException("Session is closed"); - Connection con = _connectionManager.connect(peer, new ConnectionOptions(options)); + ConnectionOptions opts = null; + if (options instanceof ConnectionOptions) + opts = (ConnectionOptions)options; + else + opts = new ConnectionOptions(options); + Connection con = _connectionManager.connect(peer, opts); I2PSocketFull socket = new I2PSocketFull(con); con.setSocket(socket); if (con.getConnectionError() != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java index 613c1b0674..3b665291d6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java @@ -32,14 +32,18 @@ class SchedulerClosed extends SchedulerImpl { } public boolean accept(Connection con) { - boolean ok = (con != null) && - (con.getCloseSentOn() > 0) && + if (con == null) return false; + long timeSinceClose = _context.clock().now() - con.getCloseSentOn(); + boolean ok = (con.getCloseSentOn() > 0) && (con.getCloseReceivedOn() > 0) && (con.getUnackedPacketsReceived() <= 0) && (con.getUnackedPacketsSent() <= 0) && (!con.getResetReceived()) && - (con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT > _context.clock().now()); - return ok; + (timeSinceClose < Connection.DISCONNECT_TIMEOUT); + boolean conTimeout = (con.getOptions().getConnectTimeout() < con.getLifetime()) && + con.getSendStreamId() == null && + con.getLifetime() < Connection.DISCONNECT_TIMEOUT; + return (ok || conTimeout); } public void eventOccurred(Connection con) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java index 45cfc1d3a9..9701310a62 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java @@ -35,10 +35,13 @@ class SchedulerConnecting extends SchedulerImpl { } public boolean accept(Connection con) { - return (con != null) && - (con.getLastSendId() >= 0) && - (con.getAckedPackets() <= 0) && - (!con.getResetReceived()); + if (con == null) return false; + boolean notYetConnected = (con.getIsConnected()) && + (con.getSendStreamId() == null) && + (con.getLastSendId() >= 0) && + (con.getAckedPackets() <= 0) && + (!con.getResetReceived()); + return notYetConnected; } public void eventOccurred(Connection con) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java index 5f0981c581..6c14981ccb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java @@ -31,14 +31,17 @@ class SchedulerDead extends SchedulerImpl { } public boolean accept(Connection con) { - boolean ok = (con != null) && - (con.getResetReceived()) || - ((con.getCloseSentOn() > 0) && - (con.getCloseReceivedOn() > 0) && - (con.getUnackedPacketsReceived() <= 0) && - (con.getUnackedPacketsSent() <= 0) && - (con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT <= _context.clock().now())); - return ok; + if (con == null) return false; + long timeSinceClose = _context.clock().now() - con.getCloseSentOn(); + boolean nothingLeftToDo = (con.getCloseSentOn() > 0) && + (con.getCloseReceivedOn() > 0) && + (con.getUnackedPacketsReceived() <= 0) && + (con.getUnackedPacketsSent() <= 0) && + (timeSinceClose >= Connection.DISCONNECT_TIMEOUT); + boolean timedOut = (con.getOptions().getConnectTimeout() < con.getLifetime()) && + con.getSendStreamId() == null && + con.getLifetime() >= Connection.DISCONNECT_TIMEOUT; + return con.getResetReceived() || nothingLeftToDo || timedOut; } public void eventOccurred(Connection con) { -- GitLab