diff --git a/history.txt b/history.txt index e505044a7..9dd7dbfb1 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,15 @@ +2021-09-22 zzz + * UDP: Replace ACKSender thread with per-PeerState ack timers + +2021-09-10 zzz + * I2CP: Don't call listener.readError() after external client disconnect + +2021-09-05 zzz + * Update: Get backup URLs from news feed + +2021-08-28 zzz + * Console: Catch error checking systray availability (gitlab issue #331) + 2021-08-26 zzz * Debian: Fix dh_installdocs build error * Router: Increase rekey probability diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 24c6e3ec7..aee15848b 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Git"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 1; + public final static long BUILD = 2; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java deleted file mode 100644 index 8d806f74d..000000000 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ /dev/null @@ -1,212 +0,0 @@ -package net.i2p.router.transport.udp; - -import java.util.HashSet; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import net.i2p.router.RouterContext; -import net.i2p.util.I2PThread; -import net.i2p.util.Log; - -/** - * Blocking thread that is given peers by the inboundFragment pool, sending out - * any outstanding ACKs. - * The ACKs are sent directly to UDPSender, - * bypassing OutboundMessageFragments and PacketPusher. - */ -class ACKSender implements Runnable { - private final RouterContext _context; - private final Log _log; - private final UDPTransport _transport; - private final PacketBuilder _builder; - /** list of peers (PeerState) who we have received data from but not yet ACKed to */ - private final BlockingQueue _peersToACK; - private volatile boolean _alive; - private static final long POISON_PS = -9999999999l; - - /** how frequently do we want to send ACKs to a peer? */ - static final int ACK_FREQUENCY = 150; - - public ACKSender(RouterContext ctx, UDPTransport transport) { - _context = ctx; - _log = ctx.logManager().getLog(ACKSender.class); - _transport = transport; - _peersToACK = new LinkedBlockingQueue(); - _builder = new PacketBuilder(_context, transport); - _alive = true; - _context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.ackFrequency", "how long ago did we send an ACK to this peer?", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendACKRemaining", "when we ack a peer, how many peers are left waiting to ack?", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.abortACK", "How often do we schedule up an ACK send only to find it had already been sent (through piggyback)?", "udp", UDPTransport.RATES); - } - - /** - * Add to the queue. - * For speed, don't check for duplicates here. - * The runner will remove them in its own thread. - */ - public void ackPeer(PeerState peer) { - if (_alive) - _peersToACK.offer(peer); - } - - public synchronized void startup() { - _alive = true; - _peersToACK.clear(); - I2PThread t = new I2PThread(this, "UDP ACK sender", true); - t.start(); - } - - public synchronized void shutdown() { - _alive = false; - PeerState poison = new PeerState(_context, _transport, new byte[4], 0, null, false, 0); - poison.setTheyRelayToUsAs(POISON_PS); - _peersToACK.offer(poison); - for (int i = 1; i <= 5 && !_peersToACK.isEmpty(); i++) { - try { - Thread.sleep(i * 50); - } catch (InterruptedException ie) {} - } - _peersToACK.clear(); - } - - private static long ackFrequency(long timeSinceACK, long rtt) { - // if we are actively pumping lots of data to them, we can depend upon - // the unsentACKThreshold to figure out when to send an ACK instead of - // using the timer, so we can set the timeout/frequency higher - if (timeSinceACK < 2*1000) - return Math.min(rtt/2, ACK_FREQUENCY); - else - return ACK_FREQUENCY; - } - - public void run() { - try { - run2(); - } finally { - // prevent OOM on thread death - if (_alive) { - _alive = false; - _log.error("ACK Sender died"); - } - } - } - - private void run2() { - // we use a Set to strip out dups that come in on the Queue - Set notYet = new HashSet(); - while (_alive) { - PeerState peer = null; - long now = 0; - long remaining = -1; - long wanted = 0; - - while (_alive) { - // Pull from the queue until we find one ready to ack - // Any that are not ready we will put back on the queue - PeerState cur = null; - try { - if (notYet.isEmpty()) - // wait forever - cur = _peersToACK.take(); - else - // Don't wait if nothing there, just put everybody back and sleep below - cur = _peersToACK.poll(); - } catch (InterruptedException ie) {} - - if (cur != null) { - if (cur.getTheyRelayToUsAs() == POISON_PS) - return; - wanted = cur.getWantedACKSendSince(); - now = _context.clock().now(); - long delta = wanted + ackFrequency(now-cur.getLastACKSend(), cur.getRTT()) - now; - if (wanted <= 0) { - // it got acked by somebody - discard, remove any dups, and go around again - notYet.remove(cur); - } else if ( (delta <= 0) || (cur.unsentACKThresholdReached()) ) { - // found one to ack - peer = cur; - notYet.remove(cur); // in case a dup - try { - // bulk operations may throw an exception - _peersToACK.addAll(notYet); - } catch (NoSuchElementException nsee) {} - notYet.clear(); - break; - } else { - // not yet, go around again - // moving from the Queue to the Set and then back removes duplicates - boolean added = notYet.add(cur); - if (added && _log.shouldLog(Log.DEBUG)) - _log.debug("Pending ACK (delta = " + delta + ") for " + cur); - } - } else if (!notYet.isEmpty()) { - // put them all back and wait a while - try { - // bulk operations may throw an exception - _peersToACK.addAll(notYet); - } catch (RuntimeException e) {} - if (_log.shouldLog(Log.DEBUG)) - _log.debug("sleeping, pending size = " + notYet.size()); - notYet.clear(); - try { - // sleep a little longer than the divided frequency, - // so it will be ready after we circle around a few times - Thread.sleep(5 + (ACK_FREQUENCY / 3)); - } catch (InterruptedException ie) {} - } // else go around again where we will wait at take() - } // inner while() - - if (peer != null) { - long lastSend = peer.getLastACKSend(); - // set above before the break - //long wanted = peer.getWantedACKSendSince(); - List ackBitfields = peer.retrieveACKBitfields(false); - - if (wanted < 0) { - if (_log.shouldLog(Log.WARN)) - _log.warn("why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields); - continue; - } - - if (!ackBitfields.isEmpty()) { - _context.statManager().addRateData("udp.sendACKCount", ackBitfields.size()); - if (remaining > 0) - _context.statManager().addRateData("udp.sendACKRemaining", remaining); - // set above before the break - //now = _context.clock().now(); - if (lastSend < 0) - lastSend = now - 1; - _context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted); - //_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size()); - UDPPacket ack = _builder.buildACK(peer, ackBitfields); - ack.markType(1); - ack.setFragmentCount(-1); - ack.setMessageType(PacketBuilder.TYPE_ACK); - - if (_log.shouldLog(Log.INFO)) - _log.info("Sending " + ackBitfields + " to " + peer); - // locking issues, we ignore the result, and acks are small, - // so don't even bother allocating - //peer.allocateSendingBytes(ack.getPacket().getLength(), true); - // ignore whether its ok or not, its a bloody ack. this should be fixed, probably. - _transport.send(ack); - - if ( (wanted > 0) && (wanted <= peer.getWantedACKSendSince()) ) { - // still full packets left to be ACKed, since wanted time - // is reset by retrieveACKBitfields when all of the IDs are - // removed - if (_log.shouldInfo()) - _log.info("Precautionary rerequest ACK for peer " + peer); - ackPeer(peer); - } - } else { - _context.statManager().addRateData("udp.abortACK", 1); - } - } - } - } -} diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 8360c4011..381423356 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -25,7 +25,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ private DecayingBloomFilter _recentlyCompletedMessages; private final OutboundMessageFragments _outbound; private final UDPTransport _transport; - private final ACKSender _ackSender; private final MessageReceiver _messageReceiver; private volatile boolean _alive; @@ -38,7 +37,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ //_inboundMessages = new HashMap(64); _outbound = outbound; _transport = transport; - _ackSender = new ACKSender(_context, _transport); _messageReceiver = new MessageReceiver(_context, _transport); _context.statManager().createRateStat("udp.receivedCompleteTime", "How long it takes to receive a full message", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES); @@ -55,7 +53,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ // array size (currently its tuned for 10 minute rates for the // messageValidator) _recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF"); - _ackSender.startup(); _messageReceiver.startup(); } @@ -64,7 +61,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ if (_recentlyCompletedMessages != null) _recentlyCompletedMessages.stopDecaying(); _recentlyCompletedMessages = null; - _ackSender.shutdown(); _messageReceiver.shutdown(); } @@ -127,7 +123,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ if (data.readMessageFragmentNum(i) == 0) { _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1); from.messageFullyReceived(messageId, -1); - _ackSender.ackPeer(from); if (_log.shouldLog(Log.INFO)) _log.info("Message received is a dup: " + mid + " dups: " + _recentlyCompletedMessages.getCurrentDuplicateCount() + " out of " @@ -174,7 +169,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ if (messageComplete) { _recentlyCompletedMessages.add(mid); from.messageFullyReceived(messageId, state.getCompleteSize()); - _ackSender.ackPeer(from); if (_log.shouldLog(Log.DEBUG)) _log.debug("Message received completely! " + state); @@ -196,7 +190,6 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ if (_log.shouldLog(Log.DEBUG)) _log.debug("Queueing up a partial ACK for peer: " + from + " for " + state); from.messagePartiallyReceived(); - _ackSender.ackPeer(from); } // TODO: Why give up on other fragments if one is bad? diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 5eff2780c..717cbe404 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -502,6 +502,8 @@ class OutboundMessageFragments { int sent = rv.size(); peer.packetsTransmitted(sent); + if (newFullAckCount <= 0) + peer.clearWantedACKSendSince(); if (_log.shouldDebug()) _log.debug("Sent " + fragmentsToSend + " fragments of " + states.size() + " messages in " + sent + " packets to " + peer); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 6b3aa142f..0c56fcb68 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -25,8 +25,9 @@ import net.i2p.router.util.CachedIteratorCollection; import net.i2p.router.util.CoDelPriorityBlockingQueue; import net.i2p.router.util.PriBlockingQueue; import net.i2p.util.BandwidthEstimator; -import net.i2p.util.Log; import net.i2p.util.ConcurrentHashSet; +import net.i2p.util.Log; +import net.i2p.util.SimpleTimer2; /** * Contain all of the state about a UDP connection to a peer. @@ -301,7 +302,9 @@ public class PeerState { private static final int INIT_RTO = 1000; private static final int INIT_RTT = 0; private static final int MAX_RTO = 60*1000; - private static final int CLOCK_SKEW_FUDGE = (ACKSender.ACK_FREQUENCY * 2) / 3; + /** how frequently do we want to send ACKs to a peer? */ + private static final int ACK_FREQUENCY = 150; + private static final int CLOCK_SKEW_FUDGE = (ACK_FREQUENCY * 2) / 3; /** * The max number of acks we save to send as duplicates @@ -712,15 +715,26 @@ public class PeerState { _receiveBytes = 0; _receivePeriodBegin = now; } - - if (_wantACKSendSince <= 0) - _wantACKSendSince = now; _currentACKs.add(messageId); + messagePartiallyReceived(now); } + /** + * We received a partial message, or we want to send some acks. + */ void messagePartiallyReceived() { - if (_wantACKSendSince <= 0) - _wantACKSendSince = _context.clock().now(); + messagePartiallyReceived(_context.clock().now()); + } + + /** + * We received a partial message, or we want to send some acks. + * @since 0.9.52 + */ + private synchronized void messagePartiallyReceived(long now) { + if (_wantACKSendSince <= 0) { + _wantACKSendSince = now; + new ACKTimer(); + } } /** @@ -874,28 +888,18 @@ public class PeerState { } /** - * grab a list of ACKBitfield instances, some of which may fully - * ACK a message while others may only partially ACK a message. - * the values returned are limited in size so that they will fit within - * the peer's current MTU as an ACK - as such, not all messages may be - * ACKed with this call. Be sure to check getWantedACKSendSince() which - * will be unchanged if there are ACKs remaining. - * - * @return non-null, possibly empty - * @deprecated unused - */ - @Deprecated - List retrieveACKBitfields() { return retrieveACKBitfields(true); } - - /** - * See above. Only called by ACKSender with alwaysIncludeRetransmissions = false. + * Only called by ACKTimer with alwaysIncludeRetransmissions = false. * So this is only for ACK-only packets, so all the size limiting is useless. * FIXME. - * Side effect - sets _lastACKSend if rv is non-empty + * + * Caller should sync on this. + * + * Side effect - sets _lastACKSend to now if rv is non-empty. + * Side effect - sets _wantACKSendSince to 0 if _currentACKs is now empty. * * @return non-null, possibly empty */ - List retrieveACKBitfields(boolean alwaysIncludeRetransmissions) { + private List retrieveACKBitfields(boolean alwaysIncludeRetransmissions) { int bytesRemaining = countMaxACKData(); // Limit the overhead of all the resent acks when using small MTU @@ -924,7 +928,7 @@ public class PeerState { bytesRemaining -= 4; } if (_currentACKs.isEmpty()) - _wantACKSendSince = -1; + _wantACKSendSince = 0; if (alwaysIncludeRetransmissions || !rv.isEmpty()) { List randomResends = getCurrentResendACKs(); // now repeat by putting in some old ACKs @@ -1278,11 +1282,15 @@ public class PeerState { /** when did we last send an ACK to the peer? */ public long getLastACKSend() { return _lastACKSend; } - /** @deprecated unused */ - @Deprecated - public void setLastACKSend(long when) { _lastACKSend = when; } - - public long getWantedACKSendSince() { return _wantACKSendSince; } + /** + * All acks have been sent. + * @since 0.9.52 + */ + synchronized void clearWantedACKSendSince() { + // race prevention + if (_currentACKs.isEmpty()) + _wantACKSendSince = 0; + } /** * Are we out of room to send all the current unsent acks in a single packet? @@ -1372,7 +1380,7 @@ public class PeerState { } // so the ACKSender will drop this peer from its queue - _wantACKSendSince = -1; + _wantACKSendSince = 0; } /** @@ -2087,6 +2095,67 @@ public class PeerState { } } + /** + * A timer to send an ack-only packet. + * @since 0.9.52 + */ + private class ACKTimer extends SimpleTimer2.TimedEvent { + public ACKTimer() { + super(_context.simpleTimer2()); + long delta = Math.min(_rtt/2, ACK_FREQUENCY); + if (_log.shouldDebug()) + _log.debug("Sending delayed ack in " + delta + ": " + PeerState.this); + schedule(delta); + } + + /** + * Send an ack-only packet, unless acks were already sent + * as indicated by _wantACKSendSince == 0. + * Will not requeue unless the acks don't all fit (unlikely). + */ + public void timeReached() { + synchronized(PeerState.this) { + long wanted = _wantACKSendSince; + if (wanted <= 0) { + if (_log.shouldDebug()) + _log.debug("Already acked:" + PeerState.this); + return; + } + List ackBitfields = retrieveACKBitfields(false); + + if (!ackBitfields.isEmpty()) { + PacketBuilder builder = new PacketBuilder(_context, _transport); + UDPPacket ack = builder.buildACK(PeerState.this, ackBitfields); + ack.markType(1); + ack.setFragmentCount(-1); + ack.setMessageType(PacketBuilder.TYPE_ACK); + + if (_log.shouldDebug()) { + //_log.debug("Sending " + ackBitfields + " to " + PeerState.this); + _log.debug("Sending " + ackBitfields.size() + " acks to " + PeerState.this); + } + // locking issues, we ignore the result, and acks are small, + // so don't even bother allocating + //peer.allocateSendingBytes(ack.getPacket().getLength(), true); + // ignore whether its ok or not, its a bloody ack. this should be fixed, probably. + _transport.send(ack); + + if (_wantACKSendSince > 0) { + // still full packets left to be ACKed, since wanted time + // is reset by retrieveACKBitfields when all of the IDs are + // removed + if (_log.shouldInfo()) + _log.info("Requeueing more ACKs for " + PeerState.this); + reschedule(25); + } + } else { + if (_log.shouldDebug()) + _log.debug("No more acks:" + PeerState.this); + } + } + } + } + // why removed? Some risk of dups in OutboundMessageFragments._activePeers ??? /*