I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 8c2550c3 authored by zzz's avatar zzz
Browse files

* Streaming: MessageOutputStream logging tweaks

parent 1d3f0fe9
No related branches found
No related tags found
No related merge requests found
...@@ -152,6 +152,7 @@ public class Connection { ...@@ -152,6 +152,7 @@ public class Connection {
* Block until there is an open outbound packet slot or the write timeout * Block until there is an open outbound packet slot or the write timeout
* expires. * expires.
* *
* @param timeoutMs PacketLocal is the only caller, often with -1??????
* @return true if the packet should be sent * @return true if the packet should be sent
*/ */
boolean packetSendChoke(long timeoutMs) { boolean packetSendChoke(long timeoutMs) {
......
...@@ -16,8 +16,8 @@ import net.i2p.util.SimpleTimer2; ...@@ -16,8 +16,8 @@ import net.i2p.util.SimpleTimer2;
* to the data receiver's needs. * to the data receiver's needs.
*/ */
public class MessageOutputStream extends OutputStream { public class MessageOutputStream extends OutputStream {
private I2PAppContext _context; private final I2PAppContext _context;
private Log _log; private final Log _log;
private byte _buf[]; private byte _buf[];
private int _valid; private int _valid;
private final Object _dataLock; private final Object _dataLock;
...@@ -27,7 +27,7 @@ public class MessageOutputStream extends OutputStream { ...@@ -27,7 +27,7 @@ public class MessageOutputStream extends OutputStream {
private long _written; private long _written;
private int _writeTimeout; private int _writeTimeout;
private ByteCache _dataCache; private ByteCache _dataCache;
private Flusher _flusher; private final Flusher _flusher;
private long _lastFlushed; private long _lastFlushed;
private long _lastBuffered; private long _lastBuffered;
/** if we enqueue data but don't flush it in this period, flush it passively */ /** if we enqueue data but don't flush it in this period, flush it passively */
...@@ -128,6 +128,8 @@ public class MessageOutputStream extends OutputStream { ...@@ -128,6 +128,8 @@ public class MessageOutputStream extends OutputStream {
throwAnyError(); throwAnyError();
return; return;
} }
if (_log.shouldLog(Log.INFO))
_log.info("write() direct valid = " + _valid);
ws = _dataReceiver.writeData(_buf, 0, _valid); ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
...@@ -138,26 +140,31 @@ public class MessageOutputStream extends OutputStream { ...@@ -138,26 +140,31 @@ public class MessageOutputStream extends OutputStream {
} }
} }
if (ws != null) { if (ws != null) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO))
_log.debug("Waiting " + _writeTimeout + "ms for accept of " + ws); _log.info("Waiting " + _writeTimeout + "ms for accept of " + ws);
// ok, we've actually added a new packet - lets wait until // ok, we've actually added a new packet - lets wait until
// its accepted into the queue before moving on (so that we // its accepted into the queue before moving on (so that we
// dont fill our buffer instantly) // dont fill our buffer instantly)
ws.waitForAccept(_writeTimeout); ws.waitForAccept(_writeTimeout);
if (!ws.writeAccepted()) { if (!ws.writeAccepted()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Write not accepted of " + ws);
if (_writeTimeout > 0) if (_writeTimeout > 0)
throw new InterruptedIOException("Write not accepted within timeout: " + ws); throw new InterruptedIOException("Write not accepted within timeout: " + ws);
else else
throw new IOException("Write not accepted into the queue: " + ws); throw new IOException("Write not accepted into the queue: " + ws);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("After waitForAccept of " + ws);
} }
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO))
_log.debug("Queued " + len + " without sending to the receiver"); _log.info("Queued " + len + " without sending to the receiver");
} }
} }
long elapsed = _context.clock().now() - begin; long elapsed = _context.clock().now() - begin;
if ( (elapsed > 10*1000) && (_log.shouldLog(Log.DEBUG)) ) if ( (elapsed > 10*1000) && (_log.shouldLog(Log.INFO)) )
_log.debug("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo")); _log.info("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo"));
throwAnyError(); throwAnyError();
updateBps(len); updateBps(len);
} }
...@@ -215,8 +222,8 @@ public class MessageOutputStream extends OutputStream { ...@@ -215,8 +222,8 @@ public class MessageOutputStream extends OutputStream {
// We've seen the queue blow up before, maybe it was this before the rewrite... // We've seen the queue blow up before, maybe it was this before the rewrite...
// So perhaps it IS wise to be "overly worried" ... // So perhaps it IS wise to be "overly worried" ...
forceReschedule(_passiveFlushDelay); forceReschedule(_passiveFlushDelay);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO))
_log.debug("Enqueueing the flusher for " + _passiveFlushDelay + "ms out"); _log.info("Enqueueing the flusher for " + _passiveFlushDelay + "ms out");
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("NOT enqueing the flusher"); _log.debug("NOT enqueing the flusher");
...@@ -243,8 +250,8 @@ public class MessageOutputStream extends OutputStream { ...@@ -243,8 +250,8 @@ public class MessageOutputStream extends OutputStream {
synchronized (_dataLock) { synchronized (_dataLock) {
long flushTime = _lastBuffered + _passiveFlushDelay; long flushTime = _lastBuffered + _passiveFlushDelay;
if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) { if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO))
_log.debug("doFlush() valid = " + _valid); _log.info("doFlush() valid = " + _valid);
if ( (_buf != null) && (_dataReceiver != null) ) { if ( (_buf != null) && (_dataReceiver != null) ) {
ws = _dataReceiver.writeData(_buf, 0, _valid); ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
...@@ -255,13 +262,13 @@ public class MessageOutputStream extends OutputStream { ...@@ -255,13 +262,13 @@ public class MessageOutputStream extends OutputStream {
sent = true; sent = true;
} }
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.debug("doFlush() rejected... valid = " + _valid); _log.info("doFlush() rejected... valid = " + _valid);
} }
} }
// ignore the ws // ignore the ws
if (sent && _log.shouldLog(Log.DEBUG)) if (sent && _log.shouldLog(Log.INFO))
_log.debug("Passive flush of " + ws); _log.info("Passive flush of " + ws);
} }
} }
...@@ -278,6 +285,8 @@ public class MessageOutputStream extends OutputStream { ...@@ -278,6 +285,8 @@ public class MessageOutputStream extends OutputStream {
*/ */
long begin = _context.clock().now(); long begin = _context.clock().now();
WriteStatus ws = null; WriteStatus ws = null;
if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.info("flush() valid = " + _valid);
synchronized (_dataLock) { synchronized (_dataLock) {
if (_buf == null) { if (_buf == null) {
_dataLock.notifyAll(); _dataLock.notifyAll();
...@@ -352,6 +361,8 @@ public class MessageOutputStream extends OutputStream { ...@@ -352,6 +361,8 @@ public class MessageOutputStream extends OutputStream {
private void clearData(boolean shouldFlush) { private void clearData(boolean shouldFlush) {
ByteArray ba = null; ByteArray ba = null;
if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.info("clearData() valid = " + _valid);
synchronized (_dataLock) { synchronized (_dataLock) {
// flush any data, but don't wait for it // flush any data, but don't wait for it
if ( (_dataReceiver != null) && (_valid > 0) && shouldFlush) if ( (_dataReceiver != null) && (_valid > 0) && shouldFlush)
...@@ -398,6 +409,8 @@ public class MessageOutputStream extends OutputStream { ...@@ -398,6 +409,8 @@ public class MessageOutputStream extends OutputStream {
void flushAvailable(DataReceiver target, boolean blocking) throws IOException { void flushAvailable(DataReceiver target, boolean blocking) throws IOException {
WriteStatus ws = null; WriteStatus ws = null;
long before = System.currentTimeMillis(); long before = System.currentTimeMillis();
if (_log.shouldLog(Log.INFO) && _valid > 0)
_log.info("flushAvailable() valid = " + _valid);
synchronized (_dataLock) { synchronized (_dataLock) {
// _buf may be null, but the data receiver can handle that just fine, // _buf may be null, but the data receiver can handle that just fine,
// deciding whether or not to send a packet // deciding whether or not to send a packet
...@@ -449,6 +462,7 @@ public class MessageOutputStream extends OutputStream { ...@@ -449,6 +462,7 @@ public class MessageOutputStream extends OutputStream {
/** /**
* wait until the data written is accepted into the outbound pool, * wait until the data written is accepted into the outbound pool,
* which we throttle rather than accept arbitrary data and queue * which we throttle rather than accept arbitrary data and queue
* @param maxWaitMs -1 = forever ?
*/ */
public void waitForAccept(int maxWaitMs); public void waitForAccept(int maxWaitMs);
/** the write was accepted. aka did the socket not close? */ /** the write was accepted. aka did the socket not close? */
......
...@@ -193,6 +193,9 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat ...@@ -193,6 +193,9 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
return buf; return buf;
} }
/**
* @param timeoutMs MessageOutputStream is the only caller, often with -1 ??????
*/
public void waitForAccept(int maxWaitMs) { public void waitForAccept(int maxWaitMs) {
if (_connection == null) if (_connection == null)
throw new IllegalStateException("Cannot wait for accept with no connection"); throw new IllegalStateException("Cannot wait for accept with no connection");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment