* NTCP Pumper:

- Ensure failsafe pumper code gets run on schedule
    - Don't copy the read buffers
    - Adjust minimum read buffers based on memory
    - New i2np.ntcp.useDirectBuffer option (default false)
    - Mark peer unreachable when read failure is during establishment
    - Change some Reader Lists to Sets to avoid linear search
    - Log tweaks, debugging, new loop stats
This commit is contained in:
zzz
2011-11-21 18:22:13 +00:00
parent dc6c568e9f
commit 1119612684
7 changed files with 208 additions and 82 deletions

View File

@@ -1,3 +1,13 @@
2011-11-21 zzz
* NTCP Pumper:
- Ensure failsafe pumper code gets run on schedule
- Don't copy the read buffers
- Adjust minimum read buffers based on memory
- New i2np.ntcp.useDirectBuffer option (default false)
- Mark peer unreachable when read failure is during establishment
- Change some Reader Lists to Sets to avoid linear search
- Log tweaks, debugging, new loop stats
2011-11-18 zzz
* NTCP:
- First cut at improving EventPumper performance (ticket #551)

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 5;
public final static long BUILD = 6;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -147,9 +147,12 @@ class EstablishState {
/**
* parse the contents of the buffer as part of the handshake. if the
* handshake is completed and there is more data remaining, the buffer is
* updated so that the next read will be the (still encrypted) remaining
* handshake is completed and there is more data remaining, the data are
* copieed out so that the next read will be the (still encrypted) remaining
* data (available from getExtraBytes)
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*/
public void receive(ByteBuffer src) {
if (_corrupt || _verified)
@@ -176,6 +179,9 @@ class EstablishState {
/**
* we are Bob, so receive these bytes as part of an inbound connection
* This method receives messages 1 and 3, and sends messages 2 and 4.
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*/
private void receiveInbound(ByteBuffer src) {
if (_log.shouldLog(Log.DEBUG))
@@ -340,6 +346,9 @@ class EstablishState {
/**
* We are Alice, so receive these bytes as part of an outbound connection.
* This method receives messages 2 and 4, and sends message 3.
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*/
private void receiveOutbound(ByteBuffer src) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"Receive outbound " + src + " received=" + _received);
@@ -684,7 +693,11 @@ class EstablishState {
_transport.getPumper().wantsWrite(_con, _e_bobSig);
}
/** anything left over in the byte buffer after verification is extra */
/** Anything left over in the byte buffer after verification is extra
*
* All data must be copied out of the buffer as Reader.processRead()
* will return it to the pool.
*/
private void prepareExtra(ByteBuffer buf) {
int remaining = buf.remaining();
if (remaining > 0) {

View File

@@ -19,6 +19,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
import net.i2p.router.RouterContext;
@@ -44,12 +45,21 @@ class EventPumper implements Runnable {
private final Queue<NTCPConnection> _wantsConRegister = new ConcurrentLinkedQueue<NTCPConnection>();
private final NTCPTransport _transport;
private long _expireIdleWriteTime;
private boolean _useDirect;
/**
* This probably doesn't need to be bigger than the largest typical
* message, which is a 5-slot VTBM (~2700 bytes).
* The occasional larger message can use multiple buffers.
*/
private static final int BUF_SIZE = 8*1024;
private static final int MAX_CACHE_SIZE = 64;
/**
* Read buffers. (write buffers use wrap())
* Shared if there are multiple routers in the JVM
* Note that if the routers have different PROP_DIRECT settings this will have a mix,
* so don't do that.
*/
private static final LinkedBlockingQueue<ByteBuffer> _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
@@ -67,11 +77,30 @@ class EventPumper implements Runnable {
private static final long MIN_EXPIRE_IDLE_TIME = 135*1000l;
private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l;
/**
* Do we use direct buffers for reading? Default false.
* @see java.nio.ByteBuffer
*/
private static final String PROP_DIRECT = "i2np.ntcp.useDirectBuffers";
private static final int MIN_MINB = 4;
private static final int MAX_MINB = 12;
private static final int MIN_BUFS;
static {
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
MIN_BUFS = (int) Math.max(MIN_MINB, Math.min(MAX_MINB, 1 + (maxMemory / (16*1024*1024))));
}
public EventPumper(RouterContext ctx, NTCPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_transport = transport;
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
_context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} );
_context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} );
}
public synchronized void startPumping() {
@@ -130,33 +159,32 @@ class EventPumper implements Runnable {
* take care to minimize overhead and unnecessary debugging stuff.
*/
public void run() {
int loopCount = 0;
long lastFailsafeIteration = System.currentTimeMillis();
while (_alive && _selector.isOpen()) {
try {
loopCount++;
runDelayedEvents();
int count = 0;
try {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("before select...");
count = _selector.select(SELECTOR_LOOP_DELAY);
int count = _selector.select(SELECTOR_LOOP_DELAY);
if (count > 0) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("select returned " + count);
Set<SelectionKey> selected = _selector.selectedKeys();
_context.statManager().addRateData("ntcp.pumperKeysPerLoop", selected.size());
processKeys(selected);
// does clear() do anything useful?
selected.clear();
}
} catch (ClosedSelectorException cse) {
continue;
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error selecting", ioe);
}
if (count <= 0)
continue;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("select returned " + count);
Set<SelectionKey> selected;
try {
selected = _selector.selectedKeys();
} catch (ClosedSelectorException cse) {
continue;
}
processKeys(selected);
selected.clear();
if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < System.currentTimeMillis()) {
// in the *cough* unthinkable possibility that there are bugs in
@@ -166,6 +194,9 @@ class EventPumper implements Runnable {
lastFailsafeIteration = System.currentTimeMillis();
try {
Set<SelectionKey> all = _selector.keys();
_context.statManager().addRateData("ntcp.pumperKeySetSize", all.size());
_context.statManager().addRateData("ntcp.pumperLoopsPerSecond", loopCount / (FAILSAFE_ITERATION_FREQ / 1000));
loopCount = 0;
int failsafeWrites = 0;
int failsafeCloses = 0;
@@ -203,7 +234,7 @@ class EventPumper implements Runnable {
(!((SocketChannel)key.channel()).isConnectionPending()) &&
con.getTimeSinceCreated() > 2 * NTCPTransport.ESTABLISH_TIMEOUT) {
if (_log.shouldLog(Log.INFO))
_log.info("Invalid key " + con);
_log.info("Removing invalid key for " + con);
// this will cancel the key, and it will then be removed from the keyset
con.close();
failsafeInvalid++;
@@ -239,6 +270,13 @@ class EventPumper implements Runnable {
continue;
}
}
// Clear the cache if the user changes the setting,
// so we can test the effect.
boolean newUseDirect = _context.getBooleanProperty(PROP_DIRECT);
if (_useDirect != newUseDirect) {
_useDirect = newUseDirect;
_bufCache.clear();
}
} catch (RuntimeException re) {
_log.error("Error in the event pumper", re);
}
@@ -310,14 +348,16 @@ class EventPumper implements Runnable {
}
if (read) {
//_context.statManager().addRateData("ntcp.read", 1, 0);
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
processRead(key);
}
if (write) {
//_context.statManager().addRateData("ntcp.write", 1, 0);
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
processWrite(key);
}
//if (!(accept || connect || read || write)) {
// if (_log.shouldLog(Log.INFO))
// _log.info("key wanted nothing? con: " + key.attachment());
//}
} catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("key cancelled");
@@ -365,25 +405,26 @@ class EventPumper implements Runnable {
_wantsRead.offer(con);
_selector.wakeup();
}
private static final int MIN_BUFS = 5;
/**
* How many to keep in reserve.
* Shared if there are multiple routers in the JVM
*/
private static int _numBufs = MIN_BUFS;
private static int __liveBufs = 0;
private static int __consecutiveExtra;
/**
* High-frequency path in thread.
*/
private static ByteBuffer acquireBuf() {
private ByteBuffer acquireBuf() {
ByteBuffer rv = _bufCache.poll();
if (rv == null) {
rv = ByteBuffer.allocate(BUF_SIZE);
_numBufs = ++__liveBufs;
// discard buffer if _useDirect setting changes
if (rv == null || rv.isDirect() != _useDirect) {
if (_useDirect)
rv = ByteBuffer.allocateDirect(BUF_SIZE);
else
rv = ByteBuffer.allocate(BUF_SIZE);
_numBufs++;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);
//_context.statManager().addRateData("ntcp.liveReadBufs", NUM_BUFS, 0);
@@ -395,27 +436,35 @@ class EventPumper implements Runnable {
}
/**
* Return a read buffer to the pool.
* These buffers must be from acquireBuf(), i.e. capacity() == BUF_SIZE.
* High-frequency path in thread.
*/
private static void releaseBuf(ByteBuffer buf) {
public static void releaseBuf(ByteBuffer buf) {
//if (false) return;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf);
// double check
if (buf.capacity() < BUF_SIZE) {
I2PAppContext.getGlobalContext().logManager().getLog(EventPumper.class).error("Bad size " + buf.capacity(), new Exception());
return;
}
buf.clear();
int extra = _bufCache.size();
boolean cached = extra < _numBufs;
// TODO always offer if direct?
if (cached) {
_bufCache.offer(buf);
if (extra > 5) {
if (extra > MIN_BUFS) {
__consecutiveExtra++;
if (__consecutiveExtra >= 20) {
_numBufs = Math.max(_numBufs - 1, MIN_BUFS);
if (_numBufs > MIN_BUFS)
_numBufs--;
__consecutiveExtra = 0;
}
}
} else {
__liveBufs--;
}
//if (cached && _log.shouldLog(Log.DEBUG))
// _log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live");
@@ -465,7 +514,7 @@ class EventPumper implements Runnable {
SocketChannel chan = con.getChannel();
boolean connected = chan.finishConnect();
if (_log.shouldLog(Log.DEBUG))
_log.debug("processing connect for " + key + " / " + con + ": connected? " + connected);
_log.debug("processing connect for " + con + ": connected? " + connected);
if (connected) {
// BUGFIX for firewalls. --Sponge
chan.socket().setKeepAlive(true);
@@ -478,18 +527,22 @@ class EventPumper implements Runnable {
_context.statManager().addRateData("ntcp.connectFailedTimeout", 1);
}
} catch (IOException ioe) { // this is the usual failure path for a timeout or connect refused
if (_log.shouldLog(Log.WARN))
_log.warn("Failed outbound connection to " + con.getRemotePeer().calculateHash(), ioe);
if (_log.shouldLog(Log.INFO))
_log.info("Failed outbound " + con, ioe);
con.close();
//_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "Error connecting", NTCPTransport.STYLE);
_transport.markUnreachable(con.getRemotePeer().calculateHash());
_context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1);
} catch (NoConnectionPendingException ncpe) {
// ignore
if (_log.shouldLog(Log.WARN))
_log.warn("error connecting on " + con, ncpe);
}
}
/**
* OP_READ will always be set before this is called.
* This method will disable the interest if no more reads remain because of inbound bandwidth throttling.
* High-frequency path in thread.
*/
private void processRead(SelectionKey key) {
@@ -499,21 +552,18 @@ class EventPumper implements Runnable {
int read = con.getChannel().read(buf);
if (read == -1) {
//if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con);
_context.statManager().addRateData("ntcp.readEOF", 1);
//_context.statManager().addRateData("ntcp.readEOF", 1);
con.close();
releaseBuf(buf);
} else if (read == 0) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("nothing to read for " + con + ", but stay interested");
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
// stay interested
//key.interestOps(key.interestOps() | SelectionKey.OP_READ);
releaseBuf(buf);
} else if (read > 0) {
byte data[] = new byte[read];
// ZERO COPY. The buffer will be returned in Reader.processRead()
buf.flip();
buf.get(data);
releaseBuf(buf);
buf = null;
ByteBuffer rbuf = ByteBuffer.wrap(data);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
if (req.getPendingInboundRequested() > 0) {
// rare since we generally don't throttle inbound
@@ -521,31 +571,49 @@ class EventPumper implements Runnable {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore");
_context.statManager().addRateData("ntcp.queuedRecv", read);
con.queuedRecv(rbuf, req);
con.queuedRecv(buf, req);
} else {
// fully allocated
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("not bw throttled reading for " + con);
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
con.recv(rbuf);
// stay interested
//key.interestOps(key.interestOps() | SelectionKey.OP_READ);
con.recv(buf);
_context.statManager().addRateData("ntcp.read", read);
}
}
} catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", cke);
releaseBuf(buf);
if (_log.shouldLog(Log.WARN)) _log.warn("error reading on " + con, cke);
con.close();
_context.statManager().addRateData("ntcp.readError", 1);
if (buf != null) releaseBuf(buf);
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe);
// common, esp. at outbound connect time
releaseBuf(buf);
if (_log.shouldLog(Log.INFO))
_log.info("error reading on " + con, ioe);
if (con.isEstablished()) {
_context.statManager().addRateData("ntcp.readError", 1);
} else {
// Usually "connection reset by peer", probably a conn limit rejection?
// although it could be a read failure during the DH handshake
// Same stat as in processConnect()
_context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1);
_transport.markUnreachable(con.getRemotePeer().calculateHash());
}
con.close();
_context.statManager().addRateData("ntcp.readError", 1);
if (buf != null) releaseBuf(buf);
} catch (NotYetConnectedException nyce) {
releaseBuf(buf);
// ???
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
if (_log.shouldLog(Log.WARN))
_log.warn("error reading on " + con, nyce);
}
}
/**
* OP_WRITE will always be set before this is called.
* This method will disable the interest if no more writes remain.
* High-frequency path in thread.
*/
private void processWrite(SelectionKey key) {
@@ -573,14 +641,17 @@ class EventPumper implements Runnable {
if (written == 0) {
if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) {
//if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains...");
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
// stay interested
//key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
//if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains...");
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
break;
} else if (buf.remaining() > 0) {
//if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining...");
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
// stay interested
//key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
break;
} else {
//long beforeRem = System.currentTimeMillis();
@@ -592,17 +663,20 @@ class EventPumper implements Runnable {
//releaseBuf(buf);
//buffers++;
//if (buffer time is too much, add OP_WRITe to the interest ops and break?)
// LOOP
}
} else {
// Nothing more to write
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
break;
}
}
} catch (CancelledKeyException cke) {
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", cke);
if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, cke);
_context.statManager().addRateData("ntcp.writeError", 1);
con.close();
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe);
if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, ioe);
_context.statManager().addRateData("ntcp.writeError", 1);
con.close();
}

View File

@@ -165,7 +165,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_remAddr = remAddr;
_readBufs = new ConcurrentLinkedQueue();
_writeBufs = new ConcurrentLinkedQueue();
_bwRequests = new ConcurrentHashSet(2);
_bwRequests = new ConcurrentHashSet(8);
// TODO possible switch to CLQ but beware non-constant size() - see below
_outbound = new LinkedBlockingQueue();
_isInbound = false;
@@ -257,9 +257,17 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
for (Iterator<FIFOBandwidthLimiter.Request> iter = _bwRequests.iterator(); iter.hasNext(); ) {
iter.next().abort();
// we would like to return read ByteBuffers via EventPumper.releaseBuf(),
// but we can't risk releasing it twice
}
_bwRequests.clear();
_writeBufs.clear();
ByteBuffer bb;
while ((bb = _readBufs.poll()) != null) {
EventPumper.releaseBuf(bb);
}
OutNetMessage msg;
while ((msg = _outbound.poll()) != null) {
Object buf = msg.releasePreparationBuffer();
@@ -789,10 +797,18 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_transport.getWriter().wantsWrite(this, "outbound connected");
}
/**
* The FifoBandwidthLimiter.CompleteListener callback.
* Does the delayed read or write.
*/
public void complete(FIFOBandwidthLimiter.Request req) {
removeRequest(req);
ByteBuffer buf = (ByteBuffer)req.attachment();
if (req.getTotalInboundRequested() > 0) {
if (_closed) {
EventPumper.releaseBuf(buf);
return;
}
_context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime()));
recv(buf);
// our reads used to be bw throttled (during which time we were no
@@ -800,7 +816,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
// throttled anymore, so we should resume being interested in reading
_transport.getPumper().wantsRead(this);
//_transport.getReader().wantsRead(this);
} else if (req.getTotalOutboundRequested() > 0) {
} else if (req.getTotalOutboundRequested() > 0 && !_closed) {
_context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()));
write(buf);
}
@@ -836,7 +852,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/**
* The contents of the buffer have been read and can be processed asap.
* This should not block, and the NTCP connection now owns the buffer
* to do with as it pleases.
* to do with as it pleases BUT it should eventually copy out the data
* and call EventPumper.releaseBuf().
*/
public void recv(ByteBuffer buf) {
_bytesReceived += buf.remaining();
@@ -977,8 +994,11 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* encrypted and encoded I2NP messages. individual i2np messages are
* encoded as "sizeof(data)+data+pad+crc", and those are encrypted
* with the session key and the last 16 bytes of the previous encrypted
* i2np message. the contents of the buffer is owned by the EventPumper,
* so data should be copied out.
* i2np message.
*
* The NTCP connection now owns the buffer
* BUT it must copy out the data
* as reader will call EventPumper.releaseBuf().
*/
synchronized void recvEncryptedI2NP(ByteBuffer buf) {
//if (_log.shouldLog(Log.DEBUG))
@@ -1010,7 +1030,12 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
}
/** _decryptBlockBuf contains another cleartext block of I2NP to parse */
/**
* Append the next 16 bytes of cleartext to the read state.
* _decryptBlockBuf contains another cleartext block of I2NP to parse.
* Caller must synchronize!
* @return success
*/
private boolean recvUnencryptedI2NP() {
_curReadState.receiveBlock(_decryptBlockBuf);
if (_curReadState.getSize() > BUFFER_SIZE) {

View File

@@ -89,7 +89,7 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.connectFailedIOE", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.bidRejectedLocalAddress", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.connectFailedTimeout", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.connectFailedTimeoutIOE", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.connectFailedUnresolved", "", "ntcp", RATES);
@@ -124,8 +124,8 @@ public class NTCPTransport extends TransportImpl {
_context.statManager().createRateStat("ntcp.invalidOutboundSkew", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.noBidTooLargeI2NP", "send size", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES);
//_context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.readError", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", RATES);
_context.statManager().createRateStat("ntcp.receiveMeta", "", "ntcp", RATES);
@@ -289,7 +289,7 @@ public class NTCPTransport extends TransportImpl {
if (addr == null) {
markUnreachable(peer);
_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1);
//_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1);
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE);
if (_log.shouldLog(Log.DEBUG))
_log.debug("no bid when trying to send to " + peer.toBase64() + " as they don't have an ntcp address");

View File

@@ -2,7 +2,9 @@ package net.i2p.router.transport.ntcp;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
@@ -19,17 +21,17 @@ class Reader {
private final Log _log;
// TODO change to LBQ ??
private final List<NTCPConnection> _pendingConnections;
private final List<NTCPConnection> _liveReads;
private final List<NTCPConnection> _readAfterLive;
private final Set<NTCPConnection> _liveReads;
private final Set<NTCPConnection> _readAfterLive;
private final List<Runner> _runners;
public Reader(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
_pendingConnections = new ArrayList(16);
_runners = new ArrayList(5);
_liveReads = new ArrayList(5);
_readAfterLive = new ArrayList();
_runners = new ArrayList(8);
_liveReads = new HashSet(8);
_readAfterLive = new HashSet(8);
}
public void startReading(int numReaders) {
@@ -40,6 +42,7 @@ class Reader {
t.start();
}
}
public void stopReading() {
while (!_runners.isEmpty()) {
Runner r = _runners.remove(0);
@@ -55,9 +58,7 @@ class Reader {
boolean already = false;
synchronized (_pendingConnections) {
if (_liveReads.contains(con)) {
if (!_readAfterLive.contains(con)) {
_readAfterLive.add(con);
}
_readAfterLive.add(con);
already = true;
} else if (!_pendingConnections.contains(con)) {
_pendingConnections.add(con);
@@ -78,8 +79,11 @@ class Reader {
private class Runner implements Runnable {
private boolean _stop;
public Runner() { _stop = false; }
public Runner() {}
public void stop() { _stop = true; }
public void run() {
if (_log.shouldLog(Log.INFO)) _log.info("Starting reader");
NTCPConnection con = null;
@@ -118,7 +122,8 @@ class Reader {
}
/**
* process everything read
* Process everything read.
* Return read buffers back to the pool as we process them.
*/
private void processRead(NTCPConnection con) {
if (con.isClosed())
@@ -129,6 +134,7 @@ class Reader {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]");
if (est == null) {
EventPumper.releaseBuf(buf);
if (!con.isEstablished()) {
// establish state is only removed when the connection is fully established,
// yet if that happens, con.isEstablished() should return true...
@@ -144,9 +150,11 @@ class Reader {
// why is it complete yet !con.isEstablished?
_log.error("establishment state [" + est + "] is complete, yet the connection isn't established? "
+ con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")");
EventPumper.releaseBuf(buf);
break;
}
est.receive(buf);
EventPumper.releaseBuf(buf);
if (est.isCorrupt()) {
if (_log.shouldLog(Log.WARN))
_log.warn("closing connection on establishment because: " +est.getError(), est.getException());
@@ -154,9 +162,6 @@ class Reader {
_context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1);
con.close();
return;
} else if (buf.remaining() <= 0) {
// not necessary, getNextReadBuf() removes
//con.removeReadBuf(buf);
}
if (est.isComplete() && est.getExtraBytes() != null)
con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes()));
@@ -169,8 +174,7 @@ class Reader {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)");
con.recvEncryptedI2NP(buf);
// not necessary, getNextReadBuf() removes
//con.removeReadBuf(buf);
EventPumper.releaseBuf(buf);
}
}
}