From 40d981df252e0026115baa4f504220aabed407f8 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Mon, 29 Oct 2012 22:21:50 +0000 Subject: [PATCH] * OutNetMessage: Properly clean up when dropped by codel (but unused for now since codel is disabled for ONM) * Tunnels: Implement per-client outbound tunnel message priority (ticket #719) * ClientTunnelSettings cleanup --- .../net/i2p/router/ClientTunnelSettings.java | 10 +++---- .../src/net/i2p/router/OutNetMessage.java | 27 ++++++++++++++----- .../net/i2p/router/TunnelPoolSettings.java | 15 +++++++++++ .../i2p/router/tunnel/OutboundReceiver.java | 4 ++- .../router/tunnel/OutboundTunnelEndpoint.java | 3 ++- .../router/tunnel/TunnelCreatorConfig.java | 16 +++++++++++ .../router/tunnel/TunnelGatewayZeroHop.java | 3 ++- .../i2p/router/tunnel/pool/TunnelPool.java | 4 ++- .../util/CoDelPriorityBlockingQueue.java | 4 ++- 9 files changed, 69 insertions(+), 17 deletions(-) diff --git a/router/java/src/net/i2p/router/ClientTunnelSettings.java b/router/java/src/net/i2p/router/ClientTunnelSettings.java index efed48a9d7..13a0b88806 100644 --- a/router/java/src/net/i2p/router/ClientTunnelSettings.java +++ b/router/java/src/net/i2p/router/ClientTunnelSettings.java @@ -16,8 +16,8 @@ import java.util.Properties; * */ public class ClientTunnelSettings { - private TunnelPoolSettings _inboundSettings; - private TunnelPoolSettings _outboundSettings; + private final TunnelPoolSettings _inboundSettings; + private final TunnelPoolSettings _outboundSettings; public ClientTunnelSettings() { _inboundSettings = new TunnelPoolSettings(false, true); @@ -25,16 +25,16 @@ public class ClientTunnelSettings { } public TunnelPoolSettings getInboundSettings() { return _inboundSettings; } - public void setInboundSettings(TunnelPoolSettings settings) { _inboundSettings = settings; } + //public void setInboundSettings(TunnelPoolSettings settings) { _inboundSettings = settings; } public TunnelPoolSettings getOutboundSettings() { return _outboundSettings; } - public void setOutboundSettings(TunnelPoolSettings settings) { _outboundSettings = settings; } + //public void setOutboundSettings(TunnelPoolSettings settings) { _outboundSettings = settings; } public void readFromProperties(Properties props) { _inboundSettings.readFromProperties("inbound.", props); _outboundSettings.readFromProperties("outbound.", props); } - public void writeToProperties(Properties props) { + private void writeToProperties(Properties props) { if (props == null) return; _inboundSettings.writeToProperties("inbound.", props); _outboundSettings.writeToProperties("outbound.", props); diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 9016d88dfc..60a47243e6 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -66,8 +66,9 @@ public class OutNetMessage implements CDPQEntry { public static final int PRIORITY_HIGHEST = 1000; public static final int PRIORITY_MY_BUILD_REQUEST = 500; public static final int PRIORITY_MY_NETDB_LOOKUP = 500; - public static final int PRIORITY_MY_NETDB_STORE = 400; - public static final int PRIORITY_MY_DATA = 400; + public static final int PRIORITY_MY_NETDB_STORE = 460; + /** may be adjusted +/- 25 for outbound traffic */ + public static final int PRIORITY_MY_DATA = 425; public static final int PRIORITY_MY_NETDB_STORE_LOW = 300; public static final int PRIORITY_HIS_BUILD_REQUEST = 300; public static final int PRIORITY_BUILD_REPLY = 300; @@ -308,6 +309,18 @@ public class OutNetMessage implements CDPQEntry { * @since 0.9.3 */ public void drop() { + // This is essentially what TransportImpl.afterSend(this, false) does + // but we don't have a ref to the Transport. + // No requeue with other transport allowed. + if (_onFailedSend != null) + _context.jobQueue().addJob(_onFailedSend); + if (_onFailedReply != null) + _context.jobQueue().addJob(_onFailedReply); + if (_replySelector != null) + _context.messageRegistry().unregisterPending(this); + discardData(); + // we want this stat to reflect the lag + _context.statManager().addRateData("transport.sendProcessingTime", _context.clock().now() - _enqueueTime); } /** @@ -333,11 +346,11 @@ public class OutNetMessage implements CDPQEntry { public void discardData() { if ( (_message != null) && (_messageSize <= 0) ) _messageSize = _message.getMessageSize(); - if (_log.shouldLog(Log.DEBUG)) { - long timeToDiscard = _context.clock().now() - _created; - _log.debug("Discard " + _messageSize + "byte " + getMessageType() + " message after " - + timeToDiscard); - } + //if (_log.shouldLog(Log.DEBUG)) { + // long timeToDiscard = _context.clock().now() - _created; + // _log.debug("Discard " + _messageSize + "byte " + getMessageType() + " message after " + // + timeToDiscard); + //} _message = null; //_context.statManager().addRateData("outNetMessage.timeToDiscard", timeToDiscard, timeToDiscard); //_context.messageStateMonitor().outboundMessageDiscarded(); diff --git a/router/java/src/net/i2p/router/TunnelPoolSettings.java b/router/java/src/net/i2p/router/TunnelPoolSettings.java index ff85978822..7be825939a 100644 --- a/router/java/src/net/i2p/router/TunnelPoolSettings.java +++ b/router/java/src/net/i2p/router/TunnelPoolSettings.java @@ -27,6 +27,7 @@ public class TunnelPoolSettings { private int _IPRestriction; private final Properties _unknownOptions; private final Hash _randomKey; + private int _priority; /** prefix used to override the router's defaults for clients */ public static final String PREFIX_DEFAULT = "router.defaultPool."; @@ -44,6 +45,7 @@ public class TunnelPoolSettings { public static final String PROP_LENGTH_VARIANCE = "lengthVariance"; public static final String PROP_ALLOW_ZERO_HOP = "allowZeroHop"; public static final String PROP_IP_RESTRICTION = "IPRestriction"; + public static final String PROP_PRIORITY = "priority"; public static final int DEFAULT_QUANTITY = 2; public static final int DEFAULT_BACKUP_QUANTITY = 0; @@ -53,6 +55,8 @@ public class TunnelPoolSettings { public static final int DEFAULT_LENGTH_VARIANCE = 0; public static final boolean DEFAULT_ALLOW_ZERO_HOP = true; public static final int DEFAULT_IP_RESTRICTION = 2; // class B (/16) + private static final int MIN_PRIORITY = -25; + private static final int MAX_PRIORITY = 25; public TunnelPoolSettings(boolean isExploratory, boolean isInbound) { _isExploratory = isExploratory; @@ -160,6 +164,13 @@ public class TunnelPoolSettings { public int getIPRestriction() { int r = _IPRestriction; if (r>4) r=4; else if (r<0) r=0; return r;} public void setIPRestriction(int b) { _IPRestriction = b; } + /** + * Outbound message priority - for outbound tunnels only + * @return -25 to +25, default 0 + * @since 0.9.4 + */ + public int getPriority() { return _priority; } + public Properties getUnknownOptions() { return _unknownOptions; } public void readFromProperties(String prefix, Map<Object, Object> props) { @@ -185,6 +196,8 @@ public class TunnelPoolSettings { _destinationNickname = value; else if (name.equalsIgnoreCase(prefix + PROP_IP_RESTRICTION)) _IPRestriction = getInt(value, DEFAULT_IP_RESTRICTION); + else if ((!_isInbound) && name.equalsIgnoreCase(prefix + PROP_PRIORITY)) + _IPRestriction = Math.min(MAX_PRIORITY, Math.max(MIN_PRIORITY, getInt(value, 0))); else _unknownOptions.setProperty(name.substring((prefix != null ? prefix.length() : 0)), value); } @@ -203,6 +216,8 @@ public class TunnelPoolSettings { props.setProperty(prefix + PROP_QUANTITY, ""+_quantity); // props.setProperty(prefix + PROP_REBUILD_PERIOD, ""+_rebuildPeriod); props.setProperty(prefix + PROP_IP_RESTRICTION, ""+_IPRestriction); + if (!_isInbound) + props.setProperty(prefix + PROP_PRIORITY, Integer.toString(_priority)); for (Map.Entry e : _unknownOptions.entrySet()) { String name = (String) e.getKey(); String val = (String) e.getValue(); diff --git a/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java b/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java index d67dcc39f3..db40ff00ad 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java @@ -20,6 +20,7 @@ class OutboundReceiver implements TunnelGateway.Receiver { private final Log _log; private final TunnelCreatorConfig _config; private RouterInfo _nextHopCache; + private final int _priority; private static final long MAX_LOOKUP_TIME = 15*1000; private static final int PRIORITY = OutNetMessage.PRIORITY_MY_DATA; @@ -29,6 +30,7 @@ class OutboundReceiver implements TunnelGateway.Receiver { _log = ctx.logManager().getLog(OutboundReceiver.class); _config = cfg; _nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1)); + _priority = PRIORITY + cfg.getPriority(); // all createRateStat() in TunnelDispatcher } @@ -73,7 +75,7 @@ class OutboundReceiver implements TunnelGateway.Receiver { m.setMessage(msg); m.setExpiration(msg.getMessageExpiration()); m.setTarget(ri); - m.setPriority(PRIORITY); + m.setPriority(_priority); _context.outNetMessagePool().add(m); _config.incrementProcessedMessages(); } diff --git a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java index 7f81424450..a294238f4f 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java @@ -4,6 +4,7 @@ import net.i2p.data.Hash; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.TunnelDataMessage; +import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -26,7 +27,7 @@ class OutboundTunnelEndpoint { _config = config; _processor = processor; _handler = new RouterFragmentHandler(ctx, new DefragmentedHandler()); - _outDistributor = new OutboundMessageDistributor(ctx, 200); + _outDistributor = new OutboundMessageDistributor(ctx, OutNetMessage.PRIORITY_PARTICIPATING); } public void dispatch(TunnelDataMessage msg, Hash recvFrom) { _config.incrementProcessedMessages(); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java index 5306179a35..f8ff4ed4a4 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java @@ -34,10 +34,12 @@ public class TunnelCreatorConfig implements TunnelInfo { private boolean _failed; private int _failures; private boolean _reused; + private int _priority; public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound) { this(ctx, length, isInbound, null); } + public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) { _context = ctx; if (length <= 0) @@ -204,6 +206,20 @@ public class TunnelCreatorConfig implements TunnelInfo { */ public void setReused() { _reused = true; } + /** + * Outbound message priority - for outbound tunnels only + * @return -25 to +25, default 0 + * @since 0.9.4 + */ + public int getPriority() { return _priority; } + + /** + * Outbound message priority - for outbound tunnels only + * @param priority -25 to +25, default 0 + * @since 0.9.4 + */ + public void setPriority(int priority) { _priority = priority; } + @Override public String toString() { // H0:1235-->H1:2345-->H2:2345 diff --git a/router/java/src/net/i2p/router/tunnel/TunnelGatewayZeroHop.java b/router/java/src/net/i2p/router/tunnel/TunnelGatewayZeroHop.java index a4c048eb95..818c88bd10 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelGatewayZeroHop.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelGatewayZeroHop.java @@ -6,6 +6,7 @@ import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessageException; import net.i2p.data.i2np.TunnelGatewayMessage; import net.i2p.data.i2np.UnknownI2NPMessage; +import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -27,7 +28,7 @@ class TunnelGatewayZeroHop extends TunnelGateway { if (config.isInbound()) _inDistributor = new InboundMessageDistributor(context, config.getDestination()); else - _outDistributor = new OutboundMessageDistributor(context, 400); + _outDistributor = new OutboundMessageDistributor(context, OutNetMessage.PRIORITY_MY_DATA); } /** diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 8304a1c709..d14c32420d 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -1128,7 +1128,9 @@ public class TunnelPool { // don't need to worry about prev/next hop } cfg.setExpiration(expiration); - + if (!settings.isInbound()) + cfg.setPriority(settings.getPriority()); + if (_log.shouldLog(Log.DEBUG)) _log.debug("Config contains " + peers + ": " + cfg); synchronized (_inProgress) { diff --git a/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java b/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java index c384f5e7bc..b4da749014 100644 --- a/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java +++ b/router/java/src/net/i2p/router/util/CoDelPriorityBlockingQueue.java @@ -295,7 +295,9 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking private void drop(E entry) { long delay = _context.clock().now() - entry.getEnqueueTime(); - _context.statManager().addRateData(STAT_DROP + entry.getPriority(), delay); + // round down for the stat + int priority = entry.getPriority() / 100 * 100; + _context.statManager().addRateData(STAT_DROP + priority, delay); if (_log.shouldLog(Log.WARN)) _log.warn("CDPQ #" + _id + ' ' + _name + " dropped item with delay " + delay + ", priority " + entry.getPriority() + ", seq " + -- GitLab