diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index cb0646cf9bfd395865f5431dbad4c3285b52e79a..6c1f0c3f1bbc800b416ad43dfb4cd562951f207a 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -12,6 +12,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.Adler32; import net.i2p.data.Base64; @@ -87,7 +88,7 @@ class NTCPConnection { private volatile EstablishState _establishState; private final NTCPTransport _transport; private final boolean _isInbound; - private volatile boolean _closed; + private final AtomicBoolean _closed = new AtomicBoolean(false); private final RouterAddress _remAddr; private RouterIdentity _remotePeer; private long _clockSkew; // in seconds @@ -335,11 +336,17 @@ class NTCPConnection { return ++_consecutiveZeroReads; } - public boolean isClosed() { return _closed; } + public boolean isClosed() { return _closed.get(); } public void close() { close(false); } public void close(boolean allowRequeue) { + if (!_closed.compareAndSet(false,true)) { + _log.logCloseLoop("NTCPConnection", this); + return; + } + if (_log.shouldLog(Log.INFO)) + _log.info("Closing connection " + toString(), new Exception("cause")); NTCPConnection toClose = locked_close(allowRequeue); if (toClose != null && toClose != this) { if (_log.shouldLog(Log.WARN)) @@ -350,9 +357,6 @@ class NTCPConnection { } private synchronized NTCPConnection locked_close(boolean allowRequeue) { - if (_log.shouldLog(Log.INFO)) - _log.info("Closing connection " + toString(), new Exception("cause")); - _closed = true; if (_chan != null) try { _chan.close(); } catch (IOException ioe) { } if (_conKey != null) _conKey.cancel(); _establishState = EstablishState.VERIFIED; @@ -712,7 +716,7 @@ class NTCPConnection { * */ private void prepareNextWriteFast() { - if (_closed) + if (_closed.get()) return; //if (_log.shouldLog(Log.DEBUG)) // _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established); @@ -928,7 +932,7 @@ class NTCPConnection { public void complete(FIFOBandwidthLimiter.Request req) { removeIBRequest(req); ByteBuffer buf = (ByteBuffer)req.attachment(); - if (_closed) { + if (_closed.get()) { EventPumper.releaseBuf(buf); return; } @@ -950,7 +954,7 @@ class NTCPConnection { public void complete(FIFOBandwidthLimiter.Request req) { removeOBRequest(req); ByteBuffer buf = (ByteBuffer)req.attachment(); - if (!_closed) { + if (!_closed.get()) { _context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime())); write(buf); } @@ -1161,7 +1165,7 @@ class NTCPConnection { } } - while (buf.hasRemaining() && !_closed) { + while (buf.hasRemaining() && !_closed.get()) { int want = Math.min(buf.remaining(), BLOCK_SIZE - _curReadBlockIndex); if (want > 0) { buf.get(_curReadBlock, _curReadBlockIndex, want); @@ -1209,7 +1213,7 @@ class NTCPConnection { int end = pos + buf.remaining(); boolean first = true; - for ( ; pos < end && !_closed; pos += BLOCK_SIZE) { + for ( ; pos < end && !_closed.get(); pos += BLOCK_SIZE) { _context.aes().decryptBlock(array, pos, _sessionKey, _decryptBlockBuf, 0); if (first) { // XOR with _prevReadBlock the first time...