From d843646b4f3f39b69739942e9aaa40b6e1051f9b Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Mon, 10 May 2010 14:15:31 +0000
Subject: [PATCH]     * Streaming: Add support for connection throttling

---
 .../i2p/client/streaming/ConnThrottler.java   | 59 +++++++++++++++++++
 .../client/streaming/ConnectionManager.java   | 53 +++++++++++++++--
 .../client/streaming/ConnectionOptions.java   | 46 +++++++++++++++
 3 files changed, 154 insertions(+), 4 deletions(-)
 create mode 100644 apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java
new file mode 100644
index 0000000000..07430e7511
--- /dev/null
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java
@@ -0,0 +1,59 @@
+package net.i2p.client.streaming;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import net.i2p.data.Hash;
+import net.i2p.util.ObjectCounter;
+import net.i2p.util.SimpleScheduler;
+import net.i2p.util.SimpleTimer;
+
+/**
+ * Count how often we have received an incoming connection
+ * This offers basic DOS protection but is not a complete solution.
+ *
+ * @since 0.7.14
+ */
+class ConnThrottler {
+    private final ObjectCounter<Hash> counter;
+    private final int _max;
+    private final int _totalMax;
+    private final AtomicInteger _currentTotal;
+
+    /*
+     * @param max per-peer, 0 for unlimited
+     * @param totalMax for all peers, 0 for unlimited
+     * @param period ms
+     */
+    ConnThrottler(int max, int totalMax, long period) {
+        _max = max;
+        _totalMax = totalMax;
+        if (max > 0) {
+            SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), period);
+            this.counter = new ObjectCounter();
+        } else {
+            this.counter = null;
+        }
+        if (totalMax > 0)
+            _currentTotal = new AtomicInteger();
+        else
+            _currentTotal = null;
+    }
+
+    /** increments before checking */
+    boolean shouldThrottle(Hash h) {
+        if (_totalMax > 0 && _currentTotal.incrementAndGet() > _totalMax)
+            return true;
+        if (_max > 0)
+            return this.counter.increment(h) > _max;
+        return false;
+    }
+
+    private class Cleaner implements SimpleTimer.TimedEvent {
+        public void timeReached() {
+            if (_totalMax > 0)
+                _currentTotal.set(0);
+            if (_max > 0)
+                ConnThrottler.this.counter.clear();
+        }
+    }
+}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index 49cc0476ac..630d84422a 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -10,6 +10,7 @@ import net.i2p.I2PAppContext;
 import net.i2p.I2PException;
 import net.i2p.client.I2PSession;
 import net.i2p.data.Destination;
+import net.i2p.data.Hash;
 import net.i2p.data.SessionKey;
 import net.i2p.util.Log;
 import net.i2p.util.SimpleTimer;
@@ -35,10 +36,14 @@ public class ConnectionManager {
     /** Ping ID (Long) to PingRequest */
     private final Map<Long, PingRequest> _pendingPings;
     private boolean _allowIncoming;
+    private boolean _throttlersInitialized;
     private int _maxConcurrentStreams;
     private ConnectionOptions _defaultOptions;
     private volatile int _numWaiting;
     private long SoTimeout;
+    private ConnThrottler _minuteThrottler;
+    private ConnThrottler _hourThrottler;
+    private ConnThrottler _dayThrottler;
     
     public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
         _context = context;
@@ -106,7 +111,23 @@ public class ConnectionManager {
 
     public void setAllowIncomingConnections(boolean allow) { 
         _connectionHandler.setActive(allow);
+        if (allow && !_throttlersInitialized) {
+            _throttlersInitialized = true;
+            if (_defaultOptions.getMaxConnsPerMinute() > 0 || _defaultOptions.getMaxTotalConnsPerMinute() > 0) {
+               _context.statManager().createRateStat("stream.con.throttledMinute", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
+               _minuteThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute(), 60*1000);
+            }
+            if (_defaultOptions.getMaxConnsPerHour() > 0 || _defaultOptions.getMaxTotalConnsPerHour() > 0) {
+               _context.statManager().createRateStat("stream.con.throttledHour", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
+               _hourThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour(), 60*60*1000);
+            }
+            if (_defaultOptions.getMaxConnsPerDay() > 0 || _defaultOptions.getMaxTotalConnsPerDay() > 0) {
+               _context.statManager().createRateStat("stream.con.throttledDay", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
+               _dayThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay(), 24*60*60*1000);
+            }
+        }
     }
+
     /** @return if we should accept connections */
     public boolean getAllowIncomingConnections() {
         return _connectionHandler.getActive();
@@ -140,8 +161,15 @@ public class ConnectionManager {
                               + _maxConcurrentStreams + " connections");
                 reject = true;
             } else if (shouldRejectConnection(synPacket)) {
-                _log.error("Refusing connection since peer is " +
-                           (_defaultOptions.isAccessListEnabled() ? "not whitelisted: " : "blacklisted: ") +
+                // this may not be right if more than one is enabled
+                String why;
+                if (_defaultOptions.isAccessListEnabled())
+                    why = "not whitelisted: ";
+                else if (_defaultOptions.isBlacklistEnabled())
+                    why = "blacklisted: ";
+                else
+                    why = "throttled: ";
+                _log.error("Refusing connection since peer is " + why +
                            (synPacket.getOptionalFrom() == null ? "null from" : synPacket.getOptionalFrom().calculateHash().toBase64()));
                 reject = true;
             } else { 
@@ -281,11 +309,28 @@ public class ConnectionManager {
         Destination from = syn.getOptionalFrom();
         if (from == null)
             return true;
+        Hash h = from.calculateHash();
+        boolean throttled = false;
+        // always call all 3 to increment all counters
+        if (_minuteThrottler != null && _minuteThrottler.shouldThrottle(h)) {
+            _context.statManager().addRateData("stream.con.throttledMinute", 1, 0);
+            throttled = true;
+        }
+        if (_hourThrottler != null && _hourThrottler.shouldThrottle(h)) {
+            _context.statManager().addRateData("stream.con.throttledHour", 1, 0);
+            throttled = true;
+        }
+        if (_dayThrottler != null && _dayThrottler.shouldThrottle(h)) {
+            _context.statManager().addRateData("stream.con.throttledDay", 1, 0);
+            throttled = true;
+        }
+        if (throttled)
+            return true;
         // if the sig is absent or bad it will be caught later (in CPH)
         if (_defaultOptions.isAccessListEnabled())
-            return !_defaultOptions.getAccessList().contains(from.calculateHash());
+            return !_defaultOptions.getAccessList().contains(h);
         if (_defaultOptions.isBlacklistEnabled())
-            return _defaultOptions.getBlacklist().contains(from.calculateHash());
+            return _defaultOptions.getBlacklist().contains(h);
         return false;
     }
 
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
index da3bd48f8e..139d080d41 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
@@ -40,6 +40,12 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
     private boolean _blackListEnabled;
     private Set<Hash> _accessList;
     private Set<Hash> _blackList;
+    private int _maxConnsPerMinute;
+    private int _maxConnsPerHour;
+    private int _maxConnsPerDay;
+    private int _maxTotalConnsPerMinute;
+    private int _maxTotalConnsPerHour;
+    private int _maxTotalConnsPerDay;
 
     public static final int PROFILE_BULK = 1;
     public static final int PROFILE_INTERACTIVE = 2;
@@ -67,9 +73,17 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
     public static final String PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = "i2p.streaming.congestionAvoidanceGrowthRateFactor";
     public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor";
     public static final String PROP_ANSWER_PINGS = "i2p.streaming.answerPings";
+    /** all of these are @since 0.7.13 */
     public static final String PROP_ENABLE_ACCESS_LIST = "i2cp.enableAccessList";
     public static final String PROP_ENABLE_BLACKLIST = "i2cp.enableBlackList";
     public static final String PROP_ACCESS_LIST = "i2cp.accessList";
+    /** all of these are @since 0.7.14 */
+    public static final String PROP_MAX_CONNS_MIN = "i2p.streaming.maxConnsPerMinute";
+    public static final String PROP_MAX_CONNS_HOUR = "i2p.streaming.maxConnsPerHour";
+    public static final String PROP_MAX_CONNS_DAY = "i2p.streaming.maxConnsPerDay";
+    public static final String PROP_MAX_TOTAL_CONNS_MIN = "i2p.streaming.maxTotalConnsPerMinute";
+    public static final String PROP_MAX_TOTAL_CONNS_HOUR = "i2p.streaming.maxTotalConnsPerHour";
+    public static final String PROP_MAX_TOTAL_CONNS_DAY = "i2p.streaming.maxTotalConnsPerDay";
     
     private static final int TREND_COUNT = 3;
     static final int INITIAL_WINDOW_SIZE = 6;
@@ -222,6 +236,12 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
             setReadTimeout(opts.getReadTimeout());
             setAnswerPings(opts.getAnswerPings());
             initLists(opts);
+            _maxConnsPerMinute = opts.getMaxConnsPerMinute();
+            _maxConnsPerHour = opts.getMaxConnsPerHour();
+            _maxConnsPerDay = opts.getMaxConnsPerDay();
+            _maxTotalConnsPerMinute = opts.getMaxTotalConnsPerMinute();
+            _maxTotalConnsPerHour = opts.getMaxTotalConnsPerHour();
+            _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay();
         }
     }
     
@@ -248,6 +268,12 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
         setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
         setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));
         initLists(opts);
+        _maxConnsPerMinute = getInt(opts, PROP_MAX_CONNS_MIN, 0);
+        _maxConnsPerHour = getInt(opts, PROP_MAX_CONNS_HOUR, 0);
+        _maxConnsPerDay = getInt(opts, PROP_MAX_CONNS_DAY, 0);
+        _maxTotalConnsPerMinute = getInt(opts, PROP_MAX_TOTAL_CONNS_MIN, 0);
+        _maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
+        _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
     }
     
 	@Override
@@ -291,6 +317,18 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
         if (opts.containsKey(PROP_ANSWER_PINGS))
             setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));
         initLists(opts);
+        if (opts.containsKey(PROP_MAX_CONNS_MIN))
+            _maxConnsPerMinute = getInt(opts, PROP_MAX_CONNS_MIN, 0);
+        if (opts.containsKey(PROP_MAX_CONNS_HOUR))
+            _maxConnsPerHour = getInt(opts, PROP_MAX_CONNS_HOUR, 0);
+        if (opts.containsKey(PROP_MAX_CONNS_DAY))
+            _maxConnsPerDay = getInt(opts, PROP_MAX_CONNS_DAY, 0);
+        if (opts.containsKey(PROP_MAX_TOTAL_CONNS_MIN))
+            _maxTotalConnsPerMinute = getInt(opts, PROP_MAX_TOTAL_CONNS_MIN, 0);
+        if (opts.containsKey(PROP_MAX_TOTAL_CONNS_HOUR))
+            _maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
+        if (opts.containsKey(PROP_MAX_TOTAL_CONNS_DAY))
+            _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
     }
     
     /** 
@@ -523,6 +561,14 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
     public int getSlowStartGrowthRateFactor() { return _slowStartGrowthRateFactor; }
     public void setSlowStartGrowthRateFactor(int factor) { _slowStartGrowthRateFactor = factor; }
     
+    /** all of these are @since 0.7.14; no public setters */
+    public int getMaxConnsPerMinute() { return _maxConnsPerMinute; }
+    public int getMaxConnsPerHour() { return _maxConnsPerHour; }
+    public int getMaxConnsPerDay() { return _maxConnsPerDay; }
+    public int getMaxTotalConnsPerMinute() { return _maxConnsPerMinute; }
+    public int getMaxTotalConnsPerHour() { return _maxTotalConnsPerHour; }
+    public int getMaxTotalConnsPerDay() { return _maxTotalConnsPerDay; }
+
     public boolean isAccessListEnabled() { return _accessListEnabled; }
     public boolean isBlacklistEnabled() { return _blackListEnabled; }
     public Set<Hash> getAccessList() { return _accessList; }
-- 
GitLab