* OutNetMessage: Properly clean up when dropped by codel (but unused for now

since codel is disabled for ONM)
 * Tunnels: Implement per-client outbound tunnel message priority (ticket #719)
 * ClientTunnelSettings cleanup
This commit is contained in:
zzz
2012-10-29 22:21:50 +00:00
parent f5165cfae5
commit 40d981df25
9 changed files with 69 additions and 17 deletions

View File

@@ -16,8 +16,8 @@ import java.util.Properties;
*
*/
public class ClientTunnelSettings {
private TunnelPoolSettings _inboundSettings;
private TunnelPoolSettings _outboundSettings;
private final TunnelPoolSettings _inboundSettings;
private final TunnelPoolSettings _outboundSettings;
public ClientTunnelSettings() {
_inboundSettings = new TunnelPoolSettings(false, true);
@@ -25,16 +25,16 @@ public class ClientTunnelSettings {
}
public TunnelPoolSettings getInboundSettings() { return _inboundSettings; }
public void setInboundSettings(TunnelPoolSettings settings) { _inboundSettings = settings; }
//public void setInboundSettings(TunnelPoolSettings settings) { _inboundSettings = settings; }
public TunnelPoolSettings getOutboundSettings() { return _outboundSettings; }
public void setOutboundSettings(TunnelPoolSettings settings) { _outboundSettings = settings; }
//public void setOutboundSettings(TunnelPoolSettings settings) { _outboundSettings = settings; }
public void readFromProperties(Properties props) {
_inboundSettings.readFromProperties("inbound.", props);
_outboundSettings.readFromProperties("outbound.", props);
}
public void writeToProperties(Properties props) {
private void writeToProperties(Properties props) {
if (props == null) return;
_inboundSettings.writeToProperties("inbound.", props);
_outboundSettings.writeToProperties("outbound.", props);

View File

@@ -66,8 +66,9 @@ public class OutNetMessage implements CDPQEntry {
public static final int PRIORITY_HIGHEST = 1000;
public static final int PRIORITY_MY_BUILD_REQUEST = 500;
public static final int PRIORITY_MY_NETDB_LOOKUP = 500;
public static final int PRIORITY_MY_NETDB_STORE = 400;
public static final int PRIORITY_MY_DATA = 400;
public static final int PRIORITY_MY_NETDB_STORE = 460;
/** may be adjusted +/- 25 for outbound traffic */
public static final int PRIORITY_MY_DATA = 425;
public static final int PRIORITY_MY_NETDB_STORE_LOW = 300;
public static final int PRIORITY_HIS_BUILD_REQUEST = 300;
public static final int PRIORITY_BUILD_REPLY = 300;
@@ -308,6 +309,18 @@ public class OutNetMessage implements CDPQEntry {
* @since 0.9.3
*/
public void drop() {
// This is essentially what TransportImpl.afterSend(this, false) does
// but we don't have a ref to the Transport.
// No requeue with other transport allowed.
if (_onFailedSend != null)
_context.jobQueue().addJob(_onFailedSend);
if (_onFailedReply != null)
_context.jobQueue().addJob(_onFailedReply);
if (_replySelector != null)
_context.messageRegistry().unregisterPending(this);
discardData();
// we want this stat to reflect the lag
_context.statManager().addRateData("transport.sendProcessingTime", _context.clock().now() - _enqueueTime);
}
/**
@@ -333,11 +346,11 @@ public class OutNetMessage implements CDPQEntry {
public void discardData() {
if ( (_message != null) && (_messageSize <= 0) )
_messageSize = _message.getMessageSize();
if (_log.shouldLog(Log.DEBUG)) {
long timeToDiscard = _context.clock().now() - _created;
_log.debug("Discard " + _messageSize + "byte " + getMessageType() + " message after "
+ timeToDiscard);
}
//if (_log.shouldLog(Log.DEBUG)) {
// long timeToDiscard = _context.clock().now() - _created;
// _log.debug("Discard " + _messageSize + "byte " + getMessageType() + " message after "
// + timeToDiscard);
//}
_message = null;
//_context.statManager().addRateData("outNetMessage.timeToDiscard", timeToDiscard, timeToDiscard);
//_context.messageStateMonitor().outboundMessageDiscarded();

View File

@@ -27,6 +27,7 @@ public class TunnelPoolSettings {
private int _IPRestriction;
private final Properties _unknownOptions;
private final Hash _randomKey;
private int _priority;
/** prefix used to override the router's defaults for clients */
public static final String PREFIX_DEFAULT = "router.defaultPool.";
@@ -44,6 +45,7 @@ public class TunnelPoolSettings {
public static final String PROP_LENGTH_VARIANCE = "lengthVariance";
public static final String PROP_ALLOW_ZERO_HOP = "allowZeroHop";
public static final String PROP_IP_RESTRICTION = "IPRestriction";
public static final String PROP_PRIORITY = "priority";
public static final int DEFAULT_QUANTITY = 2;
public static final int DEFAULT_BACKUP_QUANTITY = 0;
@@ -53,6 +55,8 @@ public class TunnelPoolSettings {
public static final int DEFAULT_LENGTH_VARIANCE = 0;
public static final boolean DEFAULT_ALLOW_ZERO_HOP = true;
public static final int DEFAULT_IP_RESTRICTION = 2; // class B (/16)
private static final int MIN_PRIORITY = -25;
private static final int MAX_PRIORITY = 25;
public TunnelPoolSettings(boolean isExploratory, boolean isInbound) {
_isExploratory = isExploratory;
@@ -160,6 +164,13 @@ public class TunnelPoolSettings {
public int getIPRestriction() { int r = _IPRestriction; if (r>4) r=4; else if (r<0) r=0; return r;}
public void setIPRestriction(int b) { _IPRestriction = b; }
/**
* Outbound message priority - for outbound tunnels only
* @return -25 to +25, default 0
* @since 0.9.4
*/
public int getPriority() { return _priority; }
public Properties getUnknownOptions() { return _unknownOptions; }
public void readFromProperties(String prefix, Map<Object, Object> props) {
@@ -185,6 +196,8 @@ public class TunnelPoolSettings {
_destinationNickname = value;
else if (name.equalsIgnoreCase(prefix + PROP_IP_RESTRICTION))
_IPRestriction = getInt(value, DEFAULT_IP_RESTRICTION);
else if ((!_isInbound) && name.equalsIgnoreCase(prefix + PROP_PRIORITY))
_IPRestriction = Math.min(MAX_PRIORITY, Math.max(MIN_PRIORITY, getInt(value, 0)));
else
_unknownOptions.setProperty(name.substring((prefix != null ? prefix.length() : 0)), value);
}
@@ -203,6 +216,8 @@ public class TunnelPoolSettings {
props.setProperty(prefix + PROP_QUANTITY, ""+_quantity);
// props.setProperty(prefix + PROP_REBUILD_PERIOD, ""+_rebuildPeriod);
props.setProperty(prefix + PROP_IP_RESTRICTION, ""+_IPRestriction);
if (!_isInbound)
props.setProperty(prefix + PROP_PRIORITY, Integer.toString(_priority));
for (Map.Entry e : _unknownOptions.entrySet()) {
String name = (String) e.getKey();
String val = (String) e.getValue();

View File

@@ -20,6 +20,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
private final Log _log;
private final TunnelCreatorConfig _config;
private RouterInfo _nextHopCache;
private final int _priority;
private static final long MAX_LOOKUP_TIME = 15*1000;
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_DATA;
@@ -29,6 +30,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
_log = ctx.logManager().getLog(OutboundReceiver.class);
_config = cfg;
_nextHopCache = _context.netDb().lookupRouterInfoLocally(_config.getPeer(1));
_priority = PRIORITY + cfg.getPriority();
// all createRateStat() in TunnelDispatcher
}
@@ -73,7 +75,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
m.setMessage(msg);
m.setExpiration(msg.getMessageExpiration());
m.setTarget(ri);
m.setPriority(PRIORITY);
m.setPriority(_priority);
_context.outNetMessagePool().add(m);
_config.incrementProcessedMessages();
}

View File

@@ -4,6 +4,7 @@ import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelDataMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
@@ -26,7 +27,7 @@ class OutboundTunnelEndpoint {
_config = config;
_processor = processor;
_handler = new RouterFragmentHandler(ctx, new DefragmentedHandler());
_outDistributor = new OutboundMessageDistributor(ctx, 200);
_outDistributor = new OutboundMessageDistributor(ctx, OutNetMessage.PRIORITY_PARTICIPATING);
}
public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
_config.incrementProcessedMessages();

View File

@@ -34,10 +34,12 @@ public class TunnelCreatorConfig implements TunnelInfo {
private boolean _failed;
private int _failures;
private boolean _reused;
private int _priority;
public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound) {
this(ctx, length, isInbound, null);
}
public TunnelCreatorConfig(RouterContext ctx, int length, boolean isInbound, Hash destination) {
_context = ctx;
if (length <= 0)
@@ -204,6 +206,20 @@ public class TunnelCreatorConfig implements TunnelInfo {
*/
public void setReused() { _reused = true; }
/**
* Outbound message priority - for outbound tunnels only
* @return -25 to +25, default 0
* @since 0.9.4
*/
public int getPriority() { return _priority; }
/**
* Outbound message priority - for outbound tunnels only
* @param priority -25 to +25, default 0
* @since 0.9.4
*/
public void setPriority(int priority) { _priority = priority; }
@Override
public String toString() {
// H0:1235-->H1:2345-->H2:2345

View File

@@ -6,6 +6,7 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.data.i2np.UnknownI2NPMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
@@ -27,7 +28,7 @@ class TunnelGatewayZeroHop extends TunnelGateway {
if (config.isInbound())
_inDistributor = new InboundMessageDistributor(context, config.getDestination());
else
_outDistributor = new OutboundMessageDistributor(context, 400);
_outDistributor = new OutboundMessageDistributor(context, OutNetMessage.PRIORITY_MY_DATA);
}
/**

View File

@@ -1128,7 +1128,9 @@ public class TunnelPool {
// don't need to worry about prev/next hop
}
cfg.setExpiration(expiration);
if (!settings.isInbound())
cfg.setPriority(settings.getPriority());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Config contains " + peers + ": " + cfg);
synchronized (_inProgress) {

View File

@@ -295,7 +295,9 @@ public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlocking
private void drop(E entry) {
long delay = _context.clock().now() - entry.getEnqueueTime();
_context.statManager().addRateData(STAT_DROP + entry.getPriority(), delay);
// round down for the stat
int priority = entry.getPriority() / 100 * 100;
_context.statManager().addRateData(STAT_DROP + priority, delay);
if (_log.shouldLog(Log.WARN))
_log.warn("CDPQ #" + _id + ' ' + _name + " dropped item with delay " + delay + ", priority " +
entry.getPriority() + ", seq " +