forked from I2P_Developers/i2p.i2p
WIP: UDP: Replace ACKSender thread with per-PeerState delayed ack timers
Don't send a delayed ack-only packet if acks are sent in a data packet first. Timer is created in PeerState messageFullyReceived() and messagePartiallyReceived(). This patch for now doesn't delete ACKSender.java, it just doesn't create the thread. WIP, cleanups and optimizations TODO
This commit is contained in:
@@ -25,7 +25,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
private DecayingBloomFilter _recentlyCompletedMessages;
|
||||
private final OutboundMessageFragments _outbound;
|
||||
private final UDPTransport _transport;
|
||||
private final ACKSender _ackSender;
|
||||
private final MessageReceiver _messageReceiver;
|
||||
private volatile boolean _alive;
|
||||
|
||||
@@ -38,7 +37,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
//_inboundMessages = new HashMap(64);
|
||||
_outbound = outbound;
|
||||
_transport = transport;
|
||||
_ackSender = new ACKSender(_context, _transport);
|
||||
_messageReceiver = new MessageReceiver(_context, _transport);
|
||||
_context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES);
|
||||
@@ -55,7 +53,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
// array size (currently its tuned for 10 minute rates for the
|
||||
// messageValidator)
|
||||
_recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF");
|
||||
_ackSender.startup();
|
||||
_messageReceiver.startup();
|
||||
}
|
||||
|
||||
@@ -64,7 +61,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
if (_recentlyCompletedMessages != null)
|
||||
_recentlyCompletedMessages.stopDecaying();
|
||||
_recentlyCompletedMessages = null;
|
||||
_ackSender.shutdown();
|
||||
_messageReceiver.shutdown();
|
||||
}
|
||||
|
||||
@@ -127,7 +123,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
if (data.readMessageFragmentNum(i) == 0) {
|
||||
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1);
|
||||
from.messageFullyReceived(messageId, -1);
|
||||
_ackSender.ackPeer(from);
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Message received is a dup: " + mid + " dups: "
|
||||
+ _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of "
|
||||
@@ -174,7 +169,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
if (messageComplete) {
|
||||
_recentlyCompletedMessages.add(mid);
|
||||
from.messageFullyReceived(messageId, state.getCompleteSize());
|
||||
_ackSender.ackPeer(from);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Message received completely! " + state);
|
||||
@@ -196,7 +190,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Queueing up a partial ACK for peer: " + from + " for " + state);
|
||||
from.messagePartiallyReceived();
|
||||
_ackSender.ackPeer(from);
|
||||
}
|
||||
|
||||
// TODO: Why give up on other fragments if one is bad?
|
||||
|
||||
@@ -502,6 +502,8 @@ class OutboundMessageFragments {
|
||||
|
||||
int sent = rv.size();
|
||||
peer.packetsTransmitted(sent);
|
||||
if (newFullAckCount <= 0)
|
||||
peer.clearWantedACKSendSince();
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Sent " + fragmentsToSend + " fragments of " + states.size() +
|
||||
" messages in " + sent + " packets to " + peer);
|
||||
|
||||
@@ -25,8 +25,9 @@ import net.i2p.router.util.CachedIteratorCollection;
|
||||
import net.i2p.router.util.CoDelPriorityBlockingQueue;
|
||||
import net.i2p.router.util.PriBlockingQueue;
|
||||
import net.i2p.util.BandwidthEstimator;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleTimer2;
|
||||
|
||||
/**
|
||||
* Contain all of the state about a UDP connection to a peer.
|
||||
@@ -712,15 +713,27 @@ public class PeerState {
|
||||
_receiveBytes = 0;
|
||||
_receivePeriodBegin = now;
|
||||
}
|
||||
|
||||
if (_wantACKSendSince <= 0)
|
||||
_wantACKSendSince = now;
|
||||
_currentACKs.add(messageId);
|
||||
messagePartiallyReceived(now);
|
||||
}
|
||||
|
||||
/**
|
||||
* We received a partial message, or we want to send some acks.
|
||||
*/
|
||||
void messagePartiallyReceived() {
|
||||
if (_wantACKSendSince <= 0)
|
||||
_wantACKSendSince = _context.clock().now();
|
||||
messagePartiallyReceived(_context.clock().now());
|
||||
}
|
||||
|
||||
/**
|
||||
* We received a partial message, or we want to send some acks.
|
||||
* @since 0.9.52
|
||||
*/
|
||||
private synchronized void messagePartiallyReceived(long now) {
|
||||
if (_wantACKSendSince <= 0) {
|
||||
_wantACKSendSince = now;
|
||||
// todo keep the same timer
|
||||
new ACKTimer(now);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -891,7 +904,9 @@ public class PeerState {
|
||||
* See above. Only called by ACKSender with alwaysIncludeRetransmissions = false.
|
||||
* So this is only for ACK-only packets, so all the size limiting is useless.
|
||||
* FIXME.
|
||||
* Side effect - sets _lastACKSend if rv is non-empty
|
||||
*
|
||||
* Side effect - sets _lastACKSend to now if rv is non-empty.
|
||||
* Side effect - sets _wantACKSendSince to -1 if _currentACKs is now empty.
|
||||
*
|
||||
* @return non-null, possibly empty
|
||||
*/
|
||||
@@ -924,7 +939,7 @@ public class PeerState {
|
||||
bytesRemaining -= 4;
|
||||
}
|
||||
if (_currentACKs.isEmpty())
|
||||
_wantACKSendSince = -1;
|
||||
_wantACKSendSince = 0;
|
||||
if (alwaysIncludeRetransmissions || !rv.isEmpty()) {
|
||||
List<Long> randomResends = getCurrentResendACKs();
|
||||
// now repeat by putting in some old ACKs
|
||||
@@ -1282,8 +1297,20 @@ public class PeerState {
|
||||
@Deprecated
|
||||
public void setLastACKSend(long when) { _lastACKSend = when; }
|
||||
|
||||
/** ACKSender only, to be removed
|
||||
* @deprecated unused
|
||||
*/
|
||||
public long getWantedACKSendSince() { return _wantACKSendSince; }
|
||||
|
||||
/**
|
||||
* All acks have been sent.
|
||||
* @since 0.9.52
|
||||
*/
|
||||
synchronized void clearWantedACKSendSince() {
|
||||
_wantACKSendSince = 0;
|
||||
// TODO we could also cancel ACKTimer if we keep a ref to it
|
||||
}
|
||||
|
||||
/**
|
||||
* Are we out of room to send all the current unsent acks in a single packet?
|
||||
* This is a huge threshold (134 for small MTU and 255 for large MTU)
|
||||
@@ -1372,7 +1399,7 @@ public class PeerState {
|
||||
}
|
||||
|
||||
// so the ACKSender will drop this peer from its queue
|
||||
_wantACKSendSince = -1;
|
||||
_wantACKSendSince = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2087,6 +2114,86 @@ public class PeerState {
|
||||
}
|
||||
}
|
||||
|
||||
private static long ackFrequency(long timeSinceACK, long rtt) {
|
||||
// if we are actively pumping lots of data to them, we can depend upon
|
||||
// the unsentACKThreshold to figure out when to send an ACK instead of
|
||||
// using the timer, so we can set the timeout/frequency higher
|
||||
// TODO move constant to PS
|
||||
if (timeSinceACK < 2*1000)
|
||||
return Math.min(rtt/2, ACKSender.ACK_FREQUENCY);
|
||||
else
|
||||
return ACKSender.ACK_FREQUENCY;
|
||||
}
|
||||
|
||||
/**
|
||||
* A timer to send an ack-only packet.
|
||||
* @since 0.9.52
|
||||
*/
|
||||
private class ACKTimer extends SimpleTimer2.TimedEvent {
|
||||
public ACKTimer(long now) {
|
||||
super(_context.simpleTimer2());
|
||||
long delta = ackFrequency(_lastACKSend, _rtt);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Sending delayed ack in " + delta + ": " + PeerState.this);
|
||||
schedule(delta);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an ack-only packet, unless acks were already sent
|
||||
* as indicated by _wantACKSendSince == 0.
|
||||
* Will not requeue unless the acks don't all fit (unlikely).
|
||||
*/
|
||||
public void timeReached() {
|
||||
synchronized(PeerState.this) {
|
||||
long wanted = _wantACKSendSince;
|
||||
if (wanted <= 0) {
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Already acked:" + PeerState.this);
|
||||
return;
|
||||
}
|
||||
List<ACKBitfield> ackBitfields = retrieveACKBitfields(false);
|
||||
|
||||
// TODO move stats
|
||||
if (!ackBitfields.isEmpty()) {
|
||||
_context.statManager().addRateData("udp.sendACKCount", ackBitfields.size());
|
||||
// todo do we need this stat?
|
||||
/*
|
||||
long now = _context.clock().now();
|
||||
long lastSend = _lastACKSend;
|
||||
if (lastSend < 0)
|
||||
lastSend = now - 1;
|
||||
_context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
|
||||
*/
|
||||
// todo just create it once, put it in the transport
|
||||
PacketBuilder builder = new PacketBuilder(_context, _transport);
|
||||
UDPPacket ack = builder.buildACK(PeerState.this, ackBitfields);
|
||||
ack.markType(1);
|
||||
ack.setFragmentCount(-1);
|
||||
ack.setMessageType(PacketBuilder.TYPE_ACK);
|
||||
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Sending " + ackBitfields.size() + " acks to " + PeerState.this);
|
||||
// locking issues, we ignore the result, and acks are small,
|
||||
// so don't even bother allocating
|
||||
//peer.allocateSendingBytes(ack.getPacket().getLength(), true);
|
||||
// ignore whether its ok or not, its a bloody ack. this should be fixed, probably.
|
||||
_transport.send(ack);
|
||||
|
||||
if (_wantACKSendSince > 0) {
|
||||
// still full packets left to be ACKed, since wanted time
|
||||
// is reset by retrieveACKBitfields when all of the IDs are
|
||||
// removed
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Requeueing more ACKs for " + PeerState.this);
|
||||
reschedule(25);
|
||||
}
|
||||
} else {
|
||||
_context.statManager().addRateData("udp.abortACK", 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user