From fae7262d311353e168b44d2a67ceac5b5045280e Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Mon, 18 May 2009 18:21:56 +0000 Subject: [PATCH] * DataStore: - Adjust interface to have persistent and non-persistent methods, to prepare for partial storage in RAM * PersistentDataStore: - Cleanup, simplify, and concurrentify - Tweak stats - Remove write limit - Flush to disk on shutdown - Don't write out what we just read in --- .../router/networkdb/kademlia/DataStore.java | 5 +- .../KademliaNetworkDatabaseFacade.java | 14 +- .../kademlia/PersistentDataStore.java | 207 +++++++++++++----- .../kademlia/TransientDataStore.java | 21 +- 4 files changed, 178 insertions(+), 69 deletions(-) diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/DataStore.java b/router/java/src/net/i2p/router/networkdb/kademlia/DataStore.java index c6807183bd..75329f8963 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/DataStore.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/DataStore.java @@ -16,10 +16,13 @@ import net.i2p.data.Hash; public interface DataStore { public boolean isKnown(Hash key); public DataStructure get(Hash key); + public DataStructure get(Hash key, boolean persist); public void put(Hash key, DataStructure data); + public void put(Hash key, DataStructure data, boolean persist); public DataStructure remove(Hash key); - public DataStructure removeLease(Hash key); + public DataStructure remove(Hash key, boolean persist); public Set getKeys(); + public void stop(); public void restart(); public int countLeaseSets(); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index c8a0685af1..436fc526e3 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -177,6 +177,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { public void shutdown() { _initialized = false; _kb = null; + _ds.stop(); _ds = null; _exploreKeys.clear(); // hope this doesn't cause an explosion, it shouldn't. // _exploreKeys = null; @@ -702,9 +703,13 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { * @throws IllegalArgumentException if the routerInfo is not valid */ public RouterInfo store(Hash key, RouterInfo routerInfo) throws IllegalArgumentException { + return store(key, routerInfo, true); + } + + public RouterInfo store(Hash key, RouterInfo routerInfo, boolean persist) throws IllegalArgumentException { if (!_initialized) return null; - RouterInfo rv = (RouterInfo)_ds.get(key); + RouterInfo rv = (RouterInfo)_ds.get(key, persist); if ( (rv != null) && (rv.equals(routerInfo)) ) { // no need to validate @@ -721,7 +726,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { + new Date(routerInfo.getPublished())); _context.peerManager().setCapabilities(key, routerInfo.getCapabilities()); - _ds.put(key, routerInfo); + _ds.put(key, routerInfo, persist); if (rv == null) _kb.add(key); return rv; @@ -752,10 +757,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { // if we dont know the key, lets make sure it isn't a now-dead peer } - if (isRouterInfo) - _ds.remove(dbEntry); - else - _ds.removeLease(dbEntry); + _ds.remove(dbEntry, isRouterInfo); } /** don't use directly - see F.N.D.F. override */ diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java index 0186bd9225..df6a61237c 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/PersistentDataStore.java @@ -13,10 +13,10 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; +import java.util.Iterator; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.data.DataFormatException; import net.i2p.data.DataStructure; @@ -48,44 +48,86 @@ class PersistentDataStore extends TransientDataStore { _dbDir = dbDir; _facade = facade; _context.jobQueue().addJob(new ReadJob()); - ctx.statManager().createRateStat("netDb.writeClobber", "How often we clobber a pending netDb write", "NetworkDatabase", new long[] { 60*1000, 10*60*1000 }); - ctx.statManager().createRateStat("netDb.writePending", "How many pending writes are there", "NetworkDatabase", new long[] { 60*1000, 10*60*1000 }); + ctx.statManager().createRateStat("netDb.writeClobber", "How often we clobber a pending netDb write", "NetworkDatabase", new long[] { 20*60*1000 }); + ctx.statManager().createRateStat("netDb.writePending", "How many pending writes are there", "NetworkDatabase", new long[] { 60*1000 }); + ctx.statManager().createRateStat("netDb.writeOut", "How many we wrote", "NetworkDatabase", new long[] { 20*60*1000 }); + ctx.statManager().createRateStat("netDb.writeTime", "How long it took", "NetworkDatabase", new long[] { 20*60*1000 }); _writer = new Writer(); I2PThread writer = new I2PThread(_writer, "DBWriter"); - writer.setDaemon(true); + // stop() must be called to flush data to disk + //writer.setDaemon(true); writer.start(); } + @Override + public void stop() { + super.stop(); + _writer.flush(); + } + @Override public void restart() { + super.restart(); _dbDir = _facade.getDbDir(); } + @Override + public DataStructure get(Hash key) { + return get(key, true); + } + + /** + * Prepare for having only a partial set in memory and the rest on disk + * @param persist if false, call super only, don't access disk + */ + @Override + public DataStructure get(Hash key, boolean persist) { + DataStructure rv = super.get(key); +/***** + if (rv != null || !persist) + return rv; + rv = _writer.get(key); + if (rv != null) + return rv; + Job rrj = new ReadRouterJob(getRouterInfoName(key), key)); + run in same thread + rrj.runJob(); +*******/ + return rv; + } + @Override public DataStructure remove(Hash key) { - _context.jobQueue().addJob(new RemoveJob(key)); + return remove(key, true); + } + + /* + * @param persist if false, call super only, don't access disk + */ + @Override + public DataStructure remove(Hash key, boolean persist) { + if (persist) + _context.jobQueue().addJob(new RemoveJob(key)); return super.remove(key); } @Override public void put(Hash key, DataStructure data) { + put(key, data, true); + } + + /* + * @param persist if false, call super only, don't access disk + */ + @Override + public void put(Hash key, DataStructure data, boolean persist) { if ( (data == null) || (key == null) ) return; super.put(key, data); // Don't bother writing LeaseSets to disk - if (data instanceof RouterInfo) + if (persist && data instanceof RouterInfo) _writer.queue(key, data); } - private void accept(LeaseSet ls) { - super.put(ls.getDestination().calculateHash(), ls); - } - private void accept(RouterInfo ri) { - Hash key = ri.getIdentity().getHash(); - super.put(key, ri); - // add recently loaded routers to the routing table - _facade.getKBuckets().add(key); - } - private class RemoveJob extends JobImpl { private Hash _key; public RemoveJob(Hash key) { @@ -104,56 +146,100 @@ class PersistentDataStore extends TransientDataStore { } } + /** How many files to write every 10 minutes. Doesn't make sense to limit it, + * they just back up in the queue hogging memory. + */ + private static final int WRITE_LIMIT = 10000; + private static final long WRITE_DELAY = 10*60*1000; + /* - * Queue up writes, write up to 600 files every 10 minutes + * Queue up writes, write unlimited files every 10 minutes. + * Since we write all we have, don't save the write order. + * We store a reference to the data here too, + * rather than simply pull it from super.get(), because + * we will soon have to implement a scheme for keeping only + * a subset of all DataStructures in memory and keeping the rest on disk. */ private class Writer implements Runnable { - private final Map _keys; - private List _keyOrder; + private final Map<Hash, DataStructure>_keys; + private Object _waitLock; + private volatile boolean _quit; + public Writer() { - _keys = new HashMap(64); - _keyOrder = new ArrayList(64); + _keys = new ConcurrentHashMap(64); + _waitLock = new Object(); } + public void queue(Hash key, DataStructure data) { - boolean exists = false; - int pending = 0; - synchronized (_keys) { - pending = _keys.size(); - exists = (null != _keys.put(key, data)); - if (!exists) - _keyOrder.add(key); - _keys.notifyAll(); - } + int pending = _keys.size(); + boolean exists = (null != _keys.put(key, data)); if (exists) _context.statManager().addRateData("netDb.writeClobber", pending, 0); _context.statManager().addRateData("netDb.writePending", pending, 0); } + + /** check to see if it's in the write queue */ + public DataStructure get(Hash key) { + return _keys.get(key); + } + public void run() { + _quit = false; Hash key = null; DataStructure data = null; int count = 0; - while (true) { // hmm, probably want a shutdown handle... though this is a daemon thread + int lastCount = 0; + long startTime = 0; + while (true) { + // get a new iterator every time to get a random entry without + // having concurrency issues or copying to a List or Array + Iterator<Hash> iter = _keys.keySet().iterator(); try { - synchronized (_keys) { - if (_keyOrder.size() <= 0) { - count = 0; - _keys.wait(); - } else { - count++; - key = (Hash)_keyOrder.remove(0); - data = (DataStructure)_keys.remove(key); - } + key = iter.next(); + iter.remove(); + count++; + } catch (NoSuchElementException nsee) { + lastCount = count; + count = 0; + } catch (IllegalStateException ise) { + lastCount = count; + count = 0; + } + + if (key != null) { + data = _keys.get(key); + if (data != null) { + write(key, data); + data = null; } - } catch (InterruptedException ie) {} - - if ( (key != null) && (data != null) ) - write(key, data); - key = null; - data = null; - if (count >= 600) + key = null; + } + if (count >= WRITE_LIMIT) count = 0; - if (count == 0) - try { Thread.sleep(10*60*1000); } catch (InterruptedException ie) {} + if (count == 0) { + if (lastCount > 0) { + long time = _context.clock().now() - startTime; + if (_log.shouldLog(Log.WARN)) + _log.warn("Wrote " + lastCount + " entries to disk in " + time); + _context.statManager().addRateData("netDb.writeOut", lastCount, 0); + _context.statManager().addRateData("netDb.writeTime", time, 0); + } + if (_quit) + break; + synchronized (_waitLock) { + try { + _waitLock.wait(WRITE_DELAY); + } catch (InterruptedException ie) {} + } + startTime = _context.clock().now(); + } + } + } + + public void flush() { + synchronized(_waitLock) { + _quit = true; + _waitLock.notifyAll(); } } } @@ -261,7 +347,8 @@ class PersistentDataStore extends TransientDataStore { public String getName() { return "Read RouterInfo"; } private boolean shouldRead() { - DataStructure data = get(_key); + // persist = false to call only super.get() + DataStructure data = get(_key, false); if (data == null) return true; if (data instanceof RouterInfo) { long knownDate = ((RouterInfo)data).getPublished(); @@ -292,7 +379,8 @@ class PersistentDataStore extends TransientDataStore { + " is from a different network"); } else { try { - _facade.store(ri.getIdentity().getHash(), ri); + // persist = false so we don't write what we just read + _facade.store(ri.getIdentity().getHash(), ri, false); } catch (IllegalArgumentException iae) { _log.info("Refused locally loaded routerInfo - deleting"); corrupt = true; @@ -335,22 +423,22 @@ class PersistentDataStore extends TransientDataStore { private final static String ROUTERINFO_PREFIX = "routerInfo-"; private final static String ROUTERINFO_SUFFIX = ".dat"; - private String getLeaseSetName(Hash hash) { + private static String getLeaseSetName(Hash hash) { return LEASESET_PREFIX + hash.toBase64() + LEASESET_SUFFIX; } - private String getRouterInfoName(Hash hash) { + private static String getRouterInfoName(Hash hash) { return ROUTERINFO_PREFIX + hash.toBase64() + ROUTERINFO_SUFFIX; } - private Hash getLeaseSetHash(String filename) { + private static Hash getLeaseSetHash(String filename) { return getHash(filename, LEASESET_PREFIX, LEASESET_SUFFIX); } - private Hash getRouterInfoHash(String filename) { + private static Hash getRouterInfoHash(String filename) { return getHash(filename, ROUTERINFO_PREFIX, ROUTERINFO_SUFFIX); } - private Hash getHash(String filename, String prefix, String suffix) { + private static Hash getHash(String filename, String prefix, String suffix) { try { String key = filename.substring(prefix.length()); key = key.substring(0, key.length() - suffix.length()); @@ -358,7 +446,8 @@ class PersistentDataStore extends TransientDataStore { h.fromBase64(key); return h; } catch (Exception e) { - _log.warn("Unable to fetch the key from [" + filename + "]", e); + // static + //_log.warn("Unable to fetch the key from [" + filename + "]", e); return null; } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/TransientDataStore.java b/router/java/src/net/i2p/router/networkdb/kademlia/TransientDataStore.java index 5fd28cebff..5028b5ea12 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/TransientDataStore.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/TransientDataStore.java @@ -35,14 +35,23 @@ class TransientDataStore implements DataStore { _log.info("Data Store initialized"); } - public void restart() { + public void stop() { _data.clear(); } + public void restart() { + stop(); + } + public Set getKeys() { return new HashSet(_data.keySet()); } + /** for PersistentDataStore only - don't use here */ + public DataStructure get(Hash key, boolean persist) { + throw new IllegalArgumentException("no"); + } + public DataStructure get(Hash key) { return _data.get(key); } @@ -65,6 +74,11 @@ class TransientDataStore implements DataStore { /** don't accept tunnels set to expire more than 3 hours in the future, which is insane */ private final static long MAX_FUTURE_EXPIRATION_DATE = KademliaNetworkDatabaseFacade.MAX_LEASE_FUTURE; + /** for PersistentDataStore only - don't use here */ + public void put(Hash key, DataStructure data, boolean persist) { + throw new IllegalArgumentException("no"); + } + public void put(Hash key, DataStructure data) { if (data == null) return; if (_log.shouldLog(Log.DEBUG)) @@ -140,8 +154,9 @@ class TransientDataStore implements DataStore { return buf.toString(); } - public DataStructure removeLease(Hash key) { - return remove(key); + /** for PersistentDataStore only - don't use here */ + public DataStructure remove(Hash key, boolean persist) { + throw new IllegalArgumentException("no"); } public DataStructure remove(Hash key) { -- GitLab