propagate from branch 'i2p.i2p.zzz.test4' (head 7b50b6c3d48da68078a86a53e55e2c18f55685e8)

to branch 'i2p.i2p' (head ebce577e19b70c281daacc5277f98e9bb2bb9630)
This commit is contained in:
zzz
2011-03-08 13:31:57 +00:00
183 changed files with 30058 additions and 11421 deletions

View File

@@ -81,7 +81,6 @@ public class InNetMessagePool implements Service {
_shortCircuitGatewayJob = new SharedShortCircuitGatewayJob(context);
}
_log = _context.logManager().getLog(InNetMessagePool.class);
_alive = false;
_context.statManager().createRateStat("inNetPool.dropped", "How often do we drop a message", "InNetPool", new long[] { 60*60*1000l });
_context.statManager().createRateStat("inNetPool.droppedDeliveryStatusDelay", "How long after a delivery status message is created do we receive it back again (for messages that are too slow to be handled)", "InNetPool", new long[] { 60*60*1000l });
_context.statManager().createRateStat("inNetPool.duplicate", "How often do we receive a duplicate message", "InNetPool", new long[] { 60*60*1000l });
@@ -89,12 +88,20 @@ public class InNetMessagePool implements Service {
_context.statManager().createRateStat("inNetPool.droppedDbLookupResponseMessage", "How often we drop a slow-to-arrive db search response", "InNetPool", new long[] { 60*60*1000l });
}
/**
* @return previous builder for this message type, or null
* @throws AIOOBE if i2npMessageType is greater than MAX_I2NP_MESSAGE_TYPE
*/
public HandlerJobBuilder registerHandlerJobBuilder(int i2npMessageType, HandlerJobBuilder builder) {
HandlerJobBuilder old = _handlerJobBuilders[i2npMessageType];
_handlerJobBuilders[i2npMessageType] = builder;
return old;
}
/**
* @return previous builder for this message type, or null
* @throws AIOOBE if i2npMessageType is greater than MAX_I2NP_MESSAGE_TYPE
*/
public HandlerJobBuilder unregisterHandlerJobBuilder(int i2npMessageType) {
HandlerJobBuilder old = _handlerJobBuilders[i2npMessageType];
_handlerJobBuilders[i2npMessageType] = null;
@@ -102,12 +109,14 @@ public class InNetMessagePool implements Service {
}
/**
* Add a new message to the pool, returning the number of messages in the
* pool so that the comm system can throttle inbound messages. If there is
* Add a new message to the pool.
* If there is
* a HandlerJobBuilder for the inbound message type, the message is loaded
* into a job created by that builder and queued up for processing instead
* (though if the builder doesn't create a job, it is added to the pool)
*
* @return -1 for some types of errors but not all; 0 otherwise
* (was queue length, long ago)
*/
public int add(I2NPMessage messageBody, RouterIdentity fromRouter, Hash fromRouterHash) {
long exp = messageBody.getMessageExpiration();

View File

@@ -227,10 +227,14 @@ public class JobQueue {
}
public long getMaxLag() {
// first job is the one that has been waiting the longest
Job j = _readyJobs.peek();
if (j == null) return 0;
// first job is the one that has been waiting the longest
long startAfter = j.getTiming().getStartAfter();
JobTiming jt = j.getTiming();
// PoisonJob timing is null, prevent NPE at shutdown
if (jt == null)
return 0;
long startAfter = jt.getStartAfter();
return _context.clock().now() - startAfter;
}

View File

@@ -29,8 +29,8 @@ import net.i2p.util.Log;
*
*/
public class OutNetMessage {
private Log _log;
private RouterContext _context;
private final Log _log;
private final RouterContext _context;
private RouterInfo _target;
private I2NPMessage _message;
/** cached message class name, for use after we discard the message */
@@ -50,7 +50,7 @@ public class OutNetMessage {
private long _sendBegin;
private long _transmitBegin;
private Exception _createdBy;
private long _created;
private final long _created;
/** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */
private HashMap<String, Long> _timestamps;
/**

View File

@@ -18,24 +18,18 @@ import net.i2p.util.Log;
* that wants to send a message, and the communication subsystem periodically
* retrieves messages for delivery.
*
* Actually, this doesn't 'pool' anything, it calls the comm system directly.
* Nor does it organize by priority. But perhaps it could someday.
*/
public class OutNetMessagePool {
private Log _log;
private RouterContext _context;
private final Log _log;
private final RouterContext _context;
public OutNetMessagePool(RouterContext context) {
_context = context;
_log = _context.logManager().getLog(OutNetMessagePool.class);
}
/**
* Remove the highest priority message, or null if none are available.
*
*/
public OutNetMessage getNext() {
return null;
}
/**
* Add a new message to the pool
*
@@ -47,8 +41,8 @@ public class OutNetMessagePool {
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("Adding outbound message to "
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding outbound message to "
+ msg.getTarget().getIdentity().getHash().toBase64().substring(0,6)
+ " with id " + msg.getMessage().getUniqueId()
+ " expiring on " + msg.getMessage().getMessageExpiration()
@@ -70,7 +64,7 @@ public class OutNetMessagePool {
return false;
}
if (msg.getTarget() == null) {
_log.error("No target in the OutNetMessage: " + msg, new Exception("Definitely a fuckup"));
_log.error("No target in the OutNetMessage: " + msg, new Exception());
return false;
}
if (msg.getPriority() < 0) {
@@ -83,38 +77,4 @@ public class OutNetMessagePool {
}
return true;
}
/**
* Clear any messages that have expired, enqueuing any appropriate jobs
*
*/
public void clearExpired() {
// noop
}
/**
* Retrieve the number of messages, regardless of priority.
*
*/
public int getCount() { return 0; }
/**
* Retrieve the number of messages at the given priority. This can be used for
* subsystems that maintain a pool of messages to be sent whenever there is spare time,
* where all of these 'spare' messages are of the same priority.
*
*/
public int getCount(int priority) { return 0; }
public void dumpPoolInfo() { return; }
private static class ReverseIntegerComparator implements Comparator {
public int compare(Object lhs, Object rhs) {
if ( (lhs == null) || (rhs == null) ) return 0; // invalid, but never used
if ( !(lhs instanceof Integer) || !(rhs instanceof Integer)) return 0;
Integer lv = (Integer)lhs;
Integer rv = (Integer)rhs;
return - (lv.compareTo(rv));
}
}
}

View File

@@ -6,6 +6,7 @@ import java.util.Properties;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.internal.InternalClientManager;
import net.i2p.router.client.ClientManagerFacadeImpl;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
@@ -170,8 +171,20 @@ public class RouterContext extends I2PAppContext {
/** what router is this context working for? */
public Router router() { return _router; }
/** convenience method for querying the router's ident */
public Hash routerHash() { return _router.getRouterInfo().getIdentity().getHash(); }
/**
* Convenience method for getting the router hash.
* Equivalent to context.router().getRouterInfo().getIdentity().getHash()
* @return may be null if called very early
*/
public Hash routerHash() {
if (_router == null)
return null;
RouterInfo ri = _router.getRouterInfo();
if (ri == null)
return null;
return ri.getIdentity().getHash();
}
/**
* How are we coordinating clients for the router?

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 5;
public final static long BUILD = 3;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -46,6 +46,7 @@ class QueuedClientConnectionRunner extends ClientConnectionRunner {
void stopRunning() {
super.stopRunning();
queue.close();
// queue = null;
}
/**

View File

@@ -9,7 +9,7 @@ package net.i2p.router.networkdb.kademlia;
*/
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import net.i2p.data.DatabaseEntry;
@@ -27,8 +27,8 @@ import net.i2p.util.Log;
*
*/
class ExpireLeasesJob extends JobImpl {
private Log _log;
private KademliaNetworkDatabaseFacade _facade;
private final Log _log;
private final KademliaNetworkDatabaseFacade _facade;
private final static long RERUN_DELAY_MS = 1*60*1000;
@@ -39,11 +39,11 @@ class ExpireLeasesJob extends JobImpl {
}
public String getName() { return "Expire Lease Sets Job"; }
public void runJob() {
Set toExpire = selectKeysToExpire();
Set<Hash> toExpire = selectKeysToExpire();
_log.info("Leases to expire: " + toExpire);
for (Iterator iter = toExpire.iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
for (Hash key : toExpire) {
_facade.fail(key);
//_log.info("Lease " + key + " is expiring, so lets look for it again", new Exception("Expire and search"));
//_facade.lookupLeaseSet(key, null, null, RERUN_DELAY_MS);
@@ -57,17 +57,15 @@ class ExpireLeasesJob extends JobImpl {
* don't have any leases that haven't yet passed, even with the CLOCK_FUDGE_FACTOR)
*
*/
private Set selectKeysToExpire() {
Set keys = _facade.getDataStore().getKeys();
Set toExpire = new HashSet(128);
for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
DatabaseEntry obj = _facade.getDataStore().get(key);
private Set<Hash> selectKeysToExpire() {
Set<Hash> toExpire = new HashSet(128);
for (Map.Entry<Hash, DatabaseEntry> entry : _facade.getDataStore().getMapEntries()) {
DatabaseEntry obj = entry.getValue();
if (obj.getType() == DatabaseEntry.KEY_TYPE_LEASESET) {
LeaseSet ls = (LeaseSet)obj;
if (!ls.isCurrent(Router.CLOCK_FUDGE_FACTOR))
toExpire.add(key);
else
toExpire.add(entry.getKey());
else if (_log.shouldLog(Log.DEBUG))
_log.debug("Lease " + ls.getDestination().calculateHash() + " is current, no need to expire");
}
}

View File

@@ -9,7 +9,6 @@ package net.i2p.router.networkdb.kademlia;
*/
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import net.i2p.data.Hash;
@@ -28,8 +27,8 @@ import net.i2p.util.Log;
*
*/
class ExpireRoutersJob extends JobImpl {
private Log _log;
private KademliaNetworkDatabaseFacade _facade;
private final Log _log;
private final KademliaNetworkDatabaseFacade _facade;
/** rerun fairly often, so the fails don't queue up too many netdb searches at once */
private final static long RERUN_DELAY_MS = 120*1000;
@@ -41,11 +40,13 @@ class ExpireRoutersJob extends JobImpl {
}
public String getName() { return "Expire Routers Job"; }
public void runJob() {
Set toExpire = selectKeysToExpire();
_log.info("Routers to expire (drop and try to refetch): " + toExpire);
for (Iterator iter = toExpire.iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
// this always returns an empty set (see below)
Set<Hash> toExpire = selectKeysToExpire();
if (_log.shouldLog(Log.INFO))
_log.info("Routers to expire (drop and try to refetch): " + toExpire);
for (Hash key : toExpire) {
_facade.fail(key);
}
_facade.queueForExploration(toExpire);
@@ -61,9 +62,8 @@ class ExpireRoutersJob extends JobImpl {
*
* @return nothing for now
*/
private Set selectKeysToExpire() {
for (Iterator iter = _facade.getAllRouters().iterator(); iter.hasNext(); ) {
Hash key = (Hash)iter.next();
private Set<Hash> selectKeysToExpire() {
for (Hash key : _facade.getAllRouters()) {
// Don't expire anybody we are connected to
if (!getContext().commSystem().isEstablished(key)) {
// This does a _facade.validate() and fail() which is sufficient...

View File

@@ -226,7 +226,7 @@ public class Reseeder {
List<String> URLList = new ArrayList();
String URLs = _context.getProperty(PROP_RESEED_URL);
boolean defaulted = URLs == null;
boolean SSLDisable = _context.getBooleanProperty(PROP_SSL_DISABLE);
boolean SSLDisable = _context.getBooleanPropertyDefaultTrue(PROP_SSL_DISABLE);
if (defaulted) {
if (SSLDisable)
URLs = DEFAULT_SEED_URL;

View File

@@ -11,7 +11,7 @@ class CapacityCalculator {
private static final I2PAppContext _context = I2PAppContext.getGlobalContext();
/** used to adjust each period so that we keep trying to expand the peer's capacity */
static long GROWTH_FACTOR = 5;
static final long GROWTH_FACTOR = 5;
/** the calculator estimates over a 1 hour period */
private static long ESTIMATE_PERIOD = 60*60*1000;
@@ -83,37 +83,42 @@ class CapacityCalculator {
*
* Let A = accects, R = rejects, F = fails
* @return estimated and adjusted accepts per hour, for the given period
* which is, more or less, max(0, 5 + (A * (A / (A + R))) - (4 * F))
* which is, more or less, max(0, 5 + (A * (A / (A + 2R))) - (4 * F))
*/
private static double estimateCapacity(RateStat acceptStat, RateStat rejectStat, RateStat failedStat, int period) {
Rate curAccepted = acceptStat.getRate(period);
Rate curRejected = rejectStat.getRate(period);
Rate curFailed = failedStat.getRate(period);
long eventCount = 0;
if (curAccepted != null)
double eventCount = 0;
if (curAccepted != null) {
eventCount = curAccepted.getCurrentEventCount() + curAccepted.getLastEventCount();
// Punish for rejections.
// We don't want to simply do eventCount -= rejected or we get to zero with 50% rejection,
// and we don't want everybody to be at zero during times of congestion.
if (eventCount > 0) {
long rejected = curRejected.getCurrentEventCount() + curRejected.getLastEventCount();
eventCount = eventCount * eventCount / (eventCount + rejected);
// Punish for rejections.
// We don't want to simply do eventCount -= rejected or we get to zero with 50% rejection,
// and we don't want everybody to be at zero during times of congestion.
if (eventCount > 0 && curRejected != null) {
long rejected = curRejected.getCurrentEventCount() + curRejected.getLastEventCount();
if (rejected > 0)
eventCount *= eventCount / (eventCount + (2 * rejected));
}
}
double stretch = ((double)ESTIMATE_PERIOD) / period;
double val = eventCount * stretch;
long failed = 0;
// Let's say a failure is 4 times worse than a rejection.
// It's actually much worse than that, but with 2-hop tunnels and a 8-peer
// fast pool, for example, you have a 1/7 chance of being falsely blamed.
// We also don't want to drive everybody's capacity to zero, that isn't helpful.
if (curFailed != null)
failed = (long) (0.5 + (4.0 * (curFailed.getCurrentTotalValue() + curFailed.getLastTotalValue()) / 100.0));
if (failed > 0) {
//if ( (period <= 10*60*1000) && (curFailed.getCurrentEventCount() > 0) )
// return 0.0d; // their tunnels have failed in the last 0-10 minutes
//else
val -= failed * stretch;
if (curFailed != null) {
double failed = curFailed.getCurrentTotalValue() + curFailed.getLastTotalValue();
if (failed > 0) {
//if ( (period <= 10*60*1000) && (curFailed.getCurrentEventCount() > 0) )
// return 0.0d; // their tunnels have failed in the last 0-10 minutes
//else
// .04 = 4.0 / 100.0 adjustment to failed
val -= 0.04 * failed * stretch;
}
}
val += GROWTH_FACTOR;

View File

@@ -16,6 +16,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
@@ -251,7 +252,11 @@ class GeoIP {
*/
private void updateOurCountry() {
String oldCountry = _context.router().getConfigSetting(PROP_IP_COUNTRY);
String country = _context.commSystem().getCountry(_context.routerHash());
Hash ourHash = _context.routerHash();
// we should always have a RouterInfo by now, but we had one report of an NPE here
if (ourHash == null)
return;
String country = _context.commSystem().getCountry(ourHash);
if (country != null && !country.equals(oldCountry)) {
_context.router().setConfigSetting(PROP_IP_COUNTRY, country);
_context.router().saveConfig();

View File

@@ -459,9 +459,9 @@ public class TransportManager implements TransportEventListener {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2NPMessage received: " + message.getClass().getName(), new Exception("Where did I come from again?"));
try {
int num = _context.inNetMessagePool().add(message, fromRouter, fromRouterHash);
_context.inNetMessagePool().add(message, fromRouter, fromRouterHash);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Added to in pool: "+ num);
_log.debug("Added to in pool");
} catch (IllegalArgumentException iae) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error receiving message", iae);

View File

@@ -709,7 +709,8 @@ public class TunnelDispatcher implements Service {
_validator = new BloomFilterIVValidator(_context, getShareBandwidth(_context));
}
private static int getShareBandwidth(RouterContext ctx) {
/** @return in KBps */
public static int getShareBandwidth(RouterContext ctx) {
int irateKBps = ctx.bandwidthLimiter().getInboundKBytesPerSecond();
int orateKBps = ctx.bandwidthLimiter().getOutboundKBytesPerSecond();
double pct = ctx.router().getSharePercentage();

View File

@@ -27,9 +27,9 @@ import net.i2p.util.Log;
*/
class BuildExecutor implements Runnable {
private final ArrayList<Long> _recentBuildIds = new ArrayList(100);
private RouterContext _context;
private Log _log;
private TunnelPoolManager _manager;
private final RouterContext _context;
private final Log _log;
private final TunnelPoolManager _manager;
/** list of TunnelCreatorConfig elements of tunnels currently being built */
private final Object _currentlyBuilding;
/** indexed by ptcc.getReplyMessageId() */
@@ -37,7 +37,7 @@ class BuildExecutor implements Runnable {
/** indexed by ptcc.getReplyMessageId() */
private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _recentlyBuildingMap;
private boolean _isRunning;
private BuildHandler _handler;
private final BuildHandler _handler;
private boolean _repoll;
private static final int MAX_CONCURRENT_BUILDS = 10;
/** accept replies up to a minute after we gave up on them */
@@ -248,8 +248,6 @@ class BuildExecutor implements Runnable {
int pendingRemaining = 0;
//long loopBegin = 0;
//long beforeHandleInboundReplies = 0;
//long afterHandleInboundReplies = 0;
//long afterBuildZeroHop = 0;
long afterBuildReal = 0;
long afterHandleInbound = 0;
@@ -268,10 +266,6 @@ class BuildExecutor implements Runnable {
wanted.add(pool);
}
//beforeHandleInboundReplies = System.currentTimeMillis();
_handler.handleInboundReplies();
//afterHandleInboundReplies = System.currentTimeMillis();
// allowed() also expires timed out requests (for new style requests)
int allowed = allowed();
@@ -327,9 +321,6 @@ class BuildExecutor implements Runnable {
_log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg);
buildTunnel(pool, cfg);
realBuilt++;
// we want replies to go to the top of the queue
_handler.handleInboundReplies();
} else {
i--;
}
@@ -391,10 +382,8 @@ class BuildExecutor implements Runnable {
* This prevents a large number of client pools from starving the exploratory pool.
*
*/
private static class TunnelPoolComparator implements Comparator {
public int compare(Object l, Object r) {
TunnelPool tpl = (TunnelPool) l;
TunnelPool tpr = (TunnelPool) r;
private static class TunnelPoolComparator implements Comparator<TunnelPool> {
public int compare(TunnelPool tpl, TunnelPool tpr) {
if (tpl.getSettings().isExploratory() && !tpr.getSettings().isExploratory())
return -1;
if (tpr.getSettings().isExploratory() && !tpl.getSettings().isExploratory())

View File

@@ -1,7 +1,7 @@
package net.i2p.router.tunnel.pool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
@@ -27,43 +27,42 @@ import net.i2p.router.peermanager.TunnelHistory;
import net.i2p.router.tunnel.BuildMessageProcessor;
import net.i2p.router.tunnel.BuildReplyHandler;
import net.i2p.router.tunnel.HopConfig;
import net.i2p.router.tunnel.TunnelDispatcher;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
/**
* Handle the received tunnel build message requests and replies,
* including sending responsses to requests, updating the
* lists of our tunnels and participating tunnels,
* and updating stats.
*
* Replies are handled immediately on reception; requests are queued.
*
* Note that 10 minute tunnel expiration is hardcoded in here.
*
*/
class BuildHandler {
private RouterContext _context;
private Log _log;
private BuildExecutor _exec;
private Job _buildMessageHandlerJob;
private Job _buildReplyMessageHandlerJob;
/** list of BuildMessageState, oldest first */
private final List<BuildMessageState> _inboundBuildMessages;
/** list of BuildReplyMessageState, oldest first - unused unless HANDLE_REPLIES_INLINE == false */
private final List<BuildReplyMessageState> _inboundBuildReplyMessages;
/** list of BuildEndMessageState, oldest first - unused unless HANDLE_REPLIES_INLINE == false */
private final List<BuildEndMessageState> _inboundBuildEndMessages;
private BuildMessageProcessor _processor;
private final RouterContext _context;
private final Log _log;
private final BuildExecutor _exec;
private final Job _buildMessageHandlerJob;
private final Job _buildReplyMessageHandlerJob;
private final LinkedBlockingQueue<BuildMessageState> _inboundBuildMessages;
private final BuildMessageProcessor _processor;
private final ParticipatingThrottler _throttler;
/** TODO these may be too high, review and adjust */
private static final int MIN_QUEUE = 12;
private static final int MAX_QUEUE = 96;
private static final boolean HANDLE_REPLIES_INLINE = true;
public BuildHandler(RouterContext ctx, BuildExecutor exec) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_exec = exec;
_inboundBuildMessages = new ArrayList(16);
if (HANDLE_REPLIES_INLINE) {
_inboundBuildEndMessages = null;
_inboundBuildReplyMessages = null;
} else {
_inboundBuildEndMessages = new ArrayList(16);
_inboundBuildReplyMessages = new ArrayList(16);
}
// Queue size = 12 * share BW / 48K
int sz = Math.min(MAX_QUEUE, Math.max(MIN_QUEUE, TunnelDispatcher.getShareBandwidth(ctx) * MIN_QUEUE / 48));
_inboundBuildMessages = new LinkedBlockingQueue(sz);
_context.statManager().createRateStat("tunnel.reject.10", "How often we reject a tunnel probabalistically", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.reject.20", "How often we reject a tunnel because of transient overload", "Tunnels", new long[] { 60*1000, 10*60*1000 });
@@ -93,6 +92,7 @@ class BuildHandler {
_context.statManager().createRateStat("tunnel.corruptBuildReply", "", "Tunnels", new long[] { 24*60*60*1000l });
_processor = new BuildMessageProcessor(ctx);
_throttler = new ParticipatingThrottler(ctx);
_buildMessageHandlerJob = new TunnelBuildMessageHandlerJob(ctx);
_buildReplyMessageHandlerJob = new TunnelBuildReplyMessageHandlerJob(ctx);
TunnelBuildMessageHandlerJobBuilder tbmhjb = new TunnelBuildMessageHandlerJobBuilder();
@@ -108,122 +108,37 @@ class BuildHandler {
/**
* Blocking call to handle a few of the pending inbound requests, returning how many
* requests remain after this pass
* requests remain after this pass. This is called by BuildExecutor.
*/
int handleInboundRequests() {
int dropExpired = 0;
int remaining = 0;
List handled = null;
long beforeFindHandled = System.currentTimeMillis();
synchronized (_inboundBuildMessages) {
int toHandle = _inboundBuildMessages.size();
if (toHandle > 0) {
if (toHandle > MAX_HANDLE_AT_ONCE)
toHandle = MAX_HANDLE_AT_ONCE;
handled = new ArrayList(toHandle);
//if (false) {
// for (int i = 0; i < toHandle; i++) // LIFO for lower response time (should we RED it for DoS?)
// handled.add(_inboundBuildMessages.remove(_inboundBuildMessages.size()-1));
//} else {
// drop any expired messages
long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
do {
BuildMessageState state = (BuildMessageState)_inboundBuildMessages.get(0);
if (state.recvTime <= dropBefore) {
_inboundBuildMessages.remove(0);
dropExpired++;
if (_log.shouldLog(Log.WARN))
_log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
+ ", since we received it a long time ago: " + (System.currentTimeMillis() - state.recvTime));
_context.statManager().addRateData("tunnel.dropLoadDelay", System.currentTimeMillis() - state.recvTime, 0);
} else {
break;
}
} while (!_inboundBuildMessages.isEmpty());
if (dropExpired > 0)
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
for (int i = 0; i < MAX_HANDLE_AT_ONCE; ) {
BuildMessageState state = _inboundBuildMessages.poll();
if (state == null)
return 0;
long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
if (state.recvTime <= dropBefore) {
if (_log.shouldLog(Log.WARN))
_log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
+ ", since we received it a long time ago: " + (System.currentTimeMillis() - state.recvTime));
_context.statManager().addRateData("tunnel.dropLoadDelay", System.currentTimeMillis() - state.recvTime, 0);
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
continue;
}
// now pull off the oldest requests first (we're doing a tail-drop
// when adding)
for (int i = 0; i < toHandle && !_inboundBuildMessages.isEmpty(); i++)
handled.add(_inboundBuildMessages.remove(0));
//}
}
remaining = _inboundBuildMessages.size();
}
if (handled != null) {
i++;
long beforeHandle = System.currentTimeMillis();
long actualTime = handleRequest(state);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling " + handled.size() + " requests (took " + (System.currentTimeMillis()-beforeFindHandled) + "ms to find them)");
for (int i = 0; i < handled.size(); i++) {
BuildMessageState state = (BuildMessageState)handled.get(i);
long beforeHandle = System.currentTimeMillis();
long actualTime = handleRequest(state);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime + " (" + i + " out of " + handled.size() + " with " + remaining + " remaining)");
}
handled.clear();
_log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime +
" (" + i + " with " + _inboundBuildMessages.size() + " remaining)");
}
if (!HANDLE_REPLIES_INLINE) {
synchronized (_inboundBuildEndMessages) {
int toHandle = _inboundBuildEndMessages.size();
if (toHandle > 0) {
if (handled == null)
handled = new ArrayList(_inboundBuildEndMessages);
else
handled.addAll(_inboundBuildEndMessages);
_inboundBuildEndMessages.clear();
}
}
}
if (handled != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling " + handled.size() + " requests that are actually replies");
// these are inbound build messages that actually contain the full replies, since
// they are for inbound tunnels we have created
for (int i = 0; i < handled.size(); i++) {
BuildEndMessageState state = (BuildEndMessageState)handled.get(i);
handleRequestAsInboundEndpoint(state);
}
}
// anything else?
/*
synchronized (_inboundBuildMessages) {
int remaining = _inboundBuildMessages.size();
return remaining;
}
*/
int remaining = _inboundBuildMessages.size();
if (remaining > 0)
_context.statManager().addRateData("tunnel.handleRemaining", remaining, 0);
return remaining;
}
/** Warning - noop if HANDLE_REPLIES_INLINE == true */
void handleInboundReplies() {
if (HANDLE_REPLIES_INLINE)
return;
List handled = null;
synchronized (_inboundBuildReplyMessages) {
int toHandle = _inboundBuildReplyMessages.size();
if (toHandle > 0) {
// always handle all of them - they're replies that we were waiting for!
handled = new ArrayList(_inboundBuildReplyMessages);
_inboundBuildReplyMessages.clear();
}
}
if (handled != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handling " + handled.size() + " replies");
for (int i = 0; i < handled.size(); i++) {
BuildReplyMessageState state = (BuildReplyMessageState)handled.get(i);
handleReply(state);
}
}
}
private void handleReply(BuildReplyMessageState state) {
// search through the tunnels for a reply
long replyMessageId = state.msg.getUniqueId();
@@ -341,7 +256,7 @@ class BuildHandler {
}
}
/** @return handle time or -1 */
/** @return handle time or -1 if it wasn't completely handled */
private long handleRequest(BuildMessageState state) {
long timeSinceReceived = System.currentTimeMillis()-state.recvTime;
if (_log.shouldLog(Log.DEBUG))
@@ -363,7 +278,7 @@ class BuildHandler {
BuildRequestRecord req = _processor.decrypt(_context, state.msg, _context.routerHash(), _context.keyManager().getPrivateKey());
long decryptTime = System.currentTimeMillis() - beforeDecrypt;
_context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime, decryptTime);
if (decryptTime > 500)
if (decryptTime > 500 && _log.shouldLog(Log.WARN))
_log.warn("Took too long to decrypt the request: " + decryptTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
if (req == null) {
// no records matched, or the decryption failed. bah
@@ -377,7 +292,7 @@ class BuildHandler {
long readPeerTime = System.currentTimeMillis()-beforeLookup;
RouterInfo nextPeerInfo = _context.netDb().lookupRouterInfoLocally(nextPeer);
long lookupTime = System.currentTimeMillis()-beforeLookup;
if (lookupTime > 500)
if (lookupTime > 500 && _log.shouldLog(Log.WARN))
_log.warn("Took too long to lookup the request: " + lookupTime + "/" + readPeerTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
if (nextPeerInfo == null) {
if (_log.shouldLog(Log.DEBUG))
@@ -414,9 +329,9 @@ class BuildHandler {
}
private class HandleReq extends JobImpl {
private BuildMessageState _state;
private BuildRequestRecord _req;
private Hash _nextPeer;
private final BuildMessageState _state;
private final BuildRequestRecord _req;
private final Hash _nextPeer;
HandleReq(RouterContext ctx, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
super(ctx);
_state = state;
@@ -437,9 +352,9 @@ class BuildHandler {
}
private static class TimeoutReq extends JobImpl {
private BuildMessageState _state;
private BuildRequestRecord _req;
private Hash _nextPeer;
private final BuildMessageState _state;
private final BuildRequestRecord _req;
private final Hash _nextPeer;
TimeoutReq(RouterContext ctx, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
super(ctx);
_state = state;
@@ -498,6 +413,26 @@ class BuildHandler {
long nextId = req.readNextTunnelId();
boolean isInGW = req.readIsInboundGateway();
boolean isOutEnd = req.readIsOutboundEndpoint();
// Loop checks
if ((!isOutEnd) && _context.routerHash().equals(nextPeer)) {
// We are 2 hops in a row? Drop it without a reply.
// No way to recognize if we are every other hop, but see below
_log.error("Dropping build request where we are in two consecutive hops");
return;
}
if ((!isOutEnd) && (!isInGW)) {
Hash from = state.fromHash;
if (from == null)
from = state.from.calculateHash();
// Previous and next hop the same? Don't help somebody be evil. Drop it without a reply.
// A-B-C-A is not preventable
if (nextPeer.equals(from)) {
_log.error("Dropping build request with the same previous and next hop");
return;
}
}
// time is in hours, and only for log below - what's the point?
// tunnel-alt-creation.html specifies that this is enforced +/- 1 hour but it is not.
long time = req.readRequestTime();
@@ -531,7 +466,7 @@ class BuildHandler {
_context.statManager().addRateData("tunnel.acceptLoad", recvDelay, recvDelay);
}
}
/*
* Being a IBGW or OBEP generally leads to more connections, so if we are
* approaching our connection limit (i.e. !haveCapacity()),
@@ -550,6 +485,28 @@ class BuildHandler {
_context.throttle().setTunnelStatus(_x("Rejecting tunnels: Connection limit"));
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
// Check participating throttle counters for previous and next hops
// This is at the end as it compares to a percentage of created tunnels.
// We may need another counter above for requests.
if (response == 0 && !isInGW) {
Hash from = state.fromHash;
if (from == null)
from = state.from.calculateHash();
if (from != null && _throttler.shouldThrottle(from)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting tunnel (hop throttle), previous hop: " + from);
// no setTunnelStatus() indication
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
}
if (response == 0 && (!isOutEnd) &&
_throttler.shouldThrottle(nextPeer)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Rejecting tunnel (hop throttle), next hop: " + nextPeer);
// no setTunnelStatus() indication
response = TunnelHistory.TUNNEL_REJECT_BANDWIDTH;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Responding to " + state.msg.getUniqueId() + "/" + ourId
@@ -581,7 +538,7 @@ class BuildHandler {
cfg.setSendTo(null);
cfg.setSendTunnelId(null);
} else {
cfg.setSendTo(req.readNextIdentity());
cfg.setSendTo(nextPeer);
cfg.setSendTunnelId(DataHelper.toLong(4, nextId));
}
@@ -597,7 +554,7 @@ class BuildHandler {
_context.tunnelDispatcher().joinParticipant(cfg);
} else {
_context.statManager().addRateData("tunnel.reject." + response, 1, 1);
_context.messageHistory().tunnelRejected(state.fromHash, new TunnelId(ourId), req.readNextIdentity(),
_context.messageHistory().tunnelRejected(state.fromHash, new TunnelId(ourId), nextPeer,
"rejecting for " + response + ": " +
state.msg.getUniqueId() + "/" + ourId + "/" + req.readNextTunnelId() + " delay " +
recvDelay + " as " +
@@ -687,9 +644,7 @@ class BuildHandler {
}
public int getInboundBuildQueueSize() {
synchronized (_inboundBuildMessages) {
return _inboundBuildMessages.size();
}
}
/**
@@ -712,14 +667,7 @@ class BuildHandler {
_log.error("received it, but its not inbound? " + cfg);
}
BuildEndMessageState state = new BuildEndMessageState(cfg, receivedMessage);
if (HANDLE_REPLIES_INLINE) {
handleRequestAsInboundEndpoint(state);
} else {
synchronized (_inboundBuildEndMessages) {
_inboundBuildEndMessages.add(state);
}
_exec.repoll();
}
handleRequestAsInboundEndpoint(state);
} else {
if (_exec.wasRecentlyBuilding(reqId)) {
// we are the IBEP but we already gave up?
@@ -727,52 +675,53 @@ class BuildHandler {
_log.warn("Dropping the reply " + reqId + ", as we used to be building that");
_context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0);
} else {
synchronized (_inboundBuildMessages) {
boolean removed = false;
int dropped = 0;
for (int i = 0; i < _inboundBuildMessages.size(); i++) {
BuildMessageState cur = (BuildMessageState)_inboundBuildMessages.get(i);
long age = System.currentTimeMillis() - cur.recvTime;
if (age >= BuildRequestor.REQUEST_TIMEOUT/4) {
_inboundBuildMessages.remove(i);
i--;
dropped++;
_context.statManager().addRateData("tunnel.dropLoad", age, _inboundBuildMessages.size());
}
}
if (dropped > 0) {
int sz = _inboundBuildMessages.size();
BuildMessageState cur = _inboundBuildMessages.peek();
boolean accept = true;
if (cur != null) {
long age = System.currentTimeMillis() - cur.recvTime;
if (age >= BuildRequestor.REQUEST_TIMEOUT/4) {
_context.statManager().addRateData("tunnel.dropLoad", age, sz);
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
// if the queue is backlogged, stop adding new messages
_context.statManager().addRateData("tunnel.dropLoadBacklog", _inboundBuildMessages.size(), _inboundBuildMessages.size());
_context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
accept = false;
}
}
if (accept) {
int queueTime = estimateQueueTime(sz);
float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
pDrop = (float)Math.pow(pDrop, 16); // steeeep
float f = _context.random().nextFloat();
//if ( (pDrop > f) && (allowProactiveDrop()) ) {
if (pDrop > f) {
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
_context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz);
} else {
int queueTime = estimateQueueTime(_inboundBuildMessages.size());
float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
pDrop = (float)Math.pow(pDrop, 16); // steeeep
float f = _context.random().nextFloat();
if ( (pDrop > f) && (allowProactiveDrop()) ) {
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
_context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, _inboundBuildMessages.size());
accept = _inboundBuildMessages.offer(new BuildMessageState(receivedMessage, from, fromHash));
if (accept) {
// wake up the Executor to call handleInboundRequests()
_exec.repoll();
} else {
_inboundBuildMessages.add(new BuildMessageState(receivedMessage, from, fromHash));
_context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
_context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
}
}
}
_exec.repoll();
}
}
return _buildMessageHandlerJob;
}
}
/****
private boolean allowProactiveDrop() {
String allow = _context.getProperty("router.allowProactiveDrop", "true");
boolean rv = false;
if ( (allow == null) || (Boolean.valueOf(allow).booleanValue()) )
rv = true;
boolean rv = _context.getBooleanPropertyDefaultTrue("router.allowProactiveDrop");
if (!rv)
_context.statManager().addRateData("tunnel.dropLoadProactiveAbort", 1, 0);
return rv;
}
****/
private int estimateQueueTime(int numPendingMessages) {
int decryptTime = 200;
@@ -801,24 +750,17 @@ class BuildHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive tunnel build reply message " + receivedMessage.getUniqueId() + " from "
+ (fromHash != null ? fromHash.toBase64() : from != null ? from.calculateHash().toBase64() : "a tunnel"));
if (HANDLE_REPLIES_INLINE) {
handleReply(new BuildReplyMessageState(receivedMessage));
} else {
synchronized (_inboundBuildReplyMessages) {
_inboundBuildReplyMessages.add(new BuildReplyMessageState(receivedMessage));
}
_exec.repoll();
}
handleReply(new BuildReplyMessageState(receivedMessage));
return _buildReplyMessageHandlerJob;
}
}
/** normal inbound requests from other people */
private static class BuildMessageState {
TunnelBuildMessage msg;
RouterIdentity from;
Hash fromHash;
long recvTime;
final TunnelBuildMessage msg;
final RouterIdentity from;
final Hash fromHash;
final long recvTime;
public BuildMessageState(I2NPMessage m, RouterIdentity f, Hash h) {
msg = (TunnelBuildMessage)m;
from = f;
@@ -828,8 +770,8 @@ class BuildHandler {
}
/** replies for outbound tunnels that we have created */
private static class BuildReplyMessageState {
TunnelBuildReplyMessage msg;
long recvTime;
final TunnelBuildReplyMessage msg;
final long recvTime;
public BuildReplyMessageState(I2NPMessage m) {
msg = (TunnelBuildReplyMessage)m;
recvTime = System.currentTimeMillis();
@@ -837,9 +779,9 @@ class BuildHandler {
}
/** replies for inbound tunnels we have created */
private static class BuildEndMessageState {
TunnelBuildMessage msg;
PooledTunnelCreatorConfig cfg;
long recvTime;
final TunnelBuildMessage msg;
final PooledTunnelCreatorConfig cfg;
final long recvTime;
public BuildEndMessageState(PooledTunnelCreatorConfig c, I2NPMessage m) {
cfg = c;
msg = (TunnelBuildMessage)m;
@@ -847,13 +789,14 @@ class BuildHandler {
}
}
// noop
/** noop */
private static class TunnelBuildMessageHandlerJob extends JobImpl {
private TunnelBuildMessageHandlerJob(RouterContext ctx) { super(ctx); }
public void runJob() {}
public String getName() { return "Receive tunnel build message"; }
}
// noop
/** noop */
private static class TunnelBuildReplyMessageHandlerJob extends JobImpl {
private TunnelBuildReplyMessageHandlerJob(RouterContext ctx) { super(ctx); }
public void runJob() {}
@@ -866,7 +809,7 @@ class BuildHandler {
* but it affects capacity calculations
*/
private static class TunnelBuildNextHopFailJob extends JobImpl {
HopConfig _cfg;
final HopConfig _cfg;
private TunnelBuildNextHopFailJob(RouterContext ctx, HopConfig cfg) {
super(ctx);
_cfg = cfg;

View File

@@ -22,9 +22,9 @@ import net.i2p.util.Log;
import net.i2p.util.VersionComparator;
/**
*
* Methods for creating Tunnel Build Messages, i.e. requests
*/
class BuildRequestor {
abstract class BuildRequestor {
private static final List<Integer> ORDER = new ArrayList(TunnelBuildMessage.MAX_RECORD_COUNT);
static {
for (int i = 0; i < TunnelBuildMessage.MAX_RECORD_COUNT; i++)
@@ -37,7 +37,7 @@ class BuildRequestor {
* expl. vs. client, uptime, and network conditions.
* Put the expiration in the PTCC.
*
* Also, perhaps, save the PTCC even after expiration for an extended time,
* Also, we now save the PTCC even after expiration for an extended time,
* so can we use a successfully built tunnel anyway.
*
*/
@@ -49,12 +49,16 @@ class BuildRequestor {
/** some randomization is added on to this */
private static final int BUILD_MSG_TIMEOUT = 60*1000;
/**
* "paired tunnels" means using a client's own inbound tunnel to receive the
* reply for an outbound build request, and using a client's own outbound tunnel
* to send an inbound build request.
* This is more secure than using the router's exploratory tunnels, as it
* makes correlation of multiple clients more difficult.
*/
private static boolean usePairedTunnels(RouterContext ctx) {
String val = ctx.getProperty("router.usePairedTunnels");
if ( (val == null) || (Boolean.valueOf(val).booleanValue()) )
return true;
else
return false;
return true;
//return ctx.getBooleanPropertyDefaultTrue("router.usePairedTunnels");
}
/** new style requests need to fill in the tunnel IDs before hand */
@@ -321,9 +325,9 @@ class BuildRequestor {
* Can't do this for inbound tunnels since the msg goes out an expl. tunnel.
*/
private static class TunnelBuildFirstHopFailJob extends JobImpl {
TunnelPool _pool;
PooledTunnelCreatorConfig _cfg;
BuildExecutor _exec;
final TunnelPool _pool;
final PooledTunnelCreatorConfig _cfg;
final BuildExecutor _exec;
private TunnelBuildFirstHopFailJob(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg, BuildExecutor exec) {
super(ctx);
_cfg = cfg;

View File

@@ -5,16 +5,21 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.tunnel.TunnelCreatorConfig;
/**
* This runs twice for each tunnel.
* The first time, remove it from the LeaseSet.
* The second time, stop accepting data for it.
*/
class ExpireJob extends JobImpl {
private TunnelPool _pool;
private TunnelCreatorConfig _cfg;
private final TunnelPool _pool;
private final TunnelCreatorConfig _cfg;
private boolean _leaseUpdated;
private long _dropAfter;
private final long _dropAfter;
public ExpireJob(RouterContext ctx, TunnelCreatorConfig cfg, TunnelPool pool) {
super(ctx);
_pool = pool;
_cfg = cfg;
_leaseUpdated = false;
// we act as if this tunnel expires a random skew before it actually does
// so we rebuild out of sync. otoh, we will honor tunnel messages on it
// up through the full lifetime of the tunnel, plus a clock skew, since
@@ -28,9 +33,11 @@ class ExpireJob extends JobImpl {
cfg.setExpiration(expire);
getTiming().setStartAfter(expire);
}
public String getName() {
return "Expire tunnel";
}
public void runJob() {
if (!_leaseUpdated) {
_pool.removeTunnel(_cfg);

View File

@@ -0,0 +1,60 @@
package net.i2p.router.tunnel.pool;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.util.ObjectCounter;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Count how often we have accepted a tunnel with the peer
* as the previous or next hop.
* We limit each peer to a percentage of all participating tunnels,
* subject to minimum and maximum values for the limit.
*
* This offers basic protection against simple attacks
* but is not a complete solution, as by design, we don't know
* the originator of a tunnel request.
*
* This also effectively limits the number of tunnels between
* any given pair of routers, which probably isn't a bad thing.
*
* Note that the actual limits will be higher than specified
* by up to 1 / LIFETIME_PORTION because the counter window resets.
*
* Note that the counts are of previous + next hops, so the total will
* be higher than the participating tunnel count, and will also grow
* as the network uses more 3-hop tunnels.
*
* @since 0.8.4
*/
class ParticipatingThrottler {
private final RouterContext context;
private final ObjectCounter<Hash> counter;
/** portion of the tunnel lifetime */
private static final int LIFETIME_PORTION = 3;
private static final int MIN_LIMIT = 18 / LIFETIME_PORTION;
private static final int MAX_LIMIT = 66 / LIFETIME_PORTION;
private static final int PERCENT_LIMIT = 12 / LIFETIME_PORTION;
private static final long CLEAN_TIME = 11*60*1000 / LIFETIME_PORTION;
ParticipatingThrottler(RouterContext ctx) {
this.context = ctx;
this.counter = new ObjectCounter();
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), CLEAN_TIME);
}
/** increments before checking */
boolean shouldThrottle(Hash h) {
int numTunnels = this.context.tunnelManager().getParticipatingCount();
int limit = Math.max(MIN_LIMIT, Math.min(MAX_LIMIT, numTunnels * PERCENT_LIMIT / 100));
return this.counter.increment(h) > limit;
}
private class Cleaner implements SimpleTimer.TimedEvent {
public void timeReached() {
ParticipatingThrottler.this.counter.clear();
}
}
}

View File

@@ -22,9 +22,9 @@ import net.i2p.router.message.PayloadGarlicConfig;
import net.i2p.util.Log;
class TestJob extends JobImpl {
private Log _log;
private TunnelPool _pool;
private PooledTunnelCreatorConfig _cfg;
private final Log _log;
private final TunnelPool _pool;
private final PooledTunnelCreatorConfig _cfg;
private boolean _found;
private TunnelInfo _outTunnel;
private TunnelInfo _replyTunnel;
@@ -39,9 +39,10 @@ class TestJob extends JobImpl {
public TestJob(RouterContext ctx, PooledTunnelCreatorConfig cfg, TunnelPool pool) {
super(ctx);
_log = ctx.logManager().getLog(TestJob.class);
_pool = pool;
_cfg = cfg;
if (_pool == null)
if (pool != null)
_pool = pool;
else
_pool = cfg.getTunnelPool();
if ( (_pool == null) && (_log.shouldLog(Log.ERROR)) )
_log.error("Invalid tunnel test configuration: no pool for " + cfg, new Exception("origin"));
@@ -61,7 +62,9 @@ class TestJob extends JobImpl {
ctx.statManager().createRateStat("tunnel.testAborted", "Tunnel test could not occur, since there weren't any tunnels to test with", "Tunnels",
RATES);
}
public String getName() { return "Test tunnel"; }
public void runJob() {
if (_pool == null)
return;
@@ -246,9 +249,10 @@ class TestJob extends JobImpl {
}
private class ReplySelector implements MessageSelector {
private RouterContext _context;
private long _id;
private long _expiration;
private final RouterContext _context;
private final long _id;
private final long _expiration;
public ReplySelector(RouterContext ctx, long id, long expiration) {
_context = ctx;
_id = id;
@@ -257,7 +261,9 @@ class TestJob extends JobImpl {
}
public boolean continueMatching() { return !_found && _context.clock().now() < _expiration; }
public long getExpiration() { return _expiration; }
public boolean isMatch(I2NPMessage message) {
if (message instanceof DeliveryStatusMessage) {
return ((DeliveryStatusMessage)message).getMessageId() == _id;
@@ -280,9 +286,13 @@ class TestJob extends JobImpl {
private class OnTestReply extends JobImpl implements ReplyJob {
private long _successTime;
private OutNetMessage _sentMessage;
public OnTestReply(RouterContext ctx) { super(ctx); }
public String getName() { return "Tunnel test success"; }
public void setSentMessage(OutNetMessage m) { _sentMessage = m; }
public void runJob() {
if (_sentMessage != null)
getContext().messageRegistry().unregisterPending(_sentMessage);
@@ -292,6 +302,7 @@ class TestJob extends JobImpl {
testFailed(_successTime);
_found = true;
}
// who cares about the details...
public void setMessage(I2NPMessage message) {
_successTime = getContext().clock().now() - ((DeliveryStatusMessage)message).getArrival();
@@ -310,12 +321,15 @@ class TestJob extends JobImpl {
* Test failed (boo, hiss)
*/
private class OnTestTimeout extends JobImpl {
private long _started;
private final long _started;
public OnTestTimeout(RouterContext ctx) {
super(ctx);
_started = ctx.clock().now();
}
public String getName() { return "Tunnel test timeout"; }
public void runJob() {
if (_log.shouldLog(Log.WARN))
_log.warn("Timeout: found? " + _found);

View File

@@ -22,21 +22,21 @@ import net.i2p.stat.RateStat;
import net.i2p.util.Log;
/**
*
* A group of tunnels for the router or a particular client, in a single direction.
*/
public class TunnelPool {
private final List _inProgress = new ArrayList();
private RouterContext _context;
private Log _log;
private final RouterContext _context;
private final Log _log;
private TunnelPoolSettings _settings;
private final ArrayList<TunnelInfo> _tunnels;
private TunnelPeerSelector _peerSelector;
private TunnelPoolManager _manager;
private final TunnelPeerSelector _peerSelector;
private final TunnelPoolManager _manager;
private boolean _alive;
private long _lifetimeProcessed;
private TunnelInfo _lastSelected;
private long _lastSelectionPeriod;
private int _expireSkew;
private final int _expireSkew;
private long _started;
private long _lastRateUpdate;
private long _lastLifetimeProcessed;
@@ -50,14 +50,9 @@ public class TunnelPool {
_settings = settings;
_tunnels = new ArrayList(settings.getLength() + settings.getBackupQuantity());
_peerSelector = sel;
_alive = false;
_lastSelectionPeriod = 0;
_lastSelected = null;
_lifetimeProcessed = 0;
_expireSkew = _context.random().nextInt(90*1000);
_started = System.currentTimeMillis();
_lastRateUpdate = _started;
_lastLifetimeProcessed = 0;
_rateName = "tunnel.Bps." +
(_settings.isExploratory() ? "exploratory" : _settings.getDestinationNickname()) +
(_settings.isInbound() ? ".in" : ".out");
@@ -412,11 +407,12 @@ public class TunnelPool {
}
}
/** noop for outbound */
void refreshLeaseSet() {
if (_log.shouldLog(Log.DEBUG))
_log.debug(toString() + ": refreshing leaseSet on tunnel expiration (but prior to grace timeout)");
LeaseSet ls = null;
if (_settings.isInbound() && (_settings.getDestination() != null) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(toString() + ": refreshing leaseSet on tunnel expiration (but prior to grace timeout)");
LeaseSet ls = null;
synchronized (_tunnels) {
ls = locked_buildNewLeaseSet();
}
@@ -427,7 +423,7 @@ public class TunnelPool {
}
/**
* Return true if a fallback tunnel is built
* @return true if a fallback tunnel is built
*
*/
boolean buildFallback() {
@@ -851,6 +847,7 @@ public class TunnelPool {
}
PooledTunnelCreatorConfig configureNewTunnel() { return configureNewTunnel(false); }
private PooledTunnelCreatorConfig configureNewTunnel(boolean forceZeroHop) {
TunnelPoolSettings settings = getSettings();
List peers = null;

View File

@@ -37,8 +37,8 @@ import net.i2p.util.SimpleTimer;
*
*/
public class TunnelPoolManager implements TunnelManagerFacade {
private RouterContext _context;
private Log _log;
private final RouterContext _context;
private final Log _log;
/** Hash (destination) to TunnelPool */
private final Map<Hash, TunnelPool> _clientInboundPools;
/** Hash (destination) to TunnelPool */
@@ -61,7 +61,6 @@ public class TunnelPoolManager implements TunnelManagerFacade {
_clientInboundPools = new ConcurrentHashMap(4);
_clientOutboundPools = new ConcurrentHashMap(4);
_isShutdown = false;
_executor = new BuildExecutor(ctx, this);
I2PThread execThread = new I2PThread(_executor, "BuildExecutor");
execThread.setDaemon(true);