DBF/DHS cleanups and speedups

This commit is contained in:
zzz
2011-01-19 14:42:14 +00:00
parent 7a7889b59a
commit 7538766c88
2 changed files with 77 additions and 79 deletions

View File

@@ -20,28 +20,34 @@ import org.xlattice.crypto.filters.BloomSHA1;
* Further analysis and tweaking for the tunnel IVV may be required.
*/
public class DecayingBloomFilter {
private I2PAppContext _context;
private Log _log;
protected final I2PAppContext _context;
protected final Log _log;
private BloomSHA1 _current;
private BloomSHA1 _previous;
private int _durationMs;
private int _entryBytes;
protected final int _durationMs;
protected final int _entryBytes;
private byte _extenders[][];
private byte _extended[];
private byte _longToEntry[];
private long _longToEntryMask;
protected long _currentDuplicates;
private boolean _keepDecaying;
private DecayEvent _decayEvent;
protected volatile boolean _keepDecaying;
protected SimpleTimer.TimedEvent _decayEvent;
/** just for logging */
private String _name;
protected final String _name;
private static final int DEFAULT_M = 23;
private static final int DEFAULT_K = 11;
private static final boolean ALWAYS_MISS = false;
/** noop for DHS */
public DecayingBloomFilter() {}
/** only for extension by DHS */
protected DecayingBloomFilter(int durationMs, int entryBytes, String name, I2PAppContext context) {
_context = context;
_log = context.logManager().getLog(getClass());
_entryBytes = entryBytes;
_name = name;
_durationMs = durationMs;
}
/**
* Create a bloom filter that will decay its entries over time.
@@ -87,7 +93,6 @@ public class DecayingBloomFilter {
_longToEntry = new byte[_entryBytes];
_longToEntryMask = (1l << (_entryBytes * 8l)) -1;
}
_currentDuplicates = 0;
_decayEvent = new DecayEvent();
_keepDecaying = true;
SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
@@ -105,11 +110,13 @@ public class DecayingBloomFilter {
}
public long getCurrentDuplicateCount() { return _currentDuplicates; }
public int getInsertedCount() {
synchronized (this) {
return _current.size() + _previous.size();
}
}
public double getFalsePositiveRate() {
synchronized (this) {
return _current.falsePositives();
@@ -117,12 +124,15 @@ public class DecayingBloomFilter {
}
/**
* return true if the entry added is a duplicate
*
* @return true if the entry added is a duplicate
*/
public boolean add(byte entry[]) {
return add(entry, 0, entry.length);
}
/**
* @return true if the entry added is a duplicate
*/
public boolean add(byte entry[], int off, int len) {
if (ALWAYS_MISS) return false;
if (entry == null)
@@ -131,55 +141,52 @@ public class DecayingBloomFilter {
throw new IllegalArgumentException("Bad entry [" + len + ", expected "
+ _entryBytes + "]");
synchronized (this) {
return locked_add(entry, off, len);
return locked_add(entry, off, len, true);
}
}
/**
* return true if the entry added is a duplicate. the number of low order
* @return true if the entry added is a duplicate. the number of low order
* bits used is determined by the entryBytes parameter used on creation of the
* filter.
*
*/
public boolean add(long entry) {
if (ALWAYS_MISS) return false;
if (_entryBytes <= 7)
entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask);
//entry &= _longToEntryMask;
if (entry < 0) {
DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry);
_longToEntry[0] |= (1 << 7);
} else {
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
}
synchronized (this) {
if (_entryBytes <= 7)
entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask);
//entry &= _longToEntryMask;
if (entry < 0) {
DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry);
_longToEntry[0] |= (1 << 7);
} else {
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
}
return locked_add(_longToEntry, 0, _longToEntry.length);
return locked_add(_longToEntry, 0, _longToEntry.length, true);
}
}
/**
* return true if the entry is already known. this does NOT add the
* @return true if the entry is already known. this does NOT add the
* entry however.
*
*/
public boolean isKnown(long entry) {
if (ALWAYS_MISS) return false;
if (_entryBytes <= 7)
entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask);
if (entry < 0) {
DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry);
_longToEntry[0] |= (1 << 7);
} else {
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
}
synchronized (this) {
if (_entryBytes <= 7)
entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask);
if (entry < 0) {
DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry);
_longToEntry[0] |= (1 << 7);
} else {
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
}
return locked_add(_longToEntry, 0, _longToEntry.length, false);
}
}
private boolean locked_add(byte entry[], int offset, int len) {
return locked_add(entry, offset, len, true);
}
private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) {
if (_extended != null) {
// extend the entry to 32 bytes
@@ -195,7 +202,6 @@ public class DecayingBloomFilter {
} else {
if (addIfNew) {
_current.locked_insert(_extended);
_previous.locked_insert(_extended);
}
return false;
}
@@ -208,7 +214,6 @@ public class DecayingBloomFilter {
} else {
if (addIfNew) {
_current.locked_insert(entry, offset, len);
_previous.locked_insert(entry, offset, len);
}
return false;
}

View File

@@ -17,12 +17,15 @@ import net.i2p.data.DataHelper;
*
* ./router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java:
* 32 bytes, peak 10 entries in 1m
* (320 peak entries seen on fast router)
*
* ./router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java:
* 4 bytes, peak 150 entries in 10s
* (1600 peak entries seen on fast router)
*
* ./router/java/src/net/i2p/router/MessageValidator.java:
* 8 bytes, peak 1K entries in 2m
* (36K peak entries seen on fast router)
*
* ./router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java:
* 16 bytes, peak 15K entries in 10m
@@ -57,19 +60,10 @@ import net.i2p.data.DataHelper;
* @author zzz
*/
public class DecayingHashSet extends DecayingBloomFilter {
private final I2PAppContext _context;
private final Log _log;
private ConcurrentHashSet<ArrayWrapper> _current;
private ConcurrentHashSet<ArrayWrapper> _previous;
private int _durationMs;
private int _entryBytes;
private volatile boolean _keepDecaying;
private final DecayEvent _decayEvent;
/** just for logging */
private final String _name;
/** synchronize against this lock when switching double buffers */
private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true);
/**
* Create a double-buffered hash set that will decay its entries over time.
@@ -83,16 +77,11 @@ public class DecayingHashSet extends DecayingBloomFilter {
/** @param name just for logging / debugging / stats */
public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes, String name) {
super(durationMs, entryBytes, name, context);
if (entryBytes <= 0 || entryBytes > 32)
throw new IllegalArgumentException("Bad size");
_context = context;
_log = context.logManager().getLog(DecayingHashSet.class);
_entryBytes = entryBytes;
_name = name;
_current = new ConcurrentHashSet(128);
_previous = new ConcurrentHashSet(128);
_durationMs = durationMs;
_currentDuplicates = 0;
_decayEvent = new DecayEvent();
_keepDecaying = true;
SimpleScheduler.getInstance().addEvent(_decayEvent, _durationMs);
@@ -111,6 +100,7 @@ public class DecayingHashSet extends DecayingBloomFilter {
public int getInsertedCount() {
return _current.size() + _previous.size();
}
/** pointless, only used for logging elsewhere */
@Override
public double getFalsePositiveRate() {
@@ -121,7 +111,6 @@ public class DecayingHashSet extends DecayingBloomFilter {
/**
* @return true if the entry added is a duplicate
*
*/
@Override
public boolean add(byte entry[], int off, int len) {
@@ -130,9 +119,10 @@ public class DecayingHashSet extends DecayingBloomFilter {
if (len != _entryBytes)
throw new IllegalArgumentException("Bad entry [" + len + ", expected "
+ _entryBytes + "]");
ArrayWrapper w = new ArrayWrapper(entry, off, len);
getReadLock();
try {
return locked_add(entry, off, len, true);
return locked_add(w, true);
} finally { releaseReadLock(); }
}
@@ -158,35 +148,30 @@ public class DecayingHashSet extends DecayingBloomFilter {
}
private boolean add(long entry, boolean addIfNew) {
int len = Math.min(8, _entryBytes);
byte[] b = toLong(len, entry);
ArrayWrapper w = new ArrayWrapper(entry);
getReadLock();
try {
return locked_add(b, 0, len, addIfNew);
return locked_add(w, addIfNew);
} finally { releaseReadLock(); }
}
/** from DataHelper, except negative values ok */
private static byte[] toLong(int numBytes, long value) {
byte target[] = new byte[numBytes];
for (int i = 0; i < numBytes; i++)
target[numBytes-i-1] = (byte)(value >>> (i*8));
return target;
}
/** so many questions... */
private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) {
ArrayWrapper w = new ArrayWrapper(entry, offset, len);
boolean seen = _current.contains(w);
seen = seen || _previous.contains(w);
/**
* @param addIfNew if true, add the element to current if it is not already there;
* if false, only check
* @return if the element is in either the current or previous set
*/
private boolean locked_add(ArrayWrapper w, boolean addIfNew) {
boolean seen;
// only access _current once. This adds to _current even if seen in _previous.
if (addIfNew)
seen = !_current.add(w);
else
seen = _current.contains(w);
if (!seen)
seen = _previous.contains(w);
if (seen) {
// why increment if addIfNew == false?
// why not add to current if only in previous?
// why increment if addIfNew == false? Only used for stats...
_currentDuplicates++;
} else if (addIfNew) {
_current.add(w);
// why add to previous?
_previous.add(w);
}
return seen;
}
@@ -270,14 +255,22 @@ public class DecayingHashSet extends DecayingBloomFilter {
* the maximum entropy given the length of the data.
*/
private static class ArrayWrapper {
private long _longhashcode;
private final long _longhashcode;
public ArrayWrapper(byte[] b, int offset, int len) {
int idx = offset;
int shift = Math.min(8, 64 / len);
long lhc = 0;
for (int i = 0; i < len; i++) {
// xor better than + in tests
_longhashcode ^= (((long) b[idx++]) << (i * shift));
lhc ^= (((long) b[idx++]) << (i * shift));
}
_longhashcode = lhc;
}
/** faster version for when storing <= 8 bytes */
public ArrayWrapper(long b) {
_longhashcode = b;
}
public int hashCode() {