diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 904bde97f..e60528c4b 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -63,8 +63,8 @@ import net.i2p.util.VersionComparator; public class NTCPConnection implements Closeable { private final RouterContext _context; private final Log _log; - private SocketChannel _chan; - private SelectionKey _conKey; + private volatile SocketChannel _chan; + private volatile SelectionKey _conKey; private final FIFOBandwidthLimiter.CompleteListener _inboundListener; private final FIFOBandwidthLimiter.CompleteListener _outboundListener; /** @@ -111,6 +111,7 @@ public class NTCPConnection implements Closeable { // prevent sending meta before established private long _nextMetaTime = Long.MAX_VALUE; private final AtomicInteger _consecutiveZeroReads = new AtomicInteger(); + private final Object _readLock = new Object(); private final Object _writeLock = new Object(); private final Object _statLock = new Object(); @@ -269,14 +270,14 @@ public class NTCPConnection implements Closeable { /** * Valid for inbound; valid for outbound shortly after creation */ - public synchronized SocketChannel getChannel() { return _chan; } + public SocketChannel getChannel() { return _chan; } /** * Valid for inbound; valid for outbound shortly after creation */ - public synchronized SelectionKey getKey() { return _conKey; } - public synchronized void setChannel(SocketChannel chan) { _chan = chan; } - public synchronized void setKey(SelectionKey key) { _conKey = key; } + public SelectionKey getKey() { return _conKey; } + public void setChannel(SocketChannel chan) { _chan = chan; } + public void setKey(SelectionKey key) { _conKey = key; } public boolean isInbound() { return _isInbound; } public boolean isEstablished() { return _establishState.isComplete(); } @@ -509,16 +510,10 @@ public class NTCPConnection implements Closeable { } _bwOutRequests.clear(); - _writeBufs.clear(); - ByteBuffer bb; - while ((bb = _readBufs.poll()) != null) { - EventPumper.releaseBuf(bb); - } - List pending = new ArrayList(); - //_outbound.drainAllTo(pending); - _outbound.drainTo(pending); synchronized(_writeLock) { + _writeBufs.clear(); + _outbound.drainTo(pending); if (!_currentOutbound.isEmpty()) pending.addAll(_currentOutbound); _currentOutbound.clear(); @@ -526,20 +521,26 @@ public class NTCPConnection implements Closeable { _sender.destroy(); _sender = null; } + // zero out everything we can + _sendSipk1 = 0; + _sendSipk2 = 0; + if (_sendSipIV != null) { + Arrays.fill(_sendSipIV, (byte) 0); + _sendSipIV = null; + } } for (OutNetMessage msg : pending) { _transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); } - // zero out everything we can - if (_curReadState != null) { - _curReadState.destroy(); - _curReadState = null; - } - _sendSipk1 = 0; - _sendSipk2 = 0; - if (_sendSipIV != null) { - Arrays.fill(_sendSipIV, (byte) 0); - _sendSipIV = null; + synchronized(_readLock) { + ByteBuffer bb; + while ((bb = _readBufs.poll()) != null) { + EventPumper.releaseBuf(bb); + } + if (_curReadState != null) { + _curReadState.destroy(); + _curReadState = null; + } } return old; } @@ -850,7 +851,7 @@ public class NTCPConnection implements Closeable { */ void sendTerminationAndClose() { ReadState rs = null; - synchronized (this) { + synchronized (_readLock) { if (_version == 2 && isEstablished()) rs = _curReadState; } @@ -1243,10 +1244,12 @@ public class NTCPConnection implements Closeable { * * This is the entry point as called from Reader.processRead() */ - synchronized void recvEncryptedI2NP(ByteBuffer buf) { - if (_curReadState == null) - throw new IllegalStateException("not established"); - _curReadState.receive(buf); + void recvEncryptedI2NP(ByteBuffer buf) { + synchronized(_readLock) { + if (_curReadState == null) + throw new IllegalStateException("not established"); + _curReadState.receive(buf); + } } /** @@ -1718,8 +1721,10 @@ public class NTCPConnection implements Closeable { if (_log.shouldWarn()) _log.warn("delayed close, AEAD failure after " + validFramesRcvd + " good frames, to read: " + toRead + " on " + this, new Exception("I did it")); - _curReadState = new NTCP2FailState(toRead, validFramesRcvd); - _curReadState.receive(buf); + synchronized(_readLock) { + _curReadState = new NTCP2FailState(toRead, validFramesRcvd); + _curReadState.receive(buf); + } } else { if (_log.shouldWarn()) _log.warn("immediate close, AEAD failure after " + validFramesRcvd +