diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java index 4d8b85a8177e002b8356a71448f1eed7c7cf6274..d0dc227ec1da09fde27392cfd4caab52ac4ee0a0 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java @@ -9,7 +9,6 @@ import java.util.ArrayList; import java.util.List; import java.util.StringTokenizer; -import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PSocket; import net.i2p.data.DataFormatException; import net.i2p.data.Destination; @@ -124,7 +123,7 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable } if (size == 1) // skip the rand in the most common case return dests.get(0); - int index = I2PAppContext.getGlobalContext().random().nextInt(size); + int index = _context.random().nextInt(size); return dests.get(index); } @@ -182,6 +181,8 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable } outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 output.write(outmsg.getBytes("ISO-8859-1")); + // probably doesn't do much but can't hurt + output.flush(); } else { if (_log.shouldLog(Log.WARN)) _log.warn("inbound BLOCKED: "+inmsg); @@ -257,6 +258,8 @@ public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable } outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 output.write(outmsg.getBytes("ISO-8859-1")); + // save 250 ms in streaming + output.flush(); } else { if (_log.shouldLog(Log.WARN)) _log.warn("outbound BLOCKED: "+"\""+inmsg+"\""); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index c2a014020d2df2c74d34c1422bf0d9249afb19fd..0e1c9049f84b8ea5a9c57f314a16bf7b5b150176 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -129,7 +129,14 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr // do NOT flush here, it will block and then onTimeout.run() won't happen on fail. // But if we don't flush, then we have to wait for the connectDelay timer to fire // in i2p socket? To be researched and/or fixed. - //i2pout.flush(); + // + // AS OF 0.8.1, MessageOutputStream.flush() is fixed to only wait for accept, + // not for "completion" (i.e. an ACK from the far end). + // So we now get a fast return from flush(), and can do it here to save 250 ms. + // To make sure we are under the initial window size and don't hang waiting for accept, + // only flush if it fits in one message. + if (initialI2PData.length <= 1730) // ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE + i2pout.flush(); } } if (initialSocketData != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index bc8c923699cf61f225625981264630d36dd046e8..e662ecc60a95174749196a406bf4636b2cfe53aa 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -148,16 +148,21 @@ public class Connection { } /** + * This doesn't "send a choke". Rather, it blocks if the outbound window is full, + * thus choking the sender that calls this. + * * Block until there is an open outbound packet slot or the write timeout * expires. + * PacketLocal is the only caller, generally with -1. * - * @param timeoutMs PacketLocal is the only caller, often with -1?????? - * @return true if the packet should be sent + * @param timeoutMs 0 or negative means wait forever, 5 minutes max + * @return true if the packet should be sent, false for a fatal error + * will return false after 5 minutes even if timeoutMs is <= 0. */ boolean packetSendChoke(long timeoutMs) { // if (false) return true; // <--- what the fuck?? long start = _context.clock().now(); - long writeExpire = start + timeoutMs; + long writeExpire = start + timeoutMs; // only used if timeoutMs > 0 boolean started = false; while (true) { long timeLeft = writeExpire - _context.clock().now(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 1a7f0afbe82e313c6793168c702a087039d3d6d5..fd10d679cec33b72c459ecfeb416747f59f8fefb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -43,6 +43,10 @@ public class MessageOutputStream extends OutputStream { private long _sendPeriodBytes; private int _sendBps; + /** + * Since this is less than i2ptunnel's i2p.streaming.connectDelay default of 1000, + * we only wait 250 at the start. Guess that's ok, 1000 is too long anyway. + */ private static final int DEFAULT_PASSIVE_FLUSH_DELAY = 250; public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { @@ -273,8 +277,18 @@ public class MessageOutputStream extends OutputStream { } /** - * Flush the data already queued up, blocking until it has been - * delivered. + * Flush the data already queued up, blocking only if the outbound + * window is full. + * + * Prior to 0.8.1, this blocked until "delivered". + * "Delivered" meant "received an ACK from the far end", + * which is not the commom implementation of flush(), and really hurt the + * performance of i2psnark, which flush()ed frequently. + * Calling flush() would cause a complete window stall. + * + * As of 0.8.1, only wait for accept into the streaming output queue. + * This will speed up snark significantly, and allow us to flush() + * the initial data in I2PTunnelRunner, saving 250 ms. * * @throws IOException if the write fails */ @@ -283,6 +297,14 @@ public class MessageOutputStream extends OutputStream { /* @throws InterruptedIOException if the write times out * Documented here, but doesn't belong in the javadoc. */ + flush(true); + } + + /** + * @param wait_for_accept_only see discussion in close() code + * @@since 0.8.1 + */ + private void flush(boolean wait_for_accept_only) throws IOException { long begin = _context.clock().now(); WriteStatus ws = null; if (_log.shouldLog(Log.INFO) && _valid > 0) @@ -297,14 +319,28 @@ public class MessageOutputStream extends OutputStream { throwAnyError(); return; } - ws = _dataReceiver.writeData(_buf, 0, _valid); - _written += _valid; - _valid = 0; - locked_updateBufferSize(); - _lastFlushed = _context.clock().now(); - _dataLock.notifyAll(); + // if valid == 0 return ??? - no, this could flush a CLOSE packet too. + + // Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below + // (disabled) + if (!wait_for_accept_only) { + ws = _dataReceiver.writeData(_buf, 0, _valid); + _written += _valid; + _valid = 0; + locked_updateBufferSize(); + _lastFlushed = _context.clock().now(); + _dataLock.notifyAll(); + } } + // Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1 + // must do this outside the data lock + if (wait_for_accept_only) { + flushAvailable(_dataReceiver, true); + return; + } + + // Wait a loooooong time, until we have the ACK if (_log.shouldLog(Log.DEBUG)) _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws); if (_closed && @@ -328,14 +364,28 @@ public class MessageOutputStream extends OutputStream { throwAnyError(); } + /** + * This does a flush, and BLOCKS until + * the CLOSE packet is acked. + */ @Override public void close() throws IOException { if (_closed) { synchronized (_dataLock) { _dataLock.notifyAll(); } return; } + // setting _closed before flush() will force flush() to send a CLOSE packet _closed = true; - flush(); + + // In 0.8.1 we rewrote flush() to only wait for accept into the window, + // not "completion" (i.e. ack from the far end). + // Unfortunately, that broke close(), at least in i2ptunnel HTTPClient. + // Symptom was premature close, i.e. incomplete pages and images. + // Possible cause - I2PTunnelRunner code? or the code here that follows flush()? + // It seems like we shouldn't have to wait for the far-end ACK for a close packet, + // should we? To be researched further. + // false -> wait for completion, not just accept. + flush(false); _log.debug("Output stream closed after writing " + _written); ByteArray ba = null; synchronized (_dataLock) { @@ -351,7 +401,11 @@ public class MessageOutputStream extends OutputStream { _dataCache.release(ba); } } - /** nonblocking close */ + + /** + * nonblocking close - + * Use outside of this package is deprecated, should be made package local + */ public void closeInternal() { _closed = true; if (_streamError == null) @@ -412,6 +466,8 @@ public class MessageOutputStream extends OutputStream { if (_log.shouldLog(Log.INFO) && _valid > 0) _log.info("flushAvailable() valid = " + _valid); synchronized (_dataLock) { + // if valid == 0 return ??? - no, this could flush a CLOSE packet too. + // _buf may be null, but the data receiver can handle that just fine, // deciding whether or not to send a packet ws = target.writeData(_buf, 0, _valid); @@ -457,14 +513,21 @@ public class MessageOutputStream extends OutputStream { /** Define a way to detect the status of a write */ public interface WriteStatus { - /** wait until the data written either fails or succeeds */ + /** + * Wait until the data written either fails or succeeds. + * Success means an ACK FROM THE FAR END. + * @param maxWaitMs -1 = forever + */ public void waitForCompletion(int maxWaitMs); + /** - * wait until the data written is accepted into the outbound pool, + * Wait until the data written is accepted into the outbound pool, + * (i.e. the outbound window is not full) * which we throttle rather than accept arbitrary data and queue - * @param maxWaitMs -1 = forever ? + * @param maxWaitMs -1 = forever */ public void waitForAccept(int maxWaitMs); + /** the write was accepted. aka did the socket not close? */ public boolean writeAccepted(); /** did the write fail? */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 674ff6179c531fcede87686b12d46b18a5f90823..cbe913e0505c028ac271af8399cb8b79e73c7bb5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -194,7 +194,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat } /** - * @param maxWaitMs MessageOutputStream is the only caller, often with -1 ?????? + * Blocks until outbound window is not full. See Connection.packetSendChoke(). + * @param maxWaitMs MessageOutputStream is the only caller, generally with -1 */ public void waitForAccept(int maxWaitMs) { if (_connection == null) @@ -220,6 +221,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat + toString()); } + /** block until the packet is acked from the far end */ public void waitForCompletion(int maxWaitMs) { long expiration = _context.clock().now()+maxWaitMs; while (true) {