I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 5555c523 authored by zzz's avatar zzz
Browse files

Streaming: Locking for next send time,

rename shadowing field in inner class
parent 7825f0f8
No related branches found
No related tags found
No related merge requests found
...@@ -41,7 +41,8 @@ class Connection { ...@@ -41,7 +41,8 @@ class Connection {
private final MessageInputStream _inputStream; private final MessageInputStream _inputStream;
private final MessageOutputStream _outputStream; private final MessageOutputStream _outputStream;
private final SchedulerChooser _chooser; private final SchedulerChooser _chooser;
private volatile long _nextSendTime; /** Locking: _nextSendLock */
private long _nextSendTime;
private long _ackedPackets; private long _ackedPackets;
private final long _createdOn; private final long _createdOn;
private final AtomicLong _closeSentOn = new AtomicLong(); private final AtomicLong _closeSentOn = new AtomicLong();
...@@ -70,6 +71,8 @@ class Connection { ...@@ -70,6 +71,8 @@ class Connection {
private final AtomicBoolean _ackSinceCongestion; private final AtomicBoolean _ackSinceCongestion;
/** Notify this on connection (or connection failure) */ /** Notify this on connection (or connection failure) */
private final Object _connectLock; private final Object _connectLock;
/** Locking for _nextSendTime */
private final Object _nextSendLock;
/** how many messages have been resent and not yet ACKed? */ /** how many messages have been resent and not yet ACKed? */
private final AtomicInteger _activeResends = new AtomicInteger(); private final AtomicInteger _activeResends = new AtomicInteger();
private final ConEvent _connectionEvent; private final ConEvent _connectionEvent;
...@@ -145,6 +148,7 @@ class Connection { ...@@ -145,6 +148,7 @@ class Connection {
_activityTimer = new ActivityTimer(); _activityTimer = new ActivityTimer();
_ackSinceCongestion = new AtomicBoolean(true); _ackSinceCongestion = new AtomicBoolean(true);
_connectLock = new Object(); _connectLock = new Object();
_nextSendLock = new Object();
_connectionEvent = new ConEvent(); _connectionEvent = new ConEvent();
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
// all createRateStats in ConnectionManager // all createRateStats in ConnectionManager
...@@ -906,7 +910,11 @@ class Connection { ...@@ -906,7 +910,11 @@ class Connection {
* instance, or want to delay an ACK. * instance, or want to delay an ACK.
* @return the next time the scheduler will want to send a packet, or -1 if never. * @return the next time the scheduler will want to send a packet, or -1 if never.
*/ */
public long getNextSendTime() { return _nextSendTime; } public long getNextSendTime() {
synchronized(_nextSendLock) {
return _nextSendTime;
}
}
/** /**
* If the next send time is currently >= 0 (i.e. not "never"), * If the next send time is currently >= 0 (i.e. not "never"),
...@@ -916,25 +924,20 @@ class Connection { ...@@ -916,25 +924,20 @@ class Connection {
* options.getSendAckDelay() from now (1000 ms) * options.getSendAckDelay() from now (1000 ms)
*/ */
public void setNextSendTime(long when) { public void setNextSendTime(long when) {
if (_nextSendTime >= 0) { synchronized(_nextSendLock) {
if (when < _nextSendTime) if (_nextSendTime >= 0) {
_nextSendTime = when; if (when < _nextSendTime)
} else { _nextSendTime = when;
_nextSendTime = when; } else {
} _nextSendTime = when;
}
if (_nextSendTime >= 0) { if (_nextSendTime >= 0) {
long max = _context.clock().now() + _options.getSendAckDelay(); long max = _context.clock().now() + _options.getSendAckDelay();
if (max < _nextSendTime) if (max < _nextSendTime)
_nextSendTime = max; _nextSendTime = max;
}
} }
//if (_log.shouldLog(Log.DEBUG) && false) {
// if (_nextSendTime <= 0)
// _log.debug("set next send time to an unknown time", new Exception(toString()));
// else
// _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
//}
} }
/** how many packets have we sent and the other side has ACKed? /** how many packets have we sent and the other side has ACKed?
...@@ -1259,17 +1262,17 @@ class Connection { ...@@ -1259,17 +1262,17 @@ class Connection {
*/ */
class ResendPacketEvent extends SimpleTimer2.TimedEvent { class ResendPacketEvent extends SimpleTimer2.TimedEvent {
private final PacketLocal _packet; private final PacketLocal _packet;
private long _nextSendTime; private long _nextSend;
public ResendPacketEvent(PacketLocal packet, long delay) { public ResendPacketEvent(PacketLocal packet, long delay) {
super(_timer); super(_timer);
_packet = packet; _packet = packet;
_nextSendTime = delay + _context.clock().now(); _nextSend = delay + _context.clock().now();
packet.setResendPacketEvent(ResendPacketEvent.this); packet.setResendPacketEvent(ResendPacketEvent.this);
schedule(delay); schedule(delay);
} }
public long getNextSendTime() { return _nextSendTime; } public long getNextSendTime() { return _nextSend; }
public void timeReached() { retransmit(); } public void timeReached() { retransmit(); }
/** /**
* Retransmit the packet if we need to. * Retransmit the packet if we need to.
...@@ -1319,7 +1322,7 @@ class Connection { ...@@ -1319,7 +1322,7 @@ class Connection {
+ _activeResends + " active resend, " + _activeResends + " active resend, "
+ _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize()); + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
forceReschedule(1333); forceReschedule(1333);
_nextSendTime = 1333 + _context.clock().now(); _nextSend = 1333 + _context.clock().now();
return false; return false;
} }
...@@ -1406,7 +1409,7 @@ class Connection { ...@@ -1406,7 +1409,7 @@ class Connection {
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) ) if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
timeout = MAX_RESEND_DELAY; timeout = MAX_RESEND_DELAY;
// set this before enqueue() as it passes it on to the router // set this before enqueue() as it passes it on to the router
_nextSendTime = timeout + _context.clock().now(); _nextSend = timeout + _context.clock().now();
if (_outboundQueue.enqueue(_packet)) { if (_outboundQueue.enqueue(_packet)) {
// first resend for this packet ? // first resend for this packet ?
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment