diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index aa826f7999cb9a82b8c340ecd380cd6743892a4b..d9dd9b48869c7fce1e8ebcd41ea15932aefbf9ce 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -152,6 +152,7 @@ public class Connection { * Block until there is an open outbound packet slot or the write timeout * expires. * + * @param timeoutMs PacketLocal is the only caller, often with -1?????? * @return true if the packet should be sent */ boolean packetSendChoke(long timeoutMs) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index ab7c9374a3bdbbd51cf767ce33379df28f6a8b30..de0a7422834d4e47605c7b507034b81949b2eb03 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -16,8 +16,8 @@ import net.i2p.util.SimpleTimer2; * to the data receiver's needs. */ public class MessageOutputStream extends OutputStream { - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; private byte _buf[]; private int _valid; private final Object _dataLock; @@ -27,7 +27,7 @@ public class MessageOutputStream extends OutputStream { private long _written; private int _writeTimeout; private ByteCache _dataCache; - private Flusher _flusher; + private final Flusher _flusher; private long _lastFlushed; private long _lastBuffered; /** if we enqueue data but don't flush it in this period, flush it passively */ @@ -128,6 +128,8 @@ public class MessageOutputStream extends OutputStream { throwAnyError(); return; } + if (_log.shouldLog(Log.INFO)) + _log.info("write() direct valid = " + _valid); ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; @@ -138,26 +140,31 @@ public class MessageOutputStream extends OutputStream { } } if (ws != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Waiting " + _writeTimeout + "ms for accept of " + ws); + if (_log.shouldLog(Log.INFO)) + _log.info("Waiting " + _writeTimeout + "ms for accept of " + ws); // ok, we've actually added a new packet - lets wait until // its accepted into the queue before moving on (so that we // dont fill our buffer instantly) ws.waitForAccept(_writeTimeout); if (!ws.writeAccepted()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Write not accepted of " + ws); if (_writeTimeout > 0) throw new InterruptedIOException("Write not accepted within timeout: " + ws); else throw new IOException("Write not accepted into the queue: " + ws); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("After waitForAccept of " + ws); } } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Queued " + len + " without sending to the receiver"); + if (_log.shouldLog(Log.INFO)) + _log.info("Queued " + len + " without sending to the receiver"); } } long elapsed = _context.clock().now() - begin; - if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) ) - _log.debug("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo")); + if ( (elapsed > 10*1000) && (_log.shouldLog(Log.INFO)) ) + _log.info("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo")); throwAnyError(); updateBps(len); } @@ -215,8 +222,8 @@ public class MessageOutputStream extends OutputStream { // We've seen the queue blow up before, maybe it was this before the rewrite... // So perhaps it IS wise to be "overly worried" ... forceReschedule(_passiveFlushDelay); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Enqueueing the flusher for " + _passiveFlushDelay + "ms out"); + if (_log.shouldLog(Log.INFO)) + _log.info("Enqueueing the flusher for " + _passiveFlushDelay + "ms out"); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("NOT enqueing the flusher"); @@ -243,8 +250,8 @@ public class MessageOutputStream extends OutputStream { synchronized (_dataLock) { long flushTime = _lastBuffered + _passiveFlushDelay; if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("doFlush() valid = " + _valid); + if (_log.shouldLog(Log.INFO)) + _log.info("doFlush() valid = " + _valid); if ( (_buf != null) && (_dataReceiver != null) ) { ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; @@ -255,13 +262,13 @@ public class MessageOutputStream extends OutputStream { sent = true; } } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("doFlush() rejected... valid = " + _valid); + if (_log.shouldLog(Log.INFO) && _valid > 0) + _log.info("doFlush() rejected... valid = " + _valid); } } // ignore the ws - if (sent && _log.shouldLog(Log.DEBUG)) - _log.debug("Passive flush of " + ws); + if (sent && _log.shouldLog(Log.INFO)) + _log.info("Passive flush of " + ws); } } @@ -278,6 +285,8 @@ public class MessageOutputStream extends OutputStream { */ long begin = _context.clock().now(); WriteStatus ws = null; + if (_log.shouldLog(Log.INFO) && _valid > 0) + _log.info("flush() valid = " + _valid); synchronized (_dataLock) { if (_buf == null) { _dataLock.notifyAll(); @@ -352,6 +361,8 @@ public class MessageOutputStream extends OutputStream { private void clearData(boolean shouldFlush) { ByteArray ba = null; + if (_log.shouldLog(Log.INFO) && _valid > 0) + _log.info("clearData() valid = " + _valid); synchronized (_dataLock) { // flush any data, but don't wait for it if ( (_dataReceiver != null) && (_valid > 0) && shouldFlush) @@ -398,6 +409,8 @@ public class MessageOutputStream extends OutputStream { void flushAvailable(DataReceiver target, boolean blocking) throws IOException { WriteStatus ws = null; long before = System.currentTimeMillis(); + if (_log.shouldLog(Log.INFO) && _valid > 0) + _log.info("flushAvailable() valid = " + _valid); synchronized (_dataLock) { // _buf may be null, but the data receiver can handle that just fine, // deciding whether or not to send a packet @@ -449,6 +462,7 @@ public class MessageOutputStream extends OutputStream { /** * wait until the data written is accepted into the outbound pool, * which we throttle rather than accept arbitrary data and queue + * @param maxWaitMs -1 = forever ? */ public void waitForAccept(int maxWaitMs); /** the write was accepted. aka did the socket not close? */ diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index b1438a033ad63ec6c95e5a8483ce3014ddf92dbf..d4de041065850b75388754986de5cd60c04d3bd8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -193,6 +193,9 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat return buf; } + /** + * @param timeoutMs MessageOutputStream is the only caller, often with -1 ?????? + */ public void waitForAccept(int maxWaitMs) { if (_connection == null) throw new IllegalStateException("Cannot wait for accept with no connection");