forked from I2P_Developers/i2p.i2p
Streaming: Single retransmit timer per connection (ticket #2715)
Only instantiate ResendPacketEvent for fast retransmit Move packet timeout field from ResendPacketEvent to PacketLocal Set window size to 1 on timeout Always adjust window when acked, even if the packet was sent more than once Reduce INITIAL_WINDOW_SIZE from 6 to 3 to reduce router DH overhead Set maximum burst retransmissions to 16 Closer compliance to RFCs 5681 and 6298 Prep for additional changes (Westwood) Other minor cleanups Original patch from zlatinb
This commit is contained in:
@@ -5,6 +5,7 @@ import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@@ -57,7 +58,7 @@ class Connection {
|
||||
private final boolean _isInbound;
|
||||
private boolean _updatedShareOpts;
|
||||
/** Packet ID (Long) to PacketLocal for sent but unacked packets */
|
||||
private final Map<Long, PacketLocal> _outboundPackets;
|
||||
private final SortedMap<Long, PacketLocal> _outboundPackets;
|
||||
private final PacketQueue _outboundQueue;
|
||||
private final ConnectionPacketHandler _handler;
|
||||
private ConnectionOptions _options;
|
||||
@@ -70,7 +71,6 @@ class Connection {
|
||||
private final ActivityTimer _activityTimer;
|
||||
private long _lastCongestionTime;
|
||||
private volatile long _lastCongestionHighestUnacked;
|
||||
private volatile long _nextRetransmitTime;
|
||||
/** has the other side choked us? */
|
||||
private volatile boolean _isChoked;
|
||||
/** are we choking the other side? */
|
||||
@@ -84,6 +84,7 @@ class Connection {
|
||||
/** how many messages have been resent and not yet ACKed? */
|
||||
private final AtomicInteger _activeResends = new AtomicInteger();
|
||||
private final ConEvent _connectionEvent;
|
||||
private final RetransmitEvent _retransmitEvent;
|
||||
private final int _randomWait;
|
||||
private final int _localPort;
|
||||
private final int _remotePort;
|
||||
@@ -96,8 +97,8 @@ class Connection {
|
||||
private final AtomicLong _lifetimeDupMessageSent = new AtomicLong();
|
||||
private final AtomicLong _lifetimeDupMessageReceived = new AtomicLong();
|
||||
|
||||
public static final long MAX_RESEND_DELAY = 45*1000;
|
||||
public static final long MIN_RESEND_DELAY = 100;
|
||||
public static final int MAX_RESEND_DELAY = 45*1000;
|
||||
public static final int MIN_RESEND_DELAY = 100;
|
||||
|
||||
/**
|
||||
* Wait up to 5 minutes after disconnection so we can ack/close packets.
|
||||
@@ -110,6 +111,9 @@ class Connection {
|
||||
|
||||
public static final int MAX_WINDOW_SIZE = 128;
|
||||
private static final int UNCHOKES_TO_SEND = 8;
|
||||
|
||||
/** Maximum number of packets to retransmit when the timer hits */
|
||||
private static final int MAX_RTX = 16;
|
||||
|
||||
/****
|
||||
public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
|
||||
@@ -165,6 +169,7 @@ class Connection {
|
||||
_connectLock = new Object();
|
||||
_nextSendLock = new Object();
|
||||
_connectionEvent = new ConEvent();
|
||||
_retransmitEvent = new RetransmitEvent();
|
||||
_randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
|
||||
// all createRateStats in ConnectionManager
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
@@ -427,12 +432,18 @@ class Connection {
|
||||
**/
|
||||
}
|
||||
|
||||
long timeout = _options.getRTO();
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Resend in " + timeout + " for " + packet);
|
||||
int timeout = _options.getRTO();
|
||||
|
||||
// schedules itself
|
||||
new ResendPacketEvent(packet, timeout);
|
||||
// RFC 6298 section 5.1
|
||||
if (_retransmitEvent.scheduleIfNotRunning(timeout)) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " timer was already running");
|
||||
}
|
||||
|
||||
packet.setTimeout(timeout);
|
||||
}
|
||||
|
||||
// warning, getStatLog() can be null
|
||||
@@ -497,6 +508,7 @@ class Connection {
|
||||
}
|
||||
|
||||
List<PacketLocal> acked = null;
|
||||
boolean anyLeft = false;
|
||||
synchronized (_outboundPackets) {
|
||||
if (!_outboundPackets.isEmpty()) { // short circuit iterator
|
||||
for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
|
||||
@@ -562,11 +574,25 @@ class Connection {
|
||||
_log.info("All outbound packets acked, clearing " + _activeResends);
|
||||
_activeResends.set(0);
|
||||
}
|
||||
anyLeft = !_outboundPackets.isEmpty();
|
||||
_outboundPackets.notifyAll();
|
||||
}
|
||||
if ((acked != null) && (!acked.isEmpty()) ) {
|
||||
_ackSinceCongestion.set(true);
|
||||
_nextRetransmitTime = _context.clock().now() + getOptions().getRTO();
|
||||
if (anyLeft) {
|
||||
// RFC 6298 section 5.3
|
||||
int rto = getOptions().getRTO();
|
||||
_retransmitEvent.pushBackRTO(rto);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
|
||||
} else {
|
||||
// RFC 6298 section 5.2
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
|
||||
|
||||
_retransmitEvent.cancel();
|
||||
}
|
||||
}
|
||||
return acked;
|
||||
}
|
||||
@@ -795,6 +821,7 @@ class Connection {
|
||||
_outputStream.destroy();
|
||||
_receiver.destroy();
|
||||
_activityTimer.cancel();
|
||||
_retransmitEvent.cancel();
|
||||
_inputStream.streamErrorOccurred(new IOException("Socket closed"));
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
@@ -1218,8 +1245,8 @@ class Connection {
|
||||
private void resetActivityTimer() {
|
||||
long howLong = _options.getInactivityTimeout();
|
||||
if (howLong <= 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?"));
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?"));
|
||||
return;
|
||||
}
|
||||
howLong += _randomWait; // randomize it a bit, so both sides don't do it at once
|
||||
@@ -1397,6 +1424,159 @@ class Connection {
|
||||
return buf.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* A single retransmit timer for all packets.
|
||||
* See RFCs 5681 and 6298.
|
||||
*
|
||||
* @since 0.9.46
|
||||
*/
|
||||
class RetransmitEvent extends SimpleTimer2.TimedEvent {
|
||||
|
||||
private boolean _scheduled;
|
||||
|
||||
RetransmitEvent() {
|
||||
super(_timer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean cancel() {
|
||||
_scheduled = false;
|
||||
return super.cancel();
|
||||
}
|
||||
|
||||
public synchronized boolean scheduleIfNotRunning(long delay) {
|
||||
if (_scheduled)
|
||||
return false;
|
||||
_scheduled = true;
|
||||
schedule(delay);
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized void pushBackRTO(int rto) {
|
||||
if (!_scheduled) {
|
||||
_log.log(Log.ERROR, Connection.this + " timer was not scheduled", new Exception());
|
||||
}
|
||||
reschedule(rto, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void timeReached() {
|
||||
|
||||
if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " rtx event after close or reset");
|
||||
return;
|
||||
}
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " rtx timer timeReached()");
|
||||
|
||||
congestionOccurred();
|
||||
|
||||
// 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
|
||||
final long now = _context.clock().now();
|
||||
pushBackRTO(getOptions().doubleRTO());
|
||||
|
||||
// 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
|
||||
List<PacketLocal> toResend = null;
|
||||
synchronized(_outboundPackets) {
|
||||
if (_outboundPackets.isEmpty()) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
|
||||
return;
|
||||
}
|
||||
|
||||
PacketLocal oldest = _outboundPackets.get(_outboundPackets.firstKey());
|
||||
if (oldest.getNumSends() == 1) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " cutting ssthresh and window");
|
||||
int flightSize = _outboundPackets.size();
|
||||
_ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
|
||||
getOptions().setWindowSize(1);
|
||||
} else if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " not cutting ssthresh and window");
|
||||
|
||||
toResend = new ArrayList<>(_outboundPackets.values());
|
||||
toResend = toResend.subList(0, Math.min(MAX_RTX, (toResend.size() + 1) / 2));
|
||||
}
|
||||
|
||||
// 3. Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
|
||||
boolean sentAny = false;
|
||||
for (PacketLocal packet : toResend) {
|
||||
final int nResends = packet.getNumSends();
|
||||
if (packet.getNumSends() > getOptions().getMaxResends()) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
|
||||
packet.cancelled();
|
||||
disconnect(false);
|
||||
return;
|
||||
} else if (packet.getNumSends() >= 3 &&
|
||||
packet.isFlagSet(Packet.FLAG_CLOSE) &&
|
||||
packet.getPayloadSize() <= 0 &&
|
||||
getCloseReceivedOn() > 0) {
|
||||
// Bug workaround to prevent 5 minutes of retransmission
|
||||
// Routers before 0.9.9 have bugs, they won't ack anything after
|
||||
// they sent a close. Only send 3 CLOSE packets total, then
|
||||
// shut down normally.
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " too many close resends, closing");
|
||||
packet.cancelled();
|
||||
disconnect(false);
|
||||
return;
|
||||
} else {
|
||||
|
||||
if (_isChoking) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " packet is choking " + packet);
|
||||
packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE);
|
||||
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
|
||||
} else if (_unchokesToSend.decrementAndGet() > 0) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " packet is unchoking " + packet);
|
||||
// don't worry about wrapping around
|
||||
packet.setOptionalDelay(0);
|
||||
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " packet clearing flag " + packet);
|
||||
// clear flag
|
||||
packet.setFlag(Packet.FLAG_DELAY_REQUESTED, false);
|
||||
}
|
||||
|
||||
// this seems unnecessary to send the MSS again:
|
||||
//_packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
|
||||
// bugfix release 0.7.8, we weren't dividing by 1000
|
||||
packet.setResendDelay(getOptions().getResendDelay() / 1000);
|
||||
if (packet.getReceiveStreamId() <= 0)
|
||||
packet.setReceiveStreamId(_receiveStreamId.get());
|
||||
if (packet.getSendStreamId() <= 0)
|
||||
packet.setSendStreamId(_sendStreamId.get());
|
||||
|
||||
packet.setTimeout(getOptions().getRTO());
|
||||
|
||||
|
||||
if (_outboundQueue.enqueue(packet)) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info(Connection.this + " resent packet " + packet);
|
||||
if (nResends == 1)
|
||||
_activeResends.incrementAndGet();
|
||||
sentAny = true;
|
||||
} else if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " could not resend packet " + packet);
|
||||
}
|
||||
}
|
||||
|
||||
if (sentAny) {
|
||||
_lastSendTime = now;
|
||||
resetActivityTimer();
|
||||
windowAdjusted();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* fired to reschedule event notification
|
||||
*/
|
||||
@@ -1415,36 +1595,38 @@ class Connection {
|
||||
|
||||
/**
|
||||
* If we have been explicitly NACKed three times, retransmit the packet even if
|
||||
* there are other packets in flight. 3 takes forever, let's try 2.
|
||||
* there are other packets in flight.
|
||||
*
|
||||
*/
|
||||
static final int FAST_RETRANSMIT_THRESHOLD = 3;
|
||||
|
||||
/**
|
||||
* Coordinate the resends of a given packet
|
||||
* A new ResendPacketEvent.
|
||||
* @since 0.9.46
|
||||
*/
|
||||
ResendPacketEvent newResendPacketEvent(PacketLocal packet) {
|
||||
return new ResendPacketEvent(packet);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is not normally scheduled. It's now used only for fastRetransmit(),
|
||||
* where it's scheduled with a delay of zero to put it on the timer queue.
|
||||
* Timeout retransmissions are handled by RetransmitEvent above.
|
||||
*/
|
||||
class ResendPacketEvent extends SimpleTimer2.TimedEvent {
|
||||
private final PacketLocal _packet;
|
||||
private long _nextSend;
|
||||
private boolean _fastRetransmit;
|
||||
|
||||
public ResendPacketEvent(PacketLocal packet, long delay) {
|
||||
public ResendPacketEvent(PacketLocal packet) {
|
||||
super(_timer);
|
||||
_packet = packet;
|
||||
_nextSend = delay + _context.clock().now();
|
||||
packet.setResendPacketEvent(ResendPacketEvent.this);
|
||||
schedule(delay);
|
||||
}
|
||||
|
||||
public long getNextSendTime() { return _nextSend; }
|
||||
|
||||
public void timeReached() { retransmit(); }
|
||||
|
||||
/**
|
||||
* @since 0.9.46
|
||||
*/
|
||||
void fastRetransmit() {
|
||||
_fastRetransmit = true;
|
||||
reschedule(0);
|
||||
}
|
||||
|
||||
@@ -1469,50 +1651,7 @@ class Connection {
|
||||
}
|
||||
|
||||
long now = _context.clock().now();
|
||||
long nextRetransmitTime = _nextRetransmitTime;
|
||||
if (nextRetransmitTime > now && !_fastRetransmit) {
|
||||
long delay = nextRetransmitTime - now;
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Resend time reached but will be delayed " + delay + " for packet " + _packet);
|
||||
forceReschedule(delay);
|
||||
return false;
|
||||
}
|
||||
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Resend period reached for " + _packet);
|
||||
boolean resend = false;
|
||||
boolean isLowest = false;
|
||||
synchronized (_outboundPackets) {
|
||||
// allow appx. half the window to be "lowest" and be active resends, minimum of 3
|
||||
// Note: we should really pick the N lowest, not the lowest one + N more who
|
||||
// happen to get here next, as the timers get out-of-order esp. after fast retx
|
||||
if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
|
||||
_packet.getNumSends() > 1 ||
|
||||
_activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
|
||||
isLowest = true;
|
||||
if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
|
||||
resend = true;
|
||||
}
|
||||
if ( (resend) && (_packet.getAckTime() <= 0) ) {
|
||||
if ( (!isLowest) && (!_fastRetransmit) ) {
|
||||
// we want to resend this packet, but there are already active
|
||||
// resends in the air and we dont want to make a bad situation
|
||||
// worse. wait another second
|
||||
// BUG? seq# = 0, activeResends = 0, loop forever - why?
|
||||
// also seen with seq# > 0. Is the _activeResends count reliable?
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Delaying resend of " + _packet + " with "
|
||||
+ _activeResends + " active resend, "
|
||||
+ _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
|
||||
forceReschedule(1333);
|
||||
_nextSend = 1333 + _context.clock().now();
|
||||
return false;
|
||||
}
|
||||
|
||||
// It's the lowest, or it's fast retransmit time. Resend the packet.
|
||||
|
||||
if (_fastRetransmit)
|
||||
_context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
|
||||
_context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
|
||||
|
||||
// revamp various fields, in case we need to ack more, etc
|
||||
// updateAcks done in enqueue()
|
||||
@@ -1548,17 +1687,13 @@ class Connection {
|
||||
if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
|
||||
congestionOccurred();
|
||||
_context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
|
||||
newWindowSize /= 2;
|
||||
if (newWindowSize <= 0)
|
||||
newWindowSize = 1;
|
||||
|
||||
// The timeout for _this_ packet will be doubled below, but we also
|
||||
// need to double the RTO for the _next_ packets.
|
||||
// See RFC 6298 section 5 item 5.5
|
||||
// This prevents being stuck at a window size of 1, retransmitting every packet,
|
||||
// never updating the RTT or RTO.
|
||||
getOptions().doubleRTO();
|
||||
getOptions().setWindowSize(newWindowSize);
|
||||
getOptions().setWindowSize(1);
|
||||
|
||||
if (_packet.getNumSends() == 1) {
|
||||
int flightSize = getUnackedPacketsSent();
|
||||
@@ -1605,20 +1740,24 @@ class Connection {
|
||||
disconnect(true);
|
||||
} else {
|
||||
//long timeout = _options.getResendDelay() << numSends;
|
||||
long rto = _options.getRTO();
|
||||
long timeout = rto << (numSends-2);
|
||||
int timeout = _options.getRTO();
|
||||
if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
|
||||
timeout = MAX_RESEND_DELAY;
|
||||
// set this before enqueue() as it passes it on to the router
|
||||
_nextSend = timeout + _context.clock().now();
|
||||
_packet.setTimeout(timeout);
|
||||
|
||||
if (_outboundQueue.enqueue(_packet)) {
|
||||
if (_retransmitEvent.scheduleIfNotRunning(timeout)) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug(Connection.this + " fast retransmit and schedule timer");
|
||||
}
|
||||
|
||||
// first resend for this packet ?
|
||||
if (numSends == 2)
|
||||
_activeResends.incrementAndGet();
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Resent packet " +
|
||||
(_fastRetransmit ? "(fast) " : "(timeout) ") +
|
||||
"(fast) " +
|
||||
_packet +
|
||||
" next resend in " + timeout + "ms" +
|
||||
" activeResends: " + _activeResends +
|
||||
@@ -1627,12 +1766,10 @@ class Connection {
|
||||
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
|
||||
_unackedPacketsReceived.set(0);
|
||||
_lastSendTime = _context.clock().now();
|
||||
_fastRetransmit = false;
|
||||
// timer reset added 0.9.1
|
||||
resetActivityTimer();
|
||||
}
|
||||
|
||||
forceReschedule(timeout);
|
||||
}
|
||||
|
||||
// acked during resending (... or somethin') ????????????
|
||||
@@ -1644,12 +1781,6 @@ class Connection {
|
||||
}
|
||||
|
||||
return true;
|
||||
} else {
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Packet acked before resend (resend="+ resend + "): "
|
||||
// + _packet + " on " + Connection.this);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,7 +134,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
|
||||
|
||||
private static final int TREND_COUNT = 3;
|
||||
static final int INITIAL_WINDOW_SIZE = 6;
|
||||
/** RFC 5681 sec. 3.1 */
|
||||
static final int INITIAL_WINDOW_SIZE = 3;
|
||||
static final int DEFAULT_MAX_SENDS = 8;
|
||||
public static final int DEFAULT_INITIAL_RTT = 8*1000;
|
||||
private static final int MAX_RTT = 60*1000;
|
||||
@@ -636,9 +637,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
}
|
||||
|
||||
if (_rto < Connection.MIN_RESEND_DELAY)
|
||||
_rto = (int)Connection.MIN_RESEND_DELAY;
|
||||
_rto = Connection.MIN_RESEND_DELAY;
|
||||
else if (_rto > Connection.MAX_RESEND_DELAY)
|
||||
_rto = (int)Connection.MAX_RESEND_DELAY;
|
||||
_rto = Connection.MAX_RESEND_DELAY;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -652,7 +653,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
|
||||
// we don't need to switch on _initState, _rto is set in constructor
|
||||
_rto *= 2;
|
||||
if (_rto > Connection.MAX_RESEND_DELAY)
|
||||
_rto = (int)Connection.MAX_RESEND_DELAY;
|
||||
_rto = Connection.MAX_RESEND_DELAY;
|
||||
return _rto;
|
||||
}
|
||||
|
||||
|
||||
@@ -433,7 +433,7 @@ class ConnectionPacketHandler {
|
||||
|
||||
_context.statManager().addRateData("stream.trend", trend, newWindowSize);
|
||||
|
||||
if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
|
||||
if ( (!congested) && (acked > 0) ) {
|
||||
int ssthresh = con.getSSThresh();
|
||||
if (newWindowSize < ssthresh) {
|
||||
// slow start - exponential growth
|
||||
|
||||
@@ -35,7 +35,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
||||
private long _cancelledOn;
|
||||
private final AtomicInteger _nackCount = new AtomicInteger();
|
||||
private volatile boolean _retransmitted;
|
||||
private volatile Connection.ResendPacketEvent _resendEvent;
|
||||
private volatile int _timeout;
|
||||
|
||||
/** not bound to a connection */
|
||||
public PacketLocal(I2PAppContext ctx, Destination to, I2PSession session) {
|
||||
@@ -139,9 +139,8 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
||||
}
|
||||
|
||||
private void cancelResend() {
|
||||
Connection.ResendPacketEvent ev = _resendEvent;
|
||||
if (ev != null)
|
||||
ev.cancel();
|
||||
// fast restransmits are sent immediately and we don't keep a reference,
|
||||
// can't be cancelled.
|
||||
}
|
||||
|
||||
public void ackReceived() {
|
||||
@@ -166,8 +165,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
||||
_log.debug("Cancelled! " + toString(), new Exception("cancelled"));
|
||||
}
|
||||
|
||||
public Connection.ResendPacketEvent getResendEvent() { return _resendEvent; }
|
||||
|
||||
/** how long after packet creation was it acked?
|
||||
* @return how long after packet creation the packet was ACKed in ms
|
||||
*/
|
||||
@@ -190,10 +187,10 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
||||
*/
|
||||
public void incrementNACKs() {
|
||||
final int cnt = _nackCount.incrementAndGet();
|
||||
Connection.ResendPacketEvent evt = _resendEvent;
|
||||
if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
|
||||
if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && (!_retransmitted) &&
|
||||
(_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) { // Don't fast retx if we recently resent it
|
||||
_retransmitted = true;
|
||||
Connection.ResendPacketEvent evt = _connection.newResendPacketEvent(this);
|
||||
evt.fastRetransmit();
|
||||
// the predicate used to be '+', changing to '-' --zab
|
||||
|
||||
@@ -213,7 +210,19 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
|
||||
|
||||
public int getNACKs() { return _nackCount.get(); }
|
||||
|
||||
public void setResendPacketEvent(Connection.ResendPacketEvent evt) { _resendEvent = evt; }
|
||||
/**
|
||||
* Used by PacketQueue to feed an expiration to the router.
|
||||
*
|
||||
* @return time from now, not absolute time. May be zero if unset.
|
||||
* @since 0.9.46
|
||||
*/
|
||||
public int getTimeout() { return _timeout; }
|
||||
|
||||
/**
|
||||
* @param timeout time from now, not absolute time
|
||||
* @since 0.9.46
|
||||
*/
|
||||
public void setTimeout(int timeout) { _timeout = timeout; }
|
||||
|
||||
/**
|
||||
* Sign and write the packet to the buffer (starting at the offset) and return
|
||||
|
||||
@@ -117,15 +117,15 @@ class PacketQueue implements SendMessageStatusListener, Closeable {
|
||||
// this should not block!
|
||||
begin = _context.clock().now();
|
||||
long expires = 0;
|
||||
Connection.ResendPacketEvent rpe = packet.getResendEvent();
|
||||
if (rpe != null) {
|
||||
int pktTimeout = packet.getTimeout();
|
||||
if (pktTimeout > 0) {
|
||||
// we want the router to expire it a little before we do,
|
||||
// so if we retransmit it will use a new tunnel/lease combo
|
||||
// If we are really close to the timeout already,
|
||||
// give this packet a chance to be sent,
|
||||
// but it's likely to be dropped on the router side if we're
|
||||
// running this far behind.
|
||||
expires = Math.max(rpe.getNextSendTime() - I2CP_EXPIRATION_ADJUST, begin + 25);
|
||||
expires = Math.max(begin + pktTimeout - I2CP_EXPIRATION_ADJUST, begin + 25);
|
||||
}
|
||||
SendMessageOptions options = new SendMessageOptions();
|
||||
if (expires > 0)
|
||||
|
||||
@@ -40,7 +40,7 @@ class TCBShare {
|
||||
private static final String WDW_DAMP_PROP="i2p.streaming.tcbcache.wdwDampening";
|
||||
private static final String RTTDEV_DAMP_PROP="i2p.streaming.tcbcache.rttdevDampening";
|
||||
/////
|
||||
private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
|
||||
private static final int MAX_RTT = Connection.MAX_RESEND_DELAY / 2;
|
||||
private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
|
||||
private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
|
||||
|
||||
|
||||
@@ -1,3 +1,10 @@
|
||||
2020-04-19 zzz
|
||||
* Ratchet: Adaptive order of muxed decrypt based on previous traffic
|
||||
* Streaming: Single retransmit timer per connection (ticket #2715)
|
||||
|
||||
2020-04-18 zzz
|
||||
* i2psnark: Connect out to other seeds to fetch comments (ticket #2288)
|
||||
|
||||
2020-04-17 zzz
|
||||
* Crypto: Disable speculative AES tagset usage
|
||||
* Streaming: Slow start fix
|
||||
|
||||
@@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 10;
|
||||
public final static long BUILD = 11;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
||||
Reference in New Issue
Block a user