diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 312f8d348e764aa9f555b837054610559760ff3b..f1aedda5ae49a90369dc5e605b683e1c4a75eeb3 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -28,7 +28,7 @@ class ACKSender implements Runnable { private static final long POISON_PS = -9999999999l; /** how frequently do we want to send ACKs to a peer? */ - static final int ACK_FREQUENCY = 500; + static final int ACK_FREQUENCY = 350; public ACKSender(RouterContext ctx, UDPTransport transport) { _context = ctx; @@ -73,12 +73,12 @@ class ACKSender implements Runnable { _peersToACK.clear(); } - private long ackFrequency(long timeSinceACK, long rtt) { + private static long ackFrequency(long timeSinceACK, long rtt) { // if we are actively pumping lots of data to them, we can depend upon // the unsentACKThreshold to figure out when to send an ACK instead of // using the timer, so we can set the timeout/frequency higher if (timeSinceACK < 2*1000) - return Math.max(rtt/2, 500); + return Math.max(rtt/2, ACK_FREQUENCY); else return ACK_FREQUENCY; } @@ -162,15 +162,15 @@ class ACKSender implements Runnable { } if (!ackBitfields.isEmpty()) { - _context.statManager().addRateData("udp.sendACKCount", ackBitfields.size(), 0); + _context.statManager().addRateData("udp.sendACKCount", ackBitfields.size()); if (remaining > 0) - _context.statManager().addRateData("udp.sendACKRemaining", remaining, 0); + _context.statManager().addRateData("udp.sendACKRemaining", remaining); // set above before the break //now = _context.clock().now(); if (lastSend < 0) lastSend = now - 1; _context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted); - //_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size(), 0); + //_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size()); UDPPacket ack = _builder.buildACK(peer, ackBitfields); ack.markType(1); ack.setFragmentCount(-1); @@ -193,7 +193,7 @@ class ACKSender implements Runnable { ackPeer(peer); } } else { - _context.statManager().addRateData("udp.abortACK", 1, 0); + _context.statManager().addRateData("udp.abortACK", 1); } } } diff --git a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java index 5a36fd83898260cb9230149faba2c3e5396a0958..c5eef5f29eadbb38e3b5ab766991f7abce152e81 100644 --- a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java +++ b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java @@ -185,14 +185,15 @@ class IntroductionManager { */ public void pingIntroducers() { // Try to keep the connection up for two hours after we made anybody an introducer - long pingCutoff = _context.clock().now() - (105 * 60 * 1000); - long inactivityCutoff = _context.clock().now() - UDPTransport.MIN_EXPIRE_TIMEOUT; + long now = _context.clock().now(); + long pingCutoff = now - (105 * 60 * 1000); + long inactivityCutoff = now - UDPTransport.MIN_EXPIRE_TIMEOUT; for (PeerState cur : _inbound) { if (cur.getIntroducerTime() > pingCutoff && cur.getLastSendTime() < inactivityCutoff) { if (_log.shouldLog(Log.INFO)) _log.info("Pinging introducer: " + cur); - cur.setLastSendTime(_context.clock().now()); + cur.setLastSendTime(now); _transport.send(_builder.buildPing(cur)); } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index d1377a666438933ff9570529e82ff482c2193597..db77b6af8835ad1f893b41e76668d30eaebe83c8 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -406,13 +406,20 @@ class PeerState { public long getLastReceiveTime() { return _lastReceiveTime; } /** how many seconds have we sent packets without any ACKs received? */ public int getConsecutiveFailedSends() { return _consecutiveFailedSends; } - /** have we received a packet with the ECN bit set in the current second? */ + + /** + * have we received a packet with the ECN bit set in the current second? + * @return false always + * @deprecated unused, ECNs are never sent, always returns false + */ public boolean getCurrentSecondECNReceived() { return _currentSecondECNReceived; } + /** * have all of the packets received in the current second requested that * the previous second's ACKs be sent? */ //public boolean getRemoteWantsPreviousACKs() { return _remoteWantsPreviousACKs; } + /** how many bytes should we send to the peer in a second */ public int getSendWindowBytes() { return _sendWindowBytes; } /** how many bytes can we send to the peer in the current second */ @@ -548,10 +555,12 @@ class PeerState { public void setLastPingTime(long when) { _lastPingTime = when; } /** - * Latest of last sent and last ping + * Latest of last sent, last ACK, last ping * @since 0.9.3 */ - public long getLastSendOrPingTime() { return Math.max(_lastSendTime, _lastPingTime); } + public long getLastSendOrPingTime() { + return Math.max(Math.max(_lastSendTime, _lastACKSend), _lastPingTime); + } /** return the smoothed send transfer rate */ public int getSendBps() { return _sendBps; } @@ -754,7 +763,8 @@ class PeerState { /** * either they told us to back off, or we had to resend to get * the data through. - * + * Caller should synch on this + * @return true if window shrunk, but nobody uses the return value */ private boolean congestionOccurred() { long now = _context.clock().now(); @@ -762,8 +772,6 @@ class PeerState { return false; // only shrink once every few seconds _lastCongestionOccurred = now; - _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps); - int congestionAt = _sendWindowBytes; //if (true) // _sendWindowBytes -= 10000; @@ -819,7 +827,10 @@ class PeerState { return randomResends; } - /** the ack was sent */ + /** + * The ack was sent. + * Side effect - sets _lastACKSend + */ public void removeACKMessage(Long messageId) { boolean removed = _currentACKs.remove(messageId); if (removed) { @@ -840,15 +851,15 @@ class PeerState { /** * The max number of acks we save to send as duplicates */ - private static final int MAX_RESEND_ACKS = 16; + private static final int MAX_RESEND_ACKS = 64; /** * The max number of duplicate acks sent in each ack-only messge. * Doesn't really matter, we have plenty of room... * @since 0.7.13 */ - private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS; + private static final int MAX_RESEND_ACKS_LARGE = MAX_RESEND_ACKS / 3; /** for small MTU */ - private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS; + private static final int MAX_RESEND_ACKS_SMALL = MAX_RESEND_ACKS / 5; /** * grab a list of ACKBitfield instances, some of which may fully @@ -867,11 +878,11 @@ class PeerState { * See above. Only called by ACKSender with alwaysIncludeRetransmissions = false. * So this is only for ACK-only packets, so all the size limiting is useless. * FIXME. + * Side effect - sets _lastACKSend if rv is non-empty * * @return non-null, possibly empty */ public List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) { - List<ACKBitfield> rv = new ArrayList(MAX_RESEND_ACKS); int bytesRemaining = countMaxACKData(); // Limit the overhead of all the resent acks when using small MTU @@ -883,8 +894,10 @@ class PeerState { maxResendAcks = MAX_RESEND_ACKS_SMALL; else maxResendAcks = MAX_RESEND_ACKS_LARGE; - List<Long> randomResends = new ArrayList(_currentACKsResend); + List<ACKBitfield> rv = new ArrayList(maxResendAcks); + // save to add to currentACKsResend later so we don't include twice + List<Long> currentACKsRemoved = new ArrayList(_currentACKs.size()); // As explained above, we include the acks in any order // since we are unlikely to get backed up - // just take them using the Set iterator. @@ -894,12 +907,13 @@ class PeerState { iter.remove(); long id = val.longValue(); rv.add(new FullACKBitfield(id)); - _currentACKsResend.offer(val); + currentACKsRemoved.add(val); bytesRemaining -= 4; } if (_currentACKs.isEmpty()) _wantACKSendSince = -1; if (alwaysIncludeRetransmissions || !rv.isEmpty()) { + List<Long> randomResends = new ArrayList(_currentACKsResend); // now repeat by putting in some old ACKs // randomly selected from the Resend queue. // Maybe we should only resend each one a certain number of times... @@ -920,6 +934,9 @@ class PeerState { bytesRemaining -= 4; //} } + for (Long val : currentACKsRemoved) { + _currentACKsResend.offer(val); + } } // trim down the resends while (_currentACKsResend.size() > MAX_RESEND_ACKS) @@ -947,7 +964,8 @@ class PeerState { } } - _lastACKSend = _context.clock().now(); + if (!rv.isEmpty()) + _lastACKSend = _context.clock().now(); //if (rv == null) // rv = Collections.EMPTY_LIST; if (partialIncluded > 0) @@ -1015,8 +1033,11 @@ class PeerState { public String toString() { return "Full ACK of " + _msgId; } } - /** we sent a message which was ACKed containing the given # of bytes */ - public void messageACKed(int bytesACKed, long lifetime, int numSends) { + /** + * We sent a message which was ACKed containing the given # of bytes. + * Caller should synch on this + */ + private void locked_messageACKed(int bytesACKed, long lifetime, int numSends) { _concurrentMessagesActive--; if (_concurrentMessagesActive < 0) _concurrentMessagesActive = 0; @@ -1030,15 +1051,15 @@ class PeerState { if (_sendWindowBytes <= _slowStartThreshold) { _sendWindowBytes += bytesACKed; } else { - if (false) { - _sendWindowBytes += 16; // why 16? - } else { + //if (false) { + // _sendWindowBytes += 16; // why 16? + //} else { float prob = ((float)bytesACKed) / ((float)(_sendWindowBytes<<1)); float v = _context.random().nextFloat(); if (v < 0) v = 0-v; if (v <= prob) _sendWindowBytes += bytesACKed; //512; // bytesACKed; - } + //} } } else { int allow = _concurrentMessagesAllowed - 1; @@ -1051,21 +1072,31 @@ class PeerState { _lastReceiveTime = _context.clock().now(); _lastSendFullyTime = _lastReceiveTime; - if (true) { + //if (true) { if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes) _sendWindowBytesRemaining += bytesACKed; else _sendWindowBytesRemaining = _sendWindowBytes; - } + //} _messagesSent++; if (numSends < 2) { - synchronized (this) { + // caller synchs + //synchronized (this) { recalculateTimeouts(lifetime); adjustMTU(); - } + //} + } + } + + /** + * We sent a message which was ACKed containing the given # of bytes. + */ + private void messageACKed(int bytesACKed, long lifetime, int numSends) { + synchronized(this) { + locked_messageACKed(bytesACKed, lifetime, numSends); } - else if (_log.shouldLog(Log.INFO)) + if (numSends >= 2 && _log.shouldLog(Log.INFO)) _log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); _context.statManager().addRateData("udp.sendBps", _sendBps, lifetime); @@ -1144,9 +1175,10 @@ class PeerState { _packetsRetransmitted = packets; } *****/ - congestionOccurred(); + _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps); _context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation); synchronized (this) { + congestionOccurred(); adjustMTU(); } //_rto *= 2; @@ -1208,10 +1240,14 @@ class PeerState { } /** - * we received a backoff request, so cut our send window + * We received a backoff request, so cut our send window. + * NOTE: ECN sending is unimplemented, this is never called. */ public void ECNReceived() { - congestionOccurred(); + synchronized(this) { + congestionOccurred(); + } + _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps); _currentSecondECNReceived = true; _lastReceiveTime = _context.clock().now(); } @@ -1222,17 +1258,33 @@ class PeerState { /** when did we last send an ACK to the peer? */ public long getLastACKSend() { return _lastACKSend; } + + /** @deprecated unused */ public void setLastACKSend(long when) { _lastACKSend = when; } + public long getWantedACKSendSince() { return _wantACKSendSince; } + /** + * Are we out of room to send all the current unsent acks in a single packet? + * This is a huge threshold (134 for small MTU and 255 for large MTU) + * that is rarely if ever exceeded in practice. + * So just use a fixed threshold of half the resend acks, so that if the + * packet is lost the acks have a decent chance of getting retransmitted. + * Used only by ACKSender. + */ public boolean unsentACKThresholdReached() { - int threshold = countMaxACKData() / 4; - return _currentACKs.size() >= threshold; + //int threshold = countMaxACKData() / 4; + //return _currentACKs.size() >= threshold; + return _currentACKs.size() >= MAX_RESEND_ACKS / 2; } - /** @return MTU - 83 */ + /** + * @return how many bytes available for acks in an ack-only packet, == MTU - 83 + * Max of 1020 + */ private int countMaxACKData() { - return _mtu + return Math.min(PacketBuilder.ABSOLUTE_MAX_ACKS * 4, + _mtu - PacketBuilder.IP_HEADER_SIZE - PacketBuilder.UDP_HEADER_SIZE - UDPPacket.IV_SIZE @@ -1241,7 +1293,7 @@ class PeerState { - 4 // timestamp - 1 // data flag - 1 // # ACKs - - 16; // padding safety + - 16); // padding safety } private int minRTO() {