diff --git a/core/java/src/net/i2p/data/DataHelper.java b/core/java/src/net/i2p/data/DataHelper.java index c9cca505c..dfbb5c718 100644 --- a/core/java/src/net/i2p/data/DataHelper.java +++ b/core/java/src/net/i2p/data/DataHelper.java @@ -1078,6 +1078,7 @@ public class DataHelper { ReusableGZIPInputStream in = ReusableGZIPInputStream.acquire(); in.initialize(new ByteArrayInputStream(orig, offset, length)); + // don't make this a static field, or else I2PAppContext gets initialized too early ByteCache cache = ByteCache.getInstance(8, MAX_UNCOMPRESSED); ByteArray outBuf = cache.acquire(); int written = 0; diff --git a/core/java/src/net/i2p/util/ByteCache.java b/core/java/src/net/i2p/util/ByteCache.java index 992d4e5e6..38df7a039 100644 --- a/core/java/src/net/i2p/util/ByteCache.java +++ b/core/java/src/net/i2p/util/ByteCache.java @@ -13,27 +13,98 @@ import net.i2p.data.ByteArray; * Cache the objects frequently used to reduce memory churn. The ByteArray * should be held onto as long as the data referenced in it is needed. * + * Heap size control - survey of usage (April 2010) : + * + * + Size Max MaxMem From + + 16 16 256 CryptixAESEngine + 16 32 512 BloomFilterIVValidator + 16 64 1K UDP PacketBuilder + 16 128 2K tunnel HopProcessor + 16 128 2K tunnel TrivialPreprocessor + 16 128 2K tunnel InboundEndpointProcessor + 16 128 2K tunnel OutboundGatewayProcessor + + 32 64 2K UDP PacketBuilder + 32 128 4K tunnel TrivialPreprocessor + + 1K 32 32K tunnel TrivialPreprocessor + 1K 512 512K tunnel FragmentHandler + 1K 512 512K I2NP TunnelDataMessage + 1K 512 512K tunnel FragmentedMessage + + 1730 128 216K streaming MessageOutputStream + + 2K 64 128K UDP IMS + + 4K 32 128K I2PTunnelRunner + + 8K 8 64K I2PTunnel HTTPResponseOutputStream + + 32K 4 128K SAM StreamSession + 32K 10 320K SAM v2StreamSession + 32K 64 2M UDP OMS + 32K 128 4M streaming MessageInputStream + + 36K 64 2.25M streaming PacketQueue + + 40K 8 320K DataHelper decompress + + 64K 64 4M UDP MessageReceiver - disabled in 0.7.14 + * + * */ public final class ByteCache { - private final static Map _caches = new HashMap(16); + + private static final Map _caches = new HashMap(16); + + /** + * max size in bytes of each cache + * Set to max memory / 128, with a min of 128KB and a max of 4MB + * + * @since 0.7.14 + */ + private static final int MAX_CACHE; + static { + long maxMemory = Runtime.getRuntime().maxMemory(); + MAX_CACHE = (int) Math.min(4*1024*1024l, Math.max(128*1024l, maxMemory / 128)); + } + /** * Get a cache responsible for objects of the given size * * @param cacheSize how large we want the cache to grow before using on * demand allocation + * Since 0.7.14, a limit of 1MB / size is enforced + * for the typical 128MB max memory JVM * @param size how large should the objects cached be? */ public static ByteCache getInstance(int cacheSize, int size) { + if (cacheSize * size > MAX_CACHE) + cacheSize = MAX_CACHE / size; Integer sz = Integer.valueOf(size); ByteCache cache = null; synchronized (_caches) { if (!_caches.containsKey(sz)) _caches.put(sz, new ByteCache(cacheSize, size)); - cache = (ByteCache)_caches.get(sz); + cache = _caches.get(sz); } cache.resize(cacheSize); + //I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class).error("ByteCache size: " + size + " max: " + cacheSize, new Exception("from")); return cache; } + + /** + * Clear everything (memory pressure) + * @since 0.7.14 + */ + public static void clearAll() { + for (ByteCache bc : _caches.values()) + bc.clear(); + I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class).error("WARNING: Low memory, clearing byte caches"); + } + private Log _log; /** list of available and available entries */ private Queue _available; @@ -57,13 +128,14 @@ public final class ByteCache { _lastOverflow = -1; SimpleScheduler.getInstance().addPeriodicEvent(new Cleanup(), CLEANUP_FREQUENCY); _log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class); + I2PAppContext.getGlobalContext().statManager().createRateStat("byteCache.memory." + entrySize, "Memory usage (B)", "Router", new long[] { 60*1000 }); } private void resize(int maxCachedEntries) { if (_maxCached >= maxCachedEntries) return; _maxCached = maxCachedEntries; // make a bigger one, move the cached items over - Queue newLBQ = new LinkedBlockingQueue(maxCachedEntries); + Queue newLBQ = new LinkedBlockingQueue(maxCachedEntries); ByteArray ba; while ((ba = _available.poll()) != null) newLBQ.offer(ba); @@ -109,8 +181,17 @@ public final class ByteCache { } } + /** + * Clear everything (memory pressure) + * @since 0.7.14 + */ + private void clear() { + _available.clear(); + } + private class Cleanup implements SimpleTimer.TimedEvent { public void timeReached() { + I2PAppContext.getGlobalContext().statManager().addRateData("byteCache.memory." + _entrySize, _entrySize * _available.size(), 0); if (System.currentTimeMillis() - _lastOverflow > EXPIRE_PERIOD) { // we haven't exceeded the cache size in a few minutes, so lets // shrink the cache diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index b730d6d4a..ee54f6974 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -41,6 +41,7 @@ import net.i2p.router.transport.udp.UDPTransport; import net.i2p.stat.Rate; import net.i2p.stat.RateStat; import net.i2p.stat.StatManager; +import net.i2p.util.ByteCache; import net.i2p.util.FileUtil; import net.i2p.util.I2PAppThread; import net.i2p.util.I2PThread; @@ -224,6 +225,7 @@ public class Router { _killVMOnEnd = true; _oomListener = new I2PThread.OOMEventListener() { public void outOfMemory(OutOfMemoryError oom) { + ByteCache.clearAll(); _log.log(Log.CRIT, "Thread ran out of memory", oom); for (int i = 0; i < 5; i++) { // try this 5 times, in case it OOMs try { @@ -252,6 +254,8 @@ public class Router { * */ public void setKillVMOnEnd(boolean shouldDie) { _killVMOnEnd = shouldDie; } + + /** @deprecated unused */ public boolean getKillVMOnEnd() { return _killVMOnEnd; } public String getConfigFilename() { return _configFilename; } @@ -923,7 +927,7 @@ public class Router { private static final boolean ALLOW_DYNAMIC_KEYS = false; private void finalShutdown(int exitCode) { - _log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete", new Exception("Shutdown")); + _log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete" /* , new Exception("Shutdown") */ ); try { _context.logManager().shutdown(); } catch (Throwable t) { } if (ALLOW_DYNAMIC_KEYS) { if (Boolean.valueOf(_context.getProperty(PROP_DYNAMIC_KEYS)).booleanValue()) @@ -1357,12 +1361,16 @@ public class Router { /* following classes are now private static inner classes, didn't bother to reindent */ +private static final long LOW_MEMORY_THRESHOLD = 5 * 1024 * 1024; + /** * coalesce the stats framework every minute * */ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent { private RouterContext _ctx; + private long _maxMemory; + public CoalesceStatsEvent(RouterContext ctx) { _ctx = ctx; ctx.statManager().createRateStat("bw.receiveBps", "How fast we receive data (in KBps)", "Bandwidth", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); @@ -1373,8 +1381,8 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent { ctx.statManager().createRateStat("router.activeSendPeers", "How many peers we've sent to this minute", "Throttle", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); ctx.statManager().createRateStat("router.highCapacityPeers", "How many high capacity peers we know", "Throttle", new long[] { 5*60*1000, 60*60*1000 }); ctx.statManager().createRateStat("router.fastPeers", "How many fast peers we know", "Throttle", new long[] { 5*60*1000, 60*60*1000 }); - long max = Runtime.getRuntime().maxMemory() / (1024*1024); - ctx.statManager().createRateStat("router.memoryUsed", "(Bytes) Max is " + max + "MB", "Router", new long[] { 60*1000 }); + _maxMemory = Runtime.getRuntime().maxMemory(); + ctx.statManager().createRateStat("router.memoryUsed", "(Bytes) Max is " + (_maxMemory / (1024*1024)) + "MB", "Router", new long[] { 60*1000 }); } private RouterContext getContext() { return _ctx; } public void timeReached() { @@ -1395,6 +1403,8 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent { long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); getContext().statManager().addRateData("router.memoryUsed", used, 0); + if (_maxMemory - used < LOW_MEMORY_THRESHOLD) + ByteCache.clearAll(); getContext().tunnelDispatcher().updateParticipatingStats(COALESCE_TIME); diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index 39b4163e7..1a95c227f 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -10,7 +10,7 @@ import net.i2p.data.i2np.I2NPMessageException; import net.i2p.data.i2np.I2NPMessageHandler; import net.i2p.data.i2np.I2NPMessageImpl; import net.i2p.router.RouterContext; -import net.i2p.util.ByteCache; +//import net.i2p.util.ByteCache; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -26,7 +26,7 @@ public class MessageReceiver { /** list of messages (InboundMessageState) fully received but not interpreted yet */ private final BlockingQueue _completeMessages; private boolean _alive; - private ByteCache _cache; + //private ByteCache _cache; private static final int THREADS = 5; private static final long POISON_IMS = -99999999999l; @@ -35,7 +35,8 @@ public class MessageReceiver { _log = ctx.logManager().getLog(MessageReceiver.class); _transport = transport; _completeMessages = new LinkedBlockingQueue(); - _cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE); + // the runners run forever, no need to have a cache + //_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE); _context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES); @@ -91,7 +92,8 @@ public class MessageReceiver { public void loop(I2NPMessageHandler handler) { InboundMessageState message = null; - ByteArray buf = _cache.acquire(); + //ByteArray buf = _cache.acquire(); + ByteArray buf = new ByteArray(new byte[I2NPMessage.MAX_SIZE]); while (_alive) { int expired = 0; long expiredLifetime = 0; @@ -142,7 +144,7 @@ public class MessageReceiver { } // no need to zero it out, as these buffers are only used with an explicit getCompleteSize - _cache.release(buf, false); + //_cache.release(buf, false); } private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {