diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index d2eee7c89c72d0d51a0e36e1c1f3d0a0ec1ff3f0..2e29f2a9dd37dfe8bc5841926c8f52ace4197dbb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -53,9 +53,15 @@ public class Connection { private I2PSocketFull _socket; /** set to an error cause if the connection could not be established */ private String _connectionError; + private boolean _disconnectScheduled; + private long _lastReceivedOn; + private ActivityTimer _activityTimer; public static final long MAX_RESEND_DELAY = 60*1000; public static final long MIN_RESEND_DELAY = 20*1000; + + /** wait up to 5 minutes after disconnection so we can ack/close packets */ + public static long DISCONNECT_TIMEOUT = 5*60*1000; public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { this(ctx, manager, chooser, queue, handler, null); @@ -83,6 +89,9 @@ public class Connection { _connectionManager = manager; _resetReceived = false; _connected = true; + _disconnectScheduled = false; + _lastReceivedOn = -1; + _activityTimer = new ActivityTimer(); } public long getNextOutboundPacketNum() { @@ -190,6 +199,7 @@ public class Connection { _lastSendTime = _context.clock().now(); _outboundQueue.enqueue(packet); + resetActivityTimer(); if (ackOnly) { // ACK only, don't schedule this packet for retries @@ -304,13 +314,39 @@ public class Connection { _outboundPackets.clear(); _outboundPackets.notifyAll(); } - if (removeFromConMgr) - _connectionManager.removeConnection(this); + if (removeFromConMgr) { + if (!_disconnectScheduled) { + _disconnectScheduled = true; + SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); + } + } } } void disconnectComplete() { - _connectionManager.removeConnection(this); + _connected = false; + if (!_disconnectScheduled) { + _disconnectScheduled = true; + + if (_log.shouldLog(Log.INFO)) + _log.info("Connection disconnect complete from dead, drop the con " + + toString()); + _connectionManager.removeConnection(this); + } + } + + private class DisconnectEvent implements SimpleTimer.TimedEvent { + public DisconnectEvent() { + if (_log.shouldLog(Log.INFO)) + _log.info("Connection disconnect timer initiated: 5 minutes to drop " + + Connection.this.toString()); + } + public void timeReached() { + if (_log.shouldLog(Log.INFO)) + _log.info("Connection disconnect timer complete, drop the con " + + Connection.this.toString()); + _connectionManager.removeConnection(Connection.this); + } } private void doClose() { @@ -398,6 +434,64 @@ public class Connection { public long getHighestAckedThrough() { return _highestAckedThrough; } public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; } + public long getLastActivityOn() { + return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn); + } + + void packetReceived() { + _lastReceivedOn = _context.clock().now(); + resetActivityTimer(); + } + + private void resetActivityTimer() { + if (_options.getInactivityTimeout() <= 0) return; + long howLong = _activityTimer.getTimeLeft(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resetting the inactivity timer to " + howLong); + // this will get rescheduled, and rescheduled, and rescheduled... + SimpleTimer.getInstance().addEvent(_activityTimer, howLong); + } + + private class ActivityTimer implements SimpleTimer.TimedEvent { + public void timeReached() { + // uh, nothing more to do... + if (!_connected) return; + // we got rescheduled already + if (getTimeLeft() > 0) return; + // these are either going to time out or cause further rescheduling + if (getUnackedPacketsSent() > 0) return; + // wtf, this shouldn't have been scheduled + if (_options.getInactivityTimeout() <= 0) return; + + // bugger it, might as well do the hard work now + switch (_options.getInactivityAction()) { + case ConnectionOptions.INACTIVITY_ACTION_SEND: + if (_log.shouldLog(Log.WARN)) + _log.warn("Sending some data due to inactivity"); + _receiver.send(null, 0, 0, true); + break; + case ConnectionOptions.INACTIVITY_ACTION_NOOP: + if (_log.shouldLog(Log.WARN)) + _log.warn("Inactivity timer expired, but we aint doin' shit"); + break; + case ConnectionOptions.INACTIVITY_ACTION_DISCONNECT: + // fall through + default: + if (_log.shouldLog(Log.WARN)) + _log.warn("Closing connection due to inactivity"); + disconnect(true); + break; + } + } + + public final long getTimeLeft() { + if (getLastActivityOn() > 0) + return getLastActivityOn() + _options.getInactivityTimeout() - _context.clock().now(); + else + return _createdOn + _options.getInactivityTimeout() - _context.clock().now(); + } + } + /** stream that the local peer receives data on */ public MessageInputStream getInputStream() { return _inputStream; } /** stream that the local peer sends data to the remote peer on */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index ee014ccb182db2d075d838d8b378243cb0f46c7d..c855991e30b192fbfd79e69fc34cdfd1c8638439 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -55,7 +55,14 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { public PacketLocal send(byte buf[], int off, int size) { - PacketLocal packet = buildPacket(buf, off, size); + return send(buf, off, size, false); + } + /** + * @param forceIncrement even if the buffer is empty, increment the packetId + * so we get an ACK back + */ + public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { + PacketLocal packet = buildPacket(buf, off, size, forceIncrement); _connection.sendPacket(packet); return packet; } @@ -69,14 +76,14 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { return ackOnly; } - private PacketLocal buildPacket(byte buf[], int off, int size) { + private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) { boolean ackOnly = isAckOnly(size); PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection); byte data[] = new byte[size]; if (size > 0) System.arraycopy(buf, off, data, 0, size); packet.setPayload(data); - if (ackOnly) + if (ackOnly && !forceIncrement) packet.setSequenceNum(0); else packet.setSequenceNum(_connection.getNextOutboundPacketNum()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 49e41229a91a73a80ae316d886575cf2550489fd..a35de82975f1df9b0fdf0bba032bece311ba2c96 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -18,10 +18,19 @@ public class ConnectionOptions extends I2PSocketOptions { private int _maxMessageSize; private int _choke; private int _maxResends; + private int _inactivityTimeout; + private int _inactivityAction; public static final int PROFILE_BULK = 1; public static final int PROFILE_INTERACTIVE = 2; + /** on inactivity timeout, do nothing */ + public static final int INACTIVITY_ACTION_NOOP = 0; + /** on inactivity timeout, close the connection */ + public static final int INACTIVITY_ACTION_DISCONNECT = 1; + /** on inactivity timeout, send a payload message */ + public static final int INACTIVITY_ACTION_SEND = 2; + public ConnectionOptions() { super(); init(null); @@ -48,6 +57,8 @@ public class ConnectionOptions extends I2PSocketOptions { setMaxMessageSize(opts.getMaxMessageSize()); setChoke(opts.getChoke()); setMaxResends(opts.getMaxResends()); + setInactivityTimeout(opts.getInactivityTimeout()); + setInactivityAction(opts.getInactivityAction()); } else { setConnectDelay(2*1000); setProfile(PROFILE_BULK); @@ -59,6 +70,8 @@ public class ConnectionOptions extends I2PSocketOptions { setWindowSize(1); setMaxResends(5); setWriteTimeout(-1); + setInactivityTimeout(5*60*1000); + setInactivityAction(INACTIVITY_ACTION_SEND); } } @@ -151,7 +164,11 @@ public class ConnectionOptions extends I2PSocketOptions { * */ public int getProfile() { return _profile; } - public void setProfile(int profile) { _profile = profile; } + public void setProfile(int profile) { + if (profile != PROFILE_BULK) + throw new IllegalArgumentException("Only bulk is supported so far"); + _profile = profile; + } /** * How many times will we try to send a message before giving up? @@ -159,4 +176,14 @@ public class ConnectionOptions extends I2PSocketOptions { */ public int getMaxResends() { return _maxResends; } public void setMaxResends(int numSends) { _maxResends = numSends; } + + /** + * What period of inactivity qualifies as "too long"? + * + */ + public int getInactivityTimeout() { return _inactivityTimeout; } + public void setInactivityTimeout(int timeout) { _inactivityTimeout = timeout; } + + public int getInactivityAction() { return _inactivityAction; } + public void setInactivityAction(int action) { _inactivityAction = action; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 7c4e34d73e856ae8882dc5238df5482c25604feb..e77d33b0bec60941f52f31f6beae1198ab946f66 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -26,6 +26,7 @@ public class ConnectionPacketHandler { void receivePacket(Packet packet, Connection con) throws I2PException { boolean ok = verifyPacket(packet, con); if (!ok) return; + con.packetReceived(); boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); // close *after* receiving the data, as well as after verifying the signatures / etc diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java index 1d6bf58309f7f81f3ad182144d58faed742c2f0d..613c1b0674046a653a9133316203e53bd0206be0 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java @@ -31,8 +31,6 @@ class SchedulerClosed extends SchedulerImpl { _log = ctx.logManager().getLog(SchedulerClosed.class); } - static final long CLOSE_TIMEOUT = 30*1000; - public boolean accept(Connection con) { boolean ok = (con != null) && (con.getCloseSentOn() > 0) && @@ -40,12 +38,12 @@ class SchedulerClosed extends SchedulerImpl { (con.getUnackedPacketsReceived() <= 0) && (con.getUnackedPacketsSent() <= 0) && (!con.getResetReceived()) && - (con.getCloseSentOn() + CLOSE_TIMEOUT > _context.clock().now()); + (con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT > _context.clock().now()); return ok; } public void eventOccurred(Connection con) { - long timeLeft = con.getCloseSentOn() + CLOSE_TIMEOUT - _context.clock().now(); + long timeLeft = con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT - _context.clock().now(); reschedule(timeLeft, con); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java index ae8a71e887bd336192274548b159433f9c75e70c..f77325c50327b911fb9729fc70009bb5acde0705 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java @@ -45,9 +45,14 @@ class SchedulerClosing extends SchedulerImpl { if (con.getNextSendTime() <= 0) con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); long remaining = con.getNextSendTime() - _context.clock().now(); - if (remaining <= 0) + if (remaining <= 0) { con.sendAvailable(); - else + con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); + } else { + //if (remaining < 5*1000) + // remaining = 5*1000; + //con.setNextSendTime(when reschedule(remaining, con); + } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java index 0f11690bdc5a7ff3f1f32c34cc6c08ae9b189b8c..5f0981c58168bd2adfbbf3f2b8e0d8e699d8bcec 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java @@ -37,7 +37,7 @@ class SchedulerDead extends SchedulerImpl { (con.getCloseReceivedOn() > 0) && (con.getUnackedPacketsReceived() <= 0) && (con.getUnackedPacketsSent() <= 0) && - (con.getCloseSentOn() + SchedulerClosed.CLOSE_TIMEOUT <= _context.clock().now())); + (con.getCloseSentOn() + Connection.DISCONNECT_TIMEOUT <= _context.clock().now())); return ok; }