forked from I2P_Developers/i2p.i2p
UDP: Replace ACKSender thread with per-PeerState delayed ack timers
(low latency improvements part 2) Timer is created in PeerState messageFullyReceived() and messagePartiallyReceived(). Don't send a delayed ack-only packet if acks are sent in a data packet first. Reviewed and tested by zlatinb. Related MRs: !36 !37 !38
This commit is contained in:
12
history.txt
12
history.txt
@@ -1,3 +1,15 @@
|
||||
2021-09-22 zzz
|
||||
* UDP: Replace ACKSender thread with per-PeerState ack timers
|
||||
|
||||
2021-09-10 zzz
|
||||
* I2CP: Don't call listener.readError() after external client disconnect
|
||||
|
||||
2021-09-05 zzz
|
||||
* Update: Get backup URLs from news feed
|
||||
|
||||
2021-08-28 zzz
|
||||
* Console: Catch error checking systray availability (gitlab issue #331)
|
||||
|
||||
2021-08-26 zzz
|
||||
* Debian: Fix dh_installdocs build error
|
||||
* Router: Increase rekey probability
|
||||
|
||||
@@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Git";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 1;
|
||||
public final static long BUILD = 2;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
||||
@@ -1,212 +0,0 @@
|
||||
package net.i2p.router.transport.udp;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Blocking thread that is given peers by the inboundFragment pool, sending out
|
||||
* any outstanding ACKs.
|
||||
* The ACKs are sent directly to UDPSender,
|
||||
* bypassing OutboundMessageFragments and PacketPusher.
|
||||
*/
|
||||
class ACKSender implements Runnable {
|
||||
private final RouterContext _context;
|
||||
private final Log _log;
|
||||
private final UDPTransport _transport;
|
||||
private final PacketBuilder _builder;
|
||||
/** list of peers (PeerState) who we have received data from but not yet ACKed to */
|
||||
private final BlockingQueue<PeerState> _peersToACK;
|
||||
private volatile boolean _alive;
|
||||
private static final long POISON_PS = -9999999999l;
|
||||
|
||||
/** how frequently do we want to send ACKs to a peer? */
|
||||
static final int ACK_FREQUENCY = 150;
|
||||
|
||||
public ACKSender(RouterContext ctx, UDPTransport transport) {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(ACKSender.class);
|
||||
_transport = transport;
|
||||
_peersToACK = new LinkedBlockingQueue<PeerState>();
|
||||
_builder = new PacketBuilder(_context, transport);
|
||||
_alive = true;
|
||||
_context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.ackFrequency", "how long ago did we send an ACK to this peer?", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.sendACKRemaining", "when we ack a peer, how many peers are left waiting to ack?", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.abortACK", "How often do we schedule up an ACK send only to find it had already been sent (through piggyback)?", "udp", UDPTransport.RATES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add to the queue.
|
||||
* For speed, don't check for duplicates here.
|
||||
* The runner will remove them in its own thread.
|
||||
*/
|
||||
public void ackPeer(PeerState peer) {
|
||||
if (_alive)
|
||||
_peersToACK.offer(peer);
|
||||
}
|
||||
|
||||
public synchronized void startup() {
|
||||
_alive = true;
|
||||
_peersToACK.clear();
|
||||
I2PThread t = new I2PThread(this, "UDP ACK sender", true);
|
||||
t.start();
|
||||
}
|
||||
|
||||
public synchronized void shutdown() {
|
||||
_alive = false;
|
||||
PeerState poison = new PeerState(_context, _transport, new byte[4], 0, null, false, 0);
|
||||
poison.setTheyRelayToUsAs(POISON_PS);
|
||||
_peersToACK.offer(poison);
|
||||
for (int i = 1; i <= 5 && !_peersToACK.isEmpty(); i++) {
|
||||
try {
|
||||
Thread.sleep(i * 50);
|
||||
} catch (InterruptedException ie) {}
|
||||
}
|
||||
_peersToACK.clear();
|
||||
}
|
||||
|
||||
private static long ackFrequency(long timeSinceACK, long rtt) {
|
||||
// if we are actively pumping lots of data to them, we can depend upon
|
||||
// the unsentACKThreshold to figure out when to send an ACK instead of
|
||||
// using the timer, so we can set the timeout/frequency higher
|
||||
if (timeSinceACK < 2*1000)
|
||||
return Math.min(rtt/2, ACK_FREQUENCY);
|
||||
else
|
||||
return ACK_FREQUENCY;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
run2();
|
||||
} finally {
|
||||
// prevent OOM on thread death
|
||||
if (_alive) {
|
||||
_alive = false;
|
||||
_log.error("ACK Sender died");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void run2() {
|
||||
// we use a Set to strip out dups that come in on the Queue
|
||||
Set<PeerState> notYet = new HashSet<PeerState>();
|
||||
while (_alive) {
|
||||
PeerState peer = null;
|
||||
long now = 0;
|
||||
long remaining = -1;
|
||||
long wanted = 0;
|
||||
|
||||
while (_alive) {
|
||||
// Pull from the queue until we find one ready to ack
|
||||
// Any that are not ready we will put back on the queue
|
||||
PeerState cur = null;
|
||||
try {
|
||||
if (notYet.isEmpty())
|
||||
// wait forever
|
||||
cur = _peersToACK.take();
|
||||
else
|
||||
// Don't wait if nothing there, just put everybody back and sleep below
|
||||
cur = _peersToACK.poll();
|
||||
} catch (InterruptedException ie) {}
|
||||
|
||||
if (cur != null) {
|
||||
if (cur.getTheyRelayToUsAs() == POISON_PS)
|
||||
return;
|
||||
wanted = cur.getWantedACKSendSince();
|
||||
now = _context.clock().now();
|
||||
long delta = wanted + ackFrequency(now-cur.getLastACKSend(), cur.getRTT()) - now;
|
||||
if (wanted <= 0) {
|
||||
// it got acked by somebody - discard, remove any dups, and go around again
|
||||
notYet.remove(cur);
|
||||
} else if ( (delta <= 0) || (cur.unsentACKThresholdReached()) ) {
|
||||
// found one to ack
|
||||
peer = cur;
|
||||
notYet.remove(cur); // in case a dup
|
||||
try {
|
||||
// bulk operations may throw an exception
|
||||
_peersToACK.addAll(notYet);
|
||||
} catch (NoSuchElementException nsee) {}
|
||||
notYet.clear();
|
||||
break;
|
||||
} else {
|
||||
// not yet, go around again
|
||||
// moving from the Queue to the Set and then back removes duplicates
|
||||
boolean added = notYet.add(cur);
|
||||
if (added && _log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Pending ACK (delta = " + delta + ") for " + cur);
|
||||
}
|
||||
} else if (!notYet.isEmpty()) {
|
||||
// put them all back and wait a while
|
||||
try {
|
||||
// bulk operations may throw an exception
|
||||
_peersToACK.addAll(notYet);
|
||||
} catch (RuntimeException e) {}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("sleeping, pending size = " + notYet.size());
|
||||
notYet.clear();
|
||||
try {
|
||||
// sleep a little longer than the divided frequency,
|
||||
// so it will be ready after we circle around a few times
|
||||
Thread.sleep(5 + (ACK_FREQUENCY / 3));
|
||||
} catch (InterruptedException ie) {}
|
||||
} // else go around again where we will wait at take()
|
||||
} // inner while()
|
||||
|
||||
if (peer != null) {
|
||||
long lastSend = peer.getLastACKSend();
|
||||
// set above before the break
|
||||
//long wanted = peer.getWantedACKSendSince();
|
||||
List<ACKBitfield> ackBitfields = peer.retrieveACKBitfields(false);
|
||||
|
||||
if (wanted < 0) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!ackBitfields.isEmpty()) {
|
||||
_context.statManager().addRateData("udp.sendACKCount", ackBitfields.size());
|
||||
if (remaining > 0)
|
||||
_context.statManager().addRateData("udp.sendACKRemaining", remaining);
|
||||
// set above before the break
|
||||
//now = _context.clock().now();
|
||||
if (lastSend < 0)
|
||||
lastSend = now - 1;
|
||||
_context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
|
||||
//_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size());
|
||||
UDPPacket ack = _builder.buildACK(peer, ackBitfields);
|
||||
ack.markType(1);
|
||||
ack.setFragmentCount(-1);
|
||||
ack.setMessageType(PacketBuilder.TYPE_ACK);
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Sending " + ackBitfields + " to " + peer);
|
||||
// locking issues, we ignore the result, and acks are small,
|
||||
// so don't even bother allocating
|
||||
//peer.allocateSendingBytes(ack.getPacket().getLength(), true);
|
||||
// ignore whether its ok or not, its a bloody ack. this should be fixed, probably.
|
||||
_transport.send(ack);
|
||||
|
||||
if ( (wanted > 0) && (wanted <= peer.getWantedACKSendSince()) ) {
|
||||
// still full packets left to be ACKed, since wanted time
|
||||
// is reset by retrieveACKBitfields when all of the IDs are
|
||||
// removed
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Precautionary rerequest ACK for peer " + peer);
|
||||
ackPeer(peer);
|
||||
}
|
||||
} else {
|
||||
_context.statManager().addRateData("udp.abortACK", 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -25,7 +25,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
private DecayingBloomFilter _recentlyCompletedMessages;
|
||||
private final OutboundMessageFragments _outbound;
|
||||
private final UDPTransport _transport;
|
||||
private final ACKSender _ackSender;
|
||||
private final MessageReceiver _messageReceiver;
|
||||
private volatile boolean _alive;
|
||||
|
||||
@@ -38,7 +37,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
//_inboundMessages = new HashMap(64);
|
||||
_outbound = outbound;
|
||||
_transport = transport;
|
||||
_ackSender = new ACKSender(_context, _transport);
|
||||
_messageReceiver = new MessageReceiver(_context, _transport);
|
||||
_context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES);
|
||||
@@ -55,7 +53,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
// array size (currently its tuned for 10 minute rates for the
|
||||
// messageValidator)
|
||||
_recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF");
|
||||
_ackSender.startup();
|
||||
_messageReceiver.startup();
|
||||
}
|
||||
|
||||
@@ -64,7 +61,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
if (_recentlyCompletedMessages != null)
|
||||
_recentlyCompletedMessages.stopDecaying();
|
||||
_recentlyCompletedMessages = null;
|
||||
_ackSender.shutdown();
|
||||
_messageReceiver.shutdown();
|
||||
}
|
||||
|
||||
@@ -127,7 +123,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
if (data.readMessageFragmentNum(i) == 0) {
|
||||
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1);
|
||||
from.messageFullyReceived(messageId, -1);
|
||||
_ackSender.ackPeer(from);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Message received is a dup: " + mid + " dups: "
|
||||
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
|
||||
@@ -174,7 +169,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
if (messageComplete) {
|
||||
_recentlyCompletedMessages.add(mid);
|
||||
from.messageFullyReceived(messageId, state.getCompleteSize());
|
||||
_ackSender.ackPeer(from);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Message received completely! " + state);
|
||||
@@ -196,7 +190,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
|
||||
from.messagePartiallyReceived();
|
||||
_ackSender.ackPeer(from);
|
||||
}
|
||||
|
||||
// TODO: Why give up on other fragments if one is bad?
|
||||
|
||||
@@ -502,6 +502,8 @@ class OutboundMessageFragments {
|
||||
|
||||
int sent = rv.size();
|
||||
peer.packetsTransmitted(sent);
|
||||
if (newFullAckCount <= 0)
|
||||
peer.clearWantedACKSendSince();
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Sent " + fragmentsToSend + " fragments of " + states.size() +
|
||||
" messages in " + sent + " packets to " + peer);
|
||||
|
||||
@@ -25,8 +25,9 @@ import net.i2p.router.util.CachedIteratorCollection;
|
||||
import net.i2p.router.util.CoDelPriorityBlockingQueue;
|
||||
import net.i2p.router.util.PriBlockingQueue;
|
||||
import net.i2p.util.BandwidthEstimator;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleTimer2;
|
||||
|
||||
/**
|
||||
* Contain all of the state about a UDP connection to a peer.
|
||||
@@ -301,7 +302,9 @@ public class PeerState {
|
||||
private static final int INIT_RTO = 1000;
|
||||
private static final int INIT_RTT = 0;
|
||||
private static final int MAX_RTO = 60*1000;
|
||||
private static final int CLOCK_SKEW_FUDGE = (ACKSender.ACK_FREQUENCY * 2) / 3;
|
||||
/** how frequently do we want to send ACKs to a peer? */
|
||||
private static final int ACK_FREQUENCY = 150;
|
||||
private static final int CLOCK_SKEW_FUDGE = (ACK_FREQUENCY * 2) / 3;
|
||||
|
||||
/**
|
||||
* The max number of acks we save to send as duplicates
|
||||
@@ -712,15 +715,26 @@ public class PeerState {
|
||||
_receiveBytes = 0;
|
||||
_receivePeriodBegin = now;
|
||||
}
|
||||
|
||||
if (_wantACKSendSince <= 0)
|
||||
_wantACKSendSince = now;
|
||||
_currentACKs.add(messageId);
|
||||
messagePartiallyReceived(now);
|
||||
}
|
||||
|
||||
/**
|
||||
* We received a partial message, or we want to send some acks.
|
||||
*/
|
||||
void messagePartiallyReceived() {
|
||||
if (_wantACKSendSince <= 0)
|
||||
_wantACKSendSince = _context.clock().now();
|
||||
messagePartiallyReceived(_context.clock().now());
|
||||
}
|
||||
|
||||
/**
|
||||
* We received a partial message, or we want to send some acks.
|
||||
* @since 0.9.52
|
||||
*/
|
||||
private synchronized void messagePartiallyReceived(long now) {
|
||||
if (_wantACKSendSince <= 0) {
|
||||
_wantACKSendSince = now;
|
||||
new ACKTimer();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -874,28 +888,18 @@ public class PeerState {
|
||||
}
|
||||
|
||||
/**
|
||||
* grab a list of ACKBitfield instances, some of which may fully
|
||||
* ACK a message while others may only partially ACK a message.
|
||||
* the values returned are limited in size so that they will fit within
|
||||
* the peer's current MTU as an ACK - as such, not all messages may be
|
||||
* ACKed with this call. Be sure to check getWantedACKSendSince() which
|
||||
* will be unchanged if there are ACKs remaining.
|
||||
*
|
||||
* @return non-null, possibly empty
|
||||
* @deprecated unused
|
||||
*/
|
||||
@Deprecated
|
||||
List<ACKBitfield> retrieveACKBitfields() { return retrieveACKBitfields(true); }
|
||||
|
||||
/**
|
||||
* See above. Only called by ACKSender with alwaysIncludeRetransmissions = false.
|
||||
* Only called by ACKTimer with alwaysIncludeRetransmissions = false.
|
||||
* So this is only for ACK-only packets, so all the size limiting is useless.
|
||||
* FIXME.
|
||||
* Side effect - sets _lastACKSend if rv is non-empty
|
||||
*
|
||||
* Caller should sync on this.
|
||||
*
|
||||
* Side effect - sets _lastACKSend to now if rv is non-empty.
|
||||
* Side effect - sets _wantACKSendSince to 0 if _currentACKs is now empty.
|
||||
*
|
||||
* @return non-null, possibly empty
|
||||
*/
|
||||
List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
|
||||
private List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
|
||||
int bytesRemaining = countMaxACKData();
|
||||
|
||||
// Limit the overhead of all the resent acks when using small MTU
|
||||
@@ -924,7 +928,7 @@ public class PeerState {
|
||||
bytesRemaining -= 4;
|
||||
}
|
||||
if (_currentACKs.isEmpty())
|
||||
_wantACKSendSince = -1;
|
||||
_wantACKSendSince = 0;
|
||||
if (alwaysIncludeRetransmissions || !rv.isEmpty()) {
|
||||
List<Long> randomResends = getCurrentResendACKs();
|
||||
// now repeat by putting in some old ACKs
|
||||
@@ -1278,11 +1282,15 @@ public class PeerState {
|
||||
/** when did we last send an ACK to the peer? */
|
||||
public long getLastACKSend() { return _lastACKSend; }
|
||||
|
||||
/** @deprecated unused */
|
||||
@Deprecated
|
||||
public void setLastACKSend(long when) { _lastACKSend = when; }
|
||||
|
||||
public long getWantedACKSendSince() { return _wantACKSendSince; }
|
||||
/**
|
||||
* All acks have been sent.
|
||||
* @since 0.9.52
|
||||
*/
|
||||
synchronized void clearWantedACKSendSince() {
|
||||
// race prevention
|
||||
if (_currentACKs.isEmpty())
|
||||
_wantACKSendSince = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Are we out of room to send all the current unsent acks in a single packet?
|
||||
@@ -1372,7 +1380,7 @@ public class PeerState {
|
||||
}
|
||||
|
||||
// so the ACKSender will drop this peer from its queue
|
||||
_wantACKSendSince = -1;
|
||||
_wantACKSendSince = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2087,6 +2095,67 @@ public class PeerState {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A timer to send an ack-only packet.
|
||||
* @since 0.9.52
|
||||
*/
|
||||
private class ACKTimer extends SimpleTimer2.TimedEvent {
|
||||
public ACKTimer() {
|
||||
super(_context.simpleTimer2());
|
||||
long delta = Math.min(_rtt/2, ACK_FREQUENCY);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Sending delayed ack in " + delta + ": " + PeerState.this);
|
||||
schedule(delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an ack-only packet, unless acks were already sent
|
||||
* as indicated by _wantACKSendSince == 0.
|
||||
* Will not requeue unless the acks don't all fit (unlikely).
|
||||
*/
|
||||
public void timeReached() {
|
||||
synchronized(PeerState.this) {
|
||||
long wanted = _wantACKSendSince;
|
||||
if (wanted <= 0) {
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Already acked:" + PeerState.this);
|
||||
return;
|
||||
}
|
||||
List<ACKBitfield> ackBitfields = retrieveACKBitfields(false);
|
||||
|
||||
if (!ackBitfields.isEmpty()) {
|
||||
PacketBuilder builder = new PacketBuilder(_context, _transport);
|
||||
UDPPacket ack = builder.buildACK(PeerState.this, ackBitfields);
|
||||
ack.markType(1);
|
||||
ack.setFragmentCount(-1);
|
||||
ack.setMessageType(PacketBuilder.TYPE_ACK);
|
||||
|
||||
if (_log.shouldDebug()) {
|
||||
//_log.debug("Sending " + ackBitfields + " to " + PeerState.this);
|
||||
_log.debug("Sending " + ackBitfields.size() + " acks to " + PeerState.this);
|
||||
}
|
||||
// locking issues, we ignore the result, and acks are small,
|
||||
// so don't even bother allocating
|
||||
//peer.allocateSendingBytes(ack.getPacket().getLength(), true);
|
||||
// ignore whether its ok or not, its a bloody ack. this should be fixed, probably.
|
||||
_transport.send(ack);
|
||||
|
||||
if (_wantACKSendSince > 0) {
|
||||
// still full packets left to be ACKed, since wanted time
|
||||
// is reset by retrieveACKBitfields when all of the IDs are
|
||||
// removed
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Requeueing more ACKs for " + PeerState.this);
|
||||
reschedule(25);
|
||||
}
|
||||
} else {
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("No more acks:" + PeerState.this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user