diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 8360c4011..381423356 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -25,7 +25,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ private DecayingBloomFilter _recentlyCompletedMessages; private final OutboundMessageFragments _outbound; private final UDPTransport _transport; - private final ACKSender _ackSender; private final MessageReceiver _messageReceiver; private volatile boolean _alive; @@ -38,7 +37,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ //_inboundMessages = new HashMap(64); _outbound = outbound; _transport = transport; - _ackSender = new ACKSender(_context, _transport); _messageReceiver = new MessageReceiver(_context, _transport); _context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES); @@ -55,7 +53,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ // array size (currently its tuned for 10 minute rates for the // messageValidator) _recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF"); - _ackSender.startup(); _messageReceiver.startup(); } @@ -64,7 +61,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ if (_recentlyCompletedMessages != null) _recentlyCompletedMessages.stopDecaying(); _recentlyCompletedMessages = null; - _ackSender.shutdown(); _messageReceiver.shutdown(); } @@ -127,7 +123,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ if (data.readMessageFragmentNum(i) == 0) { _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1); from.messageFullyReceived(messageId, -1); - _ackSender.ackPeer(from); if (_log.shouldLog(Log.INFO)) _log.info("Message received is a dup: " + mid + " dups: " + _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of " @@ -174,7 +169,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ if (messageComplete) { _recentlyCompletedMessages.add(mid); from.messageFullyReceived(messageId, state.getCompleteSize()); - _ackSender.ackPeer(from); if (_log.shouldLog(Log.DEBUG)) _log.debug("Message received completely! " + state); @@ -196,7 +190,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ if (_log.shouldLog(Log.DEBUG)) _log.debug("Queueing up a partial ACK for peer: " + from + " for " + state); from.messagePartiallyReceived(); - _ackSender.ackPeer(from); } // TODO: Why give up on other fragments if one is bad? diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 5eff2780c..717cbe404 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -502,6 +502,8 @@ class OutboundMessageFragments { int sent = rv.size(); peer.packetsTransmitted(sent); + if (newFullAckCount <= 0) + peer.clearWantedACKSendSince(); if (_log.shouldDebug()) _log.debug("Sent " + fragmentsToSend + " fragments of " + states.size() + " messages in " + sent + " packets to " + peer); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 6b3aa142f..10ac88c46 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -25,8 +25,9 @@ import net.i2p.router.util.CachedIteratorCollection; import net.i2p.router.util.CoDelPriorityBlockingQueue; import net.i2p.router.util.PriBlockingQueue; import net.i2p.util.BandwidthEstimator; -import net.i2p.util.Log; import net.i2p.util.ConcurrentHashSet; +import net.i2p.util.Log; +import net.i2p.util.SimpleTimer2; /** * Contain all of the state about a UDP connection to a peer. @@ -712,15 +713,27 @@ public class PeerState { _receiveBytes = 0; _receivePeriodBegin = now; } - - if (_wantACKSendSince <= 0) - _wantACKSendSince = now; _currentACKs.add(messageId); + messagePartiallyReceived(now); } + /** + * We received a partial message, or we want to send some acks. + */ void messagePartiallyReceived() { - if (_wantACKSendSince <= 0) - _wantACKSendSince = _context.clock().now(); + messagePartiallyReceived(_context.clock().now()); + } + + /** + * We received a partial message, or we want to send some acks. + * @since 0.9.52 + */ + private synchronized void messagePartiallyReceived(long now) { + if (_wantACKSendSince <= 0) { + _wantACKSendSince = now; +// todo keep the same timer + new ACKTimer(now); + } } /** @@ -891,7 +904,9 @@ public class PeerState { * See above. Only called by ACKSender with alwaysIncludeRetransmissions = false. * So this is only for ACK-only packets, so all the size limiting is useless. * FIXME. - * Side effect - sets _lastACKSend if rv is non-empty + * + * Side effect - sets _lastACKSend to now if rv is non-empty. + * Side effect - sets _wantACKSendSince to -1 if _currentACKs is now empty. * * @return non-null, possibly empty */ @@ -924,7 +939,7 @@ public class PeerState { bytesRemaining -= 4; } if (_currentACKs.isEmpty()) - _wantACKSendSince = -1; + _wantACKSendSince = 0; if (alwaysIncludeRetransmissions || !rv.isEmpty()) { List randomResends = getCurrentResendACKs(); // now repeat by putting in some old ACKs @@ -1282,8 +1297,20 @@ public class PeerState { @Deprecated public void setLastACKSend(long when) { _lastACKSend = when; } + /** ACKSender only, to be removed + * @deprecated unused + */ public long getWantedACKSendSince() { return _wantACKSendSince; } + /** + * All acks have been sent. + * @since 0.9.52 + */ + synchronized void clearWantedACKSendSince() { + _wantACKSendSince = 0; +// TODO we could also cancel ACKTimer if we keep a ref to it + } + /** * Are we out of room to send all the current unsent acks in a single packet? * This is a huge threshold (134 for small MTU and 255 for large MTU) @@ -1372,7 +1399,7 @@ public class PeerState { } // so the ACKSender will drop this peer from its queue - _wantACKSendSince = -1; + _wantACKSendSince = 0; } /** @@ -2087,6 +2114,86 @@ public class PeerState { } } + private static long ackFrequency(long timeSinceACK, long rtt) { + // if we are actively pumping lots of data to them, we can depend upon + // the unsentACKThreshold to figure out when to send an ACK instead of + // using the timer, so we can set the timeout/frequency higher +// TODO move constant to PS + if (timeSinceACK < 2*1000) + return Math.min(rtt/2, ACKSender.ACK_FREQUENCY); + else + return ACKSender.ACK_FREQUENCY; + } + + /** + * A timer to send an ack-only packet. + * @since 0.9.52 + */ + private class ACKTimer extends SimpleTimer2.TimedEvent { + public ACKTimer(long now) { + super(_context.simpleTimer2()); + long delta = ackFrequency(_lastACKSend, _rtt); + if (_log.shouldDebug()) + _log.debug("Sending delayed ack in " + delta + ": " + PeerState.this); + schedule(delta); + } + + /** + * Send an ack-only packet, unless acks were already sent + * as indicated by _wantACKSendSince == 0. + * Will not requeue unless the acks don't all fit (unlikely). + */ + public void timeReached() { + synchronized(PeerState.this) { + long wanted = _wantACKSendSince; + if (wanted <= 0) { + if (_log.shouldDebug()) + _log.debug("Already acked:" + PeerState.this); + return; + } + List ackBitfields = retrieveACKBitfields(false); + +// TODO move stats + if (!ackBitfields.isEmpty()) { + _context.statManager().addRateData("udp.sendACKCount", ackBitfields.size()); +// todo do we need this stat? +/* + long now = _context.clock().now(); + long lastSend = _lastACKSend; + if (lastSend < 0) + lastSend = now - 1; + _context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted); +*/ +// todo just create it once, put it in the transport + PacketBuilder builder = new PacketBuilder(_context, _transport); + UDPPacket ack = builder.buildACK(PeerState.this, ackBitfields); + ack.markType(1); + ack.setFragmentCount(-1); + ack.setMessageType(PacketBuilder.TYPE_ACK); + + if (_log.shouldDebug()) + _log.debug("Sending " + ackBitfields.size() + " acks to " + PeerState.this); + // locking issues, we ignore the result, and acks are small, + // so don't even bother allocating + //peer.allocateSendingBytes(ack.getPacket().getLength(), true); + // ignore whether its ok or not, its a bloody ack. this should be fixed, probably. + _transport.send(ack); + + if (_wantACKSendSince > 0) { + // still full packets left to be ACKed, since wanted time + // is reset by retrieveACKBitfields when all of the IDs are + // removed + if (_log.shouldInfo()) + _log.info("Requeueing more ACKs for " + PeerState.this); + reschedule(25); + } + } else { + _context.statManager().addRateData("udp.abortACK", 1); + } + } + } + } + // why removed? Some risk of dups in OutboundMessageFragments._activePeers ??? /*