diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index db317eab8812d58b77c5cae562dbbedb505875ce..1a6e961f80f3dd459bdde1d227a69cc5210e8384 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -3,6 +3,8 @@ package net.i2p.client.streaming; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; @@ -24,13 +26,12 @@ class MessageOutputStream extends OutputStream { private int _valid; private final Object _dataLock; private final DataReceiver _dataReceiver; - private IOException _streamError; - private volatile boolean _closed; + private final AtomicReference<IOException>_streamError = new AtomicReference<IOException>(null); + private final AtomicBoolean _closed = new AtomicBoolean(false); private long _written; private int _writeTimeout; private ByteCache _dataCache; private final Flusher _flusher; - private long _lastFlushed; private volatile long _lastBuffered; /** if we enqueue data but don't flush it in this period, flush it passively */ private final int _passiveFlushDelay; @@ -98,7 +99,7 @@ class MessageOutputStream extends OutputStream { @Override public void write(byte b[], int off, int len) throws IOException { - if (_closed) throw new IOException("Already closed"); + if (_closed.get()) throw new IOException("Already closed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("write(b[], " + off + ", " + len + ") "); int cur = off; @@ -106,7 +107,7 @@ class MessageOutputStream extends OutputStream { long begin = _context.clock().now(); while (remaining > 0) { WriteStatus ws = null; - if (_closed) throw new IOException("closed underneath us"); + if (_closed.get()) throw new IOException("closed underneath us"); // we do any waiting outside the synchronized() block because we // want to allow other threads to flushAvailable() whenever they want. // this is the only method that *adds* to the _buf, and all @@ -139,7 +140,6 @@ class MessageOutputStream extends OutputStream { _written += _valid; _valid = 0; throwAnyError(); - _lastFlushed = _context.clock().now(); locked_updateBufferSize(); } @@ -239,7 +239,7 @@ class MessageOutputStream extends OutputStream { _enqueued = true; } public void timeReached() { - if (_closed) + if (_closed.get()) return; _enqueued = false; long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now()); @@ -265,7 +265,6 @@ class MessageOutputStream extends OutputStream { ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; - _lastFlushed = _context.clock().now(); locked_updateBufferSize(); _dataLock.notifyAll(); sent = true; @@ -330,7 +329,6 @@ class MessageOutputStream extends OutputStream { _written += _valid; _valid = 0; locked_updateBufferSize(); - _lastFlushed = _context.clock().now(); _dataLock.notifyAll(); } } @@ -345,7 +343,7 @@ class MessageOutputStream extends OutputStream { // Wait a loooooong time, until we have the ACK if (_log.shouldLog(Log.DEBUG)) _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws); - if (_closed && + if (_closed.get() && ( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) || (_writeTimeout <= 0) ) ) ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); @@ -372,12 +370,12 @@ class MessageOutputStream extends OutputStream { */ @Override public void close() throws IOException { - if (_closed) { + if (!_closed.compareAndSet(false,true)) { synchronized (_dataLock) { _dataLock.notifyAll(); } + LogUtil.logCloseLoop(_log, "MOS"); return; } // setting _closed before flush() will force flush() to send a CLOSE packet - _closed = true; _flusher.cancel(); // In 0.8.1 we rewrote flush() to only wait for accept into the window, @@ -411,10 +409,12 @@ class MessageOutputStream extends OutputStream { * Only for use inside package */ public void closeInternal() { - _closed = true; + if (!_closed.compareAndSet(false,true)) { + LogUtil.logCloseLoop(_log, "close internal"); + return; + } _flusher.cancel(); - if (_streamError == null) - _streamError = new IOException("Closed internally"); + _streamError.compareAndSet(null,new IOException("Closed internally")); clearData(true); } @@ -435,7 +435,6 @@ class MessageOutputStream extends OutputStream { _buf = null; _valid = 0; } - _lastFlushed = _context.clock().now(); _dataLock.notifyAll(); } if (ba != null) { @@ -443,12 +442,11 @@ class MessageOutputStream extends OutputStream { } } - public boolean getClosed() { return _closed; } + public boolean getClosed() { return _closed.get(); } private void throwAnyError() throws IOException { - IOException ioe = _streamError; + IOException ioe = _streamError.getAndSet(null); if (ioe != null) { - _streamError = null; // constructor with cause not until Java 6 IOException ioe2 = new IOException("Output stream error"); ioe2.initCause(ioe); @@ -457,8 +455,7 @@ class MessageOutputStream extends OutputStream { } void streamErrorOccurred(IOException ioe) { - if (_streamError == null) - _streamError = ioe; + _streamError.compareAndSet(null,ioe); clearData(false); } @@ -484,7 +481,6 @@ class MessageOutputStream extends OutputStream { _valid = 0; locked_updateBufferSize(); _dataLock.notifyAll(); - _lastFlushed = _context.clock().now(); } long afterBuild = System.currentTimeMillis(); if ( (afterBuild - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) @@ -504,7 +500,10 @@ class MessageOutputStream extends OutputStream { } void destroy() { - _closed = true; + if (!_closed.compareAndSet(false,true)) { + LogUtil.logCloseLoop(_log, "destroy()"); + return; + } _flusher.cancel(); synchronized (_dataLock) { _dataLock.notifyAll();