From 4a029b7853eb88f8d081616fefe305dd524be929 Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 18 Nov 2004 19:42:11 +0000 Subject: [PATCH] * if we timeout connecting or otherwise need to cancel tags that we've sent, go one step further and cancel all of the tags we're using for that peer so that we can react to their potential restart / tag loss quicker. * use the minimum resend delay as the base to be exponentiated if our RTT is too low (so we resend less) * dont be such a wuss when flushing a closed stream --- .../net/i2p/client/streaming/Connection.java | 31 +++++++++++++++---- .../client/streaming/MessageOutputStream.java | 5 +-- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 589fe66251..fcd388369e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -336,14 +336,19 @@ public class Connection { _inputStream.close(); } else { doClose(); + boolean tagsCancelled = false; synchronized (_outboundPackets) { for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { PacketLocal pl = (PacketLocal)iter.next(); + if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) ) + tagsCancelled = true; pl.cancelled(); } _outboundPackets.clear(); _outboundPackets.notifyAll(); } + if (tagsCancelled) + _context.sessionKeyManager().failTags(_remotePeer.getPublicKey()); } if (removeFromConMgr) { if (!_disconnectScheduled) { @@ -379,15 +384,21 @@ public class Connection { + toString()); _connectionManager.removeConnection(this); } - + + boolean tagsCancelled = false; synchronized (_outboundPackets) { for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { PacketLocal pl = (PacketLocal)iter.next(); + if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) ) + tagsCancelled = true; pl.cancelled(); } _outboundPackets.clear(); _outboundPackets.notifyAll(); - } + } + if (tagsCancelled) + _context.sessionKeyManager().failTags(_remotePeer.getPublicKey()); + } private class DisconnectEvent implements SimpleTimer.TimedEvent { @@ -672,7 +683,13 @@ public class Connection { } public void timeReached() { - if (!_connected) return; + if (_packet.getAckTime() > 0) + return; + + if (!_connected) { + _packet.cancelled(); + return; + } //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Resend period reached for " + _packet); @@ -732,12 +749,14 @@ public class Connection { if (numSends > _options.getMaxResends()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Too many resends"); + _packet.cancelled(); disconnect(false); } else { //long timeout = _options.getResendDelay() << numSends; - long timeout = _options.getRTT() << (numSends-1); - if (timeout < MIN_RESEND_DELAY) - timeout = MIN_RESEND_DELAY; + long rtt = _options.getRTT(); + if (rtt < MIN_RESEND_DELAY) + rtt = MIN_RESEND_DELAY; + long timeout = rtt << (numSends-1); if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) ) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 55b7684ca4..2afa3be50d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -197,13 +197,14 @@ public class MessageOutputStream extends OutputStream { void flushAvailable(DataReceiver target, boolean blocking) throws IOException { WriteStatus ws = null; synchronized (_dataLock) { - if (_buf == null) throw new IOException("closed (buffer went away)"); + // _buf may be null, but the data receiver can handle that just fine, + // deciding whether or not to send a packet ws = target.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; _dataLock.notifyAll(); } - if (blocking) { + if (blocking && ws != null) { ws.waitForAccept(_writeTimeout); if (ws.writeFailed()) throw new IOException("Flush available failed");