diff --git a/core/java/src/net/i2p/util/DecayingBloomFilter.java b/core/java/src/net/i2p/util/DecayingBloomFilter.java index 4179edd389..ff564b2656 100644 --- a/core/java/src/net/i2p/util/DecayingBloomFilter.java +++ b/core/java/src/net/i2p/util/DecayingBloomFilter.java @@ -20,28 +20,34 @@ import org.xlattice.crypto.filters.BloomSHA1; * Further analysis and tweaking for the tunnel IVV may be required. */ public class DecayingBloomFilter { - private I2PAppContext _context; - private Log _log; + protected final I2PAppContext _context; + protected final Log _log; private BloomSHA1 _current; private BloomSHA1 _previous; - private int _durationMs; - private int _entryBytes; + protected final int _durationMs; + protected final int _entryBytes; private byte _extenders[][]; private byte _extended[]; private byte _longToEntry[]; private long _longToEntryMask; protected long _currentDuplicates; - private boolean _keepDecaying; - private DecayEvent _decayEvent; + protected volatile boolean _keepDecaying; + protected SimpleTimer.TimedEvent _decayEvent; /** just for logging */ - private String _name; + protected final String _name; private static final int DEFAULT_M = 23; private static final int DEFAULT_K = 11; private static final boolean ALWAYS_MISS = false; - /** noop for DHS */ - public DecayingBloomFilter() {} + /** only for extension by DHS */ + protected DecayingBloomFilter(int durationMs, int entryBytes, String name, I2PAppContext context) { + _context = context; + _log = context.logManager().getLog(getClass()); + _entryBytes = entryBytes; + _name = name; + _durationMs = durationMs; + } /** * Create a bloom filter that will decay its entries over time. @@ -87,7 +93,6 @@ public class DecayingBloomFilter { _longToEntry = new byte[_entryBytes]; _longToEntryMask = (1l << (_entryBytes * 8l)) -1; } - _currentDuplicates = 0; _decayEvent = new DecayEvent(); _keepDecaying = true; SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs); @@ -105,11 +110,13 @@ public class DecayingBloomFilter { } public long getCurrentDuplicateCount() { return _currentDuplicates; } + public int getInsertedCount() { synchronized (this) { return _current.size() + _previous.size(); } } + public double getFalsePositiveRate() { synchronized (this) { return _current.falsePositives(); @@ -117,12 +124,15 @@ public class DecayingBloomFilter { } /** - * return true if the entry added is a duplicate - * + * @return true if the entry added is a duplicate */ public boolean add(byte entry[]) { return add(entry, 0, entry.length); } + + /** + * @return true if the entry added is a duplicate + */ public boolean add(byte entry[], int off, int len) { if (ALWAYS_MISS) return false; if (entry == null) @@ -131,55 +141,52 @@ public class DecayingBloomFilter { throw new IllegalArgumentException("Bad entry [" + len + ", expected " + _entryBytes + "]"); synchronized (this) { - return locked_add(entry, off, len); + return locked_add(entry, off, len, true); } } /** - * return true if the entry added is a duplicate. the number of low order + * @return true if the entry added is a duplicate. the number of low order * bits used is determined by the entryBytes parameter used on creation of the * filter. * */ public boolean add(long entry) { if (ALWAYS_MISS) return false; + if (_entryBytes <= 7) + entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask); + //entry &= _longToEntryMask; + if (entry < 0) { + DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry); + _longToEntry[0] |= (1 << 7); + } else { + DataHelper.toLong(_longToEntry, 0, _entryBytes, entry); + } synchronized (this) { - if (_entryBytes <= 7) - entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask); - //entry &= _longToEntryMask; - if (entry < 0) { - DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry); - _longToEntry[0] |= (1 << 7); - } else { - DataHelper.toLong(_longToEntry, 0, _entryBytes, entry); - } - return locked_add(_longToEntry, 0, _longToEntry.length); + return locked_add(_longToEntry, 0, _longToEntry.length, true); } } /** - * return true if the entry is already known. this does NOT add the + * @return true if the entry is already known. this does NOT add the * entry however. * */ public boolean isKnown(long entry) { if (ALWAYS_MISS) return false; + if (_entryBytes <= 7) + entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask); + if (entry < 0) { + DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry); + _longToEntry[0] |= (1 << 7); + } else { + DataHelper.toLong(_longToEntry, 0, _entryBytes, entry); + } synchronized (this) { - if (_entryBytes <= 7) - entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask); - if (entry < 0) { - DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry); - _longToEntry[0] |= (1 << 7); - } else { - DataHelper.toLong(_longToEntry, 0, _entryBytes, entry); - } return locked_add(_longToEntry, 0, _longToEntry.length, false); } } - private boolean locked_add(byte entry[], int offset, int len) { - return locked_add(entry, offset, len, true); - } private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) { if (_extended != null) { // extend the entry to 32 bytes @@ -195,7 +202,6 @@ public class DecayingBloomFilter { } else { if (addIfNew) { _current.locked_insert(_extended); - _previous.locked_insert(_extended); } return false; } @@ -208,7 +214,6 @@ public class DecayingBloomFilter { } else { if (addIfNew) { _current.locked_insert(entry, offset, len); - _previous.locked_insert(entry, offset, len); } return false; } diff --git a/core/java/src/net/i2p/util/DecayingHashSet.java b/core/java/src/net/i2p/util/DecayingHashSet.java index a72b6b9e2e..f090cf727b 100644 --- a/core/java/src/net/i2p/util/DecayingHashSet.java +++ b/core/java/src/net/i2p/util/DecayingHashSet.java @@ -17,12 +17,15 @@ import net.i2p.data.DataHelper; * * ./router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java: * 32 bytes, peak 10 entries in 1m + * (320 peak entries seen on fast router) * * ./router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java: * 4 bytes, peak 150 entries in 10s + * (1600 peak entries seen on fast router) * * ./router/java/src/net/i2p/router/MessageValidator.java: * 8 bytes, peak 1K entries in 2m + * (36K peak entries seen on fast router) * * ./router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java: * 16 bytes, peak 15K entries in 10m @@ -57,19 +60,10 @@ import net.i2p.data.DataHelper; * @author zzz */ public class DecayingHashSet extends DecayingBloomFilter { - private final I2PAppContext _context; - private final Log _log; private ConcurrentHashSet _current; private ConcurrentHashSet _previous; - private int _durationMs; - private int _entryBytes; - private volatile boolean _keepDecaying; - private final DecayEvent _decayEvent; - /** just for logging */ - private final String _name; /** synchronize against this lock when switching double buffers */ private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true); - /** * Create a double-buffered hash set that will decay its entries over time. @@ -83,16 +77,11 @@ public class DecayingHashSet extends DecayingBloomFilter { /** @param name just for logging / debugging / stats */ public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes, String name) { + super(durationMs, entryBytes, name, context); if (entryBytes <= 0 || entryBytes > 32) throw new IllegalArgumentException("Bad size"); - _context = context; - _log = context.logManager().getLog(DecayingHashSet.class); - _entryBytes = entryBytes; - _name = name; _current = new ConcurrentHashSet(128); _previous = new ConcurrentHashSet(128); - _durationMs = durationMs; - _currentDuplicates = 0; _decayEvent = new DecayEvent(); _keepDecaying = true; SimpleScheduler.getInstance().addEvent(_decayEvent, _durationMs); @@ -111,6 +100,7 @@ public class DecayingHashSet extends DecayingBloomFilter { public int getInsertedCount() { return _current.size() + _previous.size(); } + /** pointless, only used for logging elsewhere */ @Override public double getFalsePositiveRate() { @@ -121,7 +111,6 @@ public class DecayingHashSet extends DecayingBloomFilter { /** * @return true if the entry added is a duplicate - * */ @Override public boolean add(byte entry[], int off, int len) { @@ -130,9 +119,10 @@ public class DecayingHashSet extends DecayingBloomFilter { if (len != _entryBytes) throw new IllegalArgumentException("Bad entry [" + len + ", expected " + _entryBytes + "]"); + ArrayWrapper w = new ArrayWrapper(entry, off, len); getReadLock(); try { - return locked_add(entry, off, len, true); + return locked_add(w, true); } finally { releaseReadLock(); } } @@ -158,35 +148,30 @@ public class DecayingHashSet extends DecayingBloomFilter { } private boolean add(long entry, boolean addIfNew) { - int len = Math.min(8, _entryBytes); - byte[] b = toLong(len, entry); + ArrayWrapper w = new ArrayWrapper(entry); getReadLock(); try { - return locked_add(b, 0, len, addIfNew); + return locked_add(w, addIfNew); } finally { releaseReadLock(); } } - /** from DataHelper, except negative values ok */ - private static byte[] toLong(int numBytes, long value) { - byte target[] = new byte[numBytes]; - for (int i = 0; i < numBytes; i++) - target[numBytes-i-1] = (byte)(value >>> (i*8)); - return target; - } - - /** so many questions... */ - private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) { - ArrayWrapper w = new ArrayWrapper(entry, offset, len); - boolean seen = _current.contains(w); - seen = seen || _previous.contains(w); + /** + * @param addIfNew if true, add the element to current if it is not already there; + * if false, only check + * @return if the element is in either the current or previous set + */ + private boolean locked_add(ArrayWrapper w, boolean addIfNew) { + boolean seen; + // only access _current once. This adds to _current even if seen in _previous. + if (addIfNew) + seen = !_current.add(w); + else + seen = _current.contains(w); + if (!seen) + seen = _previous.contains(w); if (seen) { - // why increment if addIfNew == false? - // why not add to current if only in previous? + // why increment if addIfNew == false? Only used for stats... _currentDuplicates++; - } else if (addIfNew) { - _current.add(w); - // why add to previous? - _previous.add(w); } return seen; } @@ -270,14 +255,22 @@ public class DecayingHashSet extends DecayingBloomFilter { * the maximum entropy given the length of the data. */ private static class ArrayWrapper { - private long _longhashcode; + private final long _longhashcode; + public ArrayWrapper(byte[] b, int offset, int len) { int idx = offset; int shift = Math.min(8, 64 / len); + long lhc = 0; for (int i = 0; i < len; i++) { // xor better than + in tests - _longhashcode ^= (((long) b[idx++]) << (i * shift)); + lhc ^= (((long) b[idx++]) << (i * shift)); } + _longhashcode = lhc; + } + + /** faster version for when storing <= 8 bytes */ + public ArrayWrapper(long b) { + _longhashcode = b; } public int hashCode() {