merge of '03068a89c26b0986a8bf2b6f36cb478f565664eb'

and 'c3c31953c884c3aafb142e05c2dbef2809516d9c'
This commit is contained in:
walking
2010-05-03 16:44:06 +00:00
11 changed files with 125 additions and 13 deletions

View File

@@ -131,6 +131,7 @@ public class PluginUpdateChecker extends UpdateHandler {
boolean shouldProxy = Boolean.valueOf(_context.getProperty(ConfigUpdateHandler.PROP_SHOULD_PROXY, ConfigUpdateHandler.DEFAULT_SHOULD_PROXY)).booleanValue();
String proxyHost = _context.getProperty(ConfigUpdateHandler.PROP_PROXY_HOST, ConfigUpdateHandler.DEFAULT_PROXY_HOST);
int proxyPort = _context.getProperty(ConfigUpdateHandler.PROP_PROXY_PORT, ConfigUpdateHandler.DEFAULT_PROXY_PORT_INT);
_baos.reset();
try {
_get = new PartialEepGet(_context, proxyHost, proxyPort, _baos, _xpi2pURL, TrustedUpdate.HEADER_BYTES);
_get.addStatusListener(PluginUpdateCheckerRunner.this);

View File

@@ -1078,6 +1078,7 @@ public class DataHelper {
ReusableGZIPInputStream in = ReusableGZIPInputStream.acquire();
in.initialize(new ByteArrayInputStream(orig, offset, length));
// don't make this a static field, or else I2PAppContext gets initialized too early
ByteCache cache = ByteCache.getInstance(8, MAX_UNCOMPRESSED);
ByteArray outBuf = cache.acquire();
int written = 0;

View File

@@ -13,27 +13,98 @@ import net.i2p.data.ByteArray;
* Cache the objects frequently used to reduce memory churn. The ByteArray
* should be held onto as long as the data referenced in it is needed.
*
* Heap size control - survey of usage (April 2010) :
*
* </pre>
Size Max MaxMem From
16 16 256 CryptixAESEngine
16 32 512 BloomFilterIVValidator
16 64 1K UDP PacketBuilder
16 128 2K tunnel HopProcessor
16 128 2K tunnel TrivialPreprocessor
16 128 2K tunnel InboundEndpointProcessor
16 128 2K tunnel OutboundGatewayProcessor
32 64 2K UDP PacketBuilder
32 128 4K tunnel TrivialPreprocessor
1K 32 32K tunnel TrivialPreprocessor
1K 512 512K tunnel FragmentHandler
1K 512 512K I2NP TunnelDataMessage
1K 512 512K tunnel FragmentedMessage
1730 128 216K streaming MessageOutputStream
2K 64 128K UDP IMS
4K 32 128K I2PTunnelRunner
8K 8 64K I2PTunnel HTTPResponseOutputStream
32K 4 128K SAM StreamSession
32K 10 320K SAM v2StreamSession
32K 64 2M UDP OMS
32K 128 4M streaming MessageInputStream
36K 64 2.25M streaming PacketQueue
40K 8 320K DataHelper decompress
64K 64 4M UDP MessageReceiver - disabled in 0.7.14
* </pre>
*
*/
public final class ByteCache {
private final static Map _caches = new HashMap(16);
private static final Map<Integer, ByteCache> _caches = new HashMap(16);
/**
* max size in bytes of each cache
* Set to max memory / 128, with a min of 128KB and a max of 4MB
*
* @since 0.7.14
*/
private static final int MAX_CACHE;
static {
long maxMemory = Runtime.getRuntime().maxMemory();
MAX_CACHE = (int) Math.min(4*1024*1024l, Math.max(128*1024l, maxMemory / 128));
}
/**
* Get a cache responsible for objects of the given size
*
* @param cacheSize how large we want the cache to grow before using on
* demand allocation
* Since 0.7.14, a limit of 1MB / size is enforced
* for the typical 128MB max memory JVM
* @param size how large should the objects cached be?
*/
public static ByteCache getInstance(int cacheSize, int size) {
if (cacheSize * size > MAX_CACHE)
cacheSize = MAX_CACHE / size;
Integer sz = Integer.valueOf(size);
ByteCache cache = null;
synchronized (_caches) {
if (!_caches.containsKey(sz))
_caches.put(sz, new ByteCache(cacheSize, size));
cache = (ByteCache)_caches.get(sz);
cache = _caches.get(sz);
}
cache.resize(cacheSize);
//I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class).error("ByteCache size: " + size + " max: " + cacheSize, new Exception("from"));
return cache;
}
/**
* Clear everything (memory pressure)
* @since 0.7.14
*/
public static void clearAll() {
for (ByteCache bc : _caches.values())
bc.clear();
I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class).error("WARNING: Low memory, clearing byte caches");
}
private Log _log;
/** list of available and available entries */
private Queue<ByteArray> _available;
@@ -57,13 +128,14 @@ public final class ByteCache {
_lastOverflow = -1;
SimpleScheduler.getInstance().addPeriodicEvent(new Cleanup(), CLEANUP_FREQUENCY);
_log = I2PAppContext.getGlobalContext().logManager().getLog(ByteCache.class);
I2PAppContext.getGlobalContext().statManager().createRateStat("byteCache.memory." + entrySize, "Memory usage (B)", "Router", new long[] { 60*1000 });
}
private void resize(int maxCachedEntries) {
if (_maxCached >= maxCachedEntries) return;
_maxCached = maxCachedEntries;
// make a bigger one, move the cached items over
Queue newLBQ = new LinkedBlockingQueue(maxCachedEntries);
Queue<ByteArray> newLBQ = new LinkedBlockingQueue(maxCachedEntries);
ByteArray ba;
while ((ba = _available.poll()) != null)
newLBQ.offer(ba);
@@ -109,8 +181,17 @@ public final class ByteCache {
}
}
/**
* Clear everything (memory pressure)
* @since 0.7.14
*/
private void clear() {
_available.clear();
}
private class Cleanup implements SimpleTimer.TimedEvent {
public void timeReached() {
I2PAppContext.getGlobalContext().statManager().addRateData("byteCache.memory." + _entrySize, _entrySize * _available.size(), 0);
if (System.currentTimeMillis() - _lastOverflow > EXPIRE_PERIOD) {
// we haven't exceeded the cache size in a few minutes, so lets
// shrink the cache

View File

@@ -211,6 +211,7 @@ public class ResettableGZIPInputStream extends InflaterInputStream {
}
}
/******
public static void main(String args[]) {
for (int i = 129; i < 64*1024; i++) {
if (!test(i)) return;
@@ -279,4 +280,5 @@ public class ResettableGZIPInputStream extends InflaterInputStream {
return false;
}
}
******/
}

View File

@@ -122,6 +122,7 @@ public class ResettableGZIPOutputStream extends DeflaterOutputStream {
super.write(buf, off, len);
}
/******
public static void main(String args[]) {
for (int i = 0; i < 2; i++)
test();
@@ -165,12 +166,13 @@ public class ResettableGZIPOutputStream extends DeflaterOutputStream {
} catch (Exception e) { e.printStackTrace(); }
}
/** just for testing/verification, expose the CRC32 values */
// just for testing/verification, expose the CRC32 values
private static final class SnoopGZIPOutputStream extends GZIPOutputStream {
public SnoopGZIPOutputStream(OutputStream o) throws IOException {
super(o);
}
public CRC32 getCRC() { return crc; }
}
******/
}

View File

@@ -48,6 +48,7 @@ public class ReusableGZIPInputStream extends ResettableGZIPInputStream {
private ReusableGZIPInputStream() { super(); }
/*******
public static void main(String args[]) {
for (int i = 0; i < 2; i++)
test();
@@ -127,5 +128,6 @@ public class ReusableGZIPInputStream extends ResettableGZIPInputStream {
return false;
}
}
******/
}

View File

@@ -65,6 +65,7 @@ public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream {
/** pull the contents of the stream written */
public byte[] getData() { return _buffer.toByteArray(); }
/******
public static void main(String args[]) {
try {
for (int i = 0; i < 2; i++)
@@ -129,5 +130,6 @@ public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream {
return false;
}
}
*****/
}

View File

@@ -1,3 +1,12 @@
2010-05-02 zzz
* ByteCache:
- Add a per-cache stat
- Limit each cache based on max memory
- Disable in UDP MessageReceiver
- Add clearAll() method to be called when under
severe memory pressure; call from Router
* Plugins: Fix version checker bug
2010-04-27 zzz
* i2psnark: Serve downloaded files from the servlet rather
than with a file: link

View File

@@ -41,6 +41,7 @@ import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.stat.StatManager;
import net.i2p.util.ByteCache;
import net.i2p.util.FileUtil;
import net.i2p.util.I2PAppThread;
import net.i2p.util.I2PThread;
@@ -224,6 +225,7 @@ public class Router {
_killVMOnEnd = true;
_oomListener = new I2PThread.OOMEventListener() {
public void outOfMemory(OutOfMemoryError oom) {
ByteCache.clearAll();
_log.log(Log.CRIT, "Thread ran out of memory", oom);
for (int i = 0; i < 5; i++) { // try this 5 times, in case it OOMs
try {
@@ -252,6 +254,8 @@ public class Router {
*
*/
public void setKillVMOnEnd(boolean shouldDie) { _killVMOnEnd = shouldDie; }
/** @deprecated unused */
public boolean getKillVMOnEnd() { return _killVMOnEnd; }
public String getConfigFilename() { return _configFilename; }
@@ -923,7 +927,7 @@ public class Router {
private static final boolean ALLOW_DYNAMIC_KEYS = false;
private void finalShutdown(int exitCode) {
_log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete", new Exception("Shutdown"));
_log.log(Log.CRIT, "Shutdown(" + exitCode + ") complete" /* , new Exception("Shutdown") */ );
try { _context.logManager().shutdown(); } catch (Throwable t) { }
if (ALLOW_DYNAMIC_KEYS) {
if (Boolean.valueOf(_context.getProperty(PROP_DYNAMIC_KEYS)).booleanValue())
@@ -1357,12 +1361,16 @@ public class Router {
/* following classes are now private static inner classes, didn't bother to reindent */
private static final long LOW_MEMORY_THRESHOLD = 5 * 1024 * 1024;
/**
* coalesce the stats framework every minute
*
*/
private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
private RouterContext _ctx;
private long _maxMemory;
public CoalesceStatsEvent(RouterContext ctx) {
_ctx = ctx;
ctx.statManager().createRateStat("bw.receiveBps", "How fast we receive data (in KBps)", "Bandwidth", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
@@ -1373,8 +1381,8 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
ctx.statManager().createRateStat("router.activeSendPeers", "How many peers we've sent to this minute", "Throttle", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("router.highCapacityPeers", "How many high capacity peers we know", "Throttle", new long[] { 5*60*1000, 60*60*1000 });
ctx.statManager().createRateStat("router.fastPeers", "How many fast peers we know", "Throttle", new long[] { 5*60*1000, 60*60*1000 });
long max = Runtime.getRuntime().maxMemory() / (1024*1024);
ctx.statManager().createRateStat("router.memoryUsed", "(Bytes) Max is " + max + "MB", "Router", new long[] { 60*1000 });
_maxMemory = Runtime.getRuntime().maxMemory();
ctx.statManager().createRateStat("router.memoryUsed", "(Bytes) Max is " + (_maxMemory / (1024*1024)) + "MB", "Router", new long[] { 60*1000 });
}
private RouterContext getContext() { return _ctx; }
public void timeReached() {
@@ -1395,6 +1403,8 @@ private static class CoalesceStatsEvent implements SimpleTimer.TimedEvent {
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
getContext().statManager().addRateData("router.memoryUsed", used, 0);
if (_maxMemory - used < LOW_MEMORY_THRESHOLD)
ByteCache.clearAll();
getContext().tunnelDispatcher().updateParticipatingStats(COALESCE_TIME);

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 1;
public final static long BUILD = 2;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -10,7 +10,7 @@ import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache;
//import net.i2p.util.ByteCache;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@@ -26,7 +26,7 @@ public class MessageReceiver {
/** list of messages (InboundMessageState) fully received but not interpreted yet */
private final BlockingQueue<InboundMessageState> _completeMessages;
private boolean _alive;
private ByteCache _cache;
//private ByteCache _cache;
private static final int THREADS = 5;
private static final long POISON_IMS = -99999999999l;
@@ -35,7 +35,8 @@ public class MessageReceiver {
_log = ctx.logManager().getLog(MessageReceiver.class);
_transport = transport;
_completeMessages = new LinkedBlockingQueue();
_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
// the runners run forever, no need to have a cache
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
_context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
@@ -91,7 +92,8 @@ public class MessageReceiver {
public void loop(I2NPMessageHandler handler) {
InboundMessageState message = null;
ByteArray buf = _cache.acquire();
//ByteArray buf = _cache.acquire();
ByteArray buf = new ByteArray(new byte[I2NPMessage.MAX_SIZE]);
while (_alive) {
int expired = 0;
long expiredLifetime = 0;
@@ -142,7 +144,7 @@ public class MessageReceiver {
}
// no need to zero it out, as these buffers are only used with an explicit getCompleteSize
_cache.release(buf, false);
//_cache.release(buf, false);
}
private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {