More lock changes

add read lock, make chan and key volatile
after testing by drzed.
This commit is contained in:
zzz
2021-11-26 10:08:59 -05:00
parent 748714f857
commit 03ae919b56

View File

@@ -63,8 +63,8 @@ import net.i2p.util.VersionComparator;
public class NTCPConnection implements Closeable { public class NTCPConnection implements Closeable {
private final RouterContext _context; private final RouterContext _context;
private final Log _log; private final Log _log;
private SocketChannel _chan; private volatile SocketChannel _chan;
private SelectionKey _conKey; private volatile SelectionKey _conKey;
private final FIFOBandwidthLimiter.CompleteListener _inboundListener; private final FIFOBandwidthLimiter.CompleteListener _inboundListener;
private final FIFOBandwidthLimiter.CompleteListener _outboundListener; private final FIFOBandwidthLimiter.CompleteListener _outboundListener;
/** /**
@@ -111,6 +111,7 @@ public class NTCPConnection implements Closeable {
// prevent sending meta before established // prevent sending meta before established
private long _nextMetaTime = Long.MAX_VALUE; private long _nextMetaTime = Long.MAX_VALUE;
private final AtomicInteger _consecutiveZeroReads = new AtomicInteger(); private final AtomicInteger _consecutiveZeroReads = new AtomicInteger();
private final Object _readLock = new Object();
private final Object _writeLock = new Object(); private final Object _writeLock = new Object();
private final Object _statLock = new Object(); private final Object _statLock = new Object();
@@ -269,14 +270,14 @@ public class NTCPConnection implements Closeable {
/** /**
* Valid for inbound; valid for outbound shortly after creation * Valid for inbound; valid for outbound shortly after creation
*/ */
public synchronized SocketChannel getChannel() { return _chan; } public SocketChannel getChannel() { return _chan; }
/** /**
* Valid for inbound; valid for outbound shortly after creation * Valid for inbound; valid for outbound shortly after creation
*/ */
public synchronized SelectionKey getKey() { return _conKey; } public SelectionKey getKey() { return _conKey; }
public synchronized void setChannel(SocketChannel chan) { _chan = chan; } public void setChannel(SocketChannel chan) { _chan = chan; }
public synchronized void setKey(SelectionKey key) { _conKey = key; } public void setKey(SelectionKey key) { _conKey = key; }
public boolean isInbound() { return _isInbound; } public boolean isInbound() { return _isInbound; }
public boolean isEstablished() { return _establishState.isComplete(); } public boolean isEstablished() { return _establishState.isComplete(); }
@@ -509,16 +510,10 @@ public class NTCPConnection implements Closeable {
} }
_bwOutRequests.clear(); _bwOutRequests.clear();
_writeBufs.clear();
ByteBuffer bb;
while ((bb = _readBufs.poll()) != null) {
EventPumper.releaseBuf(bb);
}
List<OutNetMessage> pending = new ArrayList<OutNetMessage>(); List<OutNetMessage> pending = new ArrayList<OutNetMessage>();
//_outbound.drainAllTo(pending);
_outbound.drainTo(pending);
synchronized(_writeLock) { synchronized(_writeLock) {
_writeBufs.clear();
_outbound.drainTo(pending);
if (!_currentOutbound.isEmpty()) if (!_currentOutbound.isEmpty())
pending.addAll(_currentOutbound); pending.addAll(_currentOutbound);
_currentOutbound.clear(); _currentOutbound.clear();
@@ -526,21 +521,27 @@ public class NTCPConnection implements Closeable {
_sender.destroy(); _sender.destroy();
_sender = null; _sender = null;
} }
}
for (OutNetMessage msg : pending) {
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
// zero out everything we can // zero out everything we can
if (_curReadState != null) {
_curReadState.destroy();
_curReadState = null;
}
_sendSipk1 = 0; _sendSipk1 = 0;
_sendSipk2 = 0; _sendSipk2 = 0;
if (_sendSipIV != null) { if (_sendSipIV != null) {
Arrays.fill(_sendSipIV, (byte) 0); Arrays.fill(_sendSipIV, (byte) 0);
_sendSipIV = null; _sendSipIV = null;
} }
}
for (OutNetMessage msg : pending) {
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
synchronized(_readLock) {
ByteBuffer bb;
while ((bb = _readBufs.poll()) != null) {
EventPumper.releaseBuf(bb);
}
if (_curReadState != null) {
_curReadState.destroy();
_curReadState = null;
}
}
return old; return old;
} }
@@ -850,7 +851,7 @@ public class NTCPConnection implements Closeable {
*/ */
void sendTerminationAndClose() { void sendTerminationAndClose() {
ReadState rs = null; ReadState rs = null;
synchronized (this) { synchronized (_readLock) {
if (_version == 2 && isEstablished()) if (_version == 2 && isEstablished())
rs = _curReadState; rs = _curReadState;
} }
@@ -1243,11 +1244,13 @@ public class NTCPConnection implements Closeable {
* *
* This is the entry point as called from Reader.processRead() * This is the entry point as called from Reader.processRead()
*/ */
synchronized void recvEncryptedI2NP(ByteBuffer buf) { void recvEncryptedI2NP(ByteBuffer buf) {
synchronized(_readLock) {
if (_curReadState == null) if (_curReadState == null)
throw new IllegalStateException("not established"); throw new IllegalStateException("not established");
_curReadState.receive(buf); _curReadState.receive(buf);
} }
}
/** /**
* Handle a received timestamp, NTCP 1 or 2. * Handle a received timestamp, NTCP 1 or 2.
@@ -1718,8 +1721,10 @@ public class NTCPConnection implements Closeable {
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("delayed close, AEAD failure after " + validFramesRcvd + _log.warn("delayed close, AEAD failure after " + validFramesRcvd +
" good frames, to read: " + toRead + " on " + this, new Exception("I did it")); " good frames, to read: " + toRead + " on " + this, new Exception("I did it"));
synchronized(_readLock) {
_curReadState = new NTCP2FailState(toRead, validFramesRcvd); _curReadState = new NTCP2FailState(toRead, validFramesRcvd);
_curReadState.receive(buf); _curReadState.receive(buf);
}
} else { } else {
if (_log.shouldWarn()) if (_log.shouldWarn())
_log.warn("immediate close, AEAD failure after " + validFramesRcvd + _log.warn("immediate close, AEAD failure after " + validFramesRcvd +