From 5ba5d537b5a69e772ca17d29f33756cba88795b7 Mon Sep 17 00:00:00 2001 From: zzz Date: Sun, 5 Aug 2012 14:24:14 +0000 Subject: [PATCH] * UDP: - Limit PacketHandler threads to 1 (ticket #660) - Limit queue sizes between UDPReceiver and PacketHandler, and between PacketHandler and MessageReceiver, to prevent OOMs and/or excessive queue delays - Increase UDPPacket cache size based on max mem - Remove more stats --- history.txt | 11 ++++++++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../udp/InboundMessageFragments.java | 15 +++++----- .../router/transport/udp/MessageReceiver.java | 28 ++++++++++++++----- .../router/transport/udp/PacketHandler.java | 4 +-- .../i2p/router/transport/udp/UDPPacket.java | 14 +++++++--- .../i2p/router/transport/udp/UDPReceiver.java | 22 +++++++++++++-- 7 files changed, 72 insertions(+), 24 deletions(-) diff --git a/history.txt b/history.txt index b90c8a453..10c14cb79 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,14 @@ +2012-08-05 zzz + * I2PSessionImpl: One more volatile (ticket #659) + * i2ptunnel, I2CP, EepGet: Buffer socket input streams (ticket #666) + * UDP: + - Limit PacketHandler threads to 1 (ticket #660) + - Limit queue sizes between UDPReceiver and PacketHandler, + and between PacketHandler and MessageReceiver, to prevent OOMs + and/or excessive queue delays + - Increase UDPPacket cache size based on max mem + - Remove more stats + 2012-08-03 zzz * build.xml: Add buildI2PTunnelJar target for Android * i2psnark: Finish migration to I2P logging to reduce object churn (ticket #673) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 282c18b42..0725033fa 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 2; + public final static long BUILD = 3; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index fdf6cabc7..e527d88c0 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -43,8 +43,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ _context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivedACKs", "How many messages were ACKed at a time", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.ignoreRecentDuplicate", "Take note that we received a packet for a recently completed message", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePiggyback", "How many acks were included in a packet with data fragments (time == # data fragments)", "udp", UDPTransport.RATES); } @@ -71,15 +71,16 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ * Pull the fragments and ACKs out of the authenticated data packet */ public void receiveData(PeerState from, UDPPacketReader.DataReader data) { - long beforeMsgs = _context.clock().now(); + //long beforeMsgs = _context.clock().now(); int fragmentsIncluded = receiveMessages(from, data); - long afterMsgs = _context.clock().now(); + //long afterMsgs = _context.clock().now(); int acksIncluded = receiveACKs(from, data); - long afterACKs = _context.clock().now(); + //long afterACKs = _context.clock().now(); from.packetReceived(data.getPacketSize()); - _context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs); - _context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs); + // each of these was less than 0.1 ms + //_context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs); + //_context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs); if ( (fragmentsIncluded > 0) && (acksIncluded > 0) ) _context.statManager().addRateData("udp.receivePiggyback", acksIncluded, fragmentsIncluded); } diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index 15d8b7310..28fb5b968 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -28,8 +28,11 @@ class MessageReceiver { private final BlockingQueue _completeMessages; private boolean _alive; //private ByteCache _cache; + private static final int MIN_THREADS = 2; // unless < 32MB private static final int MAX_THREADS = 5; + private static final int MIN_QUEUE_SIZE = 32; // unless < 32MB + private static final int MAX_QUEUE_SIZE = 128; private final int _threadCount; private static final long POISON_IMS = -99999999999l; @@ -37,17 +40,22 @@ class MessageReceiver { _context = ctx; _log = ctx.logManager().getLog(MessageReceiver.class); _transport = transport; - _completeMessages = new LinkedBlockingQueue(); long maxMemory = Runtime.getRuntime().maxMemory(); if (maxMemory == Long.MAX_VALUE) maxMemory = 96*1024*1024l; - if (maxMemory < 32*1024*1024) + int qsize; + if (maxMemory < 32*1024*1024) { _threadCount = 1; - else if (maxMemory < 64*1024*1024) + qsize = 16; + } else if (maxMemory < 64*1024*1024) { _threadCount = 2; - else + qsize = 32; + } else { _threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20)); + qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024))); + } + _completeMessages = new LinkedBlockingQueue(qsize); // the runners run forever, no need to have a cache //_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE); @@ -56,7 +64,7 @@ class MessageReceiver { _context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.inboundLag", "How long the olded ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.inboundLag", "How long the oldest ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES); _alive = true; } @@ -93,12 +101,18 @@ class MessageReceiver { /** * This queues the message for processing. * Processing will call state.releaseResources(), do not access state after calling this. + * BLOCKING if queue is full. */ public void receiveMessage(InboundMessageState state) { //int total = 0; //long lag = -1; - if (_alive) - _completeMessages.offer(state); + if (_alive) { + try { + _completeMessages.put(state); + } catch (InterruptedException ie) { + _alive = false; + } + } //total = _completeMessages.size(); //if (total > 1) // lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime(); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index d2039559b..448f25313 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -31,8 +31,8 @@ class PacketHandler { private volatile boolean _keepReading; private final Handler[] _handlers; - private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB - private static final int MAX_NUM_HANDLERS = 5; + private static final int MIN_NUM_HANDLERS = 1; // unless < 32MB + private static final int MAX_NUM_HANDLERS = 1; /** let packets be up to 30s slow */ private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index d20af4338..0effadf34 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -42,12 +42,18 @@ class UDPPacket { // Warning - this mixes contexts in a multi-router JVM private static final Queue _packetCache; private static final boolean CACHE = true; - private static final int CACHE_SIZE = 64; + private static final int MIN_CACHE_SIZE = 64; + private static final int MAX_CACHE_SIZE = 256; static { - if (CACHE) - _packetCache = new LinkedBlockingQueue(CACHE_SIZE); - else + if (CACHE) { + long maxMemory = Runtime.getRuntime().maxMemory(); + if (maxMemory == Long.MAX_VALUE) + maxMemory = 96*1024*1024l; + int csize = (int) Math.max(MIN_CACHE_SIZE, Math.min(MAX_CACHE_SIZE, maxMemory / (1024*1024))); + _packetCache = new LinkedBlockingQueue(csize); + } else { _packetCache = null; + } } /** diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 46db0aca3..44363f2b5 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -30,14 +30,21 @@ class UDPReceiver { private final UDPTransport _transport; private static int __id; private final int _id; + private static final int TYPE_POISON = -99999; + private static final int MIN_QUEUE_SIZE = 16; + private static final int MAX_QUEUE_SIZE = 192; public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) { _context = ctx; _log = ctx.logManager().getLog(UDPReceiver.class); _id = ++__id; _name = name; - _inboundQueue = new LinkedBlockingQueue(); + long maxMemory = Runtime.getRuntime().maxMemory(); + if (maxMemory == Long.MAX_VALUE) + maxMemory = 96*1024*1024l; + int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024))); + _inboundQueue = new LinkedBlockingQueue(qsize); _socket = socket; _transport = transport; _runner = new Runner(); @@ -138,7 +145,11 @@ class UDPReceiver { return doReceive(packet); } - /** @return zero (was queue size) */ + /** + * BLOCKING if queue between here and PacketHandler is full. + * + * @return zero (was queue size) + */ private final int doReceive(UDPPacket packet) { if (!_keepRunning) return 0; @@ -168,7 +179,12 @@ class UDPReceiver { } } if (!rejected) { - _inboundQueue.offer(packet); + try { + _inboundQueue.put(packet); + } catch (InterruptedException ie) { + packet.release(); + _keepRunning = false; + } //return queueSize + 1; return 0; }