Transports: Add new inbound connection throttler

To limit rate increase of inbound conns
Add exemption system so inbound tunnel builds bypass the throttler
This commit is contained in:
zzz
2023-02-16 10:26:14 -05:00
parent 5918613ff4
commit b44cb59a48
9 changed files with 321 additions and 94 deletions

View File

@@ -1,8 +1,29 @@
2023-02-16 zzz
* Transports: Add new inbound connection throttler
2023-02-15 zzz
* Router:
- Disable Sybil analysis when in test mode
- Store feed blocklist in main array for efficiency
* Tunnels: Re-enable using U routers in some expl. tunnels
2023-02-13 zzz
* Profiles:
- Change the new-router bonus to a penalty
- Remove unused failing peers map
2023-02-12 zzz
* Console: Reduce max age of displayed profiles
* NetDB:
- Don't create profile unless peer is reachable
- Store handler updates
* Profiles: Limit storage and memory usage
* Profiles:
- Adjust capacity for send success/failure
- Don't create new profile on message or lookup failures
- Downrate capacity of slow/unreachable peers
- Limit storage and memory usage
* Router: Increase min version for tunnels and netdb to 0.9.51
* Tunnels: Don't build through U routers
2023-02-11 zzz
* NetDB:

View File

@@ -722,6 +722,7 @@ public class Blocklist {
*
* @param ip IPv4 or IPv6
* @param source for logging only, may be null
* @return true if added
* @since 0.9.57
*/
public void add(byte ip[], String source) {
@@ -738,6 +739,8 @@ public class Blocklist {
}
}
rv = add(toInt(ip));
if (rv)
_context.commSystem().removeExemption(Addresses.toString(ip));
} else if (ip.length == 16) {
if (!_haveIPv6)
return;
@@ -752,6 +755,8 @@ public class Blocklist {
}
}
rv = add(new BigInteger(1, ip));
if (rv)
_context.commSystem().removeExemption(Addresses.toCanonicalString(ip));
} else {
return;
}

View File

@@ -225,6 +225,27 @@ public abstract class CommSystemFacade implements Service {
*/
public void initGeoIP() {}
/**
* Exempt this router hash from any incoming throttles or rejections
*
* @since 0.9.58
*/
public void exemptIncoming(Hash peer) {}
/**
* Is this IP exempt from any incoming throttles or rejections
*
* @since 0.9.58
*/
public boolean isExemptIncoming(String ip) { return false; }
/**
* Remove this IP from the exemptions
*
* @since 0.9.58
*/
public void removeExemption(String ip) {}
/*
* Reachability status codes
*

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Git";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 8;
public final static long BUILD = 9;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -12,6 +12,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -32,7 +33,9 @@ import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.router.util.EventLog;
import net.i2p.util.Addresses;
import net.i2p.util.AddressType;
import net.i2p.util.ArraySet;
import net.i2p.util.I2PThread;
import net.i2p.util.LHMCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
@@ -44,6 +47,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
private final RouterContext _context;
private final TransportManager _manager;
private final GeoIP _geoIP;
private final Map<String, Object> _exemptIncoming;
private volatile boolean _netMonitorStatus;
private boolean _wasStarted;
@@ -55,6 +59,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
private static final String BUNDLE_NAME = "net.i2p.router.web.messages";
private static final String COUNTRY_BUNDLE_NAME = "net.i2p.router.countries.messages";
private static final Object DUMMY = Integer.valueOf(0);
public CommSystemFacadeImpl(RouterContext context) {
_context = context;
@@ -63,6 +68,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
_netMonitorStatus = true;
_geoIP = new GeoIP(_context);
_manager = new TransportManager(_context);
_exemptIncoming = new LHMCache<String, Object>(128);
}
public synchronized void startup() {
@@ -359,6 +365,62 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
_manager.externalAddressRemoved(Transport.AddressSource.SOURCE_SSU, ipv6);
}
/**
* Exempt this router hash from any incoming throttles or rejections
*
* @since 0.9.58
*/
@Override
public void exemptIncoming(Hash peer) {
if (_manager.isEstablished(peer))
return;
RouterInfo ri = (RouterInfo) _context.netDb().lookupLocallyWithoutValidation(peer);
if (ri == null)
return;
Collection<RouterAddress> addrs = ri.getAddresses();
ArraySet<String> ips = new ArraySet<String>(addrs.size());
for (RouterAddress addr : addrs) {
String ip = addr.getHost();
if (ip == null)
continue;
// Add IPv6 even if we don't have an address, not worth the check
ips.add(Addresses.toCanonicalString(ip));
}
int sz = ips.size();
if (sz > 0) {
synchronized(_exemptIncoming) {
for (int i = 0; i < sz; i++) {
_exemptIncoming.put(ips.get(i), DUMMY);
}
}
}
}
/**
* Is this IP exempt from any incoming throttles or rejections
*
* @param ip canonical string
* @since 0.9.58
*/
@Override
public boolean isExemptIncoming(String ip) {
synchronized(_exemptIncoming) {
return _exemptIncoming.containsKey(ip);
}
}
/**
* Remove this IP from the exemptions
*
* @param ip canonical string
* @since 0.9.58
*/
public void removeExemption(String ip) {
synchronized(_exemptIncoming) {
_exemptIncoming.remove(ip);
}
}
/**
* Pluggable transports. Not for NTCP or SSU.
*

View File

@@ -24,12 +24,14 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.router.RouterAddress;
import net.i2p.data.router.RouterIdentity;
import net.i2p.router.CommSystemFacade.Status;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.stat.Rate;
import net.i2p.stat.RateAverages;
import net.i2p.stat.RateStat;
import net.i2p.util.TryCache;
import net.i2p.util.Addresses;
import net.i2p.util.ConcurrentHashSet;
@@ -54,7 +56,7 @@ class EventPumper implements Runnable {
private final Queue<ServerSocketChannel> _wantsRegister = new ConcurrentLinkedQueue<ServerSocketChannel>();
private final Queue<NTCPConnection> _wantsConRegister = new ConcurrentLinkedQueue<NTCPConnection>();
private final NTCPTransport _transport;
private final ObjectCounter<ByteArray> _blockedIPs;
private final ObjectCounter<String> _blockedIPs;
private long _expireIdleWriteTime;
private static final boolean _useDirect = false;
private final boolean _nodelay;
@@ -124,13 +126,14 @@ class EventPumper implements Runnable {
_log = ctx.logManager().getLog(getClass());
_transport = transport;
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
_blockedIPs = new ObjectCounter<ByteArray>();
_blockedIPs = new ObjectCounter<String>();
_context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} );
//_context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.zeroRead", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.zeroReadDrop", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.dropInboundNoMessage", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRequiredRateStat("ntcp.inboundConn", "Inbound NTCP Connection", "ntcp", new long[] { 60*1000L } );
_nodelay = ctx.getBooleanPropertyDefaultTrue(PROP_NODELAY);
}
@@ -492,31 +495,39 @@ class EventPumper implements Runnable {
return;
chan.configureBlocking(false);
if (!_transport.allowConnection()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session request but at connection limit: " + chan.socket().getInetAddress());
try { chan.close(); } catch (IOException ioe) { }
return;
}
byte[] ip = chan.socket().getInetAddress().getAddress();
String ba = Addresses.toString(ip);
if (_context.blocklist().isBlocklisted(ip)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session request from blocklisted IP: " + chan.socket().getInetAddress());
_log.warn("Receive session request from blocklisted IP: " + ba);
try { chan.close(); } catch (IOException ioe) { }
return;
}
if (!_context.commSystem().isExemptIncoming(Addresses.toCanonicalString(ba))) {
if (!_transport.allowConnection()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session request but at connection limit: " + ba);
try { chan.close(); } catch (IOException ioe) { }
return;
}
int count = _blockedIPs.count(ba);
if (count > 0) {
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("Blocking accept of IP with count " + count + ": " + ba);
_context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
try { chan.close(); } catch (IOException ioe) { }
return;
}
if (!shouldAllowInboundEstablishment()) {
try { chan.close(); } catch (IOException ioe) { }
return;
}
}
ByteArray ba = new ByteArray(ip);
int count = _blockedIPs.count(ba);
if (count > 0) {
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("Blocking accept of IP with count " + count + ": " + Addresses.toString(ip));
_context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
try { chan.close(); } catch (IOException ioe) { }
return;
}
_context.statManager().addRateData("ntcp.inboundConn", 1);
if (shouldSetKeepAlive(chan))
chan.socket().setKeepAlive(true);
@@ -531,6 +542,60 @@ class EventPumper implements Runnable {
_log.error("Error accepting", ioe);
}
}
/**
* Should we allow another inbound establishment?
* Used to throttle outbound hole punches.
* @since 0.9.2
*/
private boolean shouldAllowInboundEstablishment() {
RateStat rs = _context.statManager().getRate("ntcp.inboundConn");
if (rs == null)
return true;
Rate r = rs.getRate(60*1000);
if (r == null)
return true;
int last;
long periodStart;
RateAverages ra = RateAverages.getTemp();
synchronized(r) {
last = (int) r.getLastEventCount();
periodStart = r.getLastCoalesceDate();
r.computeAverages(ra, true);
}
// compare incoming conns per ms, min of 1 per second or 60/minute
if (last < 15)
last = 15;
int total = (int) ra.getTotalEventCount();
int current = total - last;
if (current <= 0)
return true;
// getLastEventCount() is normalized to the rate, so we use the canonical period
int lastPeriod = 60*1000;
double avg = ra.getAverage();
int currentTime = (int) (_context.clock().now() - periodStart);
if (currentTime <= 5*1000)
return true;
// compare incoming conns per ms
// both of these are scaled by actual period in coalesce
float lastRate = last / (float) lastPeriod;
float currentRate = (float) (current / (double) currentTime);
float factor = _transport.haveCapacity(95) ? 1.05f : 0.95f;
float minThresh = factor * lastRate;
if (currentRate > minThresh) {
// chance in 128
// max out at about 25% over the last rate
int probAccept = Math.max(1, ((int) (4 * 128 * currentRate / minThresh)) - 512);
if (probAccept >= 128 || _context.random().nextInt(128) < probAccept) {
if (_log.shouldWarn())
_log.warn("Probabalistic drop incoming (p=" + probAccept +
"/128 last rate " + last + "/min current rate " +
(int) (currentRate * 60*1000));
return false;
}
}
return true;
}
private void processConnect(SelectionKey key) {
final NTCPConnection con = (NTCPConnection)key.attachment();
@@ -605,10 +670,10 @@ class EventPumper implements Runnable {
int count;
if (addr != null) {
byte[] ip = addr.getAddress();
ByteArray ba = new ByteArray(ip);
String ba = Addresses.toString(ip);
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
_log.warn("EOF on inbound before receiving any, blocking IP " + ba + " with count " + count + ": " + con);
} else {
count = 1;
if (_log.shouldLog(Log.WARN))
@@ -684,10 +749,10 @@ class EventPumper implements Runnable {
byte[] ip = con.getRemoteIP();
int count;
if (ip != null) {
ByteArray ba = new ByteArray(ip);
String ba = Addresses.toString(ip);
count = _blockedIPs.increment(ba);
if (_log.shouldLog(Log.WARN))
_log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con, ioe);
_log.warn("Blocking IP " + ba + " with count " + count + ": " + con, ioe);
} else {
count = 1;
if (_log.shouldLog(Log.WARN))
@@ -921,7 +986,7 @@ class EventPumper implements Runnable {
public void blockIP(byte[] ip) {
if (ip == null)
return;
ByteArray ba = new ByteArray(ip);
String ba = Addresses.toString(ip);
_blockedIPs.increment(ba);
}

View File

@@ -48,6 +48,7 @@ import static net.i2p.router.transport.udp.SSU2Util.*;
import net.i2p.router.util.DecayingHashSet;
import net.i2p.router.util.DecayingBloomFilter;
import net.i2p.stat.Rate;
import net.i2p.stat.RateAverages;
import net.i2p.stat.RateStat;
import net.i2p.util.Addresses;
import net.i2p.util.HexDump;
@@ -248,6 +249,7 @@ class EstablishmentManager {
_context.statManager().createRateStat("udp.dupDHX", "Session request replay", "udp", new long[] { 24*60*60*1000L } );
if (_enableSSU2)
_context.statManager().createRequiredRateStat("udp.inboundTokenLifetime", "SSU2 token lifetime (ms)", "udp", new long[] { 5*60*1000L } );
_context.statManager().createRequiredRateStat("udp.inboundConn", "Inbound UDP Connection", "udp", new long[] { 60*1000L } );
}
public synchronized void startup() {
@@ -575,11 +577,58 @@ class EstablishmentManager {
/**
* Should we allow another inbound establishment?
* Used to throttle outbound hole punches.
*
* @since 0.9.2
*/
public boolean shouldAllowInboundEstablishment() {
return _inboundStates.size() < getMaxInboundEstablishers();
if (_inboundStates.size() >= getMaxInboundEstablishers())
return false;
RateStat rs = _context.statManager().getRate("udp.inboundConn");
if (rs == null)
return true;
Rate r = rs.getRate(60*1000);
if (r == null)
return true;
int last;
long periodStart;
RateAverages ra = RateAverages.getTemp();
synchronized(r) {
last = (int) r.getLastEventCount();
periodStart = r.getLastCoalesceDate();
r.computeAverages(ra, true);
}
// compare incoming conns per ms, min of 1 per second or 60/minute
if (last < 15)
last = 15;
int total = (int) ra.getTotalEventCount();
int current = total - last;
if (current <= 0)
return true;
// getLastEventCount() is normalized to the rate, so we use the canonical period
int lastPeriod = 60*1000;
double avg = ra.getAverage();
int currentTime = (int) (_context.clock().now() - periodStart);
if (currentTime <= 5*1000)
return true;
// compare incoming conns per ms
// both of these are scaled by actual period in coalesce
float lastRate = last / (float) lastPeriod;
float currentRate = (float) (current / (double) currentTime);
float factor = _transport.haveCapacity(95) ? 1.05f : 0.95f;
float minThresh = factor * lastRate;
if (currentRate > minThresh) {
// chance in 128
// max out at about 25% over the last rate
int probAccept = Math.max(1, ((int) (4 * 128 * currentRate / minThresh)) - 512);
if (probAccept >= 128 || _context.random().nextInt(128) < probAccept) {
if (_log.shouldWarn())
_log.warn("Probabalistic drop incoming (p=" + probAccept +
"/128 last rate " + last + "/min current rate " +
(int) (currentRate * 60*1000));
return false;
}
}
return true;
}
/**
@@ -590,7 +639,8 @@ class EstablishmentManager {
* @param state as looked up in PacketHandler, but probably null unless retransmitted
*/
void receiveSessionRequest(RemoteHostId from, InboundEstablishState state, UDPPacketReader reader) {
if (!TransportUtil.isValidPort(from.getPort()) || !_transport.isValid(from.getIP())) {
byte[] fromIP = from.getIP();
if (!TransportUtil.isValidPort(from.getPort()) || !_transport.isValid(fromIP)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session request from invalid: " + from);
return;
@@ -601,46 +651,36 @@ class EstablishmentManager {
if (state == null)
state = _inboundStates.get(from);
if (state == null) {
// TODO this is insufficient to prevent DoSing, especially if
// IP spoofing is used. For further study.
if (!shouldAllowInboundEstablishment()) {
if (_log.shouldLog(Log.WARN)) {
_log.warn("Dropping inbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH);
if (_log.shouldDebug()) {
StringBuilder buf = new StringBuilder(4096);
buf.append("Active: ").append(_inboundStates.size()).append('\n');
for (InboundEstablishState ies : _inboundStates.values()) {
buf.append(ies.toString()).append('\n');
}
_log.debug(buf.toString());
}
}
_context.statManager().addRateData("udp.establishDropped", 1);
return; // drop the packet
}
if (_context.blocklist().isBlocklisted(from.getIP())) {
if (_context.blocklist().isBlocklisted(fromIP)) {
if (_log.shouldInfo())
_log.info("Receive session request from blocklisted IP: " + from);
_context.statManager().addRateData("udp.establishBadIP", 1);
return; // drop the packet
}
synchronized (_inboundBans) {
Long exp = _inboundBans.get(from);
if (exp != null) {
if (exp.longValue() >= _context.clock().now()) {
if (_log.shouldInfo())
_log.info("SSU 1 session request from temp. blocked peer: " + from);
_context.statManager().addRateData("udp.establishBadIP", 1);
return; // drop the packet
}
// expired
_inboundBans.remove(from);
if (!_context.commSystem().isExemptIncoming(Addresses.toString(fromIP))) {
if (!shouldAllowInboundEstablishment()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound establish");
_context.statManager().addRateData("udp.establishDropped", 1);
return; // drop the packet
}
synchronized (_inboundBans) {
Long exp = _inboundBans.get(from);
if (exp != null) {
if (exp.longValue() >= _context.clock().now()) {
if (_log.shouldInfo())
_log.info("SSU 1 session request from temp. blocked peer: " + from);
_context.statManager().addRateData("udp.establishBadIP", 1);
return; // drop the packet
}
// expired
_inboundBans.remove(from);
}
}
if (!_transport.allowConnection())
return; // drop the packet
}
if (!_transport.allowConnection())
return; // drop the packet
byte[] fromIP = from.getIP();
state = new InboundEstablishState(_context, fromIP, from.getPort(),
_transport.getExternalPort(fromIP.length == 16),
_transport.getDHBuilder(),
@@ -653,6 +693,8 @@ class EstablishmentManager {
return; // drop the packet
}
_context.statManager().addRateData("udp.inboundConn", 1);
InboundEstablishState oldState = _inboundStates.putIfAbsent(from, state);
isNew = oldState == null;
if (!isNew)
@@ -693,23 +735,15 @@ class EstablishmentManager {
* @since 0.9.54
*/
void receiveSessionOrTokenRequest(RemoteHostId from, InboundEstablishState2 state, UDPPacket packet) {
if (!TransportUtil.isValidPort(from.getPort()) || !_transport.isValid(from.getIP())) {
byte[] fromIP = from.getIP();
if (!TransportUtil.isValidPort(from.getPort()) || !_transport.isValid(fromIP)) {
if (_log.shouldWarn())
_log.warn("Receive session request from invalid: " + from);
return;
}
boolean isNew = false;
if (state == null) {
// TODO this is insufficient to prevent DoSing, especially if
// IP spoofing is used. For further study.
if (!shouldAllowInboundEstablishment()) {
if (_log.shouldWarn())
_log.warn("Dropping inbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH);
_context.statManager().addRateData("udp.establishDropped", 1);
sendTerminationPacket(from, packet, REASON_LIMITS);
return;
}
if (_context.blocklist().isBlocklisted(from.getIP())) {
if (_context.blocklist().isBlocklisted(fromIP)) {
if (_log.shouldInfo())
_log.info("Receive session request from blocklisted IP: " + from);
_context.statManager().addRateData("udp.establishBadIP", 1);
@@ -718,25 +752,36 @@ class EstablishmentManager {
// else drop the packet
return;
}
synchronized (_inboundBans) {
Long exp = _inboundBans.get(from);
if (exp != null) {
if (exp.longValue() >= _context.clock().now()) {
// this is common, finally get a packet after the IES2 timeout
if (_log.shouldInfo())
_log.info("SSU 2 session request from temp. blocked peer: " + from);
_context.statManager().addRateData("udp.establishBadIP", 1);
// use this code for a temp ban
sendTerminationPacket(from, packet, REASON_MSG1);
return;
}
// expired
_inboundBans.remove(from);
if (!_context.commSystem().isExemptIncoming(Addresses.toString(fromIP))) {
// TODO this is insufficient to prevent DoSing, especially if
// IP spoofing is used. For further study.
if (!shouldAllowInboundEstablishment()) {
if (_log.shouldWarn())
_log.warn("Dropping inbound establish");
_context.statManager().addRateData("udp.establishDropped", 1);
sendTerminationPacket(from, packet, REASON_LIMITS);
return;
}
synchronized (_inboundBans) {
Long exp = _inboundBans.get(from);
if (exp != null) {
if (exp.longValue() >= _context.clock().now()) {
// this is common, finally get a packet after the IES2 timeout
if (_log.shouldInfo())
_log.info("SSU 2 session request from temp. blocked peer: " + from);
_context.statManager().addRateData("udp.establishBadIP", 1);
// use this code for a temp ban
sendTerminationPacket(from, packet, REASON_MSG1);
return;
}
// expired
_inboundBans.remove(from);
}
}
if (!_transport.allowConnection()) {
sendTerminationPacket(from, packet, REASON_LIMITS);
return;
}
}
if (!_transport.allowConnection()) {
sendTerminationPacket(from, packet, REASON_LIMITS);
return;
}
try {
state = new InboundEstablishState2(_context, _transport, packet);
@@ -747,6 +792,8 @@ class EstablishmentManager {
return;
}
_context.statManager().addRateData("udp.inboundConn", 1);
/****
// A token request or session request with a bad token is
// inexpensive to reply to.

View File

@@ -67,7 +67,7 @@ class ClientPeerSelector extends TunnelPeerSelector {
return selectExplicit(settings, length);
Set<Hash> exclude = getExclude(isInbound, false);
Set<Hash> matches = new ArraySet<Hash>(length);
ArraySet<Hash> matches = new ArraySet<Hash>(length);
if (length == 1) {
// closest-hop restrictions
if (checkClosestHop)
@@ -190,6 +190,7 @@ class ClientPeerSelector extends TunnelPeerSelector {
log.warn("CPS SANFP hidden OBEP no active peers found, returning null");
return null;
}
ctx.commSystem().exemptIncoming(matches.get(0));
} else {
ctx.profileOrganizer().selectFastPeers(1, lastHopExclude, matches, randomKey, length == 2 ? SLICE_0_1 : SLICE_0, ipRestriction, ipSet);
}
@@ -267,6 +268,8 @@ class ClientPeerSelector extends TunnelPeerSelector {
if (!checkTunnel(isInbound, false, rv))
rv = null;
}
if (isInbound && rv != null && rv.size() > 1)
ctx.commSystem().exemptIncoming(rv.get(1));
return rv;
}

View File

@@ -166,6 +166,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector {
if (!furthest.isEmpty()) {
furthestHop = furthest.get(0);
exclude.add(furthestHop);
ctx.commSystem().exemptIncoming(furthestHop);
length--;
}
}
@@ -224,6 +225,8 @@ class ExploratoryPeerSelector extends TunnelPeerSelector {
if (!checkTunnel(isInbound, true, rv))
rv = null;
}
if (isInbound && rv != null && rv.size() > 1)
ctx.commSystem().exemptIncoming(rv.get(1));
return rv;
}