diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index d73ae8c43..6161723d3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -311,16 +311,20 @@ public class Connection { } List ackPackets(long ackThrough, long nacks[]) { - if (nacks == null) { - _highestAckedThrough = ackThrough; + if (ackThrough < _highestAckedThrough) { + // dupack which won't tell us anything } else { - long lowest = -1; - for (int i = 0; i < nacks.length; i++) { - if ( (lowest < 0) || (nacks[i] < lowest) ) - lowest = nacks[i]; + if (nacks == null) { + _highestAckedThrough = ackThrough; + } else { + long lowest = -1; + for (int i = 0; i < nacks.length; i++) { + if ( (lowest < 0) || (nacks[i] < lowest) ) + lowest = nacks[i]; + } + if (lowest - 1 > _highestAckedThrough) + _highestAckedThrough = lowest - 1; } - if (lowest - 1 > _highestAckedThrough) - _highestAckedThrough = lowest - 1; } List acked = null; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 2e845e8f1..6a319fbdb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -100,10 +100,12 @@ public class ConnectionPacketHandler { (packet.getReceiveStreamId() <= 0) ) ) allowAck = false; - if (allowAck) + if (allowAck) { isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); - else - isNew = con.getInputStream().messageReceived(con.getInputStream().getHighestReadyBockId(), null); + } else { + con.getInputStream().notifyActivity(); + isNew = false; + } if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { if (_log.shouldLog(Log.DEBUG)) @@ -166,10 +168,16 @@ public class ConnectionPacketHandler { } con.eventOccurred(); if (fastAck) { - if (con.getLastSendTime() + 2000 < _context.clock().now()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Fast ack for dup " + packet); - con.ackImmediately(); + if (!isNew) { + // if we're congested (fastAck) but this is also a new packet, + // we've already scheduled an ack above, so there is no need to schedule + // a fast ack (we can wait a few ms) + } else { + if (con.getLastSendTime() + 2000 < _context.clock().now()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Fast ack for dup " + packet); + con.ackImmediately(); + } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 1b608005d..c894f98e3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -193,6 +193,8 @@ public class MessageInputStream extends InputStream { } } + public void notifyActivity() { synchronized (_dataLock) { _dataLock.notifyAll(); } } + /** * A new message has arrived - toss it on the appropriate queue (moving * previously pending messages to the ready queue if it fills the gap, etc). diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 06418beae..16e0c56c2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -222,7 +222,7 @@ public class Packet { return (_payload == null ? 0 : _payload.getValid()); } public void releasePayload() { - _payload = null; + //_payload = null; } public ByteArray acquirePayload() { ByteArray old = _payload; diff --git a/history.txt b/history.txt index 6fad8e1fb..ed5f3de10 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.273 2005/09/30 12:51:32 ragnarok Exp $ +$Id: history.txt,v 1.274 2005/09/30 15:29:19 jrandom Exp $ + +2005-09-30 jrandom + * Killed three more streaming lib bugs, one of which caused excess packets + to be transmitted (dupacking dupacks), one that was the root of many of + the old hung streams (shrinking highest received), and another that was + releasing data too soon. 2005-09-30 jrandom * Only allow autodetection of our IP address if we haven't received an diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 097a49946..c96469510 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.251 $ $Date: 2005/09/30 02:17:57 $"; + public final static String ID = "$Revision: 1.252 $ $Date: 2005/09/30 15:29:19 $"; public final static String VERSION = "0.6.1"; - public final static long BUILD = 2; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);