From 593d4dc50813a9558bc43d7e47d3e9bd7f031679 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Wed, 26 Aug 2009 22:22:47 +0000
Subject: [PATCH]     * DecayingBloomFilter:       - Replace with new
 DecayingHashSet for 3 of 4 uses,         and also in the 4th if the router is
 low-bandwidth.         Saves 8 MB heap.

---
 .../src/net/i2p/util/DecayingBloomFilter.java |  68 +++-
 .../src/net/i2p/util/DecayingHashSet.java     | 380 ++++++++++++++++++
 .../src/net/i2p/router/MessageValidator.java  |   3 +-
 .../udp/InboundMessageFragments.java          |   3 +-
 .../router/tunnel/BloomFilterIVValidator.java |  24 +-
 .../router/tunnel/BuildMessageProcessor.java  |   3 +-
 6 files changed, 467 insertions(+), 14 deletions(-)
 create mode 100644 core/java/src/net/i2p/util/DecayingHashSet.java

diff --git a/core/java/src/net/i2p/util/DecayingBloomFilter.java b/core/java/src/net/i2p/util/DecayingBloomFilter.java
index 95da0a03bd..a1029ae1be 100644
--- a/core/java/src/net/i2p/util/DecayingBloomFilter.java
+++ b/core/java/src/net/i2p/util/DecayingBloomFilter.java
@@ -14,6 +14,10 @@ import org.xlattice.crypto.filters.BloomSHA1;
  * entries per second with virtually no false positive rate.  Down the line, 
  * this may be refactored to allow tighter control of the size necessary for the
  * contained bloom filters, but a fixed 2MB overhead isn't that bad.
+ *
+ * NOTE: At 1MBps, the tunnel IVV will see an unacceptable false positive rate
+ * of almost 0.1% with the current m and k values; however using DHS instead will use 30MB.
+ * Further analysis and tweaking for the tunnel IVV may be required.
  */
 public class DecayingBloomFilter {
     private I2PAppContext _context;
@@ -26,13 +30,18 @@ public class DecayingBloomFilter {
     private byte _extended[];
     private byte _longToEntry[];
     private long _longToEntryMask;
-    private long _currentDuplicates;
+    protected long _currentDuplicates;
     private boolean _keepDecaying;
     private DecayEvent _decayEvent;
+    /** just for logging */
+    private String _name;
     
     private static final int DEFAULT_M = 23;
     private static final boolean ALWAYS_MISS = false;
    
+    /** noop for DHS */
+    public DecayingBloomFilter() {}
+
     /**
      * Create a bloom filter that will decay its entries over time.  
      *
@@ -42,9 +51,15 @@ public class DecayingBloomFilter {
      *                   against with sufficient random values.
      */
     public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes) {
+        this(context, durationMs, entryBytes, "DBF");
+    }
+
+    /** @param name just for logging / debugging / stats */
+    public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes, String name) {
         _context = context;
         _log = context.logManager().getLog(DecayingBloomFilter.class);
         _entryBytes = entryBytes;
+        _name = name;
         // this is instantiated in four different places, they may have different
         // requirements, but for now use this as a gross method of memory reduction.
         // m == 23 => 1MB each BloomSHA1 (4 pairs = 8MB total)
@@ -67,6 +82,17 @@ public class DecayingBloomFilter {
         _decayEvent = new DecayEvent();
         _keepDecaying = true;
         SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
+        if (_log.shouldLog(Log.WARN))
+           _log.warn("New DBF " + name + " m = " + m + " entryBytes = " + entryBytes +
+                     " numExtenders = " + numExtenders + " cycle (s) = " + (durationMs / 1000));
+        // try to get a handle on memory usage vs. false positives
+        context.statManager().createRateStat("router.decayingBloomFilter." + name + ".size",
+             "Size", "Router", new long[] { Math.max(60*1000, durationMs) });
+        context.statManager().createRateStat("router.decayingBloomFilter." + name + ".dups",
+             "1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) });
+        context.statManager().createRateStat("router.decayingBloomFilter." + name + ".log10(falsePos)",
+             "log10 of the false positive rate (must have net.i2p.util.DecayingBloomFilter=DEBUG)",
+             "Router", new long[] { Math.max(60*1000, durationMs) });
     }
     
     public long getCurrentDuplicateCount() { return _currentDuplicates; }
@@ -196,9 +222,12 @@ public class DecayingBloomFilter {
     private void decay() {
         int currentCount = 0;
         long dups = 0;
+        double fpr = 0d;
         synchronized (this) {
             BloomSHA1 tmp = _previous;
             currentCount = _current.size();
+            if (_log.shouldLog(Log.DEBUG) && currentCount > 0)
+                fpr = _current.falsePositives();
             _previous = _current;
             _current = tmp;
             _current.clear();
@@ -206,8 +235,19 @@ public class DecayingBloomFilter {
             _currentDuplicates = 0;
         }
         if (_log.shouldLog(Log.DEBUG))
-            _log.debug("Decaying the filter after inserting " + currentCount 
-                       + " elements and " + dups + " false positives");
+            _log.debug("Decaying the filter " + _name + " after inserting " + currentCount 
+                       + " elements and " + dups + " false positives with FPR = " + fpr);
+        _context.statManager().addRateData("router.decayingBloomFilter." + _name + ".size",
+                                           currentCount, 0);
+        if (currentCount > 0)
+            _context.statManager().addRateData("router.decayingBloomFilter." + _name + ".dups",
+                                               1000l*1000*dups/currentCount, 0);
+        if (fpr > 0d) {
+            // only if log.shouldLog(Log.DEBUG) ...
+            long exponent = (long) Math.log10(fpr);
+            _context.statManager().addRateData("router.decayingBloomFilter." + _name + ".log10(falsePos)",
+                                               exponent, 0);
+        }
     }
     
     private class DecayEvent implements SimpleTimer.TimedEvent {
@@ -219,18 +259,27 @@ public class DecayingBloomFilter {
         }
     }
     
+    /**
+     *  Theoretical false positive rate for   16 KBps: 1.17E-21
+     *  Theoretical false positive rate for   24 KBps: 9.81E-20
+     *  Theoretical false positive rate for   32 KBps: 2.24E-18
+     *  Theoretical false positive rate for  256 KBps: 7.45E-9
+     *  Theoretical false positive rate for  512 KBps: 5.32E-6
+     *  Theoretical false positive rate for 1024 KBps: 1.48E-3
+     */
     public static void main(String args[]) {
         int kbps = 256;
-        int iterations = 100;
+        int iterations = 10;
         testByLong(kbps, iterations);
         testByBytes(kbps, iterations);
     }
-    public static void testByLong(int kbps, int numRuns) {
+    private static void testByLong(int kbps, int numRuns) {
         int messages = 60 * 10 * kbps;
         Random r = new Random();
         DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 8);
         int falsePositives = 0;
         long totalTime = 0;
+        double fpr = 0d;
         for (int j = 0; j < numRuns; j++) {
             long start = System.currentTimeMillis();
             for (int i = 0; i < messages; i++) {
@@ -240,15 +289,17 @@ public class DecayingBloomFilter {
                 }
             }
             totalTime += System.currentTimeMillis() - start;
+            fpr = filter.getFalsePositiveRate();
             filter.clear();
         }
         filter.stopDecaying();
+        System.out.println("False postive rate should be " + fpr);
         System.out.println("After " + numRuns + " runs pushing " + messages + " entries in "
                            + DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
                            + falsePositives + " false positives");
 
     }
-    public static void testByBytes(int kbps, int numRuns) {
+    private static void testByBytes(int kbps, int numRuns) {
         byte iv[][] = new byte[60*10*kbps][16];
         Random r = new Random();
         for (int i = 0; i < iv.length; i++)
@@ -257,18 +308,21 @@ public class DecayingBloomFilter {
         DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 16);
         int falsePositives = 0;
         long totalTime = 0;
+        double fpr = 0d;
         for (int j = 0; j < numRuns; j++) {
             long start = System.currentTimeMillis();
             for (int i = 0; i < iv.length; i++) {
                 if (filter.add(iv[i])) {
                     falsePositives++;
-                    System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")");
+                    System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")");
                 }
             }
             totalTime += System.currentTimeMillis() - start;
+            fpr = filter.getFalsePositiveRate();
             filter.clear();
         }
         filter.stopDecaying();
+        System.out.println("False postive rate should be " + fpr);
         System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in "
                            + DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
                            + falsePositives + " false positives");
diff --git a/core/java/src/net/i2p/util/DecayingHashSet.java b/core/java/src/net/i2p/util/DecayingHashSet.java
new file mode 100644
index 0000000000..a72b6b9e2e
--- /dev/null
+++ b/core/java/src/net/i2p/util/DecayingHashSet.java
@@ -0,0 +1,380 @@
+package net.i2p.util;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.Random;
+
+import net.i2p.I2PAppContext;
+import net.i2p.data.DataHelper;
+
+
+/**
+ * Double buffered hash set.
+ * Since DecayingBloomFilter was instantiated 4 times for a total memory usage  
+ * of 8MB, it seemed like we could do a lot better, given these usage stats
+ * on a class L router:
+ *
+ *      ./router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java:
+ *           32 bytes, peak 10 entries in 1m
+ *
+ *      ./router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java:
+ *           4 bytes, peak 150 entries in 10s
+ *
+ *      ./router/java/src/net/i2p/router/MessageValidator.java:
+ *           8 bytes, peak 1K entries in 2m
+ *
+ *      ./router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java:
+ *           16 bytes, peak 15K entries in 10m
+ *
+ * If the ArrayWrapper object in the HashSet is 50 bytes, and BloomSHA1(23, 11) is 1MB,
+ * then for less than 20K entries this is smaller.
+ * And this uses space proportional to traffiic, so it doesn't penalize small routers
+ * with a fixed 8MB.
+ * So let's try it for the first 2 or 3, for now.
+ *
+ * Also, DBF is syncrhonized, and uses SimpleTimer.
+ * Here we use a read/write lock, with synchronization only
+ * when switching double buffers, and we use SimpleScheduler.
+ *
+ * Yes, we could stare at stats all day, and try to calculate an acceptable
+ * false-positive rate for each of the above uses, then estimate the DBF size
+ * required to meet that rate for a given usage. Or even start adjusting the
+ * Bloom filter m and k values on a per-DBF basis. But it's a whole lot easier
+ * to implement something with a zero false positive rate, and uses less memory
+ * for almost all bandwidth classes.
+ *
+ * This has a strictly zero false positive rate for <= 8 byte keys.
+ * For larger keys, it is 1 / (2**64) ~= 5E-20, which is better than
+ * DBF for any entry count greater than about 14K.
+ *
+ * DBF has a zero false negative rate over the period
+ * 2 * durationMs. And a 100% false negative rate beyond that period.
+ * This has the same properties.
+ *
+ * This performs about twice as fast as DBF in the test below.
+ *
+ * @author zzz
+ */
+public class DecayingHashSet extends DecayingBloomFilter {
+    private final I2PAppContext _context;
+    private final Log _log;
+    private ConcurrentHashSet<ArrayWrapper> _current;
+    private ConcurrentHashSet<ArrayWrapper> _previous;
+    private int _durationMs;
+    private int _entryBytes;
+    private volatile boolean _keepDecaying;
+    private final DecayEvent _decayEvent;
+    /** just for logging */
+    private final String _name;
+    /** synchronize against this lock when switching double buffers */
+    private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true);
+    
+   
+    /**
+     * Create a double-buffered hash set that will decay its entries over time.  
+     *
+     * @param durationMs entries last for at least this long, but no more than twice this long
+     * @param entryBytes how large are the entries to be added?  1 to 32 bytes
+     */
+    public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes) {
+        this(context, durationMs, entryBytes, "DHS");
+    }
+
+    /** @param name just for logging / debugging / stats */
+    public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes, String name) {
+        if (entryBytes <= 0 || entryBytes > 32)
+            throw new IllegalArgumentException("Bad size");
+        _context = context;
+        _log = context.logManager().getLog(DecayingHashSet.class);
+        _entryBytes = entryBytes;
+        _name = name;
+        _current = new ConcurrentHashSet(128);
+        _previous = new ConcurrentHashSet(128);
+        _durationMs = durationMs;
+        _currentDuplicates = 0;
+        _decayEvent = new DecayEvent();
+        _keepDecaying = true;
+        SimpleScheduler.getInstance().addEvent(_decayEvent, _durationMs);
+        if (_log.shouldLog(Log.WARN))
+           _log.warn("New DHS " + name + " entryBytes = " + entryBytes +
+                     " cycle (s) = " + (durationMs / 1000));
+        // try to get a handle on memory usage vs. false positives
+        context.statManager().createRateStat("router.decayingHashSet." + name + ".size",
+             "Size", "Router", new long[] { Math.max(60*1000, durationMs) });
+        context.statManager().createRateStat("router.decayingHashSet." + name + ".dups",
+             "1000000 * Duplicates/Size", "Router", new long[] { Math.max(60*1000, durationMs) });
+    }
+    
+    /** unsynchronized but only used for logging elsewhere */
+    @Override
+    public int getInsertedCount() { 
+        return _current.size() + _previous.size(); 
+    }
+    /** pointless, only used for logging elsewhere */
+    @Override
+    public double getFalsePositiveRate() { 
+        if (_entryBytes <= 8)
+            return 0d; 
+        return 1d / Math.pow(2d, 64d);  // 5.4E-20
+    }
+    
+    /** 
+     * @return true if the entry added is a duplicate
+     *
+     */
+    @Override
+    public boolean add(byte entry[], int off, int len) {
+        if (entry == null) 
+            throw new IllegalArgumentException("Null entry");
+        if (len != _entryBytes) 
+            throw new IllegalArgumentException("Bad entry [" + len + ", expected " 
+                                               + _entryBytes + "]");
+        getReadLock();
+        try {
+            return locked_add(entry, off, len, true);
+        } finally { releaseReadLock(); }
+    }
+
+    /** 
+     * @return true if the entry added is a duplicate.  the number of low order 
+     * bits used is determined by the entryBytes parameter used on creation of the
+     * filter.
+     *
+     */
+    @Override
+    public boolean add(long entry) {
+        return add(entry, true);
+    }
+    
+    /** 
+     * @return true if the entry is already known.  this does NOT add the
+     * entry however.
+     *
+     */
+    @Override
+    public boolean isKnown(long entry) {
+        return add(entry, false);
+    }
+
+    private boolean add(long entry, boolean addIfNew) {
+        int len = Math.min(8, _entryBytes);
+        byte[] b = toLong(len, entry);
+        getReadLock();
+        try {
+            return locked_add(b, 0, len, addIfNew);
+        } finally { releaseReadLock(); }
+    }
+    
+    /** from DataHelper, except negative values ok */
+    private static byte[] toLong(int numBytes, long value) {
+        byte target[] = new byte[numBytes];
+        for (int i = 0; i < numBytes; i++)
+            target[numBytes-i-1] = (byte)(value >>> (i*8));
+        return target;
+    }
+    
+    /** so many questions... */
+    private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) {
+        ArrayWrapper w = new ArrayWrapper(entry, offset, len);
+        boolean seen = _current.contains(w);
+        seen = seen || _previous.contains(w);
+        if (seen) {
+            // why increment if addIfNew == false?
+            // why not add to current if only in previous?
+            _currentDuplicates++;
+        } else if (addIfNew) {
+            _current.add(w);
+            // why add to previous?
+            _previous.add(w);
+        }
+        return seen;
+    }
+    
+    @Override
+    public void clear() {
+        _current.clear();
+        _previous.clear();
+        _currentDuplicates = 0;
+    }
+    
+    /** super doesn't call clear, but neither do the users, so it seems like we should here */
+    @Override
+    public void stopDecaying() {
+        _keepDecaying = false;
+        clear();
+    }
+    
+    private void decay() {
+        int currentCount = 0;
+        long dups = 0;
+        if (!getWriteLock())
+            return;
+        try {
+            ConcurrentHashSet<ArrayWrapper> tmp = _previous;
+            currentCount = _current.size();
+            _previous = _current;
+            _current = tmp;
+            _current.clear();
+            dups = _currentDuplicates;
+            _currentDuplicates = 0;
+        } finally { releaseWriteLock(); }
+
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("Decaying the filter " + _name + " after inserting " + currentCount 
+                       + " elements and " + dups + " false positives");
+        _context.statManager().addRateData("router.decayingHashSet." + _name + ".size",
+                                           currentCount, 0);
+        if (currentCount > 0)
+            _context.statManager().addRateData("router.decayingHashSet." + _name + ".dups",
+                                               1000l*1000*dups/currentCount, 0);
+    }
+    
+    /** if decay() ever blows up, we won't reschedule, and will grow unbounded, but it seems unlikely */
+    private class DecayEvent implements SimpleTimer.TimedEvent {
+        public void timeReached() {
+            if (_keepDecaying) {
+                decay();
+                SimpleScheduler.getInstance().addEvent(DecayEvent.this, _durationMs);
+            }
+        }
+    }
+
+    private void getReadLock() {
+        _reorganizeLock.readLock().lock();
+    }
+
+    private void releaseReadLock() {
+        _reorganizeLock.readLock().unlock();
+    }
+
+    /** @return true if the lock was acquired */
+    private boolean getWriteLock() {
+        try {
+            boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS);
+            if (!rv)
+                _log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats"));
+            return rv;
+        } catch (InterruptedException ie) {}
+        return false;
+    }
+
+    private void releaseWriteLock() {
+        _reorganizeLock.writeLock().unlock();
+    }
+
+    /**
+     *  This saves the data as-is if the length is <= 8 bytes,
+     *  otherwise it stores an 8-byte hash.
+     *  Hash function is from DataHelper, modded to get
+     *  the maximum entropy given the length of the data.
+     */
+    private static class ArrayWrapper {
+        private long _longhashcode;
+        public ArrayWrapper(byte[] b, int offset, int len) {
+            int idx = offset;
+            int shift = Math.min(8, 64 / len);
+            for (int i = 0; i < len; i++) {
+                // xor better than + in tests
+                _longhashcode ^= (((long) b[idx++]) << (i * shift));
+            }
+        }
+
+        public int hashCode() {
+             return (int) _longhashcode;
+        }
+
+        public long longHashCode() {
+             return _longhashcode;
+        }
+
+        public boolean equals(Object o) {
+             if (o == null || !(o instanceof ArrayWrapper))
+                 return false;
+             return ((ArrayWrapper) o).longHashCode() == _longhashcode;
+        }
+    }
+
+    /**
+     *  vs. DBF, this measures 1.93x faster for testByLong and 2.46x faster for testByBytes.
+     */
+    public static void main(String args[]) {
+        /** KBytes per sec, 1 message per KByte */
+        int kbps = 256;
+        int iterations = 10;
+        //testSize();
+        testByLong(kbps, iterations);
+        testByBytes(kbps, iterations);
+    }
+
+    /** and the answer is: 49.9 bytes. The ArrayWrapper alone measured 16, so that's 34 for the HashSet entry. */
+/*****
+    private static void testSize() {
+        int qty = 256*1024;
+        byte b[] = new byte[8];
+        Random r = new Random();
+        long old = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+        ConcurrentHashSet foo = new ConcurrentHashSet(qty);
+        for (int i = 0; i < qty; i++) {
+            r.nextBytes(b);
+            foo.add(new ArrayWrapper(b, 0, 8));
+        }
+        long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+        System.out.println("Memory per ArrayWrapper: " + (((double) (used - old)) / qty));
+    }
+*****/
+
+    /** 8 bytes, simulate the router message validator */
+    private static void testByLong(int kbps, int numRuns) {
+        int messages = 60 * 10 * kbps;
+        Random r = new Random();
+        DecayingBloomFilter filter = new DecayingHashSet(I2PAppContext.getGlobalContext(), 600*1000, 8);
+        int falsePositives = 0;
+        long totalTime = 0;
+        for (int j = 0; j < numRuns; j++) {
+            long start = System.currentTimeMillis();
+            for (int i = 0; i < messages; i++) {
+                if (filter.add(r.nextLong())) {
+                    falsePositives++;
+                    System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")");
+                }
+            }
+            totalTime += System.currentTimeMillis() - start;
+            filter.clear();
+        }
+        System.out.println("False postive rate should be " + filter.getFalsePositiveRate());
+        filter.stopDecaying();
+        System.out.println("After " + numRuns + " runs pushing " + messages + " entries in "
+                           + DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
+                           + falsePositives + " false positives");
+
+    }
+
+    /** 16 bytes, simulate the tunnel IV validator */
+    private static void testByBytes(int kbps, int numRuns) {
+        byte iv[][] = new byte[60*10*kbps][16];
+        Random r = new Random();
+        for (int i = 0; i < iv.length; i++)
+            r.nextBytes(iv[i]);
+
+        DecayingBloomFilter filter = new DecayingHashSet(I2PAppContext.getGlobalContext(), 600*1000, 16);
+        int falsePositives = 0;
+        long totalTime = 0;
+        for (int j = 0; j < numRuns; j++) {
+            long start = System.currentTimeMillis();
+            for (int i = 0; i < iv.length; i++) {
+                if (filter.add(iv[i])) {
+                    falsePositives++;
+                    System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")");
+                }
+            }
+            totalTime += System.currentTimeMillis() - start;
+            filter.clear();
+        }
+        System.out.println("False postive rate should be " + filter.getFalsePositiveRate());
+        filter.stopDecaying();
+        System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in "
+                           + DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
+                           + falsePositives + " false positives");
+    }
+}
diff --git a/router/java/src/net/i2p/router/MessageValidator.java b/router/java/src/net/i2p/router/MessageValidator.java
index a5c0a82175..b673702a1f 100644
--- a/router/java/src/net/i2p/router/MessageValidator.java
+++ b/router/java/src/net/i2p/router/MessageValidator.java
@@ -1,6 +1,7 @@
 package net.i2p.router;
 
 import net.i2p.util.DecayingBloomFilter;
+import net.i2p.util.DecayingHashSet;
 import net.i2p.util.Log;
 
 /**
@@ -95,7 +96,7 @@ public class MessageValidator {
     }
     
     public void startup() {
-        _filter = new DecayingBloomFilter(_context, (int)Router.CLOCK_FUDGE_FACTOR * 2, 8);
+        _filter = new DecayingHashSet(_context, (int)Router.CLOCK_FUDGE_FACTOR * 2, 8, "RouterMV");
     }
     
     void shutdown() {
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
index 9315a9ff03..04c2a3184f 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
@@ -5,6 +5,7 @@ import java.util.Map;
 import net.i2p.data.Hash;
 import net.i2p.router.RouterContext;
 import net.i2p.util.DecayingBloomFilter;
+import net.i2p.util.DecayingHashSet;
 import net.i2p.util.Log;
 
 /**
@@ -52,7 +53,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
         // may want to extend the DecayingBloomFilter so we can use a smaller 
         // array size (currently its tuned for 10 minute rates for the 
         // messageValidator)
-        _recentlyCompletedMessages = new DecayingBloomFilter(_context, DECAY_PERIOD, 4);
+        _recentlyCompletedMessages = new DecayingHashSet(_context, DECAY_PERIOD, 4, "UDPIMF");
         _ackSender.startup();
         _messageReceiver.startup();
     }
diff --git a/router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java b/router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java
index 5248944e68..2b05d0d200 100644
--- a/router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java
+++ b/router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java
@@ -1,10 +1,11 @@
 package net.i2p.router.tunnel;
 
-import net.i2p.I2PAppContext;
 import net.i2p.data.ByteArray;
 import net.i2p.data.DataHelper;
+import net.i2p.router.RouterContext;
 import net.i2p.util.ByteCache;
 import net.i2p.util.DecayingBloomFilter;
+import net.i2p.util.DecayingHashSet;
 
 /**
  * Manage the IV validation for all of the router's tunnels by way of a big
@@ -12,7 +13,7 @@ import net.i2p.util.DecayingBloomFilter;
  *
  */
 public class BloomFilterIVValidator implements IVValidator {
-    private I2PAppContext _context;
+    private RouterContext _context;
     private DecayingBloomFilter _filter;
     private ByteCache _ivXorCache = ByteCache.getInstance(32, HopProcessor.IV_LENGTH);
     
@@ -23,9 +24,17 @@ public class BloomFilterIVValidator implements IVValidator {
      *
      */
     private static final int HALFLIFE_MS = 10*60*1000;
-    public BloomFilterIVValidator(I2PAppContext ctx, int KBps) {
+    private static final int MIN_SHARE_KBPS_TO_USE_BLOOM = 64;
+
+    public BloomFilterIVValidator(RouterContext ctx, int KBps) {
         _context = ctx;
-        _filter = new DecayingBloomFilter(ctx, HALFLIFE_MS, 16);
+        // Select the filter based on share bandwidth.
+        // Note that at rates approaching 1MB, we need to do something else,
+        // as the Bloom filter false positive rates approach 0.1%. FIXME
+        if (getShareBandwidth(ctx) < MIN_SHARE_KBPS_TO_USE_BLOOM)
+            _filter = new DecayingHashSet(ctx, HALFLIFE_MS, 16, "TunnelIVV"); // appx. 4MB max
+        else
+            _filter = new DecayingBloomFilter(ctx, HALFLIFE_MS, 16, "TunnelIVV");  // 2MB fixed
         ctx.statManager().createRateStat("tunnel.duplicateIV", "Note that a duplicate IV was received", "Tunnels", 
                                          new long[] { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
     }
@@ -39,4 +48,11 @@ public class BloomFilterIVValidator implements IVValidator {
         return !dup; // return true if it is OK, false if it isn't
     }
     public void destroy() { _filter.stopDecaying(); }
+
+    private static int getShareBandwidth(RouterContext ctx) {
+        int irateKBps = ctx.bandwidthLimiter().getInboundKBytesPerSecond();
+        int orateKBps = ctx.bandwidthLimiter().getOutboundKBytesPerSecond();
+        double pct = ctx.router().getSharePercentage();
+        return (int) (pct * Math.min(irateKBps, orateKBps));
+    }
 }
diff --git a/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java b/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java
index a578ee08ac..ab6aa4cab3 100644
--- a/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java
+++ b/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java
@@ -10,6 +10,7 @@ import net.i2p.data.SessionKey;
 import net.i2p.data.i2np.BuildRequestRecord;
 import net.i2p.data.i2np.TunnelBuildMessage;
 import net.i2p.util.DecayingBloomFilter;
+import net.i2p.util.DecayingHashSet;
 import net.i2p.util.Log;
 
 /**
@@ -22,7 +23,7 @@ public class BuildMessageProcessor {
     private DecayingBloomFilter _filter;
     
     public BuildMessageProcessor(I2PAppContext ctx) {
-        _filter = new DecayingBloomFilter(ctx, 60*1000, 32);
+        _filter = new DecayingHashSet(ctx, 60*1000, 32, "TunnelBMP");
         ctx.statManager().createRateStat("tunnel.buildRequestDup", "How frequently we get dup build request messages", "Tunnels", new long[] { 60*1000, 10*60*1000 });
     }
     /**
-- 
GitLab