From 86759d2f9c63d203488c7e8b34e64be162a16ce9 Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Fri, 16 Apr 2004 23:52:11 +0000 Subject: [PATCH] DatabaseLookupMessageHandler: added stat - netDb.lookupsReceived fixed formatting HandleDatabaseLookupMessage: added stat - netDb.lookupsHandled added stat - netDb.lookupsMatched fixed formatting HandleDatabaseStoreMessage: added stat - netDb.storeHandled fixed formatting StoreJob: added stat - netDb.storeSent fixed formatting removed old unused code (we do dbStore through tunnels, not garlics) logging SearchJob: fixed formatting logging HandleTunnelCreateMessageJob: fixed formatting logging PoolingTunnelManagerFacade: added stat - tunnel.participatingTunnels fixed formatting logging TunnelPool: added getParticipatingTunnelCount() fixed formatting logging StatisticsManager: revamped whats published fixed formatting logging fixed formatting --- .../src/net/i2p/router/StatisticsManager.java | 193 ++-- .../DatabaseLookupMessageHandler.java | 10 +- .../HandleDatabaseLookupMessageJob.java | 219 ++--- .../HandleDatabaseStoreMessageJob.java | 56 +- .../router/networkdb/kademlia/SearchJob.java | 555 +++++------ .../router/networkdb/kademlia/StoreJob.java | 858 +++++++----------- .../HandleTunnelCreateMessageJob.java | 187 ++-- .../PoolingTunnelManagerFacade.java | 207 +++-- .../i2p/router/tunnelmanager/TunnelPool.java | 840 ++++++++--------- 9 files changed, 1544 insertions(+), 1581 deletions(-) diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 6753a4c577..018a0b7fcf 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -37,100 +37,134 @@ public class StatisticsManager implements Service { public final static int DEFAULT_MAX_PUBLISHED_PEERS = 20; public StatisticsManager() { - _includePeerRankings = false; + _includePeerRankings = false; } public void shutdown() {} public void startup() { - String val = Router.getInstance().getConfigSetting(PROP_PUBLISH_RANKINGS); - try { - if (val == null) { - _log.info("Peer publishing setting " + PROP_PUBLISH_RANKINGS + " not set - using default " + DEFAULT_PROP_PUBLISH_RANKINGS); - val = DEFAULT_PROP_PUBLISH_RANKINGS; - } else { - _log.info("Peer publishing setting " + PROP_PUBLISH_RANKINGS + " set to " + val); - } - boolean v = Boolean.TRUE.toString().equalsIgnoreCase(val); - _includePeerRankings = v; - _log.debug("Setting includePeerRankings = " + v); - } catch (Throwable t) { - _log.error("Error determining whether to publish rankings [" + PROP_PUBLISH_RANKINGS + "=" + val + "], so we're defaulting to FALSE"); - _includePeerRankings = false; - } - val = Router.getInstance().getConfigSetting(PROP_MAX_PUBLISHED_PEERS); - if (val == null) { - _publishedStats = DEFAULT_MAX_PUBLISHED_PEERS; - } else { - try { - int num = Integer.parseInt(val); - _publishedStats = num; - } catch (NumberFormatException nfe) { - _log.error("Invalid max number of peers to publish [" + val + "], defaulting to " + DEFAULT_MAX_PUBLISHED_PEERS, nfe); - _publishedStats = DEFAULT_MAX_PUBLISHED_PEERS; - } - } - } + String val = Router.getInstance().getConfigSetting(PROP_PUBLISH_RANKINGS); + try { + if (val == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Peer publishing setting " + PROP_PUBLISH_RANKINGS + + " not set - using default " + DEFAULT_PROP_PUBLISH_RANKINGS); + val = DEFAULT_PROP_PUBLISH_RANKINGS; + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Peer publishing setting " + PROP_PUBLISH_RANKINGS + + " set to " + val); + } + boolean v = Boolean.TRUE.toString().equalsIgnoreCase(val); + _includePeerRankings = v; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Setting includePeerRankings = " + v); + } catch (Throwable t) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error determining whether to publish rankings [" + + PROP_PUBLISH_RANKINGS + "=" + val + + "], so we're defaulting to FALSE"); + _includePeerRankings = false; + } + val = Router.getInstance().getConfigSetting(PROP_MAX_PUBLISHED_PEERS); + if (val == null) { + _publishedStats = DEFAULT_MAX_PUBLISHED_PEERS; + } else { + try { + int num = Integer.parseInt(val); + _publishedStats = num; + } catch (NumberFormatException nfe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Invalid max number of peers to publish [" + val + + "], defaulting to " + DEFAULT_MAX_PUBLISHED_PEERS, nfe); + _publishedStats = DEFAULT_MAX_PUBLISHED_PEERS; + } + } + } /** Retrieve a snapshot of the statistics that should be published */ public Properties publishStatistics() { - Properties stats = new Properties(); - stats.setProperty("router.version", RouterVersion.VERSION); - stats.setProperty("router.id", RouterVersion.ID); - stats.setProperty("coreVersion", CoreVersion.VERSION); - stats.setProperty("core.id", CoreVersion.ID); + Properties stats = new Properties(); + stats.setProperty("router.version", RouterVersion.VERSION); + stats.setProperty("router.id", RouterVersion.ID); + stats.setProperty("coreVersion", CoreVersion.VERSION); + stats.setProperty("core.id", CoreVersion.ID); - if (_includePeerRankings) { - stats.putAll(ProfileManager.getInstance().summarizePeers(_publishedStats)); - - includeRate("transport.sendProcessingTime", stats); - includeRate("tcp.queueSize", stats); - includeRate("jobQueue.jobLag", stats); - includeRate("jobQueue.jobRun", stats); - includeRate("crypto.elGamal.encrypt", stats); - includeRate("jobQueue.readyJobs", stats); - includeRate("jobQueue.droppedJobs", stats); - stats.setProperty("stat_uptime", DataHelper.formatDuration(Router.getInstance().getUptime())); - stats.setProperty("stat__rateKey", "avg;maxAvg;pctLifetime;[sat;satLim;maxSat;maxSatLim;][num;lifetimeFreq;maxFreq]"); - _log.debug("Publishing peer rankings"); - } else { - _log.debug("Not publishing peer rankings"); - } + if (_includePeerRankings) { + stats.putAll(ProfileManager.getInstance().summarizePeers(_publishedStats)); + + includeRate("transport.sendProcessingTime", stats, new long[] { 60*1000, 60*60*1000 }); + //includeRate("tcp.queueSize", stats); + includeRate("jobQueue.jobLag", stats, new long[] { 60*1000, 60*60*1000 }); + includeRate("jobQueue.jobRun", stats, new long[] { 60*1000, 60*60*1000 }); + includeRate("crypto.elGamal.encrypt", stats, new long[] { 60*1000, 60*60*1000 }); + includeRate("jobQueue.readyJobs", stats, new long[] { 60*1000, 60*60*1000 }); + includeRate("jobQueue.droppedJobs", stats, new long[] { 60*60*1000, 24*60*60*1000 }); + includeRate("inNetPool.dropped", stats, new long[] { 60*60*1000, 24*60*60*1000 }); + includeRate("tunnel.participatingTunnels", stats, new long[] { 5*60*1000, 60*60*1000 }); + includeRate("netDb.lookupsReceived", stats, new long[] { 5*60*1000, 60*60*1000 }); + includeRate("netDb.lookupsHandled", stats, new long[] { 5*60*1000, 60*60*1000 }); + includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 }); + includeRate("netDb.storeSent", stats, new long[] { 5*60*1000, 60*60*1000 }); + includeRate("netDb.successPeers", stats, new long[] { 60*60*1000 }); + includeRate("transport.receiveMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 }); + includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 }); + stats.setProperty("stat_uptime", DataHelper.formatDuration(Router.getInstance().getUptime())); + stats.setProperty("stat__rateKey", "avg;maxAvg;pctLifetime;[sat;satLim;maxSat;maxSatLim;][num;lifetimeFreq;maxFreq]"); + _log.debug("Publishing peer rankings"); + } else { + _log.debug("Not publishing peer rankings"); + } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Building status: " + stats); - return stats; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Building status: " + stats); + return stats; } private void includeRate(String rateName, Properties stats) { - RateStat rate = StatManager.getInstance().getRate(rateName); - if (rate == null) return; - for (int i = 0; i < rate.getPeriods().length; i++) { - Rate curRate = rate.getRate(rate.getPeriods()[i]); - if (curRate == null) continue; - stats.setProperty("stat_" + rateName + '.' + getPeriod(curRate), renderRate(curRate)); - } + includeRate(rateName, stats, null); + } + private void includeRate(String rateName, Properties stats, long selectedPeriods[]) { + RateStat rate = StatManager.getInstance().getRate(rateName); + if (rate == null) return; + long periods[] = rate.getPeriods(); + for (int i = 0; i < periods.length; i++) { + if (selectedPeriods != null) { + boolean found = false; + for (int j = 0; j < selectedPeriods.length; j++) { + if (selectedPeriods[j] == periods[i]) { + found = true; + break; + } + } + if (!found) continue; + } + + Rate curRate = rate.getRate(periods[i]); + if (curRate == null) continue; + stats.setProperty("stat_" + rateName + '.' + getPeriod(curRate), renderRate(curRate)); + } } private static String renderRate(Rate rate) { - StringBuffer buf = new StringBuffer(255); - buf.append(num(rate.getAverageValue())).append(';'); - buf.append(num(rate.getExtremeAverageValue())).append(';'); - buf.append(pct(rate.getPercentageOfLifetimeValue())).append(';'); - if (rate.getLifetimeTotalEventTime() > 0) { - buf.append(pct(rate.getLastEventSaturation())).append(';'); - buf.append(num(rate.getLastSaturationLimit())).append(';'); - buf.append(pct(rate.getExtremeEventSaturation())).append(';'); - buf.append(num(rate.getExtremeSaturationLimit())).append(';'); - } - buf.append(num(rate.getLastEventCount())).append(';'); - long numPeriods = rate.getLifetimePeriods(); - if (numPeriods > 0) { - double avgFrequency = rate.getLifetimeEventCount() / (double)numPeriods; - double peakFrequency = rate.getExtremeEventCount(); - buf.append(num(avgFrequency)).append(';'); - buf.append(num(rate.getExtremeEventCount())).append(';'); - } - return buf.toString(); + StringBuffer buf = new StringBuffer(255); + buf.append(num(rate.getAverageValue())).append(';'); + buf.append(num(rate.getExtremeAverageValue())).append(';'); + buf.append(pct(rate.getPercentageOfLifetimeValue())).append(';'); + if (rate.getLifetimeTotalEventTime() > 0) { + buf.append(pct(rate.getLastEventSaturation())).append(';'); + buf.append(num(rate.getLastSaturationLimit())).append(';'); + buf.append(pct(rate.getExtremeEventSaturation())).append(';'); + buf.append(num(rate.getExtremeSaturationLimit())).append(';'); + } + buf.append(num(rate.getLastEventCount())).append(';'); + long numPeriods = rate.getLifetimePeriods(); + if (numPeriods > 0) { + double avgFrequency = rate.getLifetimeEventCount() / (double)numPeriods; + double peakFrequency = rate.getExtremeEventCount(); + buf.append(num(avgFrequency)).append(';'); + buf.append(num(rate.getExtremeEventCount())).append(';'); + } + return buf.toString(); } private static String getPeriod(Rate rate) { return DataHelper.formatDuration(rate.getPeriod()); } @@ -142,6 +176,5 @@ public class StatisticsManager implements Service { private final static DecimalFormat _pct = new DecimalFormat("#0.00%", new DecimalFormatSymbols(Locale.UK)); private final static String pct(double num) { synchronized (_pct) { return _pct.format(num); } } - public String renderStatusHTML() { return ""; } } diff --git a/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java b/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java index 3ea5ecb3ec..50ac040ecb 100644 --- a/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java +++ b/router/java/src/net/i2p/router/networkdb/DatabaseLookupMessageHandler.java @@ -15,14 +15,20 @@ import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.SourceRouteBlock; import net.i2p.router.HandlerJobBuilder; import net.i2p.router.Job; +import net.i2p.stat.StatManager; /** * Build a HandleDatabaseLookupMessageJob whenever a DatabaseLookupMessage arrives * */ public class DatabaseLookupMessageHandler implements HandlerJobBuilder { + static { + StatManager.getInstance().createRateStat("netDb.lookupsReceived", "How many netDb lookups have we received?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + } + public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) { - // ignore the reply block for the moment - return new HandleDatabaseLookupMessageJob((DatabaseLookupMessage)receivedMessage, from, fromHash); + StatManager.getInstance().addRateData("netDb.lookupsReceived", 1, 0); + // ignore the reply block for the moment + return new HandleDatabaseLookupMessageJob((DatabaseLookupMessage)receivedMessage, from, fromHash); } } diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java index 34dbb734f0..bc8007c1f3 100644 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java @@ -37,6 +37,7 @@ import net.i2p.router.message.SendMessageDirectJob; import net.i2p.router.message.SendTunnelMessageJob; import net.i2p.util.Clock; import net.i2p.util.Log; +import net.i2p.stat.StatManager; /** * Handle a lookup for a key received from a remote peer. Needs to be implemented @@ -51,138 +52,152 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { private final static int MAX_ROUTERS_RETURNED = 3; private final static int REPLY_TIMEOUT = 60*1000; private final static int MESSAGE_PRIORITY = 300; + + static { + StatManager.getInstance().createRateStat("netDb.lookupsHandled", "How many netDb lookups have we handled?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createRateStat("netDb.lookupsMatched", "How many netDb lookups did we have the data for?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + } public HandleDatabaseLookupMessageJob(DatabaseLookupMessage receivedMessage, RouterIdentity from, Hash fromHash) { - _message = receivedMessage; - _from = from; - _fromHash = fromHash; + _message = receivedMessage; + _from = from; + _fromHash = fromHash; } public void runJob() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handling database lookup message for " + _message.getSearchKey()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handling database lookup message for " + _message.getSearchKey()); - Hash fromKey = _message.getFrom().getIdentity().getHash(); + Hash fromKey = _message.getFrom().getIdentity().getHash(); - if (_message.getReplyTunnel() != null) { - if (_log.shouldLog(Log.INFO)) - _log.info("dbLookup received with replies going to " + fromKey + " (tunnel " + _message.getReplyTunnel() + ")"); - } + if (_message.getReplyTunnel() != null) { + if (_log.shouldLog(Log.INFO)) + _log.info("dbLookup received with replies going to " + fromKey + + " (tunnel " + _message.getReplyTunnel() + ")"); + } - NetworkDatabaseFacade.getInstance().store(fromKey, _message.getFrom()); + NetworkDatabaseFacade.getInstance().store(fromKey, _message.getFrom()); - LeaseSet ls = NetworkDatabaseFacade.getInstance().lookupLeaseSetLocally(_message.getSearchKey()); - if (ls != null) { - // send that lease set to the _message.getFromHash peer - if (_log.shouldLog(Log.DEBUG)) - _log.debug("We do have key " + _message.getSearchKey().toBase64() + " locally as a lease set. sending to " + fromKey.toBase64()); - sendData(_message.getSearchKey(), ls, fromKey, _message.getReplyTunnel()); - } else { - RouterInfo info = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(_message.getSearchKey()); - if (info != null) { - // send that routerInfo to the _message.getFromHash peer - if (_log.shouldLog(Log.DEBUG)) - _log.debug("We do have key " + _message.getSearchKey().toBase64() + " locally as a router info. sending to " + fromKey.toBase64()); - sendData(_message.getSearchKey(), info, fromKey, _message.getReplyTunnel()); - } else { - // not found locally - return closest peer routerInfo structs - Set routerInfoSet = NetworkDatabaseFacade.getInstance().findNearestRouters(_message.getSearchKey(), MAX_ROUTERS_RETURNED, _message.getDontIncludePeers()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("We do not have key " + _message.getSearchKey().toBase64() + " locally. sending back " + routerInfoSet.size() + " peers to " + fromKey.toBase64()); - sendClosest(_message.getSearchKey(), routerInfoSet, fromKey, _message.getReplyTunnel()); - } - } + LeaseSet ls = NetworkDatabaseFacade.getInstance().lookupLeaseSetLocally(_message.getSearchKey()); + if (ls != null) { + // send that lease set to the _message.getFromHash peer + if (_log.shouldLog(Log.DEBUG)) + _log.debug("We do have key " + _message.getSearchKey().toBase64() + + " locally as a lease set. sending to " + fromKey.toBase64()); + sendData(_message.getSearchKey(), ls, fromKey, _message.getReplyTunnel()); + } else { + RouterInfo info = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(_message.getSearchKey()); + if (info != null) { + // send that routerInfo to the _message.getFromHash peer + if (_log.shouldLog(Log.DEBUG)) + _log.debug("We do have key " + _message.getSearchKey().toBase64() + + " locally as a router info. sending to " + fromKey.toBase64()); + sendData(_message.getSearchKey(), info, fromKey, _message.getReplyTunnel()); + } else { + // not found locally - return closest peer routerInfo structs + Set routerInfoSet = NetworkDatabaseFacade.getInstance().findNearestRouters(_message.getSearchKey(), + MAX_ROUTERS_RETURNED, _message.getDontIncludePeers()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("We do not have key " + _message.getSearchKey().toBase64() + + " locally. sending back " + routerInfoSet.size() + " peers to " + fromKey.toBase64()); + sendClosest(_message.getSearchKey(), routerInfoSet, fromKey, _message.getReplyTunnel()); + } + } } private void sendData(Hash key, DataStructure data, Hash toPeer, TunnelId replyTunnel) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending data matching key key " + key.toBase64() + " to peer " + toPeer.toBase64() + " tunnel " + replyTunnel); - DatabaseStoreMessage msg = new DatabaseStoreMessage(); - msg.setKey(key); - if (data instanceof LeaseSet) { - msg.setLeaseSet((LeaseSet)data); - msg.setValueType(DatabaseStoreMessage.KEY_TYPE_LEASESET); - } else if (data instanceof RouterInfo) { - msg.setRouterInfo((RouterInfo)data); - msg.setValueType(DatabaseStoreMessage.KEY_TYPE_ROUTERINFO); - } - sendMessage(msg, toPeer, replyTunnel); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending data matching key key " + key.toBase64() + " to peer " + toPeer.toBase64() + + " tunnel " + replyTunnel); + StatManager.getInstance().addRateData("netDb.lookupsMatched", 1, 0); + DatabaseStoreMessage msg = new DatabaseStoreMessage(); + msg.setKey(key); + if (data instanceof LeaseSet) { + msg.setLeaseSet((LeaseSet)data); + msg.setValueType(DatabaseStoreMessage.KEY_TYPE_LEASESET); + } else if (data instanceof RouterInfo) { + msg.setRouterInfo((RouterInfo)data); + msg.setValueType(DatabaseStoreMessage.KEY_TYPE_ROUTERINFO); + } + sendMessage(msg, toPeer, replyTunnel); } private void sendClosest(Hash key, Set routerInfoSet, Hash toPeer, TunnelId replyTunnel) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending closest routers to key " + key.toBase64() + ": # peers = " + routerInfoSet.size() + " tunnel " + replyTunnel); - DatabaseSearchReplyMessage msg = new DatabaseSearchReplyMessage(); - msg.setFromHash(Router.getInstance().getRouterInfo().getIdentity().getHash()); - msg.setSearchKey(key); - if (routerInfoSet.size() <= 0) { - // always include something, so lets toss ourselves in there - routerInfoSet.add(Router.getInstance().getRouterInfo()); - } - msg.addReplies(routerInfoSet); - sendMessage(msg, toPeer, replyTunnel); // should this go via garlic messages instead? + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending closest routers to key " + key.toBase64() + ": # peers = " + + routerInfoSet.size() + " tunnel " + replyTunnel); + DatabaseSearchReplyMessage msg = new DatabaseSearchReplyMessage(); + msg.setFromHash(Router.getInstance().getRouterInfo().getIdentity().getHash()); + msg.setSearchKey(key); + if (routerInfoSet.size() <= 0) { + // always include something, so lets toss ourselves in there + routerInfoSet.add(Router.getInstance().getRouterInfo()); + } + msg.addReplies(routerInfoSet); + sendMessage(msg, toPeer, replyTunnel); // should this go via garlic messages instead? } private void sendMessage(I2NPMessage message, Hash toPeer, TunnelId replyTunnel) { - Job send = null; - if (replyTunnel != null) { - sendThroughTunnel(message, toPeer, replyTunnel); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending reply directly to " + toPeer); - send = new SendMessageDirectJob(message, toPeer, REPLY_TIMEOUT+Clock.getInstance().now(), MESSAGE_PRIORITY); - } - - NetworkDatabaseFacade.getInstance().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT); + StatManager.getInstance().addRateData("netDb.lookupsHandled", 1, 0); + Job send = null; + if (replyTunnel != null) { + sendThroughTunnel(message, toPeer, replyTunnel); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending reply directly to " + toPeer); + send = new SendMessageDirectJob(message, toPeer, REPLY_TIMEOUT+Clock.getInstance().now(), MESSAGE_PRIORITY); + } + + NetworkDatabaseFacade.getInstance().lookupRouterInfo(toPeer, send, null, REPLY_TIMEOUT); } private void sendThroughTunnel(I2NPMessage message, Hash toPeer, TunnelId replyTunnel) { - TunnelInfo info = TunnelManagerFacade.getInstance().getTunnelInfo(replyTunnel); + TunnelInfo info = TunnelManagerFacade.getInstance().getTunnelInfo(replyTunnel); - // the sendTunnelMessageJob can't handle injecting into the tunnel anywhere but the beginning - // (and if we are the beginning, we have the signing key) - if ( (info == null) || (info.getSigningKey() != null)) { - if (_log.shouldLog(Log.INFO)) - _log.info("Sending reply through " + replyTunnel + " on " + toPeer); - JobQueue.getInstance().addJob(new SendTunnelMessageJob(message, replyTunnel, toPeer, null, null, null, null, null, REPLY_TIMEOUT, MESSAGE_PRIORITY)); - } else { - // its a tunnel we're participating in, but we're NOT the gateway, so - if (_log.shouldLog(Log.INFO)) - _log.info("Want to reply to a db request via a tunnel, but we're a participant in the reply! so send it to the gateway"); + // the sendTunnelMessageJob can't handle injecting into the tunnel anywhere but the beginning + // (and if we are the beginning, we have the signing key) + if ( (info == null) || (info.getSigningKey() != null)) { + if (_log.shouldLog(Log.INFO)) + _log.info("Sending reply through " + replyTunnel + " on " + toPeer); + JobQueue.getInstance().addJob(new SendTunnelMessageJob(message, replyTunnel, toPeer, null, null, null, null, null, REPLY_TIMEOUT, MESSAGE_PRIORITY)); + } else { + // its a tunnel we're participating in, but we're NOT the gateway, so + if (_log.shouldLog(Log.INFO)) + _log.info("Want to reply to a db request via a tunnel, but we're a participant in the reply! so send it to the gateway"); - if ( (toPeer == null) || (replyTunnel == null) ) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Someone br0ke us. where is this message supposed to go again?", getAddedBy()); - return; - } + if ( (toPeer == null) || (replyTunnel == null) ) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Someone br0ke us. where is this message supposed to go again?", getAddedBy()); + return; + } - long expiration = REPLY_TIMEOUT + Clock.getInstance().now(); + long expiration = REPLY_TIMEOUT + Clock.getInstance().now(); - TunnelMessage msg = new TunnelMessage(); - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - message.writeBytes(baos); - msg.setData(baos.toByteArray()); - msg.setTunnelId(replyTunnel); - msg.setMessageExpiration(new Date(expiration)); - JobQueue.getInstance().addJob(new SendMessageDirectJob(msg, toPeer, null, null, null, null, expiration, MESSAGE_PRIORITY)); + TunnelMessage msg = new TunnelMessage(); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + message.writeBytes(baos); + msg.setData(baos.toByteArray()); + msg.setTunnelId(replyTunnel); + msg.setMessageExpiration(new Date(expiration)); + JobQueue.getInstance().addJob(new SendMessageDirectJob(msg, toPeer, null, null, null, null, expiration, MESSAGE_PRIORITY)); - String bodyType = message.getClass().getName(); - MessageHistory.getInstance().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the tunnel message to send to the tunnel", ioe); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the tunnel message to send to the tunnel", dfe); - } - return; - } + String bodyType = message.getClass().getName(); + MessageHistory.getInstance().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the tunnel message to send to the tunnel", ioe); + } catch (DataFormatException dfe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the tunnel message to send to the tunnel", dfe); + } + return; + } } public String getName() { return "Handle Database Lookup Message"; } public void dropped() { - MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(), "Dropped due to overload"); + MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(), "Dropped due to overload"); } } diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseStoreMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseStoreMessageJob.java index aa40d16144..85be0f9b08 100644 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseStoreMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/HandleDatabaseStoreMessageJob.java @@ -18,6 +18,7 @@ import net.i2p.router.MessageHistory; import net.i2p.router.NetworkDatabaseFacade; import net.i2p.router.ProfileManager; import net.i2p.util.Log; +import net.i2p.stat.StatManager; /** * Receive DatabaseStoreMessage data and store it in the local net db @@ -28,38 +29,47 @@ public class HandleDatabaseStoreMessageJob extends JobImpl { private DatabaseStoreMessage _message; private RouterIdentity _from; private Hash _fromHash; - + + static { + StatManager.getInstance().createRateStat("netDb.storeHandled", "How many netDb store messages have we handled?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + } + public HandleDatabaseStoreMessageJob(DatabaseStoreMessage receivedMessage, RouterIdentity from, Hash fromHash) { - _message = receivedMessage; - _from = from; - _fromHash = fromHash; + _message = receivedMessage; + _from = from; + _fromHash = fromHash; } public void runJob() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handling database store message"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handling database store message"); - boolean wasNew = false; - if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) - wasNew = (null == NetworkDatabaseFacade.getInstance().store(_message.getKey(), _message.getLeaseSet())); - else if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) { - if (_log.shouldLog(Log.INFO)) - _log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of " + new Date(_message.getRouterInfo().getPublished())); - wasNew = (null == NetworkDatabaseFacade.getInstance().store(_message.getKey(), _message.getRouterInfo())); - ProfileManager.getInstance().heardAbout(_message.getKey()); - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType() + ": " + _message); - } - if (_from != null) - _fromHash = _from.getHash(); - if (_fromHash != null) - ProfileManager.getInstance().dbStoreReceived(_fromHash, wasNew); + boolean wasNew = false; + if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_LEASESET) { + Object match = NetworkDatabaseFacade.getInstance().store(_message.getKey(), _message.getLeaseSet()); + wasNew = (null == match); + } else if (_message.getValueType() == DatabaseStoreMessage.KEY_TYPE_ROUTERINFO) { + if (_log.shouldLog(Log.INFO)) + _log.info("Handling dbStore of router " + _message.getKey() + " with publishDate of " + + new Date(_message.getRouterInfo().getPublished())); + Object match = NetworkDatabaseFacade.getInstance().store(_message.getKey(), _message.getRouterInfo()); + wasNew = (null == match); + ProfileManager.getInstance().heardAbout(_message.getKey()); + } else { + if (_log.shouldLog(Log.ERROR)) + _log.error("Invalid DatabaseStoreMessage data type - " + _message.getValueType() + + ": " + _message); + } + if (_from != null) + _fromHash = _from.getHash(); + if (_fromHash != null) + ProfileManager.getInstance().dbStoreReceived(_fromHash, wasNew); + StatManager.getInstance().addRateData("netDb.storeHandled", 1, 0); } public String getName() { return "Handle Database Store Message"; } public void dropped() { - MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(), "Dropped due to overload"); + MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(), "Dropped due to overload"); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 2cb4b3d6e4..189904affb 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -58,10 +58,10 @@ class SearchJob extends JobImpl { private static final long PER_PEER_TIMEOUT = 30*1000; static { - StatManager.getInstance().createRateStat("netDb.successTime", "How long a successful search takes", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); - StatManager.getInstance().createRateStat("netDb.failedTime", "How long a failed search takes", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); - StatManager.getInstance().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); - StatManager.getInstance().createRateStat("netDb.failedPeers", "How many peers are contacted in a failed search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createRateStat("netDb.successTime", "How long a successful search takes", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createRateStat("netDb.failedTime", "How long a failed search takes", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createRateStat("netDb.failedPeers", "How many peers are contacted in a failed search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l }); } /** @@ -69,21 +69,21 @@ class SearchJob extends JobImpl { * */ public SearchJob(KademliaNetworkDatabaseFacade facade, Hash key, Job onSuccess, Job onFailure, long timeoutMs, boolean keepStats, boolean isLease) { - if ( (key == null) || (key.getData() == null) ) throw new IllegalArgumentException("Search for null key? wtf"); - _facade = facade; - _state = new SearchState(key); - _onSuccess = onSuccess; - _onFailure = onFailure; - _timeoutMs = timeoutMs; - _keepStats = keepStats; - _isLease = isLease; - _expiration = Clock.getInstance().now() + timeoutMs; + if ( (key == null) || (key.getData() == null) ) throw new IllegalArgumentException("Search for null key? wtf"); + _facade = facade; + _state = new SearchState(key); + _onSuccess = onSuccess; + _onFailure = onFailure; + _timeoutMs = timeoutMs; + _keepStats = keepStats; + _isLease = isLease; + _expiration = Clock.getInstance().now() + timeoutMs; } public void runJob() { - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Searching for " + _state.getTarget()); // , getAddedBy()); - searchNext(); + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Searching for " + _state.getTarget()); // , getAddedBy()); + searchNext(); } protected SearchState getState() { return _state; } @@ -95,26 +95,27 @@ class SearchJob extends JobImpl { * Send the next search, or stop if its completed */ protected void searchNext() { - if (_state.completed()) { - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Already completed"); - return; - } - _log.info(getJobId() + ": Searching: " + _state); - if (isLocal()) { - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Key found locally"); - _state.complete(true); - succeed(); - } else if (isExpired()) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": Key search expired"); - _state.complete(true); - fail(); - } else { - //_log.debug("Continuing search"); - continueSearch(); - } + if (_state.completed()) { + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Already completed"); + return; + } + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Searching: " + _state); + if (isLocal()) { + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Key found locally"); + _state.complete(true); + succeed(); + } else if (isExpired()) { + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": Key search expired"); + _state.complete(true); + fail(); + } else { + //_log.debug("Continuing search"); + continueSearch(); + } } /** @@ -124,7 +125,7 @@ class SearchJob extends JobImpl { private boolean isLocal() { return _facade.getDataStore().isKnown(_state.getTarget()); } private boolean isExpired() { - return Clock.getInstance().now() >= _expiration; + return Clock.getInstance().now() >= _expiration; } /** @@ -134,58 +135,66 @@ class SearchJob extends JobImpl { * */ protected void continueSearch() { - if (_state.completed()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Search already completed", new Exception("already completed")); - return; - } - int toCheck = SEARCH_BREDTH - _state.getPending().size(); - if (toCheck <= 0) { - // too many already pending - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": Too many searches already pending (pending: " + _state.getPending().size() + " max: " + SEARCH_BREDTH + ")", new Exception("too many pending")); - requeuePending(); - return; - } - List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); - if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { - if (_state.getPending().size() <= 0) { - // we tried to find some peers, but there weren't any and no one else is going to answer - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": No peers left, and none pending! Already searched: " + _state.getAttempted().size() + " failed: " + _state.getFailed().size(), new Exception("none left")); - fail(); - } else { - // no more to try, but we might get data or close peers from some outstanding requests - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": No peers left, but some are pending! Pending: " + _state.getPending().size() + " attempted: " + _state.getAttempted().size() + " failed: " + _state.getFailed().size(), new Exception("none left, but pending")); - requeuePending(); - return; - } - } else { - _state.addPending(closestHashes); - for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - DataStructure ds = _facade.getDataStore().get(peer); - if ( (ds == null) || !(ds instanceof RouterInfo) ) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds); - } else { - sendSearch((RouterInfo)ds); - } - } - } + if (_state.completed()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Search already completed", new Exception("already completed")); + return; + } + int toCheck = SEARCH_BREDTH - _state.getPending().size(); + if (toCheck <= 0) { + // too many already pending + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": Too many searches already pending (pending: " + + _state.getPending().size() + " max: " + SEARCH_BREDTH + ")", + new Exception("too many pending")); + requeuePending(); + return; + } + List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); + if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { + if (_state.getPending().size() <= 0) { + // we tried to find some peers, but there weren't any and no one else is going to answer + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": No peers left, and none pending! Already searched: " + + _state.getAttempted().size() + " failed: " + _state.getFailed().size(), + new Exception("none left")); + fail(); + } else { + // no more to try, but we might get data or close peers from some outstanding requests + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": No peers left, but some are pending! Pending: " + + _state.getPending().size() + " attempted: " + _state.getAttempted().size() + + " failed: " + _state.getFailed().size(), + new Exception("none left, but pending")); + requeuePending(); + return; + } + } else { + _state.addPending(closestHashes); + for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + DataStructure ds = _facade.getDataStore().get(peer); + if ( (ds == null) || !(ds instanceof RouterInfo) ) { + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + + peer + " : " + ds); + } else { + sendSearch((RouterInfo)ds); + } + } + } } private void requeuePending() { - if (_pendingRequeueJob == null) - _pendingRequeueJob = new JobImpl() { - public String getName() { return "Requeue search with pending"; } - public void runJob() { searchNext(); } - }; - long now = Clock.getInstance().now(); - if (_pendingRequeueJob.getTiming().getStartAfter() < now) - _pendingRequeueJob.getTiming().setStartAfter(now+5*1000); - JobQueue.getInstance().addJob(_pendingRequeueJob); + if (_pendingRequeueJob == null) + _pendingRequeueJob = new JobImpl() { + public String getName() { return "Requeue search with pending"; } + public void runJob() { searchNext(); } + }; + long now = Clock.getInstance().now(); + if (_pendingRequeueJob.getTiming().getStartAfter() < now) + _pendingRequeueJob.getTiming().setStartAfter(now+5*1000); + JobQueue.getInstance().addJob(_pendingRequeueJob); } /** @@ -195,10 +204,10 @@ class SearchJob extends JobImpl { * @return ordered list of Hash objects */ private List getClosestRouters(Hash key, int numClosest, Set alreadyChecked) { - Hash rkey = RoutingKeyGenerator.getInstance().getRoutingKey(key); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Current routing key for " + key + ": " + rkey); - return PeerSelector.getInstance().selectNearestExplicit(rkey, numClosest, alreadyChecked, _facade.getKBuckets()); + Hash rkey = RoutingKeyGenerator.getInstance().getRoutingKey(key); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Current routing key for " + key + ": " + rkey); + return PeerSelector.getInstance().selectNearestExplicit(rkey, numClosest, alreadyChecked, _facade.getKBuckets()); } /** @@ -206,20 +215,20 @@ class SearchJob extends JobImpl { * */ protected void sendSearch(RouterInfo router) { - if (router.getIdentity().equals(Router.getInstance().getRouterInfo().getIdentity())) { - // don't search ourselves - if (_log.shouldLog(Log.ERROR)) - _log.error(getJobId() + ": Dont send search to ourselves - why did we try?"); - return; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Send search to " + router); - } + if (router.getIdentity().equals(Router.getInstance().getRouterInfo().getIdentity())) { + // don't search ourselves + if (_log.shouldLog(Log.ERROR)) + _log.error(getJobId() + ": Dont send search to ourselves - why did we try?"); + return; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Send search to " + router); + } - if (_isLease || false) // moo - sendLeaseSearch(router); - else - sendRouterSearch(router); + if (_isLease || false) // moo + sendLeaseSearch(router); + else + sendRouterSearch(router); } @@ -229,55 +238,63 @@ class SearchJob extends JobImpl { * */ protected void sendLeaseSearch(RouterInfo router) { - TunnelId inTunnelId = getInboundTunnelId(); - if (inTunnelId == null) { - _log.error("No tunnels to get search replies through! wtf!"); - JobQueue.getInstance().addJob(new FailedJob(router)); - return; - } + TunnelId inTunnelId = getInboundTunnelId(); + if (inTunnelId == null) { + _log.error("No tunnels to get search replies through! wtf!"); + JobQueue.getInstance().addJob(new FailedJob(router)); + return; + } - TunnelInfo inTunnel = TunnelManagerFacade.getInstance().getTunnelInfo(inTunnelId); - RouterInfo inGateway = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(inTunnel.getThisHop()); - if (inGateway == null) { - _log.error("We can't find the gateway to our inbound tunnel?! wtf"); - JobQueue.getInstance().addJob(new FailedJob(router)); - return; - } + TunnelInfo inTunnel = TunnelManagerFacade.getInstance().getTunnelInfo(inTunnelId); + RouterInfo inGateway = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(inTunnel.getThisHop()); + if (inGateway == null) { + _log.error("We can't find the gateway to our inbound tunnel?! wtf"); + JobQueue.getInstance().addJob(new FailedJob(router)); + return; + } - long expiration = Clock.getInstance().now() + PER_PEER_TIMEOUT; // getTimeoutMs(); + long expiration = Clock.getInstance().now() + PER_PEER_TIMEOUT; // getTimeoutMs(); - DatabaseLookupMessage msg = buildMessage(inTunnelId, inGateway, expiration); + DatabaseLookupMessage msg = buildMessage(inTunnelId, inGateway, expiration); - TunnelId outTunnelId = getOutboundTunnelId(); - if (outTunnelId == null) { - _log.error("No tunnels to send search out through! wtf!"); - JobQueue.getInstance().addJob(new FailedJob(router)); - return; - } + TunnelId outTunnelId = getOutboundTunnelId(); + if (outTunnelId == null) { + _log.error("No tunnels to send search out through! wtf!"); + JobQueue.getInstance().addJob(new FailedJob(router)); + return; + } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Sending leaseSet search to " + router.getIdentity().getHash().toBase64() + " for " + msg.getSearchKey().toBase64() + " w/ replies through [" + msg.getFrom().getIdentity().getHash().toBase64() + "] via tunnel [" + msg.getReplyTunnel() + "]"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Sending leaseSet search to " + router.getIdentity().getHash().toBase64() + + " for " + msg.getSearchKey().toBase64() + " w/ replies through [" + + msg.getFrom().getIdentity().getHash().toBase64() + "] via tunnel [" + + msg.getReplyTunnel() + "]"); - SearchMessageSelector sel = new SearchMessageSelector(router, _expiration, _state); - long timeoutMs = PER_PEER_TIMEOUT; // getTimeoutMs(); - SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(router, _state, _facade, this); - SendTunnelMessageJob j = new SendTunnelMessageJob(msg, outTunnelId, router.getIdentity().getHash(), null, null, reply, new FailedJob(router), sel, timeoutMs, SEARCH_PRIORITY); - JobQueue.getInstance().addJob(j); + SearchMessageSelector sel = new SearchMessageSelector(router, _expiration, _state); + long timeoutMs = PER_PEER_TIMEOUT; // getTimeoutMs(); + SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(router, _state, _facade, this); + SendTunnelMessageJob j = new SendTunnelMessageJob(msg, outTunnelId, router.getIdentity().getHash(), + null, null, reply, new FailedJob(router), sel, + timeoutMs, SEARCH_PRIORITY); + JobQueue.getInstance().addJob(j); } /** we're searching for a router, so we can just send direct */ protected void sendRouterSearch(RouterInfo router) { - long expiration = Clock.getInstance().now() + PER_PEER_TIMEOUT; // getTimeoutMs(); + long expiration = Clock.getInstance().now() + PER_PEER_TIMEOUT; // getTimeoutMs(); - DatabaseLookupMessage msg = buildMessage(expiration); - - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64() + " for " + msg.getSearchKey().toBase64() + " w/ replies to us [" + msg.getFrom().getIdentity().getHash().toBase64() + "]"); - SearchMessageSelector sel = new SearchMessageSelector(router, _expiration, _state); - long timeoutMs = PER_PEER_TIMEOUT; - SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(router, _state, _facade, this); - SendMessageDirectJob j = new SendMessageDirectJob(msg, router.getIdentity().getHash(), reply, new FailedJob(router), sel, expiration, SEARCH_PRIORITY); - JobQueue.getInstance().addJob(j); + DatabaseLookupMessage msg = buildMessage(expiration); + + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Sending router search to " + router.getIdentity().getHash().toBase64() + + " for " + msg.getSearchKey().toBase64() + " w/ replies to us [" + + msg.getFrom().getIdentity().getHash().toBase64() + "]"); + SearchMessageSelector sel = new SearchMessageSelector(router, _expiration, _state); + long timeoutMs = PER_PEER_TIMEOUT; + SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(router, _state, _facade, this); + SendMessageDirectJob j = new SendMessageDirectJob(msg, router.getIdentity().getHash(), + reply, new FailedJob(router), sel, expiration, SEARCH_PRIORITY); + JobQueue.getInstance().addJob(j); } /** @@ -286,15 +303,15 @@ class SearchJob extends JobImpl { * @return tunnel id (or null if none are found) */ private TunnelId getOutboundTunnelId() { - TunnelSelectionCriteria crit = new TunnelSelectionCriteria(); - crit.setMaximumTunnelsRequired(1); - crit.setMinimumTunnelsRequired(1); - List tunnelIds = TunnelManagerFacade.getInstance().selectOutboundTunnelIds(crit); - if (tunnelIds.size() <= 0) { - return null; - } + TunnelSelectionCriteria crit = new TunnelSelectionCriteria(); + crit.setMaximumTunnelsRequired(1); + crit.setMinimumTunnelsRequired(1); + List tunnelIds = TunnelManagerFacade.getInstance().selectOutboundTunnelIds(crit); + if (tunnelIds.size() <= 0) { + return null; + } - return (TunnelId)tunnelIds.get(0); + return (TunnelId)tunnelIds.get(0); } /** @@ -303,14 +320,14 @@ class SearchJob extends JobImpl { * @return tunnel id (or null if none are found) */ private TunnelId getInboundTunnelId() { - TunnelSelectionCriteria crit = new TunnelSelectionCriteria(); - crit.setMaximumTunnelsRequired(1); - crit.setMinimumTunnelsRequired(1); - List tunnelIds = TunnelManagerFacade.getInstance().selectInboundTunnelIds(crit); - if (tunnelIds.size() <= 0) { - return null; - } - return (TunnelId)tunnelIds.get(0); + TunnelSelectionCriteria crit = new TunnelSelectionCriteria(); + crit.setMaximumTunnelsRequired(1); + crit.setMinimumTunnelsRequired(1); + List tunnelIds = TunnelManagerFacade.getInstance().selectInboundTunnelIds(crit); + if (tunnelIds.size() <= 0) { + return null; + } + return (TunnelId)tunnelIds.get(0); } /** @@ -321,13 +338,13 @@ class SearchJob extends JobImpl { * @param expiration when the search should stop */ protected DatabaseLookupMessage buildMessage(TunnelId replyTunnelId, RouterInfo replyGateway, long expiration) { - DatabaseLookupMessage msg = new DatabaseLookupMessage(); - msg.setSearchKey(_state.getTarget()); - msg.setFrom(replyGateway); - msg.setDontIncludePeers(_state.getAttempted()); - msg.setMessageExpiration(new Date(expiration)); - msg.setReplyTunnel(replyTunnelId); - return msg; + DatabaseLookupMessage msg = new DatabaseLookupMessage(); + msg.setSearchKey(_state.getTarget()); + msg.setFrom(replyGateway); + msg.setDontIncludePeers(_state.getAttempted()); + msg.setMessageExpiration(new Date(expiration)); + msg.setReplyTunnel(replyTunnelId); + return msg; } /** @@ -336,65 +353,69 @@ class SearchJob extends JobImpl { * */ protected DatabaseLookupMessage buildMessage(long expiration) { - DatabaseLookupMessage msg = new DatabaseLookupMessage(); - msg.setSearchKey(_state.getTarget()); - msg.setFrom(Router.getInstance().getRouterInfo()); - msg.setDontIncludePeers(_state.getAttempted()); - msg.setMessageExpiration(new Date(expiration)); - msg.setReplyTunnel(null); - return msg; + DatabaseLookupMessage msg = new DatabaseLookupMessage(); + msg.setSearchKey(_state.getTarget()); + msg.setFrom(Router.getInstance().getRouterInfo()); + msg.setDontIncludePeers(_state.getAttempted()); + msg.setMessageExpiration(new Date(expiration)); + msg.setReplyTunnel(null); + return msg; } void replyFound(DatabaseSearchReplyMessage message, Hash peer) { - long duration = _state.replyFound(peer); - // this processing can take a while, so split 'er up - JobQueue.getInstance().addJob(new SearchReplyJob((DatabaseSearchReplyMessage)message, peer, duration)); + long duration = _state.replyFound(peer); + // this processing can take a while, so split 'er up + JobQueue.getInstance().addJob(new SearchReplyJob((DatabaseSearchReplyMessage)message, peer, duration)); } private final class SearchReplyJob extends JobImpl { - private DatabaseSearchReplyMessage _msg; - private Hash _peer; - private int _curIndex; - private int _invalidPeers; - private int _seenPeers; - private int _newPeers; - private int _duplicatePeers; - private long _duration; - public SearchReplyJob(DatabaseSearchReplyMessage message, Hash peer, long duration) { - _msg = message; - _peer = peer; - _curIndex = 0; - _invalidPeers = 0; - _seenPeers = 0; - _newPeers = 0; - _duplicatePeers = 0; - } - public String getName() { return "Process Reply for Kademlia Search"; } - public void runJob() { - if (_curIndex >= _msg.getNumReplies()) { - ProfileManager.getInstance().dbLookupReply(_peer, _newPeers, _seenPeers, _invalidPeers, _duplicatePeers, _duration); - } else { - RouterInfo ri = _msg.getReply(_curIndex); - if (ri.isValid()) { - if (_state.wasAttempted(ri.getIdentity().getHash())) { - _duplicatePeers++; - } - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": dbSearchReply received on search containing router " + ri.getIdentity().getHash() + " with publishDate of " + new Date(ri.getPublished())); - _facade.store(ri.getIdentity().getHash(), ri); - if (_facade.getKBuckets().add(ri.getIdentity().getHash())) - _newPeers++; - else - _seenPeers++; - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " + ri, new Exception("Invalid peer")); - _invalidPeers++; - } - _curIndex++; - requeue(0); - } - } + private DatabaseSearchReplyMessage _msg; + private Hash _peer; + private int _curIndex; + private int _invalidPeers; + private int _seenPeers; + private int _newPeers; + private int _duplicatePeers; + private long _duration; + public SearchReplyJob(DatabaseSearchReplyMessage message, Hash peer, long duration) { + _msg = message; + _peer = peer; + _curIndex = 0; + _invalidPeers = 0; + _seenPeers = 0; + _newPeers = 0; + _duplicatePeers = 0; + } + public String getName() { return "Process Reply for Kademlia Search"; } + public void runJob() { + if (_curIndex >= _msg.getNumReplies()) { + ProfileManager.getInstance().dbLookupReply(_peer, _newPeers, _seenPeers, + _invalidPeers, _duplicatePeers, _duration); + } else { + RouterInfo ri = _msg.getReply(_curIndex); + if (ri.isValid()) { + if (_state.wasAttempted(ri.getIdentity().getHash())) { + _duplicatePeers++; + } + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": dbSearchReply received on search containing router " + + ri.getIdentity().getHash() + " with publishDate of " + + new Date(ri.getPublished())); + _facade.store(ri.getIdentity().getHash(), ri); + if (_facade.getKBuckets().add(ri.getIdentity().getHash())) + _newPeers++; + else + _seenPeers++; + } else { + if (_log.shouldLog(Log.ERROR)) + _log.error(getJobId() + ": Received an invalid peer from " + _peer + ": " + + ri, new Exception("Invalid peer")); + _invalidPeers++; + } + _curIndex++; + requeue(0); + } + } } /** @@ -403,66 +424,70 @@ class SearchJob extends JobImpl { * */ protected class FailedJob extends JobImpl { - private Hash _peer; - private boolean _penalizePeer; - public FailedJob(RouterInfo peer) { - this(peer, true); - } - /** - * Allow the choice as to whether failed searches should count against - * the peer (such as if we search for a random key) - * - */ - public FailedJob(RouterInfo peer, boolean penalizePeer) { - super(); - _penalizePeer = penalizePeer; - _peer = peer.getIdentity().getHash(); - } - public void runJob() { - _state.replyTimeout(_peer); - if (_penalizePeer) { - _log.warn("Penalizing peer for timeout on search: " + _peer.toBase64()); - ProfileManager.getInstance().dbLookupFailed(_peer); - } else { - _log.error("NOT (!!) Penalizing peer for timeout on search: " + _peer.toBase64()); - } - searchNext(); - } - public String getName() { return "Kademlia Search Failed"; } + private Hash _peer; + private boolean _penalizePeer; + public FailedJob(RouterInfo peer) { + this(peer, true); + } + /** + * Allow the choice as to whether failed searches should count against + * the peer (such as if we search for a random key) + * + */ + public FailedJob(RouterInfo peer, boolean penalizePeer) { + super(); + _penalizePeer = penalizePeer; + _peer = peer.getIdentity().getHash(); + } + public void runJob() { + _state.replyTimeout(_peer); + if (_penalizePeer) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Penalizing peer for timeout on search: " + _peer.toBase64()); + ProfileManager.getInstance().dbLookupFailed(_peer); + } else { + if (_log.shouldLog(Log.ERROR)) + _log.error("NOT (!!) Penalizing peer for timeout on search: " + _peer.toBase64()); + } + searchNext(); + } + public String getName() { return "Kademlia Search Failed"; } } - + /** * Search was totally successful */ protected void succeed() { - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": Succeeded search for key " + _state.getTarget()); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": State of successful search: " + _state); + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Succeeded search for key " + _state.getTarget()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": State of successful search: " + _state); - if (_keepStats) { - long time = Clock.getInstance().now() - _state.getWhenStarted(); - StatManager.getInstance().addRateData("netDb.successTime", time, 0); - StatManager.getInstance().addRateData("netDb.successPeers", _state.getAttempted().size(), time); - } - if (_onSuccess != null) - JobQueue.getInstance().addJob(_onSuccess); + if (_keepStats) { + long time = Clock.getInstance().now() - _state.getWhenStarted(); + StatManager.getInstance().addRateData("netDb.successTime", time, 0); + StatManager.getInstance().addRateData("netDb.successPeers", _state.getAttempted().size(), time); + } + if (_onSuccess != null) + JobQueue.getInstance().addJob(_onSuccess); } /** * Search totally failed */ protected void fail() { - _log.info(getJobId() + ": Failed search for key " + _state.getTarget()); - _log.debug(getJobId() + ": State of failed search: " + _state); - - if (_keepStats) { - long time = Clock.getInstance().now() - _state.getWhenStarted(); - StatManager.getInstance().addRateData("netDb.failedTime", time, 0); - StatManager.getInstance().addRateData("netDb.failedPeers", _state.getAttempted().size(), time); - } - if (_onFailure != null) - JobQueue.getInstance().addJob(_onFailure); + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Failed search for key " + _state.getTarget()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": State of failed search: " + _state); + + if (_keepStats) { + long time = Clock.getInstance().now() - _state.getWhenStarted(); + StatManager.getInstance().addRateData("netDb.failedTime", time, 0); + StatManager.getInstance().addRateData("netDb.failedPeers", _state.getAttempted().size(), time); + } + if (_onFailure != null) + JobQueue.getInstance().addJob(_onFailure); } public String getName() { return "Kademlia NetDb Search"; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index fe427c02db..83b9d2d47e 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -48,6 +48,7 @@ import net.i2p.router.message.SendTunnelMessageJob; import net.i2p.util.Clock; import net.i2p.util.Log; import net.i2p.util.RandomSource; +import net.i2p.stat.StatManager; class StoreJob extends JobImpl { private final Log _log = new Log(StoreJob.class); @@ -71,44 +72,50 @@ class StoreJob extends JobImpl { */ private final static int EXPLORATORY_REDUNDANCY = 1; private final static int STORE_PRIORITY = 100; - + + static { + StatManager.getInstance().createRateStat("netDb.storeSent", "How many netDb store messages have we sent?", "Network Database", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + } + /** * Create a new search for the routingKey specified * */ public StoreJob(KademliaNetworkDatabaseFacade facade, Hash key, DataStructure data, Job onSuccess, Job onFailure, long timeoutMs) { - _facade = facade; - _state = new StoreState(key, data); - _onSuccess = onSuccess; - _onFailure = onFailure; - _timeoutMs = timeoutMs; - _expiration = Clock.getInstance().now() + timeoutMs; + _facade = facade; + _state = new StoreState(key, data); + _onSuccess = onSuccess; + _onFailure = onFailure; + _timeoutMs = timeoutMs; + _expiration = Clock.getInstance().now() + timeoutMs; } public String getName() { return "Kademlia NetDb Store";} public void runJob() { - sendNext(); + sendNext(); } protected boolean isExpired() { - return Clock.getInstance().now() >= _expiration; + return Clock.getInstance().now() >= _expiration; } /** * send the key to the next batch of peers */ protected void sendNext() { - if (_state.completed()) { - _log.info("Already completed"); - return; - } - if (isExpired()) { - _state.complete(true); - fail(); - } else { - _log.info("Sending: " + _state); - continueSending(); - } + if (_state.completed()) { + if (_log.shouldLog(Log.INFO)) + _log.info("Already completed"); + return; + } + if (isExpired()) { + _state.complete(true); + fail(); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Sending: " + _state); + continueSending(); + } } /** @@ -118,37 +125,39 @@ class StoreJob extends JobImpl { * */ protected void continueSending() { - if (_state.completed()) return; - int toCheck = PARALLELIZATION - _state.getPending().size(); - if (toCheck <= 0) { - // too many already pending - return; - } - if (toCheck > PARALLELIZATION) - toCheck = PARALLELIZATION; - - List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); - if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { - if (_state.getPending().size() <= 0) { - // we tried to find some peers, but there weren't any and no one else is going to answer - fail(); - } else { - // no more to try, but we might get data or close peers from some outstanding requests - return; - } - } else { - _state.addPending(closestHashes); - _log.info("Continue sending key " + _state.getTarget() + " to " + closestHashes); - for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - DataStructure ds = _facade.getDataStore().get(peer); - if ( (ds == null) || !(ds instanceof RouterInfo) ) { - _log.warn("Error selecting closest hash that wasnt a router! " + peer + " : " + ds); - } else { - sendStore((RouterInfo)ds); - } - } - } + if (_state.completed()) return; + int toCheck = PARALLELIZATION - _state.getPending().size(); + if (toCheck <= 0) { + // too many already pending + return; + } + if (toCheck > PARALLELIZATION) + toCheck = PARALLELIZATION; + + List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); + if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { + if (_state.getPending().size() <= 0) { + // we tried to find some peers, but there weren't any and no one else is going to answer + fail(); + } else { + // no more to try, but we might get data or close peers from some outstanding requests + return; + } + } else { + _state.addPending(closestHashes); + if (_log.shouldLog(Log.INFO)) + _log.info("Continue sending key " + _state.getTarget() + " to " + closestHashes); + for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + DataStructure ds = _facade.getDataStore().get(peer); + if ( (ds == null) || !(ds instanceof RouterInfo) ) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error selecting closest hash that wasnt a router! " + peer + " : " + ds); + } else { + sendStore((RouterInfo)ds); + } + } + } } /** @@ -160,9 +169,10 @@ class StoreJob extends JobImpl { * @return ordered list of Hash objects */ protected List getClosestRouters(Hash key, int numClosest, Set alreadyChecked) { - Hash rkey = RoutingKeyGenerator.getInstance().getRoutingKey(key); - _log.debug("Current routing key for " + key + ": " + rkey); - return PeerSelector.getInstance().selectNearestExplicit(rkey, numClosest, alreadyChecked, _facade.getKBuckets()); + Hash rkey = RoutingKeyGenerator.getInstance().getRoutingKey(key); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Current routing key for " + key + ": " + rkey); + return PeerSelector.getInstance().selectNearestExplicit(rkey, numClosest, alreadyChecked, _facade.getKBuckets()); } /** @@ -171,250 +181,78 @@ class StoreJob extends JobImpl { * */ protected void sendStore(RouterInfo router) { - DatabaseStoreMessage msg = new DatabaseStoreMessage(); - msg.setKey(_state.getTarget()); - if (_state.getData() instanceof RouterInfo) - msg.setRouterInfo((RouterInfo)_state.getData()); - else if (_state.getData() instanceof LeaseSet) - msg.setLeaseSet((LeaseSet)_state.getData()); - else - throw new IllegalArgumentException("Storing an unknown data type! " + _state.getData()); - msg.setMessageExpiration(new Date(Clock.getInstance().now() + _timeoutMs)); - - if (router.getIdentity().equals(Router.getInstance().getRouterInfo().getIdentity())) { - // don't send it to ourselves - _log.error("Dont send store to ourselves - why did we try?"); - return; - } else { - _log.debug("Send store to " + router.getIdentity().getHash().toBase64()); - } - - sendStore(msg, router, _expiration); + DatabaseStoreMessage msg = new DatabaseStoreMessage(); + msg.setKey(_state.getTarget()); + if (_state.getData() instanceof RouterInfo) + msg.setRouterInfo((RouterInfo)_state.getData()); + else if (_state.getData() instanceof LeaseSet) + msg.setLeaseSet((LeaseSet)_state.getData()); + else + throw new IllegalArgumentException("Storing an unknown data type! " + _state.getData()); + msg.setMessageExpiration(new Date(Clock.getInstance().now() + _timeoutMs)); + + if (router.getIdentity().equals(Router.getInstance().getRouterInfo().getIdentity())) { + // don't send it to ourselves + if (_log.shouldLog(Log.ERROR)) + _log.error("Dont send store to ourselves - why did we try?"); + return; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Send store to " + router.getIdentity().getHash().toBase64()); + } + + sendStore(msg, router, _expiration); } protected void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { - //sendStoreAsGarlic(msg, peer, expiration); - sendStoreThroughTunnel(msg, peer, expiration); + sendStoreThroughTunnel(msg, peer, expiration); } protected void sendStoreThroughTunnel(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { - FailedJob fail = new FailedJob(peer); - Job sent = new OptimisticSendSuccess(peer); - TunnelInfo info = null; - TunnelId outboundTunnelId = selectOutboundTunnel(); - if (outboundTunnelId != null) - info = TunnelManagerFacade.getInstance().getTunnelInfo(outboundTunnelId); - if (info == null) { - _log.error("selectOutboundTunnel didn't find a valid tunnel! outboundTunnelId = " + outboundTunnelId + " is not known by the tunnel manager"); - return; - } - _log.info("Store for " + _state.getTarget() + " expiring on " + new Date(_expiration) + " is going to " + peer.getIdentity().getHash() + " via outbound tunnel: " + info); - // send it out our outboundTunnelId with instructions for our endpoint to forward it - // to the router specified (though no particular tunnelId on the target) - JobQueue.getInstance().addJob(new SendTunnelMessageJob(msg, outboundTunnelId, peer.getIdentity().getHash(), null, sent, null, fail, null, _expiration-Clock.getInstance().now(), STORE_PRIORITY)); + FailedJob fail = new FailedJob(peer); + Job sent = new OptimisticSendSuccess(peer); + TunnelInfo info = null; + TunnelId outboundTunnelId = selectOutboundTunnel(); + if (outboundTunnelId != null) + info = TunnelManagerFacade.getInstance().getTunnelInfo(outboundTunnelId); + if (info == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("selectOutboundTunnel didn't find a valid tunnel! outboundTunnelId = " + + outboundTunnelId + " is not known by the tunnel manager"); + return; + } + if (_log.shouldLog(Log.INFO)) + _log.info("Store for " + _state.getTarget() + " expiring on " + new Date(_expiration) + + " is going to " + peer.getIdentity().getHash() + " via outbound tunnel: " + info); + // send it out our outboundTunnelId with instructions for our endpoint to forward it + // to the router specified (though no particular tunnelId on the target) + Job j = new SendTunnelMessageJob(msg, outboundTunnelId, peer.getIdentity().getHash(), + null, sent, null, fail, null, _expiration-Clock.getInstance().now(), + STORE_PRIORITY); + JobQueue.getInstance().addJob(j); + StatManager.getInstance().addRateData("netDb.storeSent", 1, 0); } private TunnelId selectOutboundTunnel() { - TunnelSelectionCriteria criteria = new TunnelSelectionCriteria(); - criteria.setAnonymityPriority(80); - criteria.setLatencyPriority(50); - criteria.setReliabilityPriority(20); - criteria.setMaximumTunnelsRequired(1); - criteria.setMinimumTunnelsRequired(1); - List tunnelIds = TunnelManagerFacade.getInstance().selectOutboundTunnelIds(criteria); - if (tunnelIds.size() <= 0) { - _log.error("No outbound tunnels?!"); - return null; - } else { - return (TunnelId)tunnelIds.get(0); - } - } - - - /** - * Send the store to the peer by way of a garlic and route an ack back to us - * - */ - protected void sendStoreAsGarlic(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { - long waitingForId = RandomSource.getInstance().nextInt(Integer.MAX_VALUE); - GarlicConfig cfg = buildGarlicConfig(msg, peer, waitingForId, expiration); - FailedJob failedJob = new FailedJob(peer); - long timeoutMs = expiration - Clock.getInstance().now(); - StoreMessageSelector selector = new StoreMessageSelector(peer, waitingForId); - SessionKey sentKey = new SessionKey(); - Set sentTags = new HashSet(32); - PublicKey rcptKey = cfg.getRecipientPublicKey(); - if (rcptKey == null) { - if (cfg.getRecipient() == null) { - throw new IllegalArgumentException("Null recipient specified"); - } else if (cfg.getRecipient().getIdentity() == null) { - throw new IllegalArgumentException("Null recipient.identity specified"); - } else if (cfg.getRecipient().getIdentity().getPublicKey() == null) { - throw new IllegalArgumentException("Null recipient.identity.publicKey specified"); - } else - rcptKey = cfg.getRecipient().getIdentity().getPublicKey(); - } - - JobQueue.getInstance().addJob(new SendGarlicJob(cfg, null, failedJob, new UpdateReplyFoundJob(peer, sentKey, sentTags, rcptKey), failedJob, timeoutMs, STORE_PRIORITY, selector, sentKey, sentTags)); - } - - /** - * Build a garlic containing the data store and an ack to be unwrapped at the - * target, with the data store sent locally and the ack sent back to us through - * a random tunnel as a DeliveryStatusMessage containing the ackId - * - */ - protected GarlicConfig buildGarlicConfig(I2NPMessage msg, RouterInfo target, long ackId, long expiration) { - GarlicConfig config = new GarlicConfig(); - - PayloadGarlicConfig dataClove = buildDataClove(msg, target, expiration); - config.addClove(dataClove); - PayloadGarlicConfig ackClove = buildAckClove(ackId, expiration); - config.addClove(ackClove); - - DeliveryInstructions instructions = new DeliveryInstructions(); - instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_ROUTER); - instructions.setDelayRequested(false); - instructions.setDelaySeconds(0); - instructions.setEncrypted(false); - instructions.setEncryptionKey(null); - instructions.setRouter(target.getIdentity().getHash()); - instructions.setTunnelId(null); - - config.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); - config.setDeliveryInstructions(instructions); - config.setId(RandomSource.getInstance().nextInt(Integer.MAX_VALUE)); - config.setExpiration(_expiration); - config.setRecipientPublicKey(target.getIdentity().getPublicKey()); - config.setRecipient(target); - config.setRequestAck(false); - - return config; + TunnelSelectionCriteria criteria = new TunnelSelectionCriteria(); + criteria.setAnonymityPriority(80); + criteria.setLatencyPriority(50); + criteria.setReliabilityPriority(20); + criteria.setMaximumTunnelsRequired(1); + criteria.setMinimumTunnelsRequired(1); + List tunnelIds = TunnelManagerFacade.getInstance().selectOutboundTunnelIds(criteria); + if (tunnelIds.size() <= 0) { + _log.error("No outbound tunnels?!"); + return null; + } else { + return (TunnelId)tunnelIds.get(0); + } } - - /** - * Build a clove that sends a DeliveryStatusMessage to us after tunneling it - * through a random inbound tunnel - * - */ - protected PayloadGarlicConfig buildAckClove(long ackId, long expiration) { - DeliveryStatusMessage ackMsg = new DeliveryStatusMessage(); - ackMsg.setArrival(new Date(Clock.getInstance().now())); - ackMsg.setMessageId(ackId); - ackMsg.setMessageExpiration(new Date(expiration)); - ackMsg.setUniqueId(RandomSource.getInstance().nextInt(Integer.MAX_VALUE)); - - PayloadGarlicConfig ackClove = new PayloadGarlicConfig(); - - TunnelSelectionCriteria criteria = new TunnelSelectionCriteria(); - criteria.setAnonymityPriority(80); - criteria.setLatencyPriority(20); - criteria.setReliabilityPriority(50); - criteria.setMaximumTunnelsRequired(1); - criteria.setMinimumTunnelsRequired(1); - List tunnelIds = TunnelManagerFacade.getInstance().selectInboundTunnelIds(criteria); - if (tunnelIds.size() <= 0) { - _log.error("No inbound tunnels exist for a db store ack to come through!"); - return null; - } - TunnelId replyToTunnelId = (TunnelId)tunnelIds.get(0); // tunnel id on that gateway - TunnelInfo info = TunnelManagerFacade.getInstance().getTunnelInfo(replyToTunnelId); - RouterInfo replyPeer = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(info.getThisHop()); // inbound tunnel gateway - if (replyPeer == null) { - _log.error("We don't know how to reach the gateway of our own inbound tunnel?! " + info); - return null; - } - Hash replyToTunnelRouter = replyPeer.getIdentity().getHash(); - - DeliveryInstructions ackInstructions = new DeliveryInstructions(); - ackInstructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_TUNNEL); - ackInstructions.setRouter(replyToTunnelRouter); - ackInstructions.setTunnelId(replyToTunnelId); - ackInstructions.setDelayRequested(false); - ackInstructions.setDelaySeconds(0); - ackInstructions.setEncrypted(false); - - ackClove.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); - ackClove.setDeliveryInstructions(ackInstructions); - ackClove.setExpiration(_expiration); - ackClove.setId(RandomSource.getInstance().nextInt(Integer.MAX_VALUE)); - ackClove.setPayload(ackMsg); - ackClove.setRecipient(replyPeer); - ackClove.setRequestAck(false); - - return ackClove; - } - - /** - * Build a clove that sends the data to the target (which is local) - */ - protected PayloadGarlicConfig buildDataClove(I2NPMessage data, RouterInfo target, long expiration) { - PayloadGarlicConfig clove = new PayloadGarlicConfig(); - - DeliveryInstructions instructions = new DeliveryInstructions(); - instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL); - instructions.setRouter(target.getIdentity().getHash()); - instructions.setTunnelId(null); - instructions.setDelayRequested(false); - instructions.setDelaySeconds(0); - instructions.setEncrypted(false); - - clove.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); - clove.setDeliveryInstructions(instructions); - clove.setExpiration(expiration); - clove.setId(RandomSource.getInstance().nextInt(Integer.MAX_VALUE)); - clove.setPayload(data); - clove.setRecipientPublicKey(null); - clove.setRequestAck(false); - - return clove; - } - - + /** * Called after a match to a db store is found (match against a deliveryStatusMessage) * */ - protected class UpdateReplyFoundJob extends JobImpl implements ReplyJob { - private I2NPMessage _message; - private Hash _peer; - private SessionKey _sentKey; - private Set _sentTags; - private PublicKey _toKey; - - public UpdateReplyFoundJob(RouterInfo peer, SessionKey sentKey, Set sentTags, PublicKey toKey) { - super(); - _peer = peer.getIdentity().getHash(); - _sentKey = sentKey; - _sentTags = sentTags; - _toKey = toKey; - } - - public String getName() { return "Update Reply Found for Kademlia Store"; } - public void runJob() { - _log.info("Reply from " + _peer + " with message " + _message); - - if (_message.getType() == DeliveryStatusMessage.MESSAGE_TYPE) { - long delay = _state.confirmed(_peer); - ProfileManager.getInstance().dbStoreSent(_peer, delay); - - if ( (_sentKey != null) && (_sentKey.getData() != null) && (_sentTags != null) && (_sentTags.size() > 0) && (_toKey != null) ) { - SessionKeyManager.getInstance().tagsDelivered(_toKey, _sentKey, _sentTags); - _log.info("Delivered tags successfully to " + _peer + "! # tags: " + _sentTags.size()); - } - - if (_state.getSuccessful().size() >= REDUNDANCY) { - succeed(); - } else { - sendNext(); - } - } else { - _log.error("Selector matched to an UpdateReplyFoundJob with a message that isnt a DeliveryStatusMessage! " + _message); - } - } - - public void setMessage(I2NPMessage message) { _message = message; } - } /** * Called after sending a dbStore to a peer successfully without waiting for confirm and @@ -422,25 +260,27 @@ class StoreJob extends JobImpl { * */ protected class OptimisticSendSuccess extends JobImpl { - private Hash _peer; - - public OptimisticSendSuccess(RouterInfo peer) { - super(); - _peer = peer.getIdentity().getHash(); - } - - public String getName() { return "Optimistic Kademlia Store Send Success"; } - public void runJob() { - _log.info("Optimistically marking store of " + _state.getTarget() + " to " + _peer + " successful"); - //long howLong = _state.confirmed(_peer); - //ProfileManager.getInstance().dbStoreSent(_peer, howLong); - - if (_state.getSuccessful().size() >= REDUNDANCY) { - succeed(); - } else { - sendNext(); - } - } + private Hash _peer; + + public OptimisticSendSuccess(RouterInfo peer) { + super(); + _peer = peer.getIdentity().getHash(); + } + + public String getName() { return "Optimistic Kademlia Store Send Success"; } + public void runJob() { + if (_log.shouldLog(Log.INFO)) + _log.info("Optimistically marking store of " + _state.getTarget() + + " to " + _peer + " successful"); + //long howLong = _state.confirmed(_peer); + //ProfileManager.getInstance().dbStoreSent(_peer, howLong); + + if (_state.getSuccessful().size() >= REDUNDANCY) { + succeed(); + } else { + sendNext(); + } + } } /** @@ -449,17 +289,17 @@ class StoreJob extends JobImpl { * */ protected class FailedJob extends JobImpl { - private Hash _peer; - public FailedJob(RouterInfo peer) { - super(); - _peer = peer.getIdentity().getHash(); - } - public void runJob() { - _state.replyTimeout(_peer); - ProfileManager.getInstance().dbStoreFailed(_peer); - sendNext(); - } - public String getName() { return "Kademlia Store Failed"; } + private Hash _peer; + public FailedJob(RouterInfo peer) { + super(); + _peer = peer.getIdentity().getHash(); + } + public void runJob() { + _state.replyTimeout(_peer); + ProfileManager.getInstance().dbStoreFailed(_peer); + sendNext(); + } + public String getName() { return "Kademlia Store Failed"; } } /** @@ -468,211 +308,219 @@ class StoreJob extends JobImpl { * */ protected class StoreMessageSelector implements MessageSelector { - private Hash _peer; - private long _waitingForId; - private boolean _found; - public StoreMessageSelector(RouterInfo peer, long waitingForId) { - _peer = peer.getIdentity().getHash(); - _found = false; - _waitingForId = waitingForId; - } - - public boolean continueMatching() { return !_found; } - public long getExpiration() { return _expiration; } - public boolean isMatch(I2NPMessage message) { - _log.debug("isMatch("+message.getClass().getName() + ") [want deliveryStatusMessage from " + _peer + " wrt " + _state.getTarget() + "]"); - if (message instanceof DeliveryStatusMessage) { - DeliveryStatusMessage msg = (DeliveryStatusMessage)message; - if (msg.getMessageId() == _waitingForId) { - _log.debug("Found match for the key we're waiting for: " + _waitingForId); - _found = true; - return true; - } else { - _log.debug("DeliveryStatusMessage of a key we're not looking for"); - return false; - } - } else { - _log.debug("Not a DeliveryStatusMessage"); - return false; - } - } + private Hash _peer; + private long _waitingForId; + private boolean _found; + public StoreMessageSelector(RouterInfo peer, long waitingForId) { + _peer = peer.getIdentity().getHash(); + _found = false; + _waitingForId = waitingForId; + } + + public boolean continueMatching() { return !_found; } + public long getExpiration() { return _expiration; } + public boolean isMatch(I2NPMessage message) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("isMatch("+message.getClass().getName() + ") [want deliveryStatusMessage from " + + _peer + " wrt " + _state.getTarget() + "]"); + if (message instanceof DeliveryStatusMessage) { + DeliveryStatusMessage msg = (DeliveryStatusMessage)message; + if (msg.getMessageId() == _waitingForId) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Found match for the key we're waiting for: " + _waitingForId); + _found = true; + return true; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("DeliveryStatusMessage of a key we're not looking for"); + return false; + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Not a DeliveryStatusMessage"); + return false; + } + } } /** * Send was totally successful */ protected void succeed() { - _log.info("Succeeded sending key " + _state.getTarget()); - _log.debug("State of successful send: " + _state); - if (_onSuccess != null) - JobQueue.getInstance().addJob(_onSuccess); - _facade.noteKeySent(_state.getTarget()); + if (_log.shouldLog(Log.INFO)) + _log.info("Succeeded sending key " + _state.getTarget()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("State of successful send: " + _state); + if (_onSuccess != null) + JobQueue.getInstance().addJob(_onSuccess); + _facade.noteKeySent(_state.getTarget()); } /** * Send totally failed */ protected void fail() { - _log.info("Failed sending key " + _state.getTarget()); - _log.debug("State of failed send: " + _state, new Exception("Who failed me?")); - if (_onFailure != null) - JobQueue.getInstance().addJob(_onFailure); + if (_log.shouldLog(Log.INFO)) + _log.info("Failed sending key " + _state.getTarget()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("State of failed send: " + _state, new Exception("Who failed me?")); + if (_onFailure != null) + JobQueue.getInstance().addJob(_onFailure); } protected static class StoreState { - private Hash _key; - private DataStructure _data; - private HashSet _pendingPeers; - private HashMap _pendingPeerTimes; - private HashSet _successfulPeers; - private HashSet _successfulExploratoryPeers; - private HashSet _failedPeers; - private HashSet _attemptedPeers; - private volatile long _completed; - private volatile long _started; - - public StoreState(Hash key, DataStructure data) { - _key = key; - _data = data; - _pendingPeers = new HashSet(16); - _pendingPeerTimes = new HashMap(16); - _attemptedPeers = new HashSet(16); - _failedPeers = new HashSet(16); - _successfulPeers = new HashSet(16); - _successfulExploratoryPeers = new HashSet(16); - _completed = -1; - _started = Clock.getInstance().now(); - } - - public Hash getTarget() { return _key; } - public DataStructure getData() { return _data; } - public Set getPending() { - synchronized (_pendingPeers) { - return (Set)_pendingPeers.clone(); - } - } - public Set getAttempted() { - synchronized (_attemptedPeers) { - return (Set)_attemptedPeers.clone(); - } - } - public Set getSuccessful() { - synchronized (_successfulPeers) { - return (Set)_successfulPeers.clone(); - } - } - public Set getSuccessfulExploratory() { - synchronized (_successfulExploratoryPeers) { - return (Set)_successfulExploratoryPeers.clone(); - } - } - public Set getFailed() { - synchronized (_failedPeers) { - return (Set)_failedPeers.clone(); - } - } - public boolean completed() { return _completed != -1; } - public void complete(boolean completed) { - if (completed) - _completed = Clock.getInstance().now(); - } - - public long getWhenStarted() { return _started; } - public long getWhenCompleted() { return _completed; } - - public void addPending(Collection pending) { - synchronized (_pendingPeers) { - _pendingPeers.addAll(pending); - for (Iterator iter = pending.iterator(); iter.hasNext(); ) - _pendingPeerTimes.put(iter.next(), new Long(Clock.getInstance().now())); - } - synchronized (_attemptedPeers) { - _attemptedPeers.addAll(pending); - } - } - - public long confirmed(Hash peer) { - long rv = -1; - synchronized (_pendingPeers) { - _pendingPeers.remove(peer); - Long when = (Long)_pendingPeerTimes.remove(peer); - if (when != null) - rv = Clock.getInstance().now() - when.longValue(); - } - synchronized (_successfulPeers) { - _successfulPeers.add(peer); - } - return rv; - } + private Hash _key; + private DataStructure _data; + private HashSet _pendingPeers; + private HashMap _pendingPeerTimes; + private HashSet _successfulPeers; + private HashSet _successfulExploratoryPeers; + private HashSet _failedPeers; + private HashSet _attemptedPeers; + private volatile long _completed; + private volatile long _started; + + public StoreState(Hash key, DataStructure data) { + _key = key; + _data = data; + _pendingPeers = new HashSet(16); + _pendingPeerTimes = new HashMap(16); + _attemptedPeers = new HashSet(16); + _failedPeers = new HashSet(16); + _successfulPeers = new HashSet(16); + _successfulExploratoryPeers = new HashSet(16); + _completed = -1; + _started = Clock.getInstance().now(); + } + + public Hash getTarget() { return _key; } + public DataStructure getData() { return _data; } + public Set getPending() { + synchronized (_pendingPeers) { + return (Set)_pendingPeers.clone(); + } + } + public Set getAttempted() { + synchronized (_attemptedPeers) { + return (Set)_attemptedPeers.clone(); + } + } + public Set getSuccessful() { + synchronized (_successfulPeers) { + return (Set)_successfulPeers.clone(); + } + } + public Set getSuccessfulExploratory() { + synchronized (_successfulExploratoryPeers) { + return (Set)_successfulExploratoryPeers.clone(); + } + } + public Set getFailed() { + synchronized (_failedPeers) { + return (Set)_failedPeers.clone(); + } + } + public boolean completed() { return _completed != -1; } + public void complete(boolean completed) { + if (completed) + _completed = Clock.getInstance().now(); + } + + public long getWhenStarted() { return _started; } + public long getWhenCompleted() { return _completed; } + + public void addPending(Collection pending) { + synchronized (_pendingPeers) { + _pendingPeers.addAll(pending); + for (Iterator iter = pending.iterator(); iter.hasNext(); ) + _pendingPeerTimes.put(iter.next(), new Long(Clock.getInstance().now())); + } + synchronized (_attemptedPeers) { + _attemptedPeers.addAll(pending); + } + } + + public long confirmed(Hash peer) { + long rv = -1; + synchronized (_pendingPeers) { + _pendingPeers.remove(peer); + Long when = (Long)_pendingPeerTimes.remove(peer); + if (when != null) + rv = Clock.getInstance().now() - when.longValue(); + } + synchronized (_successfulPeers) { + _successfulPeers.add(peer); + } + return rv; + } - public long confirmedExploratory(Hash peer) { - long rv = -1; - synchronized (_pendingPeers) { - _pendingPeers.remove(peer); - Long when = (Long)_pendingPeerTimes.remove(peer); - if (when != null) - rv = Clock.getInstance().now() - when.longValue(); - } - synchronized (_successfulExploratoryPeers) { - _successfulExploratoryPeers.add(peer); - } - return rv; - } - - public void replyTimeout(Hash peer) { - synchronized (_pendingPeers) { - _pendingPeers.remove(peer); - } - synchronized (_failedPeers) { - _failedPeers.add(peer); - } - } - - public String toString() { - StringBuffer buf = new StringBuffer(256); - buf.append("Storing ").append(_key); - buf.append(" "); - if (_completed <= 0) - buf.append(" completed? false "); - else - buf.append(" completed on ").append(new Date(_completed)); - buf.append(" Attempted: "); - synchronized (_attemptedPeers) { - for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Pending: "); - synchronized (_pendingPeers) { - for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Failed: "); - synchronized (_failedPeers) { - for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Successful: "); - synchronized (_successfulPeers) { - for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - buf.append(" Successful Exploratory: "); - synchronized (_successfulExploratoryPeers) { - for (Iterator iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - buf.append(peer.toBase64()).append(" "); - } - } - return buf.toString(); - } + public long confirmedExploratory(Hash peer) { + long rv = -1; + synchronized (_pendingPeers) { + _pendingPeers.remove(peer); + Long when = (Long)_pendingPeerTimes.remove(peer); + if (when != null) + rv = Clock.getInstance().now() - when.longValue(); + } + synchronized (_successfulExploratoryPeers) { + _successfulExploratoryPeers.add(peer); + } + return rv; + } + + public void replyTimeout(Hash peer) { + synchronized (_pendingPeers) { + _pendingPeers.remove(peer); + } + synchronized (_failedPeers) { + _failedPeers.add(peer); + } + } + + public String toString() { + StringBuffer buf = new StringBuffer(256); + buf.append("Storing ").append(_key); + buf.append(" "); + if (_completed <= 0) + buf.append(" completed? false "); + else + buf.append(" completed on ").append(new Date(_completed)); + buf.append(" Attempted: "); + synchronized (_attemptedPeers) { + for (Iterator iter = _attemptedPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + buf.append(" Pending: "); + synchronized (_pendingPeers) { + for (Iterator iter = _pendingPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + buf.append(" Failed: "); + synchronized (_failedPeers) { + for (Iterator iter = _failedPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + buf.append(" Successful: "); + synchronized (_successfulPeers) { + for (Iterator iter = _successfulPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + buf.append(" Successful Exploratory: "); + synchronized (_successfulExploratoryPeers) { + for (Iterator iter = _successfulExploratoryPeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + buf.append(peer.toBase64()).append(" "); + } + } + return buf.toString(); + } } -} - +} \ No newline at end of file diff --git a/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java index 95c9477b5f..27a314bf24 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java +++ b/router/java/src/net/i2p/router/tunnelmanager/HandleTunnelCreateMessageJob.java @@ -41,118 +41,125 @@ public class HandleTunnelCreateMessageJob extends JobImpl { private final static int PRIORITY = 123; HandleTunnelCreateMessageJob(TunnelCreateMessage receivedMessage, RouterIdentity from, Hash fromHash, SourceRouteBlock replyBlock) { - _message = receivedMessage; - _from = from; - _fromHash = fromHash; - _replyBlock = replyBlock; + _message = receivedMessage; + _from = from; + _fromHash = fromHash; + _replyBlock = replyBlock; } public void runJob() { - if (_log.shouldLog(Log.DEBUG)) _log.debug("Handling tunnel create"); - TunnelInfo info = new TunnelInfo(); - info.setConfigurationKey(_message.getConfigurationKey()); - info.setEncryptionKey(_message.getTunnelKey()); - info.setNextHop(_message.getNextRouter()); + if (_log.shouldLog(Log.DEBUG)) _log.debug("Handling tunnel create"); + TunnelInfo info = new TunnelInfo(); + info.setConfigurationKey(_message.getConfigurationKey()); + info.setEncryptionKey(_message.getTunnelKey()); + info.setNextHop(_message.getNextRouter()); - TunnelSettings settings = new TunnelSettings(); - settings.setBytesPerMinuteAverage(_message.getMaxAvgBytesPerMin()); - settings.setBytesPerMinutePeak(_message.getMaxPeakBytesPerMin()); - settings.setMessagesPerMinuteAverage(_message.getMaxAvgMessagesPerMin()); - settings.setMessagesPerMinutePeak(_message.getMaxPeakMessagesPerMin()); - settings.setExpiration(_message.getTunnelDurationSeconds()*1000+Clock.getInstance().now()); - settings.setIncludeDummy(_message.getIncludeDummyTraffic()); - settings.setReorder(_message.getReorderMessages()); - info.setSettings(settings); - - info.setSigningKey(_message.getVerificationPrivateKey()); - info.setThisHop(Router.getInstance().getRouterInfo().getIdentity().getHash()); - info.setTunnelId(_message.getTunnelId()); - info.setVerificationKey(_message.getVerificationPublicKey()); + TunnelSettings settings = new TunnelSettings(); + settings.setBytesPerMinuteAverage(_message.getMaxAvgBytesPerMin()); + settings.setBytesPerMinutePeak(_message.getMaxPeakBytesPerMin()); + settings.setMessagesPerMinuteAverage(_message.getMaxAvgMessagesPerMin()); + settings.setMessagesPerMinutePeak(_message.getMaxPeakMessagesPerMin()); + settings.setExpiration(_message.getTunnelDurationSeconds()*1000+Clock.getInstance().now()); + settings.setIncludeDummy(_message.getIncludeDummyTraffic()); + settings.setReorder(_message.getReorderMessages()); + info.setSettings(settings); + + info.setSigningKey(_message.getVerificationPrivateKey()); + info.setThisHop(Router.getInstance().getRouterInfo().getIdentity().getHash()); + info.setTunnelId(_message.getTunnelId()); + info.setVerificationKey(_message.getVerificationPublicKey()); - info.getTunnelId().setType(TunnelId.TYPE_PARTICIPANT); + info.getTunnelId().setType(TunnelId.TYPE_PARTICIPANT); - if (_message.getNextRouter() == null) { - if (_log.shouldLog(Log.DEBUG)) _log.debug("We're the endpoint, don't test the \"next\" peer [duh]"); - boolean ok = TunnelManagerFacade.getInstance().joinTunnel(info); - sendReply(ok); - } else { - NetworkDatabaseFacade.getInstance().lookupRouterInfo(info.getNextHop(), new TestJob(info), new JoinJob(info, false), TIMEOUT); - } + if (_message.getNextRouter() == null) { + if (_log.shouldLog(Log.DEBUG)) _log.debug("We're the endpoint, don't test the \"next\" peer [duh]"); + boolean ok = TunnelManagerFacade.getInstance().joinTunnel(info); + sendReply(ok); + } else { + NetworkDatabaseFacade.getInstance().lookupRouterInfo(info.getNextHop(), new TestJob(info), new JoinJob(info, false), TIMEOUT); + } } private class TestJob extends JobImpl { - private TunnelInfo _target; - public TestJob(TunnelInfo target) { - _target = target; - } - - public String getName() { return "Run a test for peer reachability"; } - public void runJob() { - RouterInfo info = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(_target.getNextHop()); - if (info == null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error - unable to look up peer " + _target.toBase64() + ", even though we were queued up via onSuccess??"); - return; - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Lookup successful for tested peer " + _target.toBase64() + ", now continue with the test"); - JobQueue.getInstance().addJob(new BuildTestMessageJob(info, Router.getInstance().getRouterInfo().getIdentity().getHash(), new JoinJob(_target, true), new JoinJob(_target, false), TIMEOUT, PRIORITY)); - } - } + private TunnelInfo _target; + public TestJob(TunnelInfo target) { + _target = target; + } + + public String getName() { return "Run a test for peer reachability"; } + public void runJob() { + RouterInfo info = NetworkDatabaseFacade.getInstance().lookupRouterInfoLocally(_target.getNextHop()); + if (info == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error - unable to look up peer " + _target.toBase64() + ", even though we were queued up via onSuccess??"); + return; + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Lookup successful for tested peer " + _target.toBase64() + ", now continue with the test"); + Hash peer = Router.getInstance().getRouterInfo().getIdentity().getHash(); + JoinJob success = new JoinJob(_target, true); + JoinJob failure = new JoinJob(_target, false); + BuildTestMessageJob test = new BuildTestMessageJob(info, peer, success, failure, TIMEOUT, PRIORITY); + JobQueue.getInstance().addJob(test); + } + } } private void sendReply(boolean ok) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending reply to a tunnel create of id " + _message.getTunnelId() + " with ok (" + ok + ") to router " + _message.getReplyBlock().getRouter().toBase64()); - - MessageHistory.getInstance().receiveTunnelCreate(_message.getTunnelId(), _message.getNextRouter(), new Date(Clock.getInstance().now() + 1000*_message.getTunnelDurationSeconds()), ok, _message.getReplyBlock().getRouter()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending reply to a tunnel create of id " + _message.getTunnelId() + + " with ok (" + ok + ") to router " + _message.getReplyBlock().getRouter().toBase64()); + + MessageHistory.getInstance().receiveTunnelCreate(_message.getTunnelId(), _message.getNextRouter(), + new Date(Clock.getInstance().now() + 1000*_message.getTunnelDurationSeconds()), + ok, _message.getReplyBlock().getRouter()); - TunnelCreateStatusMessage msg = new TunnelCreateStatusMessage(); - msg.setFromHash(Router.getInstance().getRouterInfo().getIdentity().getHash()); - msg.setTunnelId(_message.getTunnelId()); - if (ok) { - msg.setStatus(TunnelCreateStatusMessage.STATUS_SUCCESS); - } else { - // since we don't actually check anything, this is a catch all - msg.setStatus(TunnelCreateStatusMessage.STATUS_FAILED_OVERLOADED); - } - msg.setMessageExpiration(new Date(Clock.getInstance().now()+60*1000)); - SendReplyMessageJob job = new SendReplyMessageJob(_message.getReplyBlock(), msg, PRIORITY); - JobQueue.getInstance().addJob(job); + TunnelCreateStatusMessage msg = new TunnelCreateStatusMessage(); + msg.setFromHash(Router.getInstance().getRouterInfo().getIdentity().getHash()); + msg.setTunnelId(_message.getTunnelId()); + if (ok) { + msg.setStatus(TunnelCreateStatusMessage.STATUS_SUCCESS); + } else { + // since we don't actually check anything, this is a catch all + msg.setStatus(TunnelCreateStatusMessage.STATUS_FAILED_OVERLOADED); + } + msg.setMessageExpiration(new Date(Clock.getInstance().now()+60*1000)); + SendReplyMessageJob job = new SendReplyMessageJob(_message.getReplyBlock(), msg, PRIORITY); + JobQueue.getInstance().addJob(job); } public String getName() { return "Handle Tunnel Create Message"; } private class JoinJob extends JobImpl { - private TunnelInfo _info; - private boolean _isReachable; - public JoinJob(TunnelInfo info, boolean isReachable) { - _info = info; - _isReachable = isReachable; - } + private TunnelInfo _info; + private boolean _isReachable; + public JoinJob(TunnelInfo info, boolean isReachable) { + _info = info; + _isReachable = isReachable; + } - public void runJob() { - if (!_isReachable) { - long before = Clock.getInstance().now(); - sendReply(false); - long after = Clock.getInstance().now(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("JoinJob .refuse took " + (after-before) + "ms to refuse " + _info); - } else { - long before = Clock.getInstance().now(); - boolean ok = TunnelManagerFacade.getInstance().joinTunnel(_info); - long afterJoin = Clock.getInstance().now(); - sendReply(ok); - long after = Clock.getInstance().now(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("JoinJob .joinTunnel took " + (afterJoin-before) + "ms and sendReply took " + (after-afterJoin) + "ms"); - } - } - public String getName() { return "Process the tunnel join after testing the nextHop"; } + public void runJob() { + if (!_isReachable) { + long before = Clock.getInstance().now(); + sendReply(false); + long after = Clock.getInstance().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("JoinJob .refuse took " + (after-before) + "ms to refuse " + _info); + } else { + long before = Clock.getInstance().now(); + boolean ok = TunnelManagerFacade.getInstance().joinTunnel(_info); + long afterJoin = Clock.getInstance().now(); + sendReply(ok); + long after = Clock.getInstance().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("JoinJob .joinTunnel took " + (afterJoin-before) + "ms and sendReply took " + (after-afterJoin) + "ms"); + } + } + public String getName() { return "Process the tunnel join after testing the nextHop"; } } public void dropped() { - MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(), "Dropped due to overload"); + MessageHistory.getInstance().messageProcessingError(_message.getUniqueId(), _message.getClass().getName(), "Dropped due to overload"); } } diff --git a/router/java/src/net/i2p/router/tunnelmanager/PoolingTunnelManagerFacade.java b/router/java/src/net/i2p/router/tunnelmanager/PoolingTunnelManagerFacade.java index 0e9ecc5375..357c38fd5e 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/PoolingTunnelManagerFacade.java +++ b/router/java/src/net/i2p/router/tunnelmanager/PoolingTunnelManagerFacade.java @@ -28,26 +28,27 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade { private TunnelTestManager _testManager; static { - StatManager.getInstance().createFrequencyStat("tunnel.acceptRequestFrequency", "How often do we accept requests to join a tunnel?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - StatManager.getInstance().createFrequencyStat("tunnel.rejectRequestFrequency", "How often do we reject requests to join a tunnel?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createFrequencyStat("tunnel.acceptRequestFrequency", "How often do we accept requests to join a tunnel?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createFrequencyStat("tunnel.rejectRequestFrequency", "How often do we reject requests to join a tunnel?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createRateStat("tunnel.participatingTunnels", "How many tunnels are we participating in?", "Tunnels", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); } public PoolingTunnelManagerFacade() { - super(); - InNetMessagePool.getInstance().registerHandlerJobBuilder(TunnelCreateMessage.MESSAGE_TYPE, new TunnelCreateMessageHandler()); + super(); + InNetMessagePool.getInstance().registerHandlerJobBuilder(TunnelCreateMessage.MESSAGE_TYPE, new TunnelCreateMessageHandler()); } public void startup() { - if (_pool == null) - _pool = new TunnelPool(); - _pool.startup(); - _testManager = new TunnelTestManager(_pool); + if (_pool == null) + _pool = new TunnelPool(); + _pool.startup(); + _testManager = new TunnelTestManager(_pool); } public void shutdown() { - _pool.shutdown(); - _testManager.stopTesting(); - _testManager = null; + _pool.shutdown(); + _testManager.stopTesting(); + _testManager = null; } /** @@ -56,54 +57,62 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade { * @return true if the router will accept participation, else false. */ public boolean joinTunnel(TunnelInfo info) { - if (info == null) { - _log.error("Null tunnel", new Exception("Null tunnel")); - StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); - return false; - } - if (info.getSettings() == null) { - _log.error("Null settings!", new Exception("settings are null")); - StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); - return false; - } - if (info.getSettings().getExpiration() == 0) { - _log.info("No expiration for tunnel " + info.getTunnelId().getTunnelId(), new Exception("No expiration")); - StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); - return false; - } else { - if (info.getSettings().getExpiration() < Clock.getInstance().now()) { - _log.warn("Already expired - " + new Date(info.getSettings().getExpiration()), new Exception("Already expired")); - StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); - return false; - } - } - - _log.debug("Joining tunnel: " + info); - boolean ok = _pool.addParticipatingTunnel(info); - if (!ok) - StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); - else - StatManager.getInstance().updateFrequency("tunnel.acceptRequestFrequency"); - return ok; + if (info == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Null tunnel", new Exception("Null tunnel")); + StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); + return false; + } + if (info.getSettings() == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Null settings!", new Exception("settings are null")); + StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); + return false; + } + if (info.getSettings().getExpiration() == 0) { + if (_log.shouldLog(Log.INFO)) + _log.info("No expiration for tunnel " + info.getTunnelId().getTunnelId(), + new Exception("No expiration")); + StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); + return false; + } else { + if (info.getSettings().getExpiration() < Clock.getInstance().now()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Already expired - " + new Date(info.getSettings().getExpiration()), + new Exception("Already expired")); + StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); + return false; + } + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Joining tunnel: " + info); + boolean ok = _pool.addParticipatingTunnel(info); + if (!ok) + StatManager.getInstance().updateFrequency("tunnel.rejectRequestFrequency"); + else + StatManager.getInstance().updateFrequency("tunnel.acceptRequestFrequency"); + StatManager.getInstance().addRateData("tunnel.participatingTunnels", _pool.getParticipatingTunnelCount(), 0); + return ok; } /** * Retrieve the information related to a particular tunnel * */ public TunnelInfo getTunnelInfo(TunnelId id) { - return _pool.getTunnelInfo(id); + return _pool.getTunnelInfo(id); } /** * Retrieve a set of tunnels from the existing ones for various purposes */ public List selectOutboundTunnelIds(TunnelSelectionCriteria criteria) { - return PoolingTunnelSelector.selectOutboundTunnelIds(_pool, criteria); + return PoolingTunnelSelector.selectOutboundTunnelIds(_pool, criteria); } /** * Retrieve a set of tunnels from the existing ones for various purposes */ public List selectInboundTunnelIds(TunnelSelectionCriteria criteria) { - return PoolingTunnelSelector.selectInboundTunnelIds(_pool, criteria); + return PoolingTunnelSelector.selectInboundTunnelIds(_pool, criteria); } /** @@ -113,12 +122,12 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade { * */ public void createTunnels(Destination destination, ClientTunnelSettings clientSettings, long timeoutMs) { - ClientTunnelPool pool = _pool.getClientPool(destination); - if (pool != null) { - pool.setClientSettings(clientSettings); - } else { - _pool.createClientPool(destination, clientSettings); - } + ClientTunnelPool pool = _pool.getClientPool(destination); + if (pool != null) { + pool.setClientSettings(clientSettings); + } else { + _pool.createClientPool(destination, clientSettings); + } } /** @@ -127,32 +136,34 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade { * */ public void peerFailed(Hash peer) { - int numFailed = 0; - for (Iterator iter = _pool.getManagedTunnelIds().iterator(); iter.hasNext(); ) { - TunnelId id = (TunnelId)iter.next(); - TunnelInfo info = (TunnelInfo)_pool.getTunnelInfo(id); - if (isParticipant(info, peer)) { - _log.info("Peer " + peer.toBase64() + " failed and they participate in tunnel " + id.getTunnelId() + ". Marking the tunnel as not ready!"); - info.setIsReady(false); - numFailed++; + int numFailed = 0; + for (Iterator iter = _pool.getManagedTunnelIds().iterator(); iter.hasNext(); ) { + TunnelId id = (TunnelId)iter.next(); + TunnelInfo info = (TunnelInfo)_pool.getTunnelInfo(id); + if (isParticipant(info, peer)) { + _log.info("Peer " + peer.toBase64() + " failed and they participate in tunnel " + + id.getTunnelId() + ". Marking the tunnel as not ready!"); + info.setIsReady(false); + numFailed++; - long lifetime = Clock.getInstance().now() - info.getCreated(); - StatManager.getInstance().addRateData("tunnel.failAfterTime", lifetime, lifetime); - } - } - - _log.info("On peer " + peer.toBase64() + " failure, " + numFailed + " tunnels were killed"); + long lifetime = Clock.getInstance().now() - info.getCreated(); + StatManager.getInstance().addRateData("tunnel.failAfterTime", lifetime, lifetime); + } + } + + if (_log.shouldLog(Log.INFO)) + _log.info("On peer " + peer.toBase64() + " failure, " + numFailed + " tunnels were killed"); } private boolean isParticipant(TunnelInfo info, Hash peer) { - if ( (info == null) || (peer == null) ) return false; - TunnelInfo cur = info; - while (cur != null) { - if (peer.equals(cur.getThisHop())) return true; - if (peer.equals(cur.getNextHop())) return true; - cur = cur.getNextHopInfo(); - } - return false; + if ( (info == null) || (peer == null) ) return false; + TunnelInfo cur = info; + while (cur != null) { + if (peer.equals(cur.getThisHop())) return true; + if (peer.equals(cur.getNextHop())) return true; + cur = cur.getNextHopInfo(); + } + return false; } /** @@ -160,32 +171,32 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade { * */ public boolean isInUse(Hash peer) { - if (isInUse(peer, _pool.getManagedTunnelIds())) { - if (_log.shouldLog(Log.INFO)) - _log.debug("Peer is in a managed tunnel: " + peer.toBase64()); - return true; - } - if (isInUse(peer, _pool.getPendingTunnels())) { - if (_log.shouldLog(Log.INFO)) - _log.debug("Peer is in a pending tunnel: " + peer.toBase64()); - return true; - } - if (isInUse(peer, _pool.getParticipatingTunnels())) { - if (_log.shouldLog(Log.INFO)) - _log.debug("Peer is in a participating tunnel: " + peer.toBase64()); - return true; - } - return false; + if (isInUse(peer, _pool.getManagedTunnelIds())) { + if (_log.shouldLog(Log.INFO)) + _log.debug("Peer is in a managed tunnel: " + peer.toBase64()); + return true; + } + if (isInUse(peer, _pool.getPendingTunnels())) { + if (_log.shouldLog(Log.INFO)) + _log.debug("Peer is in a pending tunnel: " + peer.toBase64()); + return true; + } + if (isInUse(peer, _pool.getParticipatingTunnels())) { + if (_log.shouldLog(Log.INFO)) + _log.debug("Peer is in a participating tunnel: " + peer.toBase64()); + return true; + } + return false; } private boolean isInUse(Hash peer, Set tunnelIds) { - for (Iterator iter = tunnelIds.iterator(); iter.hasNext(); ) { - TunnelId id = (TunnelId)iter.next(); - TunnelInfo info = _pool.getTunnelInfo(id); - if (isParticipant(info, peer)) - return true; - } - return false; + for (Iterator iter = tunnelIds.iterator(); iter.hasNext(); ) { + TunnelId id = (TunnelId)iter.next(); + TunnelInfo info = _pool.getTunnelInfo(id); + if (isParticipant(info, peer)) + return true; + } + return false; } /** @@ -193,9 +204,9 @@ public class PoolingTunnelManagerFacade extends TunnelManagerFacade { * */ public String renderStatusHTML() { - if (_pool != null) - return _pool.renderStatusHTML(); - else - return "<h2>Tunnel Manager not initialized</h2>\n"; + if (_pool != null) + return _pool.renderStatusHTML(); + else + return "<h2>Tunnel Manager not initialized</h2>\n"; } } diff --git a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java index 5c3805d3a9..b7c03fe81c 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java @@ -48,8 +48,8 @@ class TunnelPool { /** active or has it been shutdown? */ private boolean _isLive; - /** write out the current state every 15 seconds */ - private final static long WRITE_POOL_DELAY = 15*1000; + /** write out the current state every 60 seconds */ + private final static long WRITE_POOL_DELAY = 60*1000; /** allow the tunnel create timeout to be overridden, default is 60 seconds [but really slow computers should be larger] */ public final static String TUNNEL_CREATION_TIMEOUT_PARAM = "tunnel.creationTimeoutMs"; @@ -59,13 +59,13 @@ class TunnelPool { public final static int TARGET_CLIENTS_DEFAULT = 3; static { - StatManager.getInstance().createFrequencyStat("tunnel.failFrequency", "How often do tunnels prematurely fail (after being successfully built)?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); - StatManager.getInstance().createRateStat("tunnel.failAfterTime", "How long do tunnels that fail prematurely last before failing?", "Tunnels", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createFrequencyStat("tunnel.failFrequency", "How often do tunnels prematurely fail (after being successfully built)?", "Tunnels", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); + StatManager.getInstance().createRateStat("tunnel.failAfterTime", "How long do tunnels that fail prematurely last before failing?", "Tunnels", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); } public TunnelPool() { - _isLive = true; - _persistenceHelper = new TunnelPoolPersistenceHelper(); + _isLive = true; + _persistenceHelper = new TunnelPoolPersistenceHelper(); } /** @@ -73,45 +73,45 @@ class TunnelPool { * */ public TunnelInfo getTunnelInfo(TunnelId id) { - if (!_isLive) return null; - if (id == null) return null; - boolean typeKnown = id.getType() != TunnelId.TYPE_UNSPECIFIED; - - if ( (!typeKnown) || (id.getType() == TunnelId.TYPE_PARTICIPANT) ) { - synchronized (_participatingTunnels) { - if (_participatingTunnels.containsKey(id)) - return (TunnelInfo)_participatingTunnels.get(id); - } - } - if ( (!typeKnown) || (id.getType() == TunnelId.TYPE_OUTBOUND) ) { - synchronized (_outboundTunnels) { - if (_outboundTunnels.containsKey(id)) - return (TunnelInfo)_outboundTunnels.get(id); - } - } - if ( (!typeKnown) || (id.getType() == TunnelId.TYPE_INBOUND) ) { - synchronized (_freeInboundTunnels) { - if (_freeInboundTunnels.containsKey(id)) - return (TunnelInfo)_freeInboundTunnels.get(id); - } - } - synchronized (_pendingTunnels) { - if (_pendingTunnels.containsKey(id)) - return (TunnelInfo)_pendingTunnels.get(id); - } - - if ( (!typeKnown) || (id.getType() == TunnelId.TYPE_INBOUND) ) { - synchronized (_clientPools) { - for (Iterator iter = _clientPools.values().iterator(); iter.hasNext(); ) { - ClientTunnelPool pool = (ClientTunnelPool)iter.next(); - if (pool.isInboundTunnel(id)) - return pool.getInboundTunnel(id); - else if (pool.isInactiveInboundTunnel(id)) - return pool.getInactiveInboundTunnel(id); - } - } - } - return null; + if (!_isLive) return null; + if (id == null) return null; + boolean typeKnown = id.getType() != TunnelId.TYPE_UNSPECIFIED; + + if ( (!typeKnown) || (id.getType() == TunnelId.TYPE_PARTICIPANT) ) { + synchronized (_participatingTunnels) { + if (_participatingTunnels.containsKey(id)) + return (TunnelInfo)_participatingTunnels.get(id); + } + } + if ( (!typeKnown) || (id.getType() == TunnelId.TYPE_OUTBOUND) ) { + synchronized (_outboundTunnels) { + if (_outboundTunnels.containsKey(id)) + return (TunnelInfo)_outboundTunnels.get(id); + } + } + if ( (!typeKnown) || (id.getType() == TunnelId.TYPE_INBOUND) ) { + synchronized (_freeInboundTunnels) { + if (_freeInboundTunnels.containsKey(id)) + return (TunnelInfo)_freeInboundTunnels.get(id); + } + } + synchronized (_pendingTunnels) { + if (_pendingTunnels.containsKey(id)) + return (TunnelInfo)_pendingTunnels.get(id); + } + + if ( (!typeKnown) || (id.getType() == TunnelId.TYPE_INBOUND) ) { + synchronized (_clientPools) { + for (Iterator iter = _clientPools.values().iterator(); iter.hasNext(); ) { + ClientTunnelPool pool = (ClientTunnelPool)iter.next(); + if (pool.isInboundTunnel(id)) + return pool.getInboundTunnel(id); + else if (pool.isInactiveInboundTunnel(id)) + return pool.getInactiveInboundTunnel(id); + } + } + } + return null; } /** @@ -120,21 +120,21 @@ class TunnelPool { * */ public Set getManagedTunnelIds() { - if (!_isLive) return null; - Set ids = new HashSet(64); - synchronized (_outboundTunnels) { - ids.addAll(_outboundTunnels.keySet()); - } - synchronized (_freeInboundTunnels) { - ids.addAll(_freeInboundTunnels.keySet()); - } - synchronized (_clientPools) { - for (Iterator iter = _clientPools.values().iterator(); iter.hasNext(); ) { - ClientTunnelPool pool = (ClientTunnelPool)iter.next(); - ids.addAll(pool.getInboundTunnelIds()); - } - } - return ids; + if (!_isLive) return null; + Set ids = new HashSet(64); + synchronized (_outboundTunnels) { + ids.addAll(_outboundTunnels.keySet()); + } + synchronized (_freeInboundTunnels) { + ids.addAll(_freeInboundTunnels.keySet()); + } + synchronized (_clientPools) { + for (Iterator iter = _clientPools.values().iterator(); iter.hasNext(); ) { + ClientTunnelPool pool = (ClientTunnelPool)iter.next(); + ids.addAll(pool.getInboundTunnelIds()); + } + } + return ids; } /** @@ -143,156 +143,164 @@ class TunnelPool { * @return true if the tunnel was allocated successfully, false if an error occurred */ public boolean allocateTunnel(TunnelId id, Destination dest) { - if (!_isLive) return false; - ClientTunnelPool pool = getClientPool(dest); - if (pool == null) { - _log.error("Error allocating tunnel " + id + " to " + dest + ": no pool for the client known"); - return false; - } - TunnelInfo tunnel = removeFreeTunnel(id); - if (tunnel == null) { - _log.error("Error allocating tunnel " + id + " to " + dest + ": tunnel is no longer free?"); - return false; - } - - TunnelInfo t = tunnel; - while (t != null) { - t.setDestination(dest); - t = t.getNextHopInfo(); - } - - pool.addInboundTunnel(tunnel); - return true; + if (!_isLive) return false; + ClientTunnelPool pool = getClientPool(dest); + if (pool == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error allocating tunnel " + id + " to " + dest + ": no pool for the client known"); + return false; + } + TunnelInfo tunnel = removeFreeTunnel(id); + if (tunnel == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error allocating tunnel " + id + " to " + dest + ": tunnel is no longer free?"); + return false; + } + + TunnelInfo t = tunnel; + while (t != null) { + t.setDestination(dest); + t = t.getNextHopInfo(); + } + + pool.addInboundTunnel(tunnel); + return true; } /** * Set of tunnelIds for outbound tunnels */ public Set getOutboundTunnels() { - if (!_isLive) return null; - synchronized (_outboundTunnels) { - return new HashSet(_outboundTunnels.keySet()); - } + if (!_isLive) return null; + synchronized (_outboundTunnels) { + return new HashSet(_outboundTunnels.keySet()); + } } public int getOutboundTunnelCount() { - if (!_isLive) return 0; - synchronized (_outboundTunnels) { - return _outboundTunnels.size(); - } + if (!_isLive) return 0; + synchronized (_outboundTunnels) { + return _outboundTunnels.size(); + } } public TunnelInfo getOutboundTunnel(TunnelId id) { - if (!_isLive) return null; - synchronized (_outboundTunnels) { - return (TunnelInfo)_outboundTunnels.get(id); - } + if (!_isLive) return null; + synchronized (_outboundTunnels) { + return (TunnelInfo)_outboundTunnels.get(id); + } } public void addOutboundTunnel(TunnelInfo tunnel) { - if (!_isLive) return; - if (_log.shouldLog(Log.DEBUG)) _log.debug("Add outbound tunnel " + tunnel.getTunnelId()); - MessageHistory.getInstance().tunnelJoined("outbound", tunnel); - synchronized (_outboundTunnels) { - _outboundTunnels.put(tunnel.getTunnelId(), tunnel); - } - synchronized (_pendingTunnels) { - _pendingTunnels.remove(tunnel.getTunnelId()); - } + if (!_isLive) return; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Add outbound tunnel " + tunnel.getTunnelId()); + MessageHistory.getInstance().tunnelJoined("outbound", tunnel); + synchronized (_outboundTunnels) { + _outboundTunnels.put(tunnel.getTunnelId(), tunnel); + } + synchronized (_pendingTunnels) { + _pendingTunnels.remove(tunnel.getTunnelId()); + } } public void removeOutboundTunnel(TunnelId id) { - if (!_isLive) return; - if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing outbound tunnel " + id); - int remaining = 0; - synchronized (_outboundTunnels) { - _outboundTunnels.remove(id); - remaining = _outboundTunnels.size(); - } - if (remaining <= 0) { - buildFakeTunnels(); - } + if (!_isLive) return; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing outbound tunnel " + id); + int remaining = 0; + synchronized (_outboundTunnels) { + _outboundTunnels.remove(id); + remaining = _outboundTunnels.size(); + } + if (remaining <= 0) { + buildFakeTunnels(); + } } /** * Set of tunnelIds that this router has available for consumption */ public Set getFreeTunnels() { - if (!_isLive) return null; - synchronized (_freeInboundTunnels) { - return new HashSet(_freeInboundTunnels.keySet()); - } + if (!_isLive) return null; + synchronized (_freeInboundTunnels) { + return new HashSet(_freeInboundTunnels.keySet()); + } } public int getFreeTunnelCount() { - if (!_isLive) return 0; - synchronized (_freeInboundTunnels) { - return _freeInboundTunnels.size(); - } + if (!_isLive) return 0; + synchronized (_freeInboundTunnels) { + return _freeInboundTunnels.size(); + } } public TunnelInfo getFreeTunnel(TunnelId id) { - if (!_isLive) return null; - synchronized (_freeInboundTunnels) { - return (TunnelInfo)_freeInboundTunnels.get(id); - } + if (!_isLive) return null; + synchronized (_freeInboundTunnels) { + return (TunnelInfo)_freeInboundTunnels.get(id); + } } public void addFreeTunnel(TunnelInfo tunnel) { - if (!_isLive) return; - if (_log.shouldLog(Log.DEBUG)) _log.debug("Add free inbound tunnel " + tunnel.getTunnelId()); - MessageHistory.getInstance().tunnelJoined("free inbound", tunnel); - synchronized (_freeInboundTunnels) { - _freeInboundTunnels.put(tunnel.getTunnelId(), tunnel); - } - synchronized (_pendingTunnels) { - _pendingTunnels.remove(tunnel.getTunnelId()); - } + if (!_isLive) return; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Add free inbound tunnel " + tunnel.getTunnelId()); + MessageHistory.getInstance().tunnelJoined("free inbound", tunnel); + synchronized (_freeInboundTunnels) { + _freeInboundTunnels.put(tunnel.getTunnelId(), tunnel); + } + synchronized (_pendingTunnels) { + _pendingTunnels.remove(tunnel.getTunnelId()); + } } public TunnelInfo removeFreeTunnel(TunnelId id) { - if (!_isLive) return null; - if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing free inbound tunnel " + id); - int remaining = 0; - TunnelInfo rv = null; - synchronized (_freeInboundTunnels) { - rv = (TunnelInfo)_freeInboundTunnels.remove(id); - remaining = _freeInboundTunnels.size(); - } - if (remaining <= 0) - buildFakeTunnels(); - return rv; + if (!_isLive) return null; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing free inbound tunnel " + id); + int remaining = 0; + TunnelInfo rv = null; + synchronized (_freeInboundTunnels) { + rv = (TunnelInfo)_freeInboundTunnels.remove(id); + remaining = _freeInboundTunnels.size(); + } + if (remaining <= 0) + buildFakeTunnels(); + return rv; } /** * set of tunnelIds that this router is participating in (but not managing) */ public Set getParticipatingTunnels() { - if (!_isLive) return null; - synchronized (_participatingTunnels) { - return new HashSet(_participatingTunnels.keySet()); - } + if (!_isLive) return null; + synchronized (_participatingTunnels) { + return new HashSet(_participatingTunnels.keySet()); + } + } + public int getParticipatingTunnelCount() { + if (!_isLive) return 0; + synchronized (_participatingTunnels) { + return _participatingTunnels.size(); + } } public TunnelInfo getParticipatingTunnel(TunnelId id) { - if (!_isLive) return null; - synchronized (_participatingTunnels) { - return (TunnelInfo)_participatingTunnels.get(id); - } + if (!_isLive) return null; + synchronized (_participatingTunnels) { + return (TunnelInfo)_participatingTunnels.get(id); + } } public boolean addParticipatingTunnel(TunnelInfo tunnel) { - if (!_isLive) return false; - if (_log.shouldLog(Log.DEBUG)) _log.debug("Add participating tunnel " + tunnel.getTunnelId()); - MessageHistory.getInstance().tunnelJoined("participant", tunnel); - synchronized (_participatingTunnels) { - if (_participatingTunnels.containsKey(tunnel.getTunnelId())) { - return false; - } else { - _participatingTunnels.put(tunnel.getTunnelId(), tunnel); - tunnel.setIsReady(true); - return true; - } - } + if (!_isLive) return false; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Add participating tunnel " + tunnel.getTunnelId()); + MessageHistory.getInstance().tunnelJoined("participant", tunnel); + synchronized (_participatingTunnels) { + if (_participatingTunnels.containsKey(tunnel.getTunnelId())) { + return false; + } else { + _participatingTunnels.put(tunnel.getTunnelId(), tunnel); + tunnel.setIsReady(true); + return true; + } + } } public TunnelInfo removeParticipatingTunnel(TunnelId id) { - if (!_isLive) return null; - if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing participating tunnel " + id); - synchronized (_participatingTunnels) { - return (TunnelInfo)_participatingTunnels.remove(id); - } + if (!_isLive) return null; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing participating tunnel " + id); + synchronized (_participatingTunnels) { + return (TunnelInfo)_participatingTunnels.remove(id); + } } /** @@ -300,10 +308,10 @@ class TunnelPool { * */ public Set getClientPools() { - if (!_isLive) return null; - synchronized (_clientPools) { - return new HashSet(_clientPools.keySet()); - } + if (!_isLive) return null; + synchronized (_clientPools) { + return new HashSet(_clientPools.keySet()); + } } /** @@ -311,78 +319,78 @@ class TunnelPool { * */ public void createClientPool(Destination dest, ClientTunnelSettings settings) { - if (!_isLive) return; - ClientTunnelPool pool = null; - synchronized (_clientPools) { - if (_clientPools.containsKey(dest)) { - pool = (ClientTunnelPool)_clientPools.get(dest); - if (_log.shouldLog(Log.INFO)) - _log.info("Reusing an existing client tunnel pool for " + dest.calculateHash()); - } else { - pool = new ClientTunnelPool(dest, settings, this); - if (_log.shouldLog(Log.INFO)) - _log.info("New client tunnel pool created for " + dest.calculateHash()); - _clientPools.put(dest, pool); - } - } - pool.startPool(); + if (!_isLive) return; + ClientTunnelPool pool = null; + synchronized (_clientPools) { + if (_clientPools.containsKey(dest)) { + pool = (ClientTunnelPool)_clientPools.get(dest); + if (_log.shouldLog(Log.INFO)) + _log.info("Reusing an existing client tunnel pool for " + dest.calculateHash()); + } else { + pool = new ClientTunnelPool(dest, settings, this); + if (_log.shouldLog(Log.INFO)) + _log.info("New client tunnel pool created for " + dest.calculateHash()); + _clientPools.put(dest, pool); + } + } + pool.startPool(); } ClientTunnelPool addClientPool(ClientTunnelPool pool) { - if (!_isLive) return null; - ClientTunnelPool old = null; - - if (_log.shouldLog(Log.INFO)) - _log.info("Client tunnel pool added for " + pool.getDestination().calculateHash()); - - synchronized (_clientPools) { - old = (ClientTunnelPool)_clientPools.put(pool.getDestination(), pool); - } - return old; + if (!_isLive) return null; + ClientTunnelPool old = null; + + if (_log.shouldLog(Log.INFO)) + _log.info("Client tunnel pool added for " + pool.getDestination().calculateHash()); + + synchronized (_clientPools) { + old = (ClientTunnelPool)_clientPools.put(pool.getDestination(), pool); + } + return old; } public ClientTunnelPool getClientPool(Destination dest) { - if (!_isLive) return null; - synchronized (_clientPools) { - return (ClientTunnelPool)_clientPools.get(dest); - } + if (!_isLive) return null; + synchronized (_clientPools) { + return (ClientTunnelPool)_clientPools.get(dest); + } } public void removeClientPool(Destination dest) { - if (!_isLive) return; - if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing client tunnel pool for " + dest.calculateHash()); - ClientTunnelPool pool = null; - synchronized (_clientPools) { - pool = (ClientTunnelPool)_clientPools.remove(dest); - } - if (pool != null) - pool.stopPool(); + if (!_isLive) return; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing client tunnel pool for " + dest.calculateHash()); + ClientTunnelPool pool = null; + synchronized (_clientPools) { + pool = (ClientTunnelPool)_clientPools.remove(dest); + } + if (pool != null) + pool.stopPool(); } public Set getPendingTunnels() { - if (!_isLive) return null; - synchronized (_pendingTunnels) { - return new HashSet(_pendingTunnels.keySet()); - } + if (!_isLive) return null; + synchronized (_pendingTunnels) { + return new HashSet(_pendingTunnels.keySet()); + } } public TunnelInfo getPendingTunnel(TunnelId id) { - if (!_isLive) return null; - synchronized (_pendingTunnels) { - return (TunnelInfo)_pendingTunnels.get(id); - } + if (!_isLive) return null; + synchronized (_pendingTunnels) { + return (TunnelInfo)_pendingTunnels.get(id); + } } public void addPendingTunnel(TunnelInfo info) { - if (!_isLive) return; - MessageHistory.getInstance().tunnelJoined("pending", info); - synchronized (_pendingTunnels) { - _pendingTunnels.put(info.getTunnelId(), info); - } + if (!_isLive) return; + MessageHistory.getInstance().tunnelJoined("pending", info); + synchronized (_pendingTunnels) { + _pendingTunnels.put(info.getTunnelId(), info); + } } public void removePendingTunnel(TunnelId id) { - if (!_isLive) return; - if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing pending tunnel " + id); - synchronized (_pendingTunnels) { - _pendingTunnels.remove(id); - } + if (!_isLive) return; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing pending tunnel " + id); + synchronized (_pendingTunnels) { + _pendingTunnels.remove(id); + } } /** fetch the settings for the pool (tunnel settings and quantities) */ @@ -399,16 +407,16 @@ class TunnelPool { /** determine the number of hops in the longest tunnel we have */ public int getLongestTunnelLength() { - int max = 0; - synchronized (_freeInboundTunnels) { - for (Iterator iter = _freeInboundTunnels.values().iterator(); iter.hasNext(); ) { - TunnelInfo info = (TunnelInfo)iter.next(); - int len = info.getLength(); - if (len > max) - max = len; - } - } - return max; + int max = 0; + synchronized (_freeInboundTunnels) { + for (Iterator iter = _freeInboundTunnels.values().iterator(); iter.hasNext(); ) { + TunnelInfo info = (TunnelInfo)iter.next(); + int len = info.getLength(); + if (len > max) + max = len; + } + } + return max; } /** @@ -418,225 +426,225 @@ class TunnelPool { * */ public void buildFakeTunnels() { - if (getFreeValidTunnelCount() < 3) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Running low on valid inbound tunnels, building another"); - TunnelInfo inTunnelGateway = TunnelBuilder.getInstance().configureInboundTunnel(null, getPoolSettings(), true); - RequestTunnelJob inReqJob = new RequestTunnelJob(this, inTunnelGateway, true, getTunnelCreationTimeout()); - inReqJob.runJob(); - } - if (getOutboundValidTunnelCount() < 3) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Running low on valid outbound tunnels, building another"); - TunnelInfo outTunnelGateway = TunnelBuilder.getInstance().configureOutboundTunnel(getPoolSettings(), true); - RequestTunnelJob outReqJob = new RequestTunnelJob(this, outTunnelGateway, false, getTunnelCreationTimeout()); - outReqJob.runJob(); - } + if (getFreeValidTunnelCount() < 3) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Running low on valid inbound tunnels, building another"); + TunnelInfo inTunnelGateway = TunnelBuilder.getInstance().configureInboundTunnel(null, getPoolSettings(), true); + RequestTunnelJob inReqJob = new RequestTunnelJob(this, inTunnelGateway, true, getTunnelCreationTimeout()); + inReqJob.runJob(); + } + if (getOutboundValidTunnelCount() < 3) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Running low on valid outbound tunnels, building another"); + TunnelInfo outTunnelGateway = TunnelBuilder.getInstance().configureOutboundTunnel(getPoolSettings(), true); + RequestTunnelJob outReqJob = new RequestTunnelJob(this, outTunnelGateway, false, getTunnelCreationTimeout()); + outReqJob.runJob(); + } } private int getFreeValidTunnelCount() { - int found = 0; - Set ids = getFreeTunnels(); - long mustExpireAfter = Clock.getInstance().now(); - - for (Iterator iter = ids.iterator(); iter.hasNext(); ) { - TunnelId id = (TunnelId)iter.next(); - TunnelInfo info = getFreeTunnel(id); - if ( (info != null) && (info.getIsReady()) ) { - if (info.getSettings().getExpiration() > mustExpireAfter) { - if (info.getDestination() == null) { - found++; - } - } - } - } - return found; + int found = 0; + Set ids = getFreeTunnels(); + long mustExpireAfter = Clock.getInstance().now(); + + for (Iterator iter = ids.iterator(); iter.hasNext(); ) { + TunnelId id = (TunnelId)iter.next(); + TunnelInfo info = getFreeTunnel(id); + if ( (info != null) && (info.getIsReady()) ) { + if (info.getSettings().getExpiration() > mustExpireAfter) { + if (info.getDestination() == null) { + found++; + } + } + } + } + return found; } private int getOutboundValidTunnelCount() { - int found = 0; - Set ids = getOutboundTunnels(); - long mustExpireAfter = Clock.getInstance().now(); - - for (Iterator iter = ids.iterator(); iter.hasNext(); ) { - TunnelId id = (TunnelId)iter.next(); - TunnelInfo info = getOutboundTunnel(id); - if ( (info != null) && (info.getIsReady()) ) { - if (info.getSettings().getExpiration() > mustExpireAfter) { - found++; - } - } - } - return found; + int found = 0; + Set ids = getOutboundTunnels(); + long mustExpireAfter = Clock.getInstance().now(); + + for (Iterator iter = ids.iterator(); iter.hasNext(); ) { + TunnelId id = (TunnelId)iter.next(); + TunnelInfo info = getOutboundTunnel(id); + if ( (info != null) && (info.getIsReady()) ) { + if (info.getSettings().getExpiration() > mustExpireAfter) { + found++; + } + } + } + return found; } public void tunnelFailed(TunnelId id) { - if (!_isLive) return; - if (_log.shouldLog(Log.INFO)) - _log.info("Tunnel " + id + " marked as not ready, since it /failed/", new Exception("Failed tunnel")); - TunnelInfo info = getTunnelInfo(id); - if (info == null) - return; - MessageHistory.getInstance().tunnelFailed(info.getTunnelId()); - info.setIsReady(false); - Hash us = Router.getInstance().getRouterInfo().getIdentity().getHash(); - long lifetime = Clock.getInstance().now() - info.getCreated(); - while (info != null) { - if (!info.getThisHop().equals(us)) { - ProfileManager.getInstance().tunnelFailed(info.getThisHop()); - } - info = info.getNextHopInfo(); - } - StatManager.getInstance().addRateData("tunnel.failAfterTime", lifetime, lifetime); - StatManager.getInstance().updateFrequency("tunnel.failFrequency"); - buildFakeTunnels(); + if (!_isLive) return; + if (_log.shouldLog(Log.INFO)) + _log.info("Tunnel " + id + " marked as not ready, since it /failed/", new Exception("Failed tunnel")); + TunnelInfo info = getTunnelInfo(id); + if (info == null) + return; + MessageHistory.getInstance().tunnelFailed(info.getTunnelId()); + info.setIsReady(false); + Hash us = Router.getInstance().getRouterInfo().getIdentity().getHash(); + long lifetime = Clock.getInstance().now() - info.getCreated(); + while (info != null) { + if (!info.getThisHop().equals(us)) { + ProfileManager.getInstance().tunnelFailed(info.getThisHop()); + } + info = info.getNextHopInfo(); + } + StatManager.getInstance().addRateData("tunnel.failAfterTime", lifetime, lifetime); + StatManager.getInstance().updateFrequency("tunnel.failFrequency"); + buildFakeTunnels(); } public void startup() { - if (_log.shouldLog(Log.INFO)) _log.info("Starting up tunnel pool"); - _isLive = true; - _outboundTunnels = new HashMap(8); - _freeInboundTunnels = new HashMap(8); - _clientPools = new HashMap(8); - _participatingTunnels = new HashMap(8); - _pendingTunnels = new HashMap(8); - _poolSettings = createPoolSettings(); - _persistenceHelper.loadPool(this); - _tunnelCreationTimeout = -1; - try { - String str = Router.getInstance().getConfigSetting(TUNNEL_CREATION_TIMEOUT_PARAM); - _tunnelCreationTimeout = Long.parseLong(str); - } catch (Throwable t) { - _tunnelCreationTimeout = TUNNEL_CREATION_TIMEOUT_DEFAULT; - } - _targetClients = TARGET_CLIENTS_DEFAULT; - try { - String str = Router.getInstance().getConfigSetting(TARGET_CLIENTS_PARAM); - _targetClients = Integer.parseInt(str); - } catch (Throwable t) { - _targetClients = TARGET_CLIENTS_DEFAULT; - } - buildFakeTunnels(); - JobQueue.getInstance().addJob(new WritePoolJob()); - JobQueue.getInstance().addJob(new TunnelPoolManagerJob(this)); - JobQueue.getInstance().addJob(new TunnelPoolExpirationJob(this)); + if (_log.shouldLog(Log.INFO)) _log.info("Starting up tunnel pool"); + _isLive = true; + _outboundTunnels = new HashMap(16); + _freeInboundTunnels = new HashMap(16); + _clientPools = new HashMap(8); + _participatingTunnels = new HashMap(64); + _pendingTunnels = new HashMap(8); + _poolSettings = createPoolSettings(); + _persistenceHelper.loadPool(this); + _tunnelCreationTimeout = -1; + try { + String str = Router.getInstance().getConfigSetting(TUNNEL_CREATION_TIMEOUT_PARAM); + _tunnelCreationTimeout = Long.parseLong(str); + } catch (Throwable t) { + _tunnelCreationTimeout = TUNNEL_CREATION_TIMEOUT_DEFAULT; + } + _targetClients = TARGET_CLIENTS_DEFAULT; + try { + String str = Router.getInstance().getConfigSetting(TARGET_CLIENTS_PARAM); + _targetClients = Integer.parseInt(str); + } catch (Throwable t) { + _targetClients = TARGET_CLIENTS_DEFAULT; + } + buildFakeTunnels(); + JobQueue.getInstance().addJob(new WritePoolJob()); + JobQueue.getInstance().addJob(new TunnelPoolManagerJob(this)); + JobQueue.getInstance().addJob(new TunnelPoolExpirationJob(this)); } public void shutdown() { - if (_log.shouldLog(Log.INFO)) _log.info("Shutting down tunnel pool"); - _persistenceHelper.writePool(this); - _isLive = false; // the subjobs [should] check getIsLive() on each run - _outboundTunnels = null; - _freeInboundTunnels = null; - _clientPools = null; - _participatingTunnels = null; - _poolSettings = null; - _persistenceHelper = null; - _tunnelCreationTimeout = -1; + if (_log.shouldLog(Log.INFO)) _log.info("Shutting down tunnel pool"); + _persistenceHelper.writePool(this); + _isLive = false; // the subjobs [should] check getIsLive() on each run + _outboundTunnels = null; + _freeInboundTunnels = null; + _clientPools = null; + _participatingTunnels = null; + _poolSettings = null; + _persistenceHelper = null; + _tunnelCreationTimeout = -1; } public boolean isLive() { return _isLive; } private ClientTunnelSettings createPoolSettings() { - ClientTunnelSettings settings = new ClientTunnelSettings(); - settings.readFromProperties(Router.getInstance().getConfigMap()); - return settings; + ClientTunnelSettings settings = new ClientTunnelSettings(); + settings.readFromProperties(Router.getInstance().getConfigMap()); + return settings; } public String renderStatusHTML() { - if (!_isLive) return ""; - StringBuffer buf = new StringBuffer(); - buf.append("<h2>Tunnel Pool</h2>\n"); - renderTunnels(buf, "Free inbound tunnels", getFreeTunnels()); - renderTunnels(buf, "Outbound tunnels", getOutboundTunnels()); - renderTunnels(buf, "Participating tunnels", getParticipatingTunnels()); - for (Iterator iter = getClientPools().iterator(); iter.hasNext(); ) { - Destination dest = (Destination)iter.next(); - ClientTunnelPool pool = getClientPool(dest); - renderTunnels(buf, "Inbound tunnels for " + dest.calculateHash() + " - (still connected? " + (!pool.isStopped()) + ")", pool.getInboundTunnelIds()); - } - return buf.toString(); + if (!_isLive) return ""; + StringBuffer buf = new StringBuffer(); + buf.append("<h2>Tunnel Pool</h2>\n"); + renderTunnels(buf, "Free inbound tunnels", getFreeTunnels()); + renderTunnels(buf, "Outbound tunnels", getOutboundTunnels()); + renderTunnels(buf, "Participating tunnels", getParticipatingTunnels()); + for (Iterator iter = getClientPools().iterator(); iter.hasNext(); ) { + Destination dest = (Destination)iter.next(); + ClientTunnelPool pool = getClientPool(dest); + renderTunnels(buf, "Inbound tunnels for " + dest.calculateHash() + " - (still connected? " + (!pool.isStopped()) + ")", pool.getInboundTunnelIds()); + } + return buf.toString(); } private void renderTunnels(StringBuffer buf, String msg, Set tunnelIds) { - buf.append("<b>").append(msg).append(":</b> <i>(").append(tunnelIds.size()).append(" tunnels)</i><ul>\n"); - for (Iterator iter = tunnelIds.iterator(); iter.hasNext(); ) { - TunnelId id = (TunnelId)iter.next(); - TunnelInfo tunnel = getTunnelInfo(id); - renderTunnel(buf, id, tunnel); - } - buf.append("</ul>\n"); + buf.append("<b>").append(msg).append(":</b> <i>(").append(tunnelIds.size()).append(" tunnels)</i><ul>\n"); + for (Iterator iter = tunnelIds.iterator(); iter.hasNext(); ) { + TunnelId id = (TunnelId)iter.next(); + TunnelInfo tunnel = getTunnelInfo(id); + renderTunnel(buf, id, tunnel); + } + buf.append("</ul>\n"); } private final static void renderTunnel(StringBuffer buf, TunnelId id, TunnelInfo tunnel) { - if (tunnel == null) { - buf.append("<li>Tunnel: ").append(id.getTunnelId()).append(" is not known</li>\n"); - } else { - buf.append("<li>Tunnel: ").append(tunnel.getTunnelId()).append("</li><pre>"); - buf.append("\n\tStyle: ").append(getStyle(id)); - buf.append("\n\tReady? ").append(tunnel.getIsReady()); - buf.append("\n\tDest? ").append(getDestination(tunnel)); - if (tunnel.getSettings() != null) - buf.append("\n\tExpiration: ").append(new Date(tunnel.getSettings().getExpiration())); - else - buf.append("\n\tExpiration: none"); - - buf.append("\n\tStart router: ").append(tunnel.getThisHop().toBase64()).append("\n"); - TunnelInfo t = tunnel.getNextHopInfo(); - if (t != null) { - int hop = 1; - while (t != null) { - buf.append("\tHop ").append(hop).append(": ").append(t.getThisHop().toBase64()).append("\n"); - t = t.getNextHopInfo(); - hop++; - } - } else { - if (tunnel.getNextHop() != null) - buf.append("\tNext: ").append(tunnel.getNextHop().toBase64()).append("\n"); - } - - buf.append("\n</pre>"); - } + if (tunnel == null) { + buf.append("<li>Tunnel: ").append(id.getTunnelId()).append(" is not known</li>\n"); + } else { + buf.append("<li>Tunnel: ").append(tunnel.getTunnelId()).append("</li><pre>"); + buf.append("\n\tStyle: ").append(getStyle(id)); + buf.append("\n\tReady? ").append(tunnel.getIsReady()); + buf.append("\n\tDest? ").append(getDestination(tunnel)); + if (tunnel.getSettings() != null) + buf.append("\n\tExpiration: ").append(new Date(tunnel.getSettings().getExpiration())); + else + buf.append("\n\tExpiration: none"); + + buf.append("\n\tStart router: ").append(tunnel.getThisHop().toBase64()).append("\n"); + TunnelInfo t = tunnel.getNextHopInfo(); + if (t != null) { + int hop = 1; + while (t != null) { + buf.append("\tHop ").append(hop).append(": ").append(t.getThisHop().toBase64()).append("\n"); + t = t.getNextHopInfo(); + hop++; + } + } else { + if (tunnel.getNextHop() != null) + buf.append("\tNext: ").append(tunnel.getNextHop().toBase64()).append("\n"); + } + + buf.append("\n</pre>"); + } } private final static String getStyle(TunnelId id) { - switch (id.getType()) { - case TunnelId.TYPE_INBOUND: - return "Inbound"; - case TunnelId.TYPE_OUTBOUND: - return "Outbound"; - case TunnelId.TYPE_PARTICIPANT: - return "Participant"; - case TunnelId.TYPE_UNSPECIFIED: - return "Unspecified"; - default: - return "Other! - " + id.getType(); - } + switch (id.getType()) { + case TunnelId.TYPE_INBOUND: + return "Inbound"; + case TunnelId.TYPE_OUTBOUND: + return "Outbound"; + case TunnelId.TYPE_PARTICIPANT: + return "Participant"; + case TunnelId.TYPE_UNSPECIFIED: + return "Unspecified"; + default: + return "Other! - " + id.getType(); + } } private final static String getDestination(TunnelInfo info) { - while (info != null) { - if (info.getDestination() != null) - return info.getDestination().calculateHash().toString(); - else - info = info.getNextHopInfo(); - } - return "none"; + while (info != null) { + if (info.getDestination() != null) + return info.getDestination().calculateHash().toString(); + else + info = info.getNextHopInfo(); + } + return "none"; } /** * This job instructs the troops to invade mars with a spork. */ private class WritePoolJob extends JobImpl { - public WritePoolJob() { - getTiming().setStartAfter(Clock.getInstance().now() + WRITE_POOL_DELAY); - } - public String getName() { return "Write Out Tunnel Pool"; } - public void runJob() { - if (!isLive()) - return; - _persistenceHelper.writePool(TunnelPool.this); - requeue(WRITE_POOL_DELAY); - } + public WritePoolJob() { + getTiming().setStartAfter(Clock.getInstance().now() + WRITE_POOL_DELAY); + } + public String getName() { return "Write Out Tunnel Pool"; } + public void runJob() { + if (!isLive()) + return; + _persistenceHelper.writePool(TunnelPool.this); + requeue(WRITE_POOL_DELAY); + } } } -- GitLab