I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 7be0a932 authored by zab2's avatar zab2
Browse files

Use AtomicBoolean to prevent multiple close()-s

parent 175f4729
No related branches found
No related tags found
No related merge requests found
......@@ -12,6 +12,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Adler32;
import net.i2p.data.Base64;
......@@ -87,7 +88,7 @@ class NTCPConnection {
private volatile EstablishState _establishState;
private final NTCPTransport _transport;
private final boolean _isInbound;
private volatile boolean _closed;
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final RouterAddress _remAddr;
private RouterIdentity _remotePeer;
private long _clockSkew; // in seconds
......@@ -335,11 +336,17 @@ class NTCPConnection {
return ++_consecutiveZeroReads;
}
public boolean isClosed() { return _closed; }
public boolean isClosed() { return _closed.get(); }
public void close() { close(false); }
public void close(boolean allowRequeue) {
if (!_closed.compareAndSet(false,true)) {
_log.logCloseLoop("NTCPConnection", this);
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("Closing connection " + toString(), new Exception("cause"));
NTCPConnection toClose = locked_close(allowRequeue);
if (toClose != null && toClose != this) {
if (_log.shouldLog(Log.WARN))
......@@ -350,9 +357,6 @@ class NTCPConnection {
}
private synchronized NTCPConnection locked_close(boolean allowRequeue) {
if (_log.shouldLog(Log.INFO))
_log.info("Closing connection " + toString(), new Exception("cause"));
_closed = true;
if (_chan != null) try { _chan.close(); } catch (IOException ioe) { }
if (_conKey != null) _conKey.cancel();
_establishState = EstablishState.VERIFIED;
......@@ -712,7 +716,7 @@ class NTCPConnection {
*
*/
private void prepareNextWriteFast() {
if (_closed)
if (_closed.get())
return;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
......@@ -928,7 +932,7 @@ class NTCPConnection {
public void complete(FIFOBandwidthLimiter.Request req) {
removeIBRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment();
if (_closed) {
if (_closed.get()) {
EventPumper.releaseBuf(buf);
return;
}
......@@ -950,7 +954,7 @@ class NTCPConnection {
public void complete(FIFOBandwidthLimiter.Request req) {
removeOBRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment();
if (!_closed) {
if (!_closed.get()) {
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()));
write(buf);
}
......@@ -1161,7 +1165,7 @@ class NTCPConnection {
}
}
while (buf.hasRemaining() && !_closed) {
while (buf.hasRemaining() && !_closed.get()) {
int want = Math.min(buf.remaining(), BLOCK_SIZE - _curReadBlockIndex);
if (want > 0) {
buf.get(_curReadBlock, _curReadBlockIndex, want);
......@@ -1209,7 +1213,7 @@ class NTCPConnection {
int end = pos + buf.remaining();
boolean first = true;
for ( ; pos < end && !_closed; pos += BLOCK_SIZE) {
for ( ; pos < end && !_closed.get(); pos += BLOCK_SIZE) {
_context.aes().decryptBlock(array, pos, _sessionKey, _decryptBlockBuf, 0);
if (first) {
// XOR with _prevReadBlock the first time...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment