Updates to address review comments

Catch CKE setting interest
javadoc tweaks
log tweaks
This commit is contained in:
zzz
2021-12-18 06:53:40 -05:00
parent 03ae919b56
commit 6204d0afa6
2 changed files with 34 additions and 15 deletions

View File

@@ -770,6 +770,7 @@ class EventPumper implements Runnable {
else
setInterest(key, SelectionKey.OP_WRITE);
}
// catch and close outside the write lock to avoid deadlocks in NTCPCon.locked_close()
} catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, cke);
_context.statManager().addRateData("ntcp.writeError", 1);
@@ -908,6 +909,9 @@ class EventPumper implements Runnable {
public long getIdleTimeout() { return _expireIdleWriteTime; }
/**
* Warning - caller should catch unchecked CancelledKeyException
*
* @throws CancelledKeyException which is unchecked
* @since 0.9.53
*/
public static void setInterest(SelectionKey key, int op) throws CancelledKeyException {
@@ -919,6 +923,9 @@ class EventPumper implements Runnable {
}
/**
* Warning - caller should catch unchecked CancelledKeyException
*
* @throws CancelledKeyException which is unchecked
* @since 0.9.53
*/
public static void clearInterest(SelectionKey key, int op) throws CancelledKeyException {

View File

@@ -5,6 +5,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.Inet6Address;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException;
@@ -111,8 +112,14 @@ public class NTCPConnection implements Closeable {
// prevent sending meta before established
private long _nextMetaTime = Long.MAX_VALUE;
private final AtomicInteger _consecutiveZeroReads = new AtomicInteger();
// This lock covers:
// _curReadState
private final Object _readLock = new Object();
// This lock covers:
// _writeBufs, _currentOutbound, _outbound, _sendSipk1, _sendSipk2, _sendSipIV, _sender
private final Object _writeLock = new Object();
// This lock covers:
// _bytesReceived, _bytesSent, _lastBytesReceived, _lastBytesSent, _sendBps, _recvBps
private final Object _statLock = new Object();
private static final int BLOCK_SIZE = 16;
@@ -619,16 +626,14 @@ public class NTCPConnection implements Closeable {
*
*/
void prepareNextWrite(PrepBuffer prep) {
synchronized(this) {
if (_closed.get())
return;
// Must be established or else session key is null and we can't encrypt
// This is normal for OB conns but can happen rarely for IB also.
// wantsWrite() is called at end of OB establishment, and
// enqueueInfoMessage() is called at end of IB establishment.
if (!isEstablished()) {
return;
}
if (_closed.get())
return;
// Must be established or else session key is null and we can't encrypt
// This is normal for OB conns but can happen rarely for IB also.
// wantsWrite() is called at end of OB establishment, and
// enqueueInfoMessage() is called at end of IB establishment.
if (!isEstablished()) {
return;
}
synchronized(_writeLock) {
@@ -851,9 +856,10 @@ public class NTCPConnection implements Closeable {
*/
void sendTerminationAndClose() {
ReadState rs = null;
synchronized (_readLock) {
if (_version == 2 && isEstablished())
if (_version == 2 && isEstablished()) {
synchronized (_readLock) {
rs = _curReadState;
}
}
if (rs != null)
sendTermination(REASON_TIMEOUT, rs.getFramesReceived());
@@ -957,7 +963,12 @@ public class NTCPConnection implements Closeable {
try {_chan.close(); } catch (IOException ignored) {}
return;
}
EventPumper.setInterest(_conKey, SelectionKey.OP_READ);
try {
EventPumper.setInterest(_conKey, SelectionKey.OP_READ);
} catch (CancelledKeyException cke) {
try {_chan.close(); } catch (IOException ignored) {}
return;
}
// schedule up the beginning of our handshaking by calling prepareNextWrite on the
// writer thread pool
_transport.getWriter().wantsWrite(this, "outbound connected");
@@ -1104,8 +1115,8 @@ public class NTCPConnection implements Closeable {
if (_isInbound || isEstablished()) {
// Attempt to write directly
if (!pumper.processWrite(this, getKey())) {
if (_log.shouldWarn())
_log.warn("Async write not completed, pending bufs: " + _writeBufs.size() + " on " + this);
if (_log.shouldDebug())
_log.debug("Async write not completed, pending bufs: " + _writeBufs.size() + " on " + this);
// queue it up
pumper.wantsWrite(this);
}
@@ -1192,6 +1203,7 @@ public class NTCPConnection implements Closeable {
}
}
// following fields covered by _statLock
private long _bytesReceived;
private long _bytesSent;
/** _bytesReceived when we last updated the rate */