diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 1d43a6cef4dd7869c30dbaf0e1cd7b7eeccdd9f5..f5e93ddf01d7281f65b322a12eba9dda1cae5c10 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -793,7 +793,7 @@ class Connection { private boolean scheduleDisconnectEvent() { if (!_disconnectScheduledOn.compareAndSet(0, _context.clock().now())) return false; - _context.simpleTimer2().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); + schedule(new DisconnectEvent(), DISCONNECT_TIMEOUT); return true; } @@ -808,6 +808,24 @@ class Connection { } } + /** + * Called from SchedulerImpl + * + * @since 0.9.23 moved here so we can use our timer + */ + public void scheduleConnectionEvent(long msToWait) { + schedule(_connectionEvent, msToWait); + } + + /** + * Schedule something on our timer. + * + * @since 0.9.23 + */ + public void schedule(SimpleTimer.TimedEvent event, long msToWait) { + _timer.addEvent(event, msToWait); + } + private boolean _remotePeerSet = false; /** who are we talking with * @return peer Destination @@ -1254,8 +1272,6 @@ class Connection { return buf.toString(); } - public SimpleTimer.TimedEvent getConnectionEvent() { return _connectionEvent; } - /** * fired to reschedule event notification */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java index c06ae4a82603573a0c0224e2d692d9eeed0c24e8..b92648d0b86a6dbddd0e34db0e1de2432dcc852e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java @@ -9,6 +9,7 @@ import net.i2p.I2PAppContext; import net.i2p.data.Destination; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** * Receive new connection attempts @@ -23,6 +24,7 @@ class ConnectionHandler { private final Log _log; private final ConnectionManager _manager; private final LinkedBlockingQueue<Packet> _synQueue; + private final SimpleTimer2 _timer; private volatile boolean _active; private int _acceptTimeout; @@ -37,10 +39,11 @@ class ConnectionHandler { private static final int MAX_QUEUE_SIZE = 64; /** Creates a new instance of ConnectionHandler */ - public ConnectionHandler(I2PAppContext context, ConnectionManager mgr) { + public ConnectionHandler(I2PAppContext context, ConnectionManager mgr, SimpleTimer2 timer) { _context = context; _log = context.logManager().getLog(ConnectionHandler.class); _manager = mgr; + _timer = timer; _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE); _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } @@ -96,7 +99,7 @@ class ConnectionHandler { // also check if expiration of the head is long past for overload detection with peek() ? boolean success = _synQueue.offer(packet); // fail immediately if full if (success) { - _context.simpleTimer2().addEvent(new TimeoutSyn(packet), _acceptTimeout); + _timer.addEvent(new TimeoutSyn(packet), _acceptTimeout); } else { if (_log.shouldLog(Log.WARN)) _log.warn("Dropping new SYN request, as the queue is full"); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java index c55c8a67d8b953e755387a9e6701146fbc4f2aa0..be2bbed1116008391f1af66cf6e153430f8ca66e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java @@ -74,11 +74,11 @@ class ConnectionManager { _pendingPings = new ConcurrentHashMap<Long,PingRequest>(4); _messageHandler = new MessageHandler(_context, this); _packetHandler = new PacketHandler(_context, this); - _connectionHandler = new ConnectionHandler(_context, this); _schedulerChooser = new SchedulerChooser(_context); _conPacketHandler = new ConnectionPacketHandler(_context); _timer = new RetransmissionTimer(_context, "Streaming Timer " + session.getMyDestination().calculateHash().toBase64().substring(0, 4)); + _connectionHandler = new ConnectionHandler(_context, this, _timer); _tcbShare = new TCBShare(_context, _timer); // PROTO_ANY is for backward compatibility (pre-0.7.1) // TODO change proto to PROTO_STREAMING someday. @@ -88,7 +88,7 @@ class ConnectionManager { // As of 0.9.1, listen on configured port (default 0 = all) int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY; _session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort()); - _outboundQueue = new PacketQueue(_context); + _outboundQueue = new PacketQueue(_context, _timer); _recentlyClosed = new LHMCache<Long, Object>(32); /** Socket timeout for accept() */ _soTimeout = -1; @@ -839,7 +839,7 @@ class ConnectionManager { private final PingNotifier _notifier; public PingFailed(Long id, PingNotifier notifier) { - super(_context.simpleTimer2()); + super(_timer); _id = id; _notifier = notifier; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java index 51516c98d6b0f274a9454fcea9d21180d017a55e..f87b21e2511e795102a28516994f6b55e71cda03 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java @@ -207,7 +207,7 @@ class ConnectionPacketHandler { final long delay = nextSendTime - now; if (_log.shouldLog(Log.INFO)) _log.info("scheduling ack in " + delay); - _context.simpleTimer2().addEvent(new AckDup(con), delay); + con.schedule(new AckDup(con), delay); } } else { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java index 34ab4393983a3c10082672cb1139e535ed31238f..943a0c03f5bb76f71fbb96697762f75c599090f4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java @@ -45,11 +45,11 @@ class PacketQueue implements SendMessageStatusListener, Closeable { private static final long REMOVE_EXPIRED_TIME = 67*1000; private static final boolean ENABLE_STATUS_LISTEN = true; - public PacketQueue(I2PAppContext context) { + public PacketQueue(I2PAppContext context, SimpleTimer2 timer) { _context = context; _log = context.logManager().getLog(PacketQueue.class); _messageStatusMap = new ConcurrentHashMap<Long, Connection>(16); - new RemoveExpired(); + new RemoveExpired(timer); // all createRateStats in ConnectionManager } @@ -328,8 +328,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable { */ private class RemoveExpired extends SimpleTimer2.TimedEvent { - public RemoveExpired() { - super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME); + public RemoveExpired(SimpleTimer2 timer) { + super(timer, REMOVE_EXPIRED_TIME); } public void timeReached() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/SchedulerImpl.java index 41085a30d25760ea56a7f4f4c54774571d9cd223..bc54ba3c8016e2952129a3b08cdff068c9d97ef3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/SchedulerImpl.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/SchedulerImpl.java @@ -16,7 +16,7 @@ abstract class SchedulerImpl implements TaskScheduler { } protected void reschedule(long msToWait, Connection con) { - _context.simpleTimer2().addEvent(con.getConnectionEvent(), msToWait); + con.scheduleConnectionEvent(msToWait); } @Override