forked from I2P_Developers/i2p.i2p
NetDB: Queue RI file deletion and delete in a single thread
Do both writes and removes in the writer thread As suggested by jogger http://zzz.i2p/topics/3082 log tweaks
This commit is contained in:
@@ -23,6 +23,7 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@@ -34,6 +35,7 @@ import net.i2p.data.router.RouterInfo;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.Router;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.ConcurrentHashSet;
|
||||
import net.i2p.util.FileSuffixFilter;
|
||||
import net.i2p.util.FileUtil;
|
||||
import net.i2p.util.I2PThread;
|
||||
@@ -144,7 +146,6 @@ public class PersistentDataStore extends TransientDataStore {
|
||||
public DatabaseEntry remove(Hash key, boolean persist) {
|
||||
if (persist) {
|
||||
_writer.remove(key);
|
||||
_context.jobQueue().addJob(new RemoveJob(key));
|
||||
}
|
||||
return super.remove(key);
|
||||
}
|
||||
@@ -168,24 +169,6 @@ public class PersistentDataStore extends TransientDataStore {
|
||||
return rv;
|
||||
}
|
||||
|
||||
private class RemoveJob extends JobImpl {
|
||||
private final Hash _key;
|
||||
public RemoveJob(Hash key) {
|
||||
super(PersistentDataStore.this._context);
|
||||
_key = key;
|
||||
}
|
||||
public String getName() { return "Delete RI file"; }
|
||||
public void runJob() {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Removing key " + _key /* , getAddedBy() */);
|
||||
try {
|
||||
removeFile(_key, _dbDir);
|
||||
} catch (IOException ioe) {
|
||||
_log.error("Error removing key " + _key, ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** How many files to write every 10 minutes. Doesn't make sense to limit it,
|
||||
* they just back up in the queue hogging memory.
|
||||
*/
|
||||
@@ -202,16 +185,19 @@ public class PersistentDataStore extends TransientDataStore {
|
||||
*/
|
||||
private class Writer implements Runnable, Flushable {
|
||||
private final Map<Hash, DatabaseEntry>_keys;
|
||||
private final Set<Hash> _keysToRemove;
|
||||
private final Object _waitLock;
|
||||
private volatile boolean _quit;
|
||||
|
||||
public Writer() {
|
||||
_keys = new ConcurrentHashMap<Hash, DatabaseEntry>(64);
|
||||
_keysToRemove = new ConcurrentHashSet<Hash>();
|
||||
_waitLock = new Object();
|
||||
}
|
||||
|
||||
public void queue(Hash key, DatabaseEntry data) {
|
||||
int pending = _keys.size();
|
||||
_keysToRemove.remove(key);
|
||||
boolean exists = (null != _keys.put(key, data));
|
||||
if (exists)
|
||||
_context.statManager().addRateData("netDb.writeClobber", pending);
|
||||
@@ -220,6 +206,25 @@ public class PersistentDataStore extends TransientDataStore {
|
||||
|
||||
public void remove(Hash key) {
|
||||
_keys.remove(key);
|
||||
_keysToRemove.add(key);
|
||||
}
|
||||
|
||||
/*
|
||||
* @since 0.9.50 was in separate RemoveJob
|
||||
*/
|
||||
private void removeQueued() {
|
||||
if (_keysToRemove.isEmpty())
|
||||
return;
|
||||
for (Iterator<Hash> iter = _keysToRemove.iterator(); iter.hasNext(); ) {
|
||||
Hash key = iter.next();
|
||||
iter.remove();
|
||||
try {
|
||||
removeFile(key, _dbDir);
|
||||
} catch (IOException ioe) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Error removing key " + key, ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void run() {
|
||||
@@ -260,6 +265,7 @@ public class PersistentDataStore extends TransientDataStore {
|
||||
if (count >= WRITE_LIMIT)
|
||||
count = 0;
|
||||
if (count == 0) {
|
||||
removeQueued();
|
||||
if (lastCount > 0) {
|
||||
long time = _context.clock().now() - startTime;
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
|
||||
@@ -136,8 +136,8 @@ class TransientDataStore implements DataStore {
|
||||
_log.info("Almost clobbered an old router! " + key + ": [old published on " + new Date(ori.getPublished()) +
|
||||
" new on " + new Date(ri.getPublished()) + ']');
|
||||
} else if (ri.getPublished() == ori.getPublished()) {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Duplicate " + key);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Duplicate " + key);
|
||||
} else {
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Updated the old router for " + key + ": [old published on " + new Date(ori.getPublished()) +
|
||||
|
||||
Reference in New Issue
Block a user