diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index fd10d679cec33b72c459ecfeb416747f59f8fefb..ed45972be2d3317860f9fbc35093860909e9dccf 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -128,13 +128,15 @@ public class MessageOutputStream extends OutputStream { remaining -= toWrite; cur += toWrite; _valid = _buf.length; - if (_dataReceiver == null) { + // avoid NPE from race with destroy() + DataReceiver rcvr = _dataReceiver; + if (rcvr == null) { throwAnyError(); return; } if (_log.shouldLog(Log.INFO)) _log.info("write() direct valid = " + _valid); - ws = _dataReceiver.writeData(_buf, 0, _valid); + ws = rcvr.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; throwAnyError(); @@ -256,8 +258,10 @@ public class MessageOutputStream extends OutputStream { if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) { if (_log.shouldLog(Log.INFO)) _log.info("doFlush() valid = " + _valid); - if ( (_buf != null) && (_dataReceiver != null) ) { - ws = _dataReceiver.writeData(_buf, 0, _valid); + // avoid NPE from race with destroy() + DataReceiver rcvr = _dataReceiver; + if ( (_buf != null) && (rcvr != null) ) { + ws = rcvr.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; _lastFlushed = _context.clock().now(); @@ -309,12 +313,16 @@ public class MessageOutputStream extends OutputStream { WriteStatus ws = null; if (_log.shouldLog(Log.INFO) && _valid > 0) _log.info("flush() valid = " + _valid); + + // avoid NPE from race with destroy() + DataReceiver rcvr = _dataReceiver; synchronized (_dataLock) { if (_buf == null) { _dataLock.notifyAll(); throw new IOException("closed (buffer went away)"); } - if (_dataReceiver == null) { + + if (rcvr == null) { _dataLock.notifyAll(); throwAnyError(); return; @@ -324,7 +332,7 @@ public class MessageOutputStream extends OutputStream { // Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below // (disabled) if (!wait_for_accept_only) { - ws = _dataReceiver.writeData(_buf, 0, _valid); + ws = rcvr.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; locked_updateBufferSize(); @@ -336,7 +344,7 @@ public class MessageOutputStream extends OutputStream { // Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1 // must do this outside the data lock if (wait_for_accept_only) { - flushAvailable(_dataReceiver, true); + flushAvailable(rcvr, true); return; } @@ -417,10 +425,13 @@ public class MessageOutputStream extends OutputStream { ByteArray ba = null; if (_log.shouldLog(Log.INFO) && _valid > 0) _log.info("clearData() valid = " + _valid); + + // avoid NPE from race with destroy() + DataReceiver rcvr = _dataReceiver; synchronized (_dataLock) { // flush any data, but don't wait for it - if ( (_dataReceiver != null) && (_valid > 0) && shouldFlush) - _dataReceiver.writeData(_buf, 0, _valid); + if ( (rcvr != null) && (_valid > 0) && shouldFlush) + rcvr.writeData(_buf, 0, _valid); _written += _valid; _valid = 0;