From 4ce11a174aa3a45b39761dcabb5791a33a1e8bb1 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Tue, 30 Oct 2012 18:16:37 +0000 Subject: [PATCH] * SSU: - Adjust RTT/RTO calculations - Better bandwidth tracking - Cleanup of OutboundMessageState - Stat tweaks * Transports: Increase min peer port to 1024 --- history.txt | 27 +++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../i2p/router/transport/TransportImpl.java | 10 +- .../router/transport/ntcp/NTCPTransport.java | 4 +- .../transport/udp/EstablishmentManager.java | 4 +- .../udp/OutboundMessageFragments.java | 27 ++- .../transport/udp/OutboundMessageState.java | 162 +++++++++--------- .../i2p/router/transport/udp/PeerState.java | 75 ++++---- .../i2p/router/transport/udp/UDPReceiver.java | 14 +- .../router/transport/udp/UDPTransport.java | 17 +- 10 files changed, 183 insertions(+), 159 deletions(-) diff --git a/history.txt b/history.txt index 2e6a34f9ea..149db427c7 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,30 @@ +2012-10-30 zzz + * i2psnark: + - Add kbucket debugging + - Eliminate redundant explore keys + - Add more limits to DHT tracker + - Delay expiration at startup + - Only enable updates for dev builds and 1% of release builds + * i2ptunnel: + - Create backup privkey files (ticket #752) + - Fix NPE in Android startup + - Fix disabling proxy authorization + * Installer: Drop news.xml and old certs + * logs.jsp: + - Don't display dup message if last + - Spacing tweaks + * OutNetMessage: Properly clean up when dropped by codel (but unused for now + since codel is disabled for ONM) + * SSU: + - Adjust RTT/RTO calculations + - Better bandwidth tracking + - Cleanup of OutboundMessageState + - Stat tweaks + * StatisticsManager: Publish stats less often + * Transports: Increase min peer port to 1024 + * Tunnels: Implement per-client outbound tunnel message priority (ticket #719) + * Update Manager: Warn on dup registration + 2012-10-30 sponge * cleanups as requested diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 87e5bffca0..282c18b422 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 1; + public final static long BUILD = 2; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 0b155d559c..14ee0f3237 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -269,7 +269,7 @@ public abstract class TransportImpl implements Transport { + " to " + msg.getTarget().getIdentity().getHash().toBase64() + " (details: " + msg + ')'); if (msg.getExpiration() < _context.clock().now()) - _context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime); + _context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime); if (allowRequeue) { if ( ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) @@ -343,7 +343,7 @@ public abstract class TransportImpl implements Transport { if (sendSuccessful) { // TODO fix this stat for SSU ticket #698 - _context.statManager().addRateData("transport.sendProcessingTime", lifetime, lifetime); + _context.statManager().addRateData("transport.sendProcessingTime", lifetime); // object churn. 33 ms for NTCP and 788 for SSU, but meaningless due to // differences in how it's computed (immediate vs. round trip) //_context.statManager().addRateData("transport.sendProcessingTime." + getStyle(), lifetime, 0); @@ -351,7 +351,7 @@ public abstract class TransportImpl implements Transport { _context.statManager().addRateData("transport.sendMessageSize", msg.getMessageSize(), sendTime); } else { _context.profileManager().messageFailed(msg.getTarget().getIdentity().getHash(), getStyle()); - _context.statManager().addRateData("transport.sendMessageFailureLifetime", lifetime, lifetime); + _context.statManager().addRateData("transport.sendMessageFailureLifetime", lifetime); } } @@ -433,9 +433,9 @@ public abstract class TransportImpl implements Transport { _context.statManager().addRateData("transport.receiveMessageSize", bytesReceived, msToReceive); } - _context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive); + _context.statManager().addRateData("transport.receiveMessageTime", msToReceive); if (msToReceive > 1000) { - _context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive); + _context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive); } //// this functionality is built into the InNetMessagePool diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index ab57ccd25b..9877b698ee 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -70,11 +70,11 @@ public class NTCPTransport extends TransportImpl { private static final long[] RATES = { 10*60*1000 }; /** - * To prevent trouble. To be raised to 1024 in 0.9.4. + * To prevent trouble. 1024 as of 0.9.4. * * @since 0.9.3 */ - private static final int MIN_PEER_PORT = 500; + private static final int MIN_PEER_PORT = 1024; // Opera doesn't have the char, TODO check UA //private static final String THINSP = " / "; diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 860b758942..6cfa3cdc33 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -154,8 +154,8 @@ class EstablishmentManager { _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index d90c24cf12..d4c8be8336 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -80,8 +80,8 @@ class OutboundMessageFragments { _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendSparse", "How many fragments were partially ACKed and hence not resent (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendPiggyback", "How many acks were piggybacked on a data packet (time == message lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendPiggybackPartial", "How many partial acks were piggybacked on a data packet (time == message lifetime)", "udp", UDPTransport.RATES); @@ -152,25 +152,18 @@ class OutboundMessageFragments { public void add(OutNetMessage msg) { I2NPMessage msgBody = msg.getMessage(); RouterInfo target = msg.getTarget(); - if ( (msgBody == null) || (target == null) ) + if (target == null) return; - // todo: make sure the outNetMessage is initialzed once and only once - OutboundMessageState state = new OutboundMessageState(_context); - boolean ok = state.initialize(msg, msgBody); - if (ok) { - PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash()); - if (peer == null) { - _transport.failed(msg, "Peer disconnected quickly"); - state.releaseResources(); - return; - } + PeerState peer = _transport.getPeerState(target.getIdentity().calculateHash()); + try { + // will throw IAE if peer == null + OutboundMessageState state = new OutboundMessageState(_context, msg, peer); peer.add(state); add(peer); - //_context.statManager().addRateData("udp.outboundActiveCount", active, 0); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error initializing " + msg); + } catch (IllegalArgumentException iae) { + _transport.failed(msg, "Peer disconnected quickly"); + return; } } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 5e8d9a35b4..276ec13c30 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -19,17 +19,19 @@ class OutboundMessageState implements CDPQEntry { private final I2PAppContext _context; private final Log _log; /** may be null if we are part of the establishment */ - private OutNetMessage _message; - private long _messageId; + private final OutNetMessage _message; + private final long _messageId; /** will be null, unless we are part of the establishment */ - private PeerState _peer; - private long _expiration; + private final PeerState _peer; + private final long _expiration; private ByteArray _messageBuf; /** fixed fragment size across the message */ private int _fragmentSize; + /** size of the I2NP message */ + private final int _totalSize; /** sends[i] is how many times the fragment has been sent, or -1 if ACKed */ private short _fragmentSends[]; - private long _startedOn; + private final long _startedOn; private long _nextSendTime; private int _pushCount; private short _maxSends; @@ -59,43 +61,15 @@ class OutboundMessageState implements CDPQEntry { private static final long EXPIRATION = 10*1000; - public OutboundMessageState(I2PAppContext context) { - _context = context; - _log = _context.logManager().getLog(OutboundMessageState.class); - } - -/**** - public boolean initialize(OutNetMessage msg) { - if (msg == null) return false; - try { - return initialize(msg, msg.getMessage(), null); - } catch (OutOfMemoryError oom) { - throw oom; - } catch (Exception e) { - _log.log(Log.CRIT, "Error initializing " + msg, e); - return false; - } - } -****/ - + /** * Called from UDPTransport * TODO make two constructors, remove this, and make more things final * @return success * @throws IAE if too big */ - public boolean initialize(I2NPMessage msg, PeerState peer) { - if (msg == null) - return false; - - try { - return initialize(null, msg, peer); - } catch (OutOfMemoryError oom) { - throw oom; - } catch (Exception e) { - _log.log(Log.CRIT, "Error initializing " + msg, e); - return false; - } + public OutboundMessageState(I2PAppContext context, I2NPMessage msg, PeerState peer) { + this(context, null, msg, peer); } /** @@ -104,18 +78,8 @@ class OutboundMessageState implements CDPQEntry { * @return success * @throws IAE if too big */ - public boolean initialize(OutNetMessage m, I2NPMessage msg) { - if ( (m == null) || (msg == null) ) - return false; - - try { - return initialize(m, msg, null); - } catch (OutOfMemoryError oom) { - throw oom; - } catch (Exception e) { - _log.log(Log.CRIT, "Error initializing " + msg, e); - return false; - } + public OutboundMessageState(I2PAppContext context, OutNetMessage m, PeerState peer) { + this(context, m, m.getMessage(), peer); } /** @@ -124,28 +88,26 @@ class OutboundMessageState implements CDPQEntry { * @return success * @throws IAE if too big */ - private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) { + private OutboundMessageState(I2PAppContext context, OutNetMessage m, I2NPMessage msg, PeerState peer) { + if (msg == null || peer == null) + throw new IllegalArgumentException(); + _context = context; + _log = _context.logManager().getLog(OutboundMessageState.class); _message = m; _peer = peer; int size = msg.getRawMessageSize(); acquireBuf(size); - try { - int len = msg.toRawByteArray(_messageBuf.getData()); - _messageBuf.setValid(len); - _messageId = msg.getUniqueId(); - - _startedOn = _context.clock().now(); - _nextSendTime = _startedOn; - _expiration = _startedOn + EXPIRATION; - //_expiration = msg.getExpiration(); - - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len)); - return true; - } catch (IllegalStateException ise) { - releaseBuf(); - return false; - } + _totalSize = msg.toRawByteArray(_messageBuf.getData()); + _messageBuf.setValid(_totalSize); + _messageId = msg.getUniqueId(); + + _startedOn = _context.clock().now(); + _nextSendTime = _startedOn; + _expiration = _startedOn + EXPIRATION; + //_expiration = msg.getExpiration(); + + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len)); } /** @@ -203,7 +165,6 @@ class OutboundMessageState implements CDPQEntry { public OutNetMessage getMessage() { return _message; } public long getMessageId() { return _messageId; } public PeerState getPeer() { return _peer; } - public void setPeer(PeerState peer) { _peer = peer; } public boolean isExpired() { return _expiration < _context.clock().now(); @@ -224,8 +185,7 @@ class OutboundMessageState implements CDPQEntry { ByteArray messageBuf = _messageBuf; int rv = 0; if ( (messageBuf != null) && (fragmentSends != null) ) { - int totalSize = messageBuf.getValid(); - int lastSize = totalSize % _fragmentSize; + int lastSize = _totalSize % _fragmentSize; if (lastSize == 0) lastSize = _fragmentSize; for (int i = 0; i < fragmentSends.length; i++) { @@ -286,11 +246,22 @@ class OutboundMessageState implements CDPQEntry { public long getNextSendTime() { return _nextSendTime; } public void setNextSendTime(long when) { _nextSendTime = when; } + + /** + * The max number of sends for any fragment, which is the + * same as the push count, at least as it's coded now. + */ public int getMaxSends() { return _maxSends; } + + /** + * The number of times we've pushed some fragments, which is the + * same as the max sends, at least as it's coded now. + */ public int getPushCount() { return _pushCount; } /** note that we have pushed the message fragments */ public void push() { + // these will never be different... _pushCount++; if (_pushCount > _maxSends) _maxSends = (short)_pushCount; @@ -301,23 +272,35 @@ class OutboundMessageState implements CDPQEntry { } + /** + * Whether fragment() has been called. + * NOT whether it has more than one fragment. + * + * Caller should synchronize + * + * @return true iff fragment() has been called previously + */ public boolean isFragmented() { return _fragmentSends != null; } /** * Prepare the message for fragmented delivery, using no more than * fragmentSize bytes per fragment. * + * Caller should synchronize + * + * @throws IllegalStateException if called more than once */ public void fragment(int fragmentSize) { - int totalSize = _messageBuf.getValid(); - int numFragments = totalSize / fragmentSize; - if (numFragments * fragmentSize < totalSize) + if (_fragmentSends != null) + throw new IllegalStateException(); + int numFragments = _totalSize / fragmentSize; + if (numFragments * fragmentSize < _totalSize) numFragments++; // This should never happen, as 534 bytes * 64 fragments > 32KB, and we won't bid on > 32KB if (numFragments > InboundMessageState.MAX_FRAGMENTS) - throw new IllegalArgumentException("Fragmenting a " + totalSize + " message into " + numFragments + " fragments - too many!"); + throw new IllegalArgumentException("Fragmenting a " + _totalSize + " message into " + numFragments + " fragments - too many!"); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Fragmenting a " + totalSize + " message into " + numFragments + " fragments"); + _log.debug("Fragmenting a " + _totalSize + " message into " + numFragments + " fragments"); //_fragmentEnd = new int[numFragments]; _fragmentSends = new short[numFragments]; @@ -327,7 +310,13 @@ class OutboundMessageState implements CDPQEntry { _fragmentSize = fragmentSize; } - /** how many fragments in the message */ + /** + * How many fragments in the message. + * Only valid after fragment() has been called. + * Returns -1 before then. + * + * Caller should synchronize + */ public int getFragmentCount() { if (_fragmentSends == null) return -1; @@ -335,15 +324,26 @@ class OutboundMessageState implements CDPQEntry { return _fragmentSends.length; } - public int getFragmentSize() { return _fragmentSize; } + /** + * The size of the I2NP message. Does not include any SSU overhead. + * + * Caller should synchronize + */ + public int getMessageSize() { return _totalSize; } - /** should we continue sending this fragment? */ + /** + * Should we continue sending this fragment? + * Only valid after fragment() has been called. + * Throws NPE before then. + * + * Caller should synchronize + */ public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; } public int fragmentSize(int fragmentNum) { if (_messageBuf == null) return -1; if (fragmentNum + 1 == _fragmentSends.length) { - int valid = _messageBuf.getValid(); + int valid = _totalSize; if (valid <= _fragmentSize) return valid; // bugfix 0.8.12 @@ -406,7 +406,7 @@ class OutboundMessageState implements CDPQEntry { System.arraycopy(_messageBuf.getData(), start, out, outOffset, toSend); if (_log.shouldLog(Log.DEBUG)) _log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId - + "[" + start + "-" + (start+toSend) + "/" + _messageBuf.getValid() + "/" + _fragmentSize + "]: " + + "[" + start + "-" + (start+toSend) + "/" + _totalSize + "/" + _fragmentSize + "]: " + Base64.encode(out, outOffset, toSend)); return toSend; } else { @@ -467,13 +467,11 @@ class OutboundMessageState implements CDPQEntry { @Override public String toString() { short sends[] = _fragmentSends; - ByteArray messageBuf = _messageBuf; StringBuilder buf = new StringBuilder(256); buf.append("OB Message ").append(_messageId); if (sends != null) buf.append(" with ").append(sends.length).append(" fragments"); - if (messageBuf != null) - buf.append(" of size ").append(messageBuf.getValid()); + buf.append(" of size ").append(_totalSize); buf.append(" volleys: ").append(_maxSends); buf.append(" lifetime: ").append(getLifetime()); if (sends != null) { diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 99e11a4fac..3f0ca45fcb 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -289,7 +289,6 @@ class PeerState { */ public static final int LARGE_MTU = 1484; - /** 600 */ private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY; private static final int INIT_RTO = 3*1000; public static final int INIT_RTT = INIT_RTO / 2; @@ -714,7 +713,7 @@ class PeerState { //_receiveACKBytes = 0; _receiveBytes = 0; _receivePeriodBegin = now; - _context.statManager().addRateData("udp.receiveBps", _receiveBps, 0); + _context.statManager().addRateData("udp.receiveBps", _receiveBps); } if (_wantACKSendSince <= 0) @@ -1102,31 +1101,32 @@ class PeerState { if (numSends >= 2 && _log.shouldLog(Log.INFO)) _log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); - _context.statManager().addRateData("udp.sendBps", _sendBps, lifetime); + _context.statManager().addRateData("udp.sendBps", _sendBps); } + /** This is the value specified in RFC 2988 */ + private static final float RTT_DAMPENING = 0.125f; + /** * Adjust the tcp-esque timeouts. * Caller should synch on this */ private void recalculateTimeouts(long lifetime) { + // the rttDev calculation matches that recommended in RFC 2988 (beta = 1/4) _rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation)); + float scale = RTT_DAMPENING; // the faster we are going, the slower we want to reduce the rtt - float scale = 0.1f; - if (_sendBps > 0) - scale = lifetime / ((float)lifetime + (float)_sendBps); - if (scale < 0.001f) scale = 0.001f; + //if (_sendBps > 0) + // scale = lifetime / ((float)lifetime + (float)_sendBps); + //if (scale < 0.001f) scale = 0.001f; _rtt = (int)(_rtt*(1.0f-scale) + (scale)*lifetime); - _rto = _rtt + (_rttDeviation<<2); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt - + " rttDev=" + _rttDeviation + " rto=" + _rto); - if (_rto < minRTO()) - _rto = minRTO(); - else if (_rto > MAX_RTO) - _rto = MAX_RTO; + // K = 4 + _rto = Math.min(MAX_RTO, Math.max(minRTO(), _rtt + (_rttDeviation<<2))); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt + // + " rttDev=" + _rttDeviation + " rto=" + _rto); } /** @@ -1141,12 +1141,12 @@ class PeerState { if (_context.random().nextLong(_mtuDecreases) <= 0) { _mtu = _largeMTU; _mtuIncreases++; - _context.statManager().addRateData("udp.mtuIncrease", _mtuIncreases, _mtuDecreases); + _context.statManager().addRateData("udp.mtuIncrease", _mtuIncreases); } } else if (!wantLarge && _mtu == _largeMTU) { _mtu = MIN_MTU; _mtuDecreases++; - _context.statManager().addRateData("udp.mtuDecrease", _mtuDecreases, _mtuIncreases); + _context.statManager().addRateData("udp.mtuDecrease", _mtuDecreases); } } else { _mtu = DEFAULT_MTU; @@ -1178,7 +1178,7 @@ class PeerState { _packetsRetransmitted = packets; } *****/ - _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps); + _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes); _context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation); synchronized (this) { congestionOccurred(); @@ -1250,7 +1250,7 @@ class PeerState { synchronized(this) { congestionOccurred(); } - _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps); + _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes); _currentSecondECNReceived = true; _lastReceiveTime = _context.clock().now(); } @@ -1323,7 +1323,12 @@ class PeerState { _transport.failed(state, false); return; } - state.setPeer(this); + if (state.getPeer() != this) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Not for me!", new Exception("I did it")); + _transport.failed(state, false); + return; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding to " + _remotePeer + ": " + state.getMessageId()); int rv = 0; @@ -1452,14 +1457,14 @@ class PeerState { iter.remove(); if (_retransmitter == state) _retransmitter = null; - _context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime()); + _context.statManager().addRateData("udp.sendFailed", state.getPushCount()); if (failed == null) failed = new ArrayList(4); failed.add(state); } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { iter.remove(); if (state == _retransmitter) _retransmitter = null; - _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); + _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount()); if (failed == null) failed = new ArrayList(4); failed.add(state); } // end (pushCount > maxVolleys) @@ -1624,7 +1629,7 @@ class PeerState { /** * how much payload data can we shove in there? - * @return MTU - 87, i.e. 521 or 1401 + * @return MTU - 87, i.e. 533 or 1397 */ private static final int fragmentSize(int mtu) { // 46 + 20 + 8 + 13 = 74 + 13 = 87 @@ -1659,7 +1664,7 @@ class PeerState { if ( (retrans != null) && (retrans != state) ) { // choke it, since there's already another message retransmitting to this // peer. - _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted, _packetsTransmitted); + _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted); int max = state.getMaxSends(); if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) { //if (state.getMessage() != null) @@ -1695,7 +1700,7 @@ class PeerState { // _throttle.unchoke(peer.getRemotePeer()); return ShouldSend.YES; } else { - _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); + _context.statManager().addRateData("udp.sendRejected", state.getPushCount()); //if (state.getMessage() != null) // state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); if (_log.shouldLog(Log.INFO)) @@ -1760,15 +1765,14 @@ class PeerState { if (_log.shouldLog(Log.INFO)) _log.info("Received ack of " + messageId + " by " + _remotePeer + " after " + state.getLifetime() + " and " + numSends + " sends"); - _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); + _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime()); if (state.getFragmentCount() > 1) - _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); - if (numSends > 1) - _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); + _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount()); + _context.statManager().addRateData("udp.sendConfirmVolley", numSends); _transport.succeeded(state); int numFragments = state.getFragmentCount(); // this adjusts the rtt/rto/window/etc - messageACKed(numFragments*state.getFragmentSize(), state.getLifetime(), numSends); + messageACKed(state.getMessageSize(), state.getLifetime(), numSends); //if (getSendWindowBytesRemaining() > 0) // _throttle.unchoke(peer.getRemotePeer()); @@ -1828,7 +1832,7 @@ class PeerState { if (bitfield.received(i)) numACKed++; - _context.statManager().addRateData("udp.partialACKReceived", numACKed, state.getLifetime()); + _context.statManager().addRateData("udp.partialACKReceived", numACKed); if (_log.shouldLog(Log.INFO)) _log.info("Received partial ack of " + state.getMessageId() + " by " + _remotePeer @@ -1836,17 +1840,16 @@ class PeerState { + isComplete + ": " + state); if (isComplete) { - _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); + _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime()); if (state.getFragmentCount() > 1) - _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); - if (numSends > 1) - _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); + _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount()); + _context.statManager().addRateData("udp.sendConfirmVolley", numSends); //if (state.getMessage() != null) // state.getMessage().timestamp("partial ack to complete after " + numSends); _transport.succeeded(state); // this adjusts the rtt/rto/window/etc - messageACKed(state.getFragmentCount()*state.getFragmentSize(), state.getLifetime(), 0); + messageACKed(state.getMessageSize(), state.getLifetime(), numSends); //if (state.getPeer().getSendWindowBytesRemaining() > 0) // _throttle.unchoke(state.getPeer().getRemotePeer()); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 72637fef86..b7b06bb8d0 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -52,9 +52,9 @@ class UDPReceiver { _socket = socket; _transport = transport; _runner = new Runner(); - _context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.receiveRemaining", "How many packets are left sitting on the receiver's queue", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.receiveRemaining", "How many packets are left sitting on the receiver's queue", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receiveHolePunch", "How often we receive a NAT hole punch", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.ignorePacketFromDroplist", "Packet lifetime for those dropped on the drop list", "udp", UDPTransport.RATES); } @@ -165,7 +165,7 @@ class UDPReceiver { if (_transport.isInDropList(from)) { if (_log.shouldLog(Log.INFO)) _log.info("Ignoring packet from the drop-listed peer: " + from); - _context.statManager().addRateData("udp.ignorePacketFromDroplist", packet.getLifetime(), 0); + _context.statManager().addRateData("udp.ignorePacketFromDroplist", packet.getLifetime()); packet.release(); return 0; } @@ -296,10 +296,10 @@ class UDPReceiver { while (req.getPendingRequested() > 0) req.waitForNextAllocation(); - int queued = receive(packet); - _context.statManager().addRateData("udp.receivePacketSize", size, queued); + receive(packet); + //_context.statManager().addRateData("udp.receivePacketSize", size); } else { - _context.statManager().addRateData("udp.receiveHolePunch", 1, 0); + _context.statManager().addRateData("udp.receiveHolePunch", 1); // nat hole punch packets are 0 bytes if (_log.shouldLog(Log.INFO)) _log.info("Received a 0 byte udp packet from " + packet.getPacket().getAddress() + ":" + packet.getPacket().getPort()); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index bad73980f0..366095d931 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -113,11 +113,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public static final int DEFAULT_INTERNAL_PORT = 8887; /** - * To prevent trouble. To be raised to 1024 in 0.9.4. + * To prevent trouble. 1024 as of 0.9.4. * * @since 0.9.3 */ - static final int MIN_PEER_PORT = 500; + static final int MIN_PEER_PORT = 1024; /** Limits on port told to us by others, * We should have an exception if it matches the existing low port. @@ -1419,12 +1419,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority * "injected" message from the EstablishmentManager */ void send(I2NPMessage msg, PeerState peer) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Injecting a data message to a new peer: " + peer); - OutboundMessageState state = new OutboundMessageState(_context); - boolean ok = state.initialize(msg, peer); - if (ok) + try { + OutboundMessageState state = new OutboundMessageState(_context, msg, peer); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Injecting a data message to a new peer: " + peer); _fragments.add(state); + } catch (IllegalArgumentException iae) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Shouldnt happen", new Exception("I did it")); + } } // we don't need the following, since we have our own queueing -- GitLab