diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java index 7fe822b103d07a6c92479d84562930e7066bb03a..ca4a804a206afd32a64ed52eb442c08dc0f57cbc 100644 --- a/router/java/src/net/i2p/router/RouterThrottleImpl.java +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -121,7 +121,7 @@ class RouterThrottleImpl implements RouterThrottle { if (rs != null) r = rs.getRate(60*1000); double processTime = (r != null ? r.getAverageValue() : 0); - if (processTime > 2000) { + if (processTime > 5000) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Refusing tunnel request with the job lag of " + lag + "since the 1 minute message processing time is too slow (" + processTime + ")"); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 376e901a66e7a0b676be48ad3322399ec24fd130..9f59881e0fa2312a3a6174e64dc2aa142057d2bc 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.346 $ $Date: 2006/02/17 04:07:53 $"; + public final static String ID = "$Revision: 1.347 $ $Date: 2006/02/17 17:29:32 $"; public final static String VERSION = "0.6.1.10"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 05018b68b62867608203e8a11be9d3e63889db82..0d6d2147415d40bcc1ca29fb0d2e7ad205a8599c 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -78,6 +78,8 @@ public class OutboundMessageFragments { _context.statManager().createRateStat("udp.packetsRetransmitted", "Lifetime of packets during their retransmission (period == packets transmitted, lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.peerPacketsRetransmitted", "How many packets have been retransmitted to the peer (lifetime) when a burst of packets are retransmitted (period == packets transmitted, lifetime)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.blockedRetransmissions", "How packets have been transmitted to the peer when we blocked a retransmission to them?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.sendCycleTime", "How long it takes to cycle through all of the active messages?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.sendCycleTimeSlow", "How long it takes to cycle through all of the active messages, when its going slowly?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); } public void startup() { _alive = true; } @@ -145,6 +147,8 @@ public class OutboundMessageFragments { if (ok) _activeMessages.add(state); active = _activeMessages.size(); + if (active == 1) + _lastCycleTime = System.currentTimeMillis(); _activeMessages.notifyAll(); } msg.timestamp("made active along with: " + active); @@ -158,6 +162,8 @@ public class OutboundMessageFragments { public void add(OutboundMessageState state) { synchronized (_activeMessages) { _activeMessages.add(state); + if (_activeMessages.size() == 1) + _lastCycleTime = System.currentTimeMillis(); _activeMessages.notifyAll(); } } @@ -264,6 +270,8 @@ public class OutboundMessageFragments { private static final long SECOND_MASK = 1023l; + private long _lastCycleTime = System.currentTimeMillis(); + /** * Fetch all the packets for a message volley, blocking until there is a * message which can be fully transmitted (or the transport is shut down). @@ -282,6 +290,14 @@ public class OutboundMessageFragments { synchronized (_activeMessages) { for (int i = 0; i < _activeMessages.size(); i++) { int cur = (i + _nextPacketMessage) % _activeMessages.size(); + if (cur == 0) { + long ts = System.currentTimeMillis(); + long cycleTime = ts - _lastCycleTime; + _lastCycleTime = ts; + _context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activeMessages.size()); + if (cycleTime > 1000) + _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activeMessages.size()); + } state = (OutboundMessageState)_activeMessages.get(cur); peer = state.getPeer(); // known if this is immediately after establish if (peer == null) diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index 3f9074ce716a98b4de514654974ad377baceb84b..18c08617b7f7b4469434472e5d6adee5fb830794 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -39,7 +39,7 @@ public class PacketPusher implements Runnable { if (packets != null) { for (int i = 0; i < packets.length; i++) { if (packets[i] != null) // null for ACKed fragments - _sender.add(packets[i], 100); // blocks for up to 100ms + _sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms } } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 2c8a008399212370adb0c86a95955317c70dc87e..e16fbe5df58e447f1db86137375de93f034ca282 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -171,6 +171,15 @@ public class PeerState { /** have we migrated away from this peer to another newer one? */ private volatile boolean _dead; + + /** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */ + private volatile int _concurrentMessagesAllowed = 8; + /** + * how many outbound messages are currently being transmitted. Not thread safe, as we're not strict + */ + private volatile int _concurrentMessagesActive = 0; + /** how many concurrency rejections have we had in a row */ + private volatile int _consecutiveRejections = 0; private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024; private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; @@ -253,6 +262,9 @@ public class PeerState { _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); } private int getDefaultMTU() { @@ -414,6 +426,10 @@ public class PeerState { public int getSendBps() { return _sendBps; } public int getReceiveBps() { return _receiveBps; } public int incrementConsecutiveFailedSends() { + _concurrentMessagesActive--; + if (_concurrentMessagesActive < 0) + _concurrentMessagesActive = 0; + long now = _context.clock().now()/(10*1000); if (_lastFailedSendPeriod >= now) { // ignore... too fast @@ -469,6 +485,17 @@ public class PeerState { } //if (true) return true; if (IGNORE_CWIN || size <= _sendWindowBytesRemaining || (ALWAYS_ALLOW_FIRST_PUSH && messagePushCount == 0)) { + if ( (messagePushCount == 0) && (_concurrentMessagesActive > _concurrentMessagesAllowed) ) { + _consecutiveRejections++; + _context.statManager().addRateData("udp.rejectConcurrentActive", _concurrentMessagesActive, _consecutiveRejections); + return false; + } else if (messagePushCount == 0) { + _context.statManager().addRateData("udp.allowConcurrentActive", _concurrentMessagesActive, _concurrentMessagesAllowed); + _concurrentMessagesActive++; + if (_consecutiveRejections > 0) + _context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _concurrentMessagesActive); + _consecutiveRejections = 0; + } _sendWindowBytesRemaining -= size; _sendBytes += size; _lastSendTime = now; @@ -479,6 +506,7 @@ public class PeerState { return false; } } + /** what IP+port is the peer sending and receiving packets on? */ public void setRemoteAddress(byte ip[], int port) { _remoteIP = ip; @@ -505,6 +533,9 @@ public class PeerState { _mtuLastChecked = _context.clock().now(); } public int getSlowStartThreshold() { return _slowStartThreshold; } + public int getConcurrentSends() { return _concurrentMessagesActive; } + public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; } + public int getConsecutiveSendRejections() { return _consecutiveRejections; } /** we received the message specified completely */ public void messageFullyReceived(Long messageId, int bytes) { messageFullyReceived(messageId, bytes, false); } @@ -745,9 +776,16 @@ public class PeerState { /** we sent a message which was ACKed containing the given # of bytes */ public void messageACKed(int bytesACKed, long lifetime, int numSends) { + _concurrentMessagesActive--; + if (_concurrentMessagesActive < 0) + _concurrentMessagesActive = 0; + _consecutiveFailedSends = 0; _lastFailedSendPeriod = -1; if (numSends < 2) { + if (_context.random().nextInt(_concurrentMessagesAllowed) <= 0) + _concurrentMessagesAllowed++; + if (_sendWindowBytes <= _slowStartThreshold) { _sendWindowBytes += bytesACKed; } else { @@ -761,6 +799,11 @@ public class PeerState { _sendWindowBytes += bytesACKed; //512; // bytesACKed; } } + } else { + int allow = _concurrentMessagesAllowed - 1; + if (allow < 8) + allow = 8; + _concurrentMessagesAllowed = allow; } if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES) _sendWindowBytes = MAX_SEND_WINDOW_BYTES; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 63860dcc9b06ba5904a416047ebd505f5f8f4f05..5fd02cdfcc295fbcadc5ff155793cbf1bf78603b 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -36,6 +36,8 @@ public class UDPSender { _name = name; _context.statManager().createRateStat("udp.pushTime", "How long a UDP packet takes to get pushed out", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.sendQueueSize", "How many packets are queued on the UDP sender", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.sendQueueFailed", "How often it was unable to add a new packet to the queue", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.sendQueueTrimmed", "How many packets were removed from the queue for being too old (duration == remaining)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.sendPacketSize", "How large packets sent are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.socketSendTime", "How long the actual socket.send took", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.sendBWThrottleTime", "How long the send is blocked by the bandwidth throttle", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -89,12 +91,29 @@ public class UDPSender { long expiration = _context.clock().now() + blockTime; int remaining = -1; long lifetime = -1; + boolean added = false; + int removed = 0; while ( (_keepRunning) && (remaining < 0) ) { try { synchronized (_outboundQueue) { - if (_outboundQueue.size() < MAX_QUEUED) { + // clear out any too-old packets + UDPPacket head = null; + if (_outboundQueue.size() > 0) { + head = (UDPPacket)_outboundQueue.get(0); + while (head.getLifetime() > MAX_HEAD_LIFETIME) { + _outboundQueue.remove(0); + removed++; + if (_outboundQueue.size() > 0) + head = (UDPPacket)_outboundQueue.get(0); + else + break; + } + } + + if (true || (_outboundQueue.size() < MAX_QUEUED)) { lifetime = packet.getLifetime(); _outboundQueue.add(packet); + added = true; remaining = _outboundQueue.size(); _outboundQueue.notifyAll(); } else { @@ -105,16 +124,23 @@ public class UDPSender { remaining = _outboundQueue.size(); _outboundQueue.notifyAll(); } + lifetime = packet.getLifetime(); } } } catch (InterruptedException ie) {} } _context.statManager().addRateData("udp.sendQueueSize", remaining, lifetime); + if (!added) + _context.statManager().addRateData("udp.sendQueueFailed", remaining, lifetime); + if (removed > 0) + _context.statManager().addRateData("udp.sendQueueTrimmed", removed, remaining); if (_log.shouldLog(Log.DEBUG)) _log.debug("Added the packet onto the queue with " + remaining + " remaining and a lifetime of " + lifetime); return remaining; } + private static final int MAX_HEAD_LIFETIME = 1000; + /** * * @return number of packets in the queue @@ -123,13 +149,28 @@ public class UDPSender { if (packet == null) return 0; int size = 0; long lifetime = -1; + int removed = 0; synchronized (_outboundQueue) { lifetime = packet.getLifetime(); + UDPPacket head = null; + if (_outboundQueue.size() > 0) { + head = (UDPPacket)_outboundQueue.get(0); + while (head.getLifetime() > MAX_HEAD_LIFETIME) { + _outboundQueue.remove(0); + removed++; + if (_outboundQueue.size() > 0) + head = (UDPPacket)_outboundQueue.get(0); + else + break; + } + } _outboundQueue.add(packet); size = _outboundQueue.size(); _outboundQueue.notifyAll(); } _context.statManager().addRateData("udp.sendQueueSize", size, lifetime); + if (removed > 0) + _context.statManager().addRateData("udp.sendQueueTrimmed", removed, size); if (_log.shouldLog(Log.DEBUG)) _log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + lifetime); return size; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 0236121b113921b0dda130383553dbcf10042d24..b5acee669a0220145c21e267d37f3cd3ea963319 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -955,6 +955,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority + " to " + msg.getPeer()); if ( (consecutive > MAX_CONSECUTIVE_FAILED) && (msg.getPeer().getInactivityTime() > DROP_INACTIVITY_TIME)) dropPeer(msg.getPeer(), false); + else if (consecutive > 2 * MAX_CONSECUTIVE_FAILED) // they're sending us data, but we cant reply? + dropPeer(msg.getPeer(), false); } noteSend(msg, false); super.afterSend(msg.getMessage(), false); @@ -1198,7 +1200,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append("<td valign=\"top\" ><code>"); buf.append(sendWindow/1024); - buf.append("K</code></td>"); + buf.append("K"); + buf.append("/").append(peer.getConcurrentSends()); + buf.append("/").append(peer.getConcurrentSendWindow()); + buf.append("/").append(peer.getConsecutiveSendRejections()); + buf.append("</code></td>"); buf.append("<td valign=\"top\" ><code>"); buf.append(peer.getSlowStartThreshold()/1024); @@ -1329,7 +1335,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority "<b id=\"def.rate\">in/out</b>: the rates show a smoothed inbound and outbound transfer rate (KBytes per second)<br />\n" + "<b id=\"def.up\">up</b>: the uptime is how long ago this session was established<br />\n" + "<b id=\"def.skew\">skew</b>: the skew says how far off the other user's clock is, relative to your own<br />\n" + - "<b id=\"def.cwnd\">cwnd</b>: the congestion window is how many bytes in 'in flight' you can send without an acknowledgement<br />\n" + + "<b id=\"def.cwnd\">cwnd</b>: the congestion window is how many bytes in 'in flight' you can send without an acknowledgement / <br />\n" + + " the number of currently active messages being sent /<br />\n the maximum number of concurrent messages to send /<br />\n"+ + " the number of consecutive sends which were blocked due to throws message window size<br />\n" + "<b id=\"def.ssthresh\">ssthresh</b>: the slow start threshold help make sure the cwnd doesn't grow too fast<br />\n" + "<b id=\"def.rtt\">rtt</b>: the round trip time is how long it takes to get an acknowledgement of a packet<br />\n" + "<b id=\"def.dev\">dev</b>: the standard deviation of the round trip time, to help control the retransmit timeout<br />\n" + diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index a10405bc49e24cae2c93c38f3a44802563e9299e..282aa2285c140bdb383d72f7d56460ff2117c63b 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -73,8 +73,8 @@ class BuildHandler { if (toHandle > MAX_HANDLE_AT_ONCE) toHandle = MAX_HANDLE_AT_ONCE; handled = new ArrayList(toHandle); - for (int i = 0; i < toHandle; i++) - handled.add(_inboundBuildMessages.remove(0)); + for (int i = 0; i < toHandle; i++) // LIFO for lower response time (should we RED it for DoS?) + handled.add(_inboundBuildMessages.remove(_inboundBuildMessages.size()-1)); } } if (handled != null) {