diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 5cc606b037fa70b944aef7b9e1b2650d22d7fb7c..8770467167a434ed2a9365ce3525d17c7440780c 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -425,7 +425,13 @@ class OutboundMessageFragments { int queued = state.push(toSend); // per-state stats if (queued > 0 && state.getMaxSends() > 1) { - peer.messageRetransmitted(queued); + int maxPktSz = state.fragmentSize(0); + if (peer.getVersion() == 1) + maxPktSz += (peer.isIPv6() ? PacketBuilder.MIN_IPV6_DATA_PACKET_OVERHEAD : PacketBuilder.MIN_DATA_PACKET_OVERHEAD); + else + maxPktSz += SSU2Payload.BLOCK_HEADER_SIZE + + (peer.isIPv6() ? PacketBuilder2.MIN_IPV6_DATA_PACKET_OVERHEAD : PacketBuilder2.MIN_DATA_PACKET_OVERHEAD); + peer.messageRetransmitted(queued, maxPktSz); // _packetsRetransmitted += toSend; // lifetime for the transport _context.statManager().addRateData("udp.peerPacketsRetransmitted", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted()); _context.statManager().addRateData("udp.packetsRetransmitted", state.getLifetime(), peer.getPacketsTransmitted()); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index d6c82a6d867a638473951da0f77f0f651d2bb2d6..e2636ef42656b4338784bbd7ffcbf0df5ccce781 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -158,6 +158,7 @@ public class PeerState { private int _mtuReceive; /** what is the largest packet we will ever send to the peer? */ private int _largeMTU; + private final int _minMTU; /* how many consecutive packets at or under the min MTU have been received */ private long _consecutiveSmall; private int _mtuIncreases; @@ -293,6 +294,10 @@ public class PeerState { */ public static final int MAX_MTU = Math.max(LARGE_MTU, MAX_IPV6_MTU); + // amount to adjust up or down in adjustMTU() + // should be multiple of 16, at least for SSU 1 + private static final int MTU_STEP = 64; + private static final int MIN_RTO = 1000; private static final int INIT_RTO = 1000; private static final int INIT_RTT = 0; @@ -320,6 +325,8 @@ public class PeerState { private static final int FAST_RTX_ACKS = 3; /** + * SSU 1 only. + * * @param rtt from the EstablishState, or 0 if not available */ public PeerState(RouterContext ctx, UDPTransport transport, @@ -341,10 +348,12 @@ public class PeerState { _mtu = DEFAULT_MTU; _mtuReceive = DEFAULT_MTU; _largeMTU = transport.getMTU(false); + _minMTU = MIN_MTU; } else { _mtu = MIN_IPV6_MTU; _mtuReceive = MIN_IPV6_MTU; _largeMTU = transport.getMTU(true); + _minMTU = MIN_IPV6_MTU; } // RFC 5681 sec. 3.1 if (_mtu > 1095) @@ -400,6 +409,7 @@ public class PeerState { } else { _largeMTU = transport.getMTU(true); } + _minMTU = PeerState2.MIN_MTU; // RFC 5681 sec. 3.1 _sendWindowBytes = 3 * _mtu; _sendWindowBytesRemaining = _sendWindowBytes; @@ -1114,7 +1124,7 @@ public class PeerState { * We sent a message which was ACKed containing the given # of bytes. * Caller should synch on this */ - private void locked_messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending, boolean anyQueued) { + private void locked_messageACKed(int bytesACKed, int maxPktSz, long lifetime, int numSends, boolean anyPending, boolean anyQueued) { _consecutiveFailedSends = 0; // _lastFailedSendPeriod = -1; if (numSends < 2) { @@ -1157,7 +1167,7 @@ public class PeerState { if (numSends < 2) { // caller synchs recalculateTimeouts(lifetime); - adjustMTU(); + adjustMTU(maxPktSz, true); } if (!anyPending) { @@ -1180,9 +1190,9 @@ public class PeerState { /** * We sent a message which was ACKed containing the given # of bytes. */ - protected void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending, boolean anyQueued) { + private void messageACKed(int bytesACKed, int maxPktSz, long lifetime, int numSends, boolean anyPending, boolean anyQueued) { synchronized(this) { - locked_messageACKed(bytesACKed, lifetime, numSends, anyPending, anyQueued); + locked_messageACKed(bytesACKed, maxPktSz, lifetime, numSends, anyPending, anyQueued); } _bwEstimator.addSample(bytesACKed); if (numSends >= 2 && _log.shouldDebug()) @@ -1214,26 +1224,40 @@ public class PeerState { } /** + * Adjust upward if a large packet was successfully sent without retransmission. + * Adjust downward if a packet was retransmitted. + * * Caller should synch on this - */ - private void adjustMTU() { - double retransPct = 0; - if (_packetsTransmitted > 10) { - retransPct = (double)_packetsRetransmitted/(double)_packetsTransmitted; - boolean wantLarge = retransPct < .30d; // heuristic to allow fairly lossy links to use large MTUs - if (wantLarge && _mtu != _largeMTU) { - if (_context.random().nextLong(_mtuDecreases) <= 0) { - _mtu = _largeMTU; + * + * @param maxPktSz the largest packet that was sent + * @param success was it sent successfully? + */ + private void adjustMTU(int maxPktSz, boolean success) { + if (_packetsTransmitted > 0) { + // heuristic to allow fairly lossy links to use large MTUs + boolean wantLarge = success && + (float)_packetsRetransmitted / (float)_packetsTransmitted < 0.10f; + // we only increase if the size was close to the limit + if (wantLarge) { + if (_mtu < _largeMTU && maxPktSz > _mtu - (MTU_STEP * 2) && + (_mtuDecreases <= 1 || _context.random().nextInt(_mtuDecreases) <= 0)){ + _mtu = Math.min(_mtu + MTU_STEP, _largeMTU); _mtuIncreases++; + _mtuDecreases = 0; _context.statManager().addRateData("udp.mtuIncrease", _mtuIncreases); - } - } else if (!wantLarge && _mtu == _largeMTU) { - _mtu = getVersion() == 2 ? PeerState2.MIN_MTU : (_remoteIP.length == 4 ? MIN_MTU : MIN_IPV6_MTU); - _mtuDecreases++; - _context.statManager().addRateData("udp.mtuDecrease", _mtuDecreases); - } - } else { - _mtu = getVersion() == 2 ? PeerState2.MIN_MTU : (_remoteIP.length == 4 ? MIN_MTU : MIN_IPV6_MTU); + if (_log.shouldDebug()) + _log.debug("Increased MTU after " + maxPktSz + " byte packet acked on " + this); + } + } else { + if (_mtu > _minMTU) { + _mtu = Math.max(_mtu - MTU_STEP, _minMTU); + _mtuDecreases++; + _mtuIncreases = 0; + _context.statManager().addRateData("udp.mtuDecrease", _mtuDecreases); + if (_log.shouldDebug()) + _log.debug("Decreased MTU after " + maxPktSz + " byte packet retx on " + this); + } + } } } @@ -1241,9 +1265,7 @@ public class PeerState { * @since 0.9.2 */ synchronized void setHisMTU(int mtu) { - if (mtu <= MIN_MTU || mtu >= _largeMTU || - (_remoteIP.length == 16 && mtu <= MIN_IPV6_MTU) || - (getVersion() == 2 && mtu <= PeerState2.MIN_MTU)) + if (mtu <= _minMTU || mtu >= _largeMTU) return; _largeMTU = mtu; if (mtu < _mtu) @@ -1251,12 +1273,12 @@ public class PeerState { } /** we are resending a packet, so lets jack up the rto */ - synchronized void messageRetransmitted(int packets) { + synchronized void messageRetransmitted(int packets, int maxPktSz) { _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes); _context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation); _packetsRetransmitted += packets; congestionOccurred(); - adjustMTU(); + adjustMTU(maxPktSz, false); } synchronized void packetsTransmitted(int packets) { @@ -1306,18 +1328,15 @@ public class PeerState { */ synchronized void packetReceived(int size) { _packetsReceived++; - int minMTU; if (_remoteIP.length == 4) { size += OVERHEAD_SIZE; - minMTU = getVersion() == 2 ? PeerState2.MIN_MTU : MIN_MTU; } else { size += IPV6_OVERHEAD_SIZE; - minMTU = getVersion() == 2 ? PeerState2.MIN_MTU : MIN_IPV6_MTU; } - if (size <= minMTU) { + if (size <= _minMTU) { _consecutiveSmall++; if (_consecutiveSmall >= MTU_RCV_DISPLAY_THRESHOLD) - _mtuReceive = minMTU; + _mtuReceive = _minMTU; } else { _consecutiveSmall = 0; if (size > _mtuReceive) @@ -1856,7 +1875,9 @@ public class PeerState { _ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn)); } // this adjusts the rtt/rto/window/etc - messageACKed(state.getUnackedSize(), lifetime, numSends, anyPending, anyQueued); + int maxPktSz = state.fragmentSize(0) + + (isIPv6() ? PacketBuilder.MIN_IPV6_DATA_PACKET_OVERHEAD : PacketBuilder.MIN_DATA_PACKET_OVERHEAD); + messageACKed(state.getUnackedSize(), maxPktSz, lifetime, numSends, anyPending, anyQueued); } else { // dupack, likely Long seq; @@ -1958,7 +1979,7 @@ public class PeerState { } } // this adjusts the rtt/rto/window/etc - messageACKed(ackedSize, lifetime, numSends, anyPending, anyQueued); + messageACKed(ackedSize, 0, lifetime, numSends, anyPending, anyQueued); } // we do this even if only partial long sn = state.getSeqNum(); @@ -2079,7 +2100,10 @@ public class PeerState { } } // this adjusts the rtt/rto/window/etc - messageACKed(ackedSize, lifetime, numSends, anyPending, anyQueued); + int maxPktSz = state.fragmentSize(0) + + SSU2Payload.BLOCK_HEADER_SIZE + + (isIPv6() ? PacketBuilder2.MIN_IPV6_DATA_PACKET_OVERHEAD : PacketBuilder2.MIN_DATA_PACKET_OVERHEAD); + messageACKed(ackedSize, maxPktSz, lifetime, numSends, anyPending, anyQueued); return true; } @@ -2383,6 +2407,8 @@ public class PeerState { buf.append(" lifetime: ").append(DataHelper.formatDuration(now - _keyEstablishedTime)); buf.append(" RTT: ").append(_rtt); buf.append(" RTO: ").append(_rto); + buf.append(" MTU: ").append(_mtu); + buf.append(" LMTU: ").append(_largeMTU); buf.append(" cwin: ").append(_sendWindowBytes); buf.append(" acwin: ").append(_sendWindowBytesRemaining); buf.append(" SST: ").append(_slowStartThreshold);