diff --git a/history.txt b/history.txt index 61fe18f05986dac2bb54c2d3ff610040fae282ad..4d040c69dbff3c85f8f32f111d46e864aeb095db 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,12 @@ -$Id: history.txt,v 1.439 2006/03/25 18:50:51 jrandom Exp $ +$Id: history.txt,v 1.440 2006/03/26 18:23:54 jrandom Exp $ + +2006-03-30 jrandom + * Substantially reduced the lock contention in the message registry (a + major hotspot that can choke most threads). Also reworked the locking + so we don't need per-message timer events + * No need to have additional per-peer message clearing, as they are + either unregistered individually or expired. + * Include some of the more transient tunnel throttling * 2006-03-26 0.6.1.13 released diff --git a/router/java/src/net/i2p/router/RouterThrottleImpl.java b/router/java/src/net/i2p/router/RouterThrottleImpl.java index 705adf83d86d6d4eb5955d45f4dbe8838cf184fe..365c33f9ff94518c50d5971762cb87d120749d58 100644 --- a/router/java/src/net/i2p/router/RouterThrottleImpl.java +++ b/router/java/src/net/i2p/router/RouterThrottleImpl.java @@ -248,7 +248,7 @@ class RouterThrottleImpl implements RouterThrottle { */ private boolean allowTunnel(double bytesAllocated, int numTunnels) { int maxKBps = Math.min(_context.bandwidthLimiter().getOutboundKBytesPerSecond(), _context.bandwidthLimiter().getInboundKBytesPerSecond()); - int used1s = 0; //get1sRate(_context); // dont throttle on the 1s rate, its too volatile + int used1s = get1sRate(_context); // dont throttle on the 1s rate, its too volatile int used1m = get1mRate(_context); int used5m = 0; //get5mRate(_context); // don't throttle on the 5m rate, as that'd hide available bandwidth int used = Math.max(Math.max(used1s, used1m), used5m); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 6ca96e39bfdbdb480032c27b04ce27f1289e6e88..6c598171f74f255a5adbe9b981c758043febc8f9 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.380 $ $Date: 2006/03/25 18:50:48 $"; + public final static String ID = "$Revision: 1.381 $ $Date: 2006/03/26 18:23:52 $"; public final static String VERSION = "0.6.1.13"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/Shitlist.java b/router/java/src/net/i2p/router/Shitlist.java index 54162eb6d1ea4841c4e81c25c9fe26f7cf3f7617..b230ab8a559cfe4ac46bd2c34087bd82c315ae6a 100644 --- a/router/java/src/net/i2p/router/Shitlist.java +++ b/router/java/src/net/i2p/router/Shitlist.java @@ -84,7 +84,7 @@ public class Shitlist { _context.netDb().fail(peer); //_context.tunnelManager().peerFailed(peer); - _context.messageRegistry().peerFailed(peer); + //_context.messageRegistry().peerFailed(peer); if (!wasAlready) _context.messageHistory().shitlist(peer, reason); return wasAlready; diff --git a/router/java/src/net/i2p/router/transport/GetBidsJob.java b/router/java/src/net/i2p/router/transport/GetBidsJob.java index 219057598986e34074af371b334556cab45afb7b..01b81e339fdb46d2576e00d126733130601caedf 100644 --- a/router/java/src/net/i2p/router/transport/GetBidsJob.java +++ b/router/java/src/net/i2p/router/transport/GetBidsJob.java @@ -47,7 +47,7 @@ public class GetBidsJob extends JobImpl { if (context.shitlist().isShitlisted(to)) { if (log.shouldLog(Log.WARN)) log.warn("Attempt to send a message to a shitlisted peer - " + to); - context.messageRegistry().peerFailed(to); + //context.messageRegistry().peerFailed(to); fail(context, msg); return; } diff --git a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java index 281227c55988c5079a24fe3c7508acc3d4272ab3..aaf6b13e0320f82851d7f2f40bf9daace353c418 100644 --- a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java +++ b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java @@ -10,14 +10,8 @@ package net.i2p.router.transport; import java.io.IOException; import java.io.Writer; -import java.util.ArrayList; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; -import net.i2p.data.Hash; import net.i2p.data.i2np.I2NPMessage; import net.i2p.router.Job; import net.i2p.router.MessageSelector; @@ -29,29 +23,25 @@ import net.i2p.util.SimpleTimer; public class OutboundMessageRegistry { private Log _log; - /** Expiration date (Long) to OutNetMessage */ - private TreeMap _pendingMessages; + /** list of currently active MessageSelector instances */ + private List _selectors; + /** map of active MessageSelector to the OutNetMessage causing it (for quick removal) */ + private Map _selectorToMessage; + /** set of active OutNetMessage (for quick removal and selector fetching) */ + private Set _activeMessages; + private CleanupTask _cleanupTask; private RouterContext _context; - private final static long CLEANUP_DELAY = 1000*5; // how often to expire pending unreplied messages - public OutboundMessageRegistry(RouterContext context) { _context = context; _log = _context.logManager().getLog(OutboundMessageRegistry.class); - _pendingMessages = new TreeMap(); - //_context.jobQueue().addJob(new CleanupPendingMessagesJob()); + _selectors = new ArrayList(64); + _selectorToMessage = new HashMap(64); + _activeMessages = new HashSet(64); + _cleanupTask = new CleanupTask(); } - public void shutdown() { - if (_log.shouldLog(Log.WARN)) { - StringBuffer buf = new StringBuffer(1024); - buf.append("Pending messages: ").append(_pendingMessages.size()).append("\n"); - for (Iterator iter = _pendingMessages.values().iterator(); iter.hasNext(); ) { - buf.append(iter.next().toString()).append("\n\t"); - } - _log.log(Log.WARN, buf.toString()); - } - } + public void shutdown() {} /** * Retrieve all messages that are waiting for the specified message. In @@ -65,103 +55,54 @@ public class OutboundMessageRegistry { * the payload */ public List getOriginalMessages(I2NPMessage message) { - ArrayList matches = new ArrayList(2); + ArrayList matchedSelectors = null; + ArrayList removedSelectors = null; long beforeSync = _context.clock().now(); - - Map messages = null; - long matchTime = 0; - long continueTime = 0; - int numMessages = 0; - long afterSync1 = 0; - long afterSearch = 0; - int matchedRemoveCount = 0; - StringBuffer slow = null; // new StringBuffer(256); - - synchronized (_pendingMessages) { - messages = _pendingMessages; //(Map)_pendingMessages.clone(); - - numMessages = messages.size(); - afterSync1 = _context.clock().now(); - - for (Iterator iter = messages.keySet().iterator(); iter.hasNext(); ) { - Long exp = (Long)iter.next(); - OutNetMessage msg = (OutNetMessage)messages.get(exp); - MessageSelector selector = msg.getReplySelector(); - if (selector != null) { - long before = _context.clock().now(); - boolean isMatch = selector.isMatch(message); - long after = _context.clock().now(); - long diff = after-before; - if (diff > 100) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Matching with selector took too long (" + diff + "ms) : " - + selector.getClass().getName()); - if (slow == null) slow = new StringBuffer(256); - slow.append(selector.getClass().getName()).append(": "); - slow.append(diff).append(" "); + synchronized (_selectors) { + for (int i = 0; i < _selectors.size(); i++) { + MessageSelector sel = (MessageSelector)_selectors.get(i); + boolean isMatch = sel.isMatch(message); + if (isMatch) { + if (matchedSelectors == null) matchedSelectors = new ArrayList(1); + matchedSelectors.add(sel); + if (!sel.continueMatching()) { + if (removedSelectors == null) removedSelectors = new ArrayList(1); + removedSelectors.add(sel); + _selectors.remove(i); + i--; } - matchTime += diff; - - if (isMatch) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Selector matches [" + selector); - if (!matches.contains(msg)) - matches.add(msg); - long beforeCon = _context.clock().now(); - boolean continueMatching = selector.continueMatching(); - long afterCon = _context.clock().now(); - long diffCon = afterCon - beforeCon; - if (diffCon > 100) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Error continueMatching on a match took too long (" - + diffCon + "ms) : " + selector.getClass().getName()); - } - continueTime += diffCon; - - if (continueMatching) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Continue matching"); - // noop - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Stop matching selector " + selector + " for message " - + msg.getMessageType()); - - // i give in mihi, i'll use iter.remove just this once ;) - // (TreeMap supports it, and this synchronized block is a hotspot) - iter.remove(); + } + } + } - matchedRemoveCount++; - } + List rv = null; + if (matchedSelectors != null) { + rv = new ArrayList(matchedSelectors.size()); + for (int i = 0; i < matchedSelectors.size(); i++) { + MessageSelector sel = (MessageSelector)matchedSelectors.get(i); + boolean removed = false; + OutNetMessage msg = null; + synchronized (_selectorToMessage) { + if ( (removedSelectors != null) && (removedSelectors.contains(sel)) ) { + msg = (OutNetMessage)_selectorToMessage.remove(sel); + removed = true; } else { - //_log.debug("Selector does not match [" + selector + "]"); + msg = (OutNetMessage)_selectorToMessage.get(sel); + } + if (msg != null) + rv.add(msg); + } + if (removed && msg != null) { + synchronized (_activeMessages) { + _activeMessages.remove(msg); } } } - afterSearch = _context.clock().now(); - } - - long delay = _context.clock().now() - beforeSync; - long search = afterSearch - afterSync1; - long sync = afterSync1 - beforeSync; - - int level = Log.DEBUG; - if (delay > 1000) - level = Log.ERROR; - if (_log.shouldLog(level)) { - StringBuffer buf = new StringBuffer(1024); - buf.append("getMessages took ").append(delay).append("ms with search time of"); - buf.append(search).append("ms (match: ").append(matchTime).append("ms, continue: "); - buf.append(continueTime).append("ms, #: ").append(numMessages).append(") and sync time of "); - buf.append(sync).append("ms for "); - buf.append(matchedRemoveCount); - buf.append(" removed, ").append(matches.size()).append(" matches: slow = "); - if (slow != null) - buf.append(slow.toString()); - _log.log(level, buf.toString()); + } else { + rv = Collections.EMPTY_LIST; } - return matches; + return rv; } public OutNetMessage registerPending(MessageSelector replySelector, ReplyJob onReply, Job onTimeout, int timeoutMs) { @@ -174,271 +115,86 @@ public class OutboundMessageRegistry { registerPending(msg, true); return msg; } - - public void registerPending(OutNetMessage msg) { - registerPending(msg, false); - } + public void registerPending(OutNetMessage msg) { registerPending(msg, false); } public void registerPending(OutNetMessage msg, boolean allowEmpty) { - if (msg == null) - throw new IllegalArgumentException("Null OutNetMessage specified? wtf"); - if (!allowEmpty) { - if (msg.getMessage() == null) + if ( (!allowEmpty) && (msg.getMessage() == null) ) throw new IllegalArgumentException("OutNetMessage doesn't contain an I2NPMessage? wtf"); - } - - long beforeSync = _context.clock().now(); - long afterSync1 = 0; - long afterDone = 0; - try { - OutNetMessage oldMsg = null; - long l = msg.getExpiration(); - synchronized (_pendingMessages) { - if (_pendingMessages.containsValue(msg)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Not adding an already pending message: " + msg, - new Exception("Duplicate message registration")); - return; - } + MessageSelector sel = msg.getReplySelector(); + if (sel == null) throw new IllegalArgumentException("No reply selector? wtf"); - while (_pendingMessages.containsKey(new Long(l))) - l++; - _pendingMessages.put(new Long(l), msg); - } - afterSync1 = _context.clock().now(); - - // this may get orphaned if the message is matched or explicitly - // removed, but its cheap enough to do an extra remove on the map - // that to poll the list periodically - SimpleTimer.getInstance().addEvent(new CleanupExpiredTask(l), l - _context.clock().now()); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Register pending: " + msg.getReplySelector().getClass().getName() - + " for " + msg.getMessage() + ": " - + msg.getReplySelector().toString(), new Exception("Register pending")); - afterDone = _context.clock().now(); - } finally { - long delay = _context.clock().now() - beforeSync; - long sync1 = afterSync1 - beforeSync; - long done = afterDone - afterSync1; - String warn = delay + "ms (sync = " + sync1 + "ms, done = " + done + "ms)"; - if ( (delay > 1000) && (_log.shouldLog(Log.WARN)) ) { - _log.error("Synchronizing in the registry.register took too long! " + warn); - //_context.messageHistory().messageProcessingError(msg.getMessage().getUniqueId(), - // msg.getMessage().getClass().getName(), - // "RegisterPending took too long: " + warn); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Synchronizing in the registry.register was quick: " + warn); - } + boolean alreadyPending = false; + synchronized (_activeMessages) { + if (!_activeMessages.add(msg)) + return; // dont add dups } - //_log.debug("* Register called of " + msg + "\n\nNow pending are: " + renderStatusHTML(), new Exception("who registered a new one?")); + synchronized (_selectorToMessage) { _selectorToMessage.put(sel, msg); } + synchronized (_selectors) { _selectors.add(sel); } + + _cleanupTask.scheduleExpiration(sel); } public void unregisterPending(OutNetMessage msg) { - long beforeSync = _context.clock().now(); - try { - synchronized (_pendingMessages) { - if (_pendingMessages.containsValue(msg)) { - Long found = null; - for (Iterator iter = _pendingMessages.keySet().iterator(); iter.hasNext();) { - Long exp = (Long)iter.next(); - Object val = _pendingMessages.get(exp); - if (val.equals(msg)) { - found = exp; - break; - } - } - - if (found != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Unregistered message " + msg.getReplySelector() - + ": " + msg, new Exception("Who unregistered?")); - _pendingMessages.remove(found); - } else { - _log.error("Arg, couldn't find the message that we... thought we could find?", - new Exception("WTF")); - } - } - } - } finally { - long delay = _context.clock().now() - beforeSync; - String warn = delay + "ms"; - if ( (delay > 1000) && (_log.shouldLog(Log.WARN)) ) { - _log.warn("Synchronizing in the registry.unRegister took too long! " + warn); - _context.messageHistory().messageProcessingError(msg.getMessageId(), msg.getMessageType(), "Unregister took too long: " + warn); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Synchronizing in the registry.unRegister was quick: " + warn); - } - } + MessageSelector sel = msg.getReplySelector(); + // remember, order matters + synchronized (_selectors) { _selectors.add(sel); } + synchronized (_selectorToMessage) { _selectorToMessage.put(sel, msg); } + synchronized (_activeMessages) { _activeMessages.remove(msg); } } - public void peerFailed(Hash peer) { - List failed = null; - int numFailed = 0; - synchronized (_pendingMessages) { - for (Iterator iter = _pendingMessages.values().iterator(); iter.hasNext(); ) { - OutNetMessage msg = (OutNetMessage)iter.next(); - if (msg.getTarget() != null) { - Hash to = msg.getTarget().getIdentity().calculateHash(); - if (to.equals(peer)) { - if (failed == null) - failed = new ArrayList(4); - failed.add(msg); - iter.remove(); - numFailed++; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Peer failed: " + peer.toBase64().substring(0,6) - + " but not killing a message to " - + to.toBase64().substring(0,6)); - } - } - } - } - if (failed != null) { - for (int i = 0; i < failed.size(); i++) { - OutNetMessage msg = (OutNetMessage)failed.get(i); - msg.discardData(); - if (msg.getOnFailedSendJob() != null) - _context.jobQueue().addJob(msg.getOnFailedSendJob()); - } - } - - if (_log.shouldLog(Log.WARN)) - _log.warn("Peer failed: " + peer.toBase64().substring(0,6) + " killing " + numFailed); - } + public void renderStatusHTML(Writer out) throws IOException {} - public void renderStatusHTML(Writer out) throws IOException { - StringBuffer buf = new StringBuffer(8192); - buf.append("<h2>Pending messages</h2>\n"); - Map msgs = null; - synchronized (_pendingMessages) { - msgs = (Map)_pendingMessages.clone(); - } - buf.append("<ul>"); - for (Iterator iter = msgs.keySet().iterator(); iter.hasNext();) { - Long exp = (Long)iter.next(); - OutNetMessage msg = (OutNetMessage)msgs.get(exp); - buf.append("<li>").append(msg.getMessageType()); - buf.append(": expiring on ").append(new Date(exp.longValue())); - if (msg.getTarget() != null) - buf.append(" targetting ").append(msg.getTarget().getIdentity().getHash()); - if (msg.getReplySelector() != null) - buf.append(" with reply selector ").append(msg.getReplySelector().toString()); - else - buf.append(" with NO reply selector? WTF!"); - buf.append("</li>\n"); - } - buf.append("</ul>"); - out.write(buf.toString()); - out.flush(); - } - - private class CleanupExpiredTask implements SimpleTimer.TimedEvent { - private long _expiration; - public CleanupExpiredTask(long expiration) { - _expiration = expiration; + private class CleanupTask implements SimpleTimer.TimedEvent { + private List _removing; + private long _nextExpire; + public CleanupTask() { + _removing = new ArrayList(4); + _nextExpire = -1; } public void timeReached() { - OutNetMessage msg = null; - synchronized (_pendingMessages) { - msg = (OutNetMessage)_pendingMessages.remove(new Long(_expiration)); - } - if (msg != null) { - _context.messageHistory().replyTimedOut(msg); - Job fail = msg.getOnFailedReplyJob(); - if (fail != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removing message with selector " + msg.getReplySelector() - + ": " + msg.getMessageType() - + " and firing fail job: " + fail.getClass().getName()); - _context.jobQueue().addJob(fail); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removing message with selector " + msg.getReplySelector() - + " and not firing any job"); + long now = _context.clock().now(); + synchronized (_selectors) { + for (int i = 0; i < _selectors.size(); i++) { + MessageSelector sel = (MessageSelector)_selectors.get(i); + long expiration = sel.getExpiration(); + if (expiration <= now) { + _removing.add(sel); + _selectors.remove(i); + i--; + } else if (expiration < _nextExpire || _nextExpire < now) { + _nextExpire = expiration; + } } } - } - } - - /** - * Cleanup any messages that were pending replies but have expired - * - */ - /* - private class CleanupPendingMessagesJob extends JobImpl { - public CleanupPendingMessagesJob() { - super(OutboundMessageRegistry.this._context); - } - - public String getName() { return "Cleanup any messages that timed out"; } - - public void runJob() { - List removed = removeMessages(); - - RouterContext ctx = OutboundMessageRegistry.this._context; - - for (int i = 0; i < removed.size(); i++) { - OutNetMessage msg = (OutNetMessage)removed.get(i); - - if (msg != null) { - _context.messageHistory().replyTimedOut(msg); - Job fail = msg.getOnFailedReplyJob(); - if (fail != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removing message with selector " + msg.getReplySelector() - + ": " + msg.getMessageType() - + " and firing fail job: " + fail.getClass().getName()); - _context.jobQueue().addJob(fail); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removing message with selector " + msg.getReplySelector() - + " and not firing any job"); + if (_removing.size() > 0) { + for (int i = 0; i < _removing.size(); i++) { + MessageSelector sel = (MessageSelector)_removing.get(i); + OutNetMessage msg = null; + synchronized (_selectorToMessage) { + msg = (OutNetMessage)_selectorToMessage.remove(sel); + } + if (msg != null) { + synchronized (_activeMessages) { + _activeMessages.remove(msg); + } + Job fail = msg.getOnFailedReplyJob(); + if (fail != null) + _context.jobQueue().addJob(fail); } } + _removing.clear(); } - requeue(CLEANUP_DELAY); + if (_nextExpire <= now) + _nextExpire = now + 10*1000; + SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now); } - - /** - * Remove any messages whose expirations are in the past - * - * @return list of OutNetMessage objects that have expired - */ /* - private List removeMessages() { - long now = OutboundMessageRegistry.this._context.clock().now(); - List removedMessages = new ArrayList(2); - List expirationsToRemove = null; - synchronized (_pendingMessages) { - for (Iterator iter = _pendingMessages.keySet().iterator(); iter.hasNext();) { - Long expiration = (Long)iter.next(); - if (expiration.longValue() < now) { - if (expirationsToRemove == null) - expirationsToRemove = new ArrayList(8); - expirationsToRemove.add(expiration); - } else { - // its sorted - break; - } - } - - if (expirationsToRemove != null) { - for (int i = 0; i < expirationsToRemove.size(); i++) { - Long expiration = (Long)expirationsToRemove.get(i); - OutNetMessage msg = (OutNetMessage)_pendingMessages.remove(expiration); - if (msg != null) - removedMessages.add(msg); - } - } + public void scheduleExpiration(MessageSelector sel) { + long now = _context.clock().now(); + if ( (_nextExpire <= now) || (sel.getExpiration() < _nextExpire) ) { + _nextExpire = sel.getExpiration(); + SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now); } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removed " + removedMessages.size() + " messages"); - return removedMessages; } } - */ }