diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index d2c5f7ead2f91fe0a4a25af69671c78b4ee49191..aba7921933fd2c4d32188aa615705a39de52701e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -5,7 +5,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -59,7 +58,7 @@ class Connection { private final boolean _isInbound; private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ - private final SortedMap<Long, PacketLocal> _outboundPackets; + private final TreeMap<Long, PacketLocal> _outboundPackets; private final PacketQueue _outboundQueue; private final ConnectionPacketHandler _handler; private ConnectionOptions _options; @@ -1481,19 +1480,19 @@ class Connection { congestionOccurred(); // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6) - final long now = _context.clock().now(); pushBackRTO(_options.doubleRTO()); // 2. cut ssthresh to bandwidth estimate, window to 1 List<PacketLocal> toResend = null; synchronized(_outboundPackets) { - if (_outboundPackets.isEmpty()) { + Map.Entry<Long, PacketLocal> e = _outboundPackets.firstEntry(); + if (e == null) { if (_log.shouldLog(Log.WARN)) _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??"); return; } - PacketLocal oldest = _outboundPackets.get(_outboundPackets.firstKey()); + PacketLocal oldest = e.getValue(); if (oldest.getNumSends() == 1) { if (_log.shouldLog(Log.DEBUG)) _log.debug(Connection.this + " cutting ssthresh and window"); @@ -1574,7 +1573,7 @@ class Connection { } if (sentAny) { - _lastSendTime = now; + _lastSendTime = _context.clock().now(); resetActivityTimer(); windowAdjusted(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java index ba78a215bed78a0ea48154a7950c8d04456039d8..398590d24684d590b10f1859bc08d5a9e652796c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java @@ -135,7 +135,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final String PROP_TAG_THRESHOLD = "crypto.lowTagThreshold"; - private static final int TREND_COUNT = 3; + //private static final int TREND_COUNT = 3; /** RFC 5681 sec. 3.1 */ static final int INITIAL_WINDOW_SIZE = 3; static final int DEFAULT_MAX_SENDS = 8; @@ -166,7 +166,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ private static final boolean DEFAULT_ENFORCE_PROTO = true; - private final int _trend[] = new int[TREND_COUNT]; + //private final int _trend[] = new int[TREND_COUNT]; /** * OK, here is the calculation on the message size to fit in a single @@ -632,6 +632,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * not public, use updateRTT() */ private void setRTT(int ms) { +/* synchronized (_trend) { _trend[0] = _trend[1]; _trend[1] = _trend[2]; @@ -642,6 +643,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { else _trend[2] = 0; } +*/ synchronized(this) { _rtt = ms; @@ -711,9 +713,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have * 3 consecutive rtt decreases, we are trending downwards (-1), else we're stable. * + * @deprecated unused as of 0.9.51 * @return positive/flat/negative trend in round trip time */ + @Deprecated public int getRTTTrend() { +/* synchronized (_trend) { for (int i = 0; i < TREND_COUNT - 1; i++) { if (_trend[i] != _trend[i+1]) @@ -721,6 +726,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl { } return _trend[0]; } +*/ + return 0; } /** diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java index 4a39ef8b7f3303fc8b5f3b40479c2332c07615d6..8e1ed8275f545dc89433169174c29d17bb2627e9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java @@ -41,7 +41,7 @@ class ConnectionPacketHandler { _context.statManager().createRateStat("stream.con.packetsAckedPerMessageReceived", "Avg number of acks in a message", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.sendsBeforeAck", "How many times a message was sent before it was ACKed?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.resetReceived", "How many messages had we sent successfully before receiving a RESET?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); - _context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 }); + //_context.statManager().createRateStat("stream.trend", "What direction the RTT is trending in (with period = windowsize)", "Stream", new long[] { 60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.initialRTT.in", "What is the actual RTT for the first packet of an inbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.con.initialRTT.out", "What is the actual RTT for the first packet of an outbound conn?", "Stream", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createFrequencyStat("stream.ack.dup.immediate","How often duplicate packets get acked immediately","Stream",new long[] { 10*60*1000, 60*60*1000 }); @@ -453,9 +453,9 @@ class ConnectionPacketHandler { int oldWindow = con.getOptions().getWindowSize(); int newWindowSize = oldWindow; - int trend = con.getOptions().getRTTTrend(); + //int trend = con.getOptions().getRTTTrend(); - _context.statManager().addRateData("stream.trend", trend, newWindowSize); + //_context.statManager().addRateData("stream.trend", trend, newWindowSize); if ( (!congested) && (acked > 0) ) { int ssthresh = con.getSSThresh(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java index 2ff924081e226d3b0df4dfd754544f7d2e968b2a..9c1cae9981c3737b98e5c2cbe277b47c5b9bd116 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java @@ -153,6 +153,7 @@ class MessageInputStream extends InputStream { if (_log.shouldWarn()) _log.warn("Dropping message " + messageId + ", inbound buffer exceeded: available = " + available); + _dataLock.notifyAll(); return false; } // following code screws up if available < 0 @@ -161,12 +162,14 @@ class MessageInputStream extends InputStream { if (_log.shouldWarn()) _log.warn("Dropping message " + messageId + ", inbound buffer exceeded: " + _highestReadyBlockId + '/' + (_highestReadyBlockId + allowedBlocks) + '/' + available); + _dataLock.notifyAll(); return false; } // This prevents us from getting DoSed by accepting unlimited in-order small messages if (_readyDataBlocks.size() >= 4 * _maxWindowSize) { if (_log.shouldWarn()) _log.warn("Dropping message " + messageId + ", too many ready blocks"); + _dataLock.notifyAll(); return false; } } @@ -327,7 +330,6 @@ class MessageInputStream extends InputStream { if (messageId <= _highestReadyBlockId) { if (_log.shouldLog(Log.INFO)) _log.info("ignoring dup message " + messageId); - _dataLock.notifyAll(); return false; // already received } if (messageId > _highestBlockId) @@ -353,6 +355,7 @@ class MessageInputStream extends InputStream { cur++; _highestReadyBlockId++; } + _dataLock.notifyAll(); } else { // _notYetReadyBlocks size is limited in canAccept() if (_locallyClosed) { @@ -366,7 +369,6 @@ class MessageInputStream extends InputStream { _notYetReadyBlocks.put(Long.valueOf(messageId), payload); } } - _dataLock.notifyAll(); } return true; }