From 5555c52376cac10c240e1d0ab8afa7cee8512f95 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Fri, 17 Apr 2015 13:00:16 +0000 Subject: [PATCH] Streaming: Locking for next send time, rename shadowing field in inner class --- .../i2p/client/streaming/impl/Connection.java | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 72f1022335..977dd98e6a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -41,7 +41,8 @@ class Connection { private final MessageInputStream _inputStream; private final MessageOutputStream _outputStream; private final SchedulerChooser _chooser; - private volatile long _nextSendTime; + /** Locking: _nextSendLock */ + private long _nextSendTime; private long _ackedPackets; private final long _createdOn; private final AtomicLong _closeSentOn = new AtomicLong(); @@ -70,6 +71,8 @@ class Connection { private final AtomicBoolean _ackSinceCongestion; /** Notify this on connection (or connection failure) */ private final Object _connectLock; + /** Locking for _nextSendTime */ + private final Object _nextSendLock; /** how many messages have been resent and not yet ACKed? */ private final AtomicInteger _activeResends = new AtomicInteger(); private final ConEvent _connectionEvent; @@ -145,6 +148,7 @@ class Connection { _activityTimer = new ActivityTimer(); _ackSinceCongestion = new AtomicBoolean(true); _connectLock = new Object(); + _nextSendLock = new Object(); _connectionEvent = new ConEvent(); _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage // all createRateStats in ConnectionManager @@ -906,7 +910,11 @@ class Connection { * instance, or want to delay an ACK. * @return the next time the scheduler will want to send a packet, or -1 if never. */ - public long getNextSendTime() { return _nextSendTime; } + public long getNextSendTime() { + synchronized(_nextSendLock) { + return _nextSendTime; + } + } /** * If the next send time is currently >= 0 (i.e. not "never"), @@ -916,25 +924,20 @@ class Connection { * options.getSendAckDelay() from now (1000 ms) */ public void setNextSendTime(long when) { - if (_nextSendTime >= 0) { - if (when < _nextSendTime) - _nextSendTime = when; - } else { - _nextSendTime = when; - } + synchronized(_nextSendLock) { + if (_nextSendTime >= 0) { + if (when < _nextSendTime) + _nextSendTime = when; + } else { + _nextSendTime = when; + } - if (_nextSendTime >= 0) { - long max = _context.clock().now() + _options.getSendAckDelay(); - if (max < _nextSendTime) - _nextSendTime = max; + if (_nextSendTime >= 0) { + long max = _context.clock().now() + _options.getSendAckDelay(); + if (max < _nextSendTime) + _nextSendTime = max; + } } - - //if (_log.shouldLog(Log.DEBUG) && false) { - // if (_nextSendTime <= 0) - // _log.debug("set next send time to an unknown time", new Exception(toString())); - // else - // _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString())); - //} } /** how many packets have we sent and the other side has ACKed? @@ -1259,17 +1262,17 @@ class Connection { */ class ResendPacketEvent extends SimpleTimer2.TimedEvent { private final PacketLocal _packet; - private long _nextSendTime; + private long _nextSend; public ResendPacketEvent(PacketLocal packet, long delay) { super(_timer); _packet = packet; - _nextSendTime = delay + _context.clock().now(); + _nextSend = delay + _context.clock().now(); packet.setResendPacketEvent(ResendPacketEvent.this); schedule(delay); } - public long getNextSendTime() { return _nextSendTime; } + public long getNextSendTime() { return _nextSend; } public void timeReached() { retransmit(); } /** * Retransmit the packet if we need to. @@ -1319,7 +1322,7 @@ class Connection { + _activeResends + " active resend, " + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize()); forceReschedule(1333); - _nextSendTime = 1333 + _context.clock().now(); + _nextSend = 1333 + _context.clock().now(); return false; } @@ -1406,7 +1409,7 @@ class Connection { if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) ) timeout = MAX_RESEND_DELAY; // set this before enqueue() as it passes it on to the router - _nextSendTime = timeout + _context.clock().now(); + _nextSend = timeout + _context.clock().now(); if (_outboundQueue.enqueue(_packet)) { // first resend for this packet ? -- GitLab