I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit bec62c1b authored by zab2's avatar zab2
Browse files

Remove unused field

	Use atomics to manage closed state and IOExceptions
parent 7f8efca0
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,8 @@ package net.i2p.client.streaming; ...@@ -3,6 +3,8 @@ package net.i2p.client.streaming;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray; import net.i2p.data.ByteArray;
...@@ -24,13 +26,12 @@ class MessageOutputStream extends OutputStream { ...@@ -24,13 +26,12 @@ class MessageOutputStream extends OutputStream {
private int _valid; private int _valid;
private final Object _dataLock; private final Object _dataLock;
private final DataReceiver _dataReceiver; private final DataReceiver _dataReceiver;
private IOException _streamError; private final AtomicReference<IOException>_streamError = new AtomicReference<IOException>(null);
private volatile boolean _closed; private final AtomicBoolean _closed = new AtomicBoolean(false);
private long _written; private long _written;
private int _writeTimeout; private int _writeTimeout;
private ByteCache _dataCache; private ByteCache _dataCache;
private final Flusher _flusher; private final Flusher _flusher;
private long _lastFlushed;
private volatile long _lastBuffered; private volatile long _lastBuffered;
/** if we enqueue data but don't flush it in this period, flush it passively */ /** if we enqueue data but don't flush it in this period, flush it passively */
private final int _passiveFlushDelay; private final int _passiveFlushDelay;
...@@ -98,7 +99,7 @@ class MessageOutputStream extends OutputStream { ...@@ -98,7 +99,7 @@ class MessageOutputStream extends OutputStream {
@Override @Override
public void write(byte b[], int off, int len) throws IOException { public void write(byte b[], int off, int len) throws IOException {
if (_closed) throw new IOException("Already closed"); if (_closed.get()) throw new IOException("Already closed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("write(b[], " + off + ", " + len + ") "); _log.debug("write(b[], " + off + ", " + len + ") ");
int cur = off; int cur = off;
...@@ -106,7 +107,7 @@ class MessageOutputStream extends OutputStream { ...@@ -106,7 +107,7 @@ class MessageOutputStream extends OutputStream {
long begin = _context.clock().now(); long begin = _context.clock().now();
while (remaining > 0) { while (remaining > 0) {
WriteStatus ws = null; WriteStatus ws = null;
if (_closed) throw new IOException("closed underneath us"); if (_closed.get()) throw new IOException("closed underneath us");
// we do any waiting outside the synchronized() block because we // we do any waiting outside the synchronized() block because we
// want to allow other threads to flushAvailable() whenever they want. // want to allow other threads to flushAvailable() whenever they want.
// this is the only method that *adds* to the _buf, and all // this is the only method that *adds* to the _buf, and all
...@@ -139,7 +140,6 @@ class MessageOutputStream extends OutputStream { ...@@ -139,7 +140,6 @@ class MessageOutputStream extends OutputStream {
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
throwAnyError(); throwAnyError();
_lastFlushed = _context.clock().now();
locked_updateBufferSize(); locked_updateBufferSize();
} }
...@@ -239,7 +239,7 @@ class MessageOutputStream extends OutputStream { ...@@ -239,7 +239,7 @@ class MessageOutputStream extends OutputStream {
_enqueued = true; _enqueued = true;
} }
public void timeReached() { public void timeReached() {
if (_closed) if (_closed.get())
return; return;
_enqueued = false; _enqueued = false;
long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now()); long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now());
...@@ -265,7 +265,6 @@ class MessageOutputStream extends OutputStream { ...@@ -265,7 +265,6 @@ class MessageOutputStream extends OutputStream {
ws = _dataReceiver.writeData(_buf, 0, _valid); ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
_lastFlushed = _context.clock().now();
locked_updateBufferSize(); locked_updateBufferSize();
_dataLock.notifyAll(); _dataLock.notifyAll();
sent = true; sent = true;
...@@ -330,7 +329,6 @@ class MessageOutputStream extends OutputStream { ...@@ -330,7 +329,6 @@ class MessageOutputStream extends OutputStream {
_written += _valid; _written += _valid;
_valid = 0; _valid = 0;
locked_updateBufferSize(); locked_updateBufferSize();
_lastFlushed = _context.clock().now();
_dataLock.notifyAll(); _dataLock.notifyAll();
} }
} }
...@@ -345,7 +343,7 @@ class MessageOutputStream extends OutputStream { ...@@ -345,7 +343,7 @@ class MessageOutputStream extends OutputStream {
// Wait a loooooong time, until we have the ACK // Wait a loooooong time, until we have the ACK
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws); _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws);
if (_closed && if (_closed.get() &&
( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) || ( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ||
(_writeTimeout <= 0) ) ) (_writeTimeout <= 0) ) )
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
...@@ -372,12 +370,12 @@ class MessageOutputStream extends OutputStream { ...@@ -372,12 +370,12 @@ class MessageOutputStream extends OutputStream {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (_closed) { if (!_closed.compareAndSet(false,true)) {
synchronized (_dataLock) { _dataLock.notifyAll(); } synchronized (_dataLock) { _dataLock.notifyAll(); }
LogUtil.logCloseLoop(_log, "MOS");
return; return;
} }
// setting _closed before flush() will force flush() to send a CLOSE packet // setting _closed before flush() will force flush() to send a CLOSE packet
_closed = true;
_flusher.cancel(); _flusher.cancel();
// In 0.8.1 we rewrote flush() to only wait for accept into the window, // In 0.8.1 we rewrote flush() to only wait for accept into the window,
...@@ -411,10 +409,12 @@ class MessageOutputStream extends OutputStream { ...@@ -411,10 +409,12 @@ class MessageOutputStream extends OutputStream {
* Only for use inside package * Only for use inside package
*/ */
public void closeInternal() { public void closeInternal() {
_closed = true; if (!_closed.compareAndSet(false,true)) {
LogUtil.logCloseLoop(_log, "close internal");
return;
}
_flusher.cancel(); _flusher.cancel();
if (_streamError == null) _streamError.compareAndSet(null,new IOException("Closed internally"));
_streamError = new IOException("Closed internally");
clearData(true); clearData(true);
} }
...@@ -435,7 +435,6 @@ class MessageOutputStream extends OutputStream { ...@@ -435,7 +435,6 @@ class MessageOutputStream extends OutputStream {
_buf = null; _buf = null;
_valid = 0; _valid = 0;
} }
_lastFlushed = _context.clock().now();
_dataLock.notifyAll(); _dataLock.notifyAll();
} }
if (ba != null) { if (ba != null) {
...@@ -443,12 +442,11 @@ class MessageOutputStream extends OutputStream { ...@@ -443,12 +442,11 @@ class MessageOutputStream extends OutputStream {
} }
} }
public boolean getClosed() { return _closed; } public boolean getClosed() { return _closed.get(); }
private void throwAnyError() throws IOException { private void throwAnyError() throws IOException {
IOException ioe = _streamError; IOException ioe = _streamError.getAndSet(null);
if (ioe != null) { if (ioe != null) {
_streamError = null;
// constructor with cause not until Java 6 // constructor with cause not until Java 6
IOException ioe2 = new IOException("Output stream error"); IOException ioe2 = new IOException("Output stream error");
ioe2.initCause(ioe); ioe2.initCause(ioe);
...@@ -457,8 +455,7 @@ class MessageOutputStream extends OutputStream { ...@@ -457,8 +455,7 @@ class MessageOutputStream extends OutputStream {
} }
void streamErrorOccurred(IOException ioe) { void streamErrorOccurred(IOException ioe) {
if (_streamError == null) _streamError.compareAndSet(null,ioe);
_streamError = ioe;
clearData(false); clearData(false);
} }
...@@ -484,7 +481,6 @@ class MessageOutputStream extends OutputStream { ...@@ -484,7 +481,6 @@ class MessageOutputStream extends OutputStream {
_valid = 0; _valid = 0;
locked_updateBufferSize(); locked_updateBufferSize();
_dataLock.notifyAll(); _dataLock.notifyAll();
_lastFlushed = _context.clock().now();
} }
long afterBuild = System.currentTimeMillis(); long afterBuild = System.currentTimeMillis();
if ( (afterBuild - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) if ( (afterBuild - before > 1000) && (_log.shouldLog(Log.DEBUG)) )
...@@ -504,7 +500,10 @@ class MessageOutputStream extends OutputStream { ...@@ -504,7 +500,10 @@ class MessageOutputStream extends OutputStream {
} }
void destroy() { void destroy() {
_closed = true; if (!_closed.compareAndSet(false,true)) {
LogUtil.logCloseLoop(_log, "destroy()");
return;
}
_flusher.cancel(); _flusher.cancel();
synchronized (_dataLock) { synchronized (_dataLock) {
_dataLock.notifyAll(); _dataLock.notifyAll();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment