From 646fe207262c998b87f3c253cc7a46ddc0f7e9f9 Mon Sep 17 00:00:00 2001 From: zzz Date: Mon, 19 Feb 2018 14:31:51 +0000 Subject: [PATCH] Streaming: Don't exceed configured tag settings when overriding --- .../i2p/client/streaming/impl/Connection.java | 4 +- .../streaming/impl/ConnectionOptions.java | 37 +++++++++++++ .../client/streaming/impl/PacketQueue.java | 53 ++++++++++++++----- 3 files changed, 79 insertions(+), 15 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index d10325f14..f80878e5e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -920,12 +920,12 @@ class Connection { /** * Retrieve the current ConnectionOptions. - * @return the current ConnectionOptions + * @return the current ConnectionOptions, non-null */ public ConnectionOptions getOptions() { return _options; } /** * Set the ConnectionOptions. - * @param opts ConnectionOptions + * @param opts ConnectionOptions non-null */ public void setOptions(ConnectionOptions opts) { _options = opts; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java index b3c7aae5e..dbf12fb43 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java @@ -53,6 +53,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private int _maxConns; private boolean _disableRejectLog; private String _limitAction; + private int _tagsToSend; + private int _tagThreshold; /** state of a connection */ private enum AckInit { @@ -125,6 +127,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final String PROP_DISABLE_REJ_LOG = "i2p.streaming.disableRejectLogging"; /** @since 0.9.34 reset,drop,http, or custom string, default reset */ public static final String PROP_LIMIT_ACTION = "i2p.streaming.limitAction"; + /** @since 0.9.34 */ + public static final String PROP_TAGS_TO_SEND = "crypto.tagsToSend"; + /** @since 0.9.34 */ + public static final String PROP_TAG_THRESHOLD = "crypto.lowTagThreshold"; private static final int TREND_COUNT = 3; @@ -139,7 +145,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND; private static final int DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = 1; private static final int DEFAULT_SLOW_START_GROWTH_RATE_FACTOR = 1; + /** @since 0.9.34 */ private static final String DEFAULT_LIMIT_ACTION = "reset"; + /** @since 0.9.34 */ + public static final int DEFAULT_TAGS_TO_SEND = 40; + /** @since 0.9.34 */ + public static final int DEFAULT_TAG_THRESHOLD = 30; /** @@ -352,6 +363,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay(); _maxConns = opts.getMaxConns(); _limitAction = opts.getLimitAction(); + _tagsToSend = opts.getTagsToSend(); + _tagThreshold = opts.getTagThreshold(); } /** @@ -395,6 +408,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _limitAction = DEFAULT_LIMIT_ACTION; _rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO); + _tagsToSend = getInt(opts, PROP_TAGS_TO_SEND, DEFAULT_TAGS_TO_SEND); + _tagsToSend = getInt(opts, PROP_TAG_THRESHOLD, DEFAULT_TAG_THRESHOLD); } /** @@ -464,6 +479,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxConns = getInt(opts, PROP_MAX_STREAMS, 0); if (opts.getProperty(PROP_LIMIT_ACTION) != null) _limitAction = opts.getProperty(PROP_LIMIT_ACTION); + if (opts.getProperty(PROP_TAGS_TO_SEND) != null) + _maxConns = getInt(opts, PROP_TAGS_TO_SEND, DEFAULT_TAGS_TO_SEND); + if (opts.getProperty(PROP_TAG_THRESHOLD) != null) + _maxConns = getInt(opts, PROP_TAG_THRESHOLD, DEFAULT_TAG_THRESHOLD); _rto = getInt(opts, PROP_INITIAL_RTO, INITIAL_RTO); } @@ -791,6 +810,24 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public String getLimitAction() { return _limitAction; } + /** + * This option is mostly handled on the router side, + * but PacketQueue also needs to know, so that when + * it overrides, it doesn't exceed the setting. + * + * @since 0.9.34 + */ + public int getTagsToSend() { return _tagsToSend; } + + /** + * This option is mostly handled on the router side, + * but PacketQueue also needs to know, so that when + * it overrides, it doesn't exceed the setting. + * + * @since 0.9.34 + */ + public int getTagThreshold() { return _tagThreshold; } + private void initLists(ConnectionOptions opts) { _accessList = opts.getAccessList(); _blackList = opts.getBlacklist(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java index 5d807ddc9..50332dc54 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java @@ -125,16 +125,8 @@ class PacketQueue implements SendMessageStatusListener, Closeable { if (expires > 0) options.setDate(expires); boolean listenForStatus = false; - if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) { - if (con != null) { - if (con.isInbound()) - options.setSendLeaseSet(false); - else if (ENABLE_STATUS_LISTEN) - listenForStatus = true; - } - options.setTagsToSend(INITIAL_TAGS_TO_SEND); - options.setTagThreshold(MIN_TAG_THRESHOLD); - } else if (packet.isFlagSet(FLAGS_FINAL_TAGS)) { + // FINAL trumps INITIAL, in the case of SYN+CLOSE + if (packet.isFlagSet(FLAGS_FINAL_TAGS)) { if (packet.isFlagSet(Packet.FLAG_ECHO)) { // Send LS for PING, not for PONG if (packet.getSendStreamId() <= 0) // pong @@ -142,16 +134,51 @@ class PacketQueue implements SendMessageStatusListener, Closeable { } else { options.setSendLeaseSet(false); } - options.setTagsToSend(FINAL_TAGS_TO_SEND); - options.setTagThreshold(FINAL_TAG_THRESHOLD); + int sendTags = FINAL_TAGS_TO_SEND; + int tagThresh = FINAL_TAG_THRESHOLD; + if (con != null) { + ConnectionOptions copts = con.getOptions(); + int cSendTags = copts.getTagsToSend(); + int cTagThresh = copts.getTagThreshold(); + if (cSendTags < sendTags) + sendTags = cSendTags; + if (cTagThresh < tagThresh) + tagThresh = cTagThresh; + } + options.setTagsToSend(sendTags); + options.setTagThreshold(tagThresh); + } else if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) { + if (con != null) { + if (con.isInbound()) + options.setSendLeaseSet(false); + else if (ENABLE_STATUS_LISTEN) + listenForStatus = true; + } + int sendTags = INITIAL_TAGS_TO_SEND; + int tagThresh = MIN_TAG_THRESHOLD; + if (con != null) { + ConnectionOptions copts = con.getOptions(); + int cSendTags = copts.getTagsToSend(); + int cTagThresh = copts.getTagThreshold(); + if (cSendTags < sendTags) + sendTags = cSendTags; + if (cTagThresh < tagThresh) + tagThresh = cTagThresh; + } + options.setTagsToSend(sendTags); + options.setTagThreshold(tagThresh); } else { if (con != null) { if (con.isInbound() && con.getLifetime() < 2*60*1000) options.setSendLeaseSet(false); // increase threshold with higher window sizes to prevent stalls // after tag delivery failure - int wdw = con.getOptions().getWindowSize(); + ConnectionOptions copts = con.getOptions(); + int wdw = copts.getWindowSize(); int thresh = Math.max(MIN_TAG_THRESHOLD, wdw * TAG_WINDOW_FACTOR); + int cTagThresh = copts.getTagThreshold(); + if (cTagThresh < thresh) + thresh = cTagThresh; options.setTagThreshold(thresh); } }