From 9741d127a92d59eaf898946bf00f6071dd1c8750 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Tue, 13 Nov 2012 20:35:47 +0000 Subject: [PATCH] * NTCP: - Fix NPE with more syncing (hopefully) (ticket #770) - Use ByteCache for 16KB buffers --- .../router/transport/ntcp/NTCPConnection.java | 61 ++++++++----------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index b646c8b1a9..96951cc5f3 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -14,6 +14,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.Adler32; import net.i2p.data.Base64; +import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; import net.i2p.data.RouterIdentity; import net.i2p.data.RouterInfo; @@ -28,6 +29,7 @@ import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; import net.i2p.router.util.CoDelPriorityBlockingQueue; import net.i2p.router.util.PriBlockingQueue; +import net.i2p.util.ByteCache; import net.i2p.util.ConcurrentHashSet; import net.i2p.util.HexDump; import net.i2p.util.Log; @@ -142,6 +144,8 @@ class NTCPConnection { * In the meantime, don't let the transport bid on big messages. */ public static final int BUFFER_SIZE = 16*1024; + private static final int MAX_DATA_READ_BUFS = 16; + private static final ByteCache _dataReadBufs = ByteCache.getInstance(MAX_DATA_READ_BUFS, BUFFER_SIZE); /** 2 bytes for length and 4 for CRC */ public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4); @@ -218,11 +222,12 @@ class NTCPConnection { public NTCPAddress getRemoteAddress() { return _remAddr; } public RouterIdentity getRemotePeer() { return _remotePeer; } public void setRemotePeer(RouterIdentity ident) { _remotePeer = ident; } + /** * @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should * be under 1 minute) */ - public void finishInboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) { + public synchronized void finishInboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) { _sessionKey = key; _clockSkew = clockSkew; _prevWriteEnd = prevWriteEnd; @@ -284,8 +289,10 @@ class NTCPConnection { } public boolean isClosed() { return _closed; } + public void close() { close(false); } - public void close(boolean allowRequeue) { + + public synchronized void close(boolean allowRequeue) { if (_log.shouldLog(Log.INFO)) _log.info("Closing connection " + toString(), new Exception("cause")); _closed = true; @@ -489,7 +496,7 @@ class NTCPConnection { * @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should * be under 1 minute) */ - public void finishOutboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) { + public synchronized void finishOutboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) { if (_log.shouldLog(Log.DEBUG)) _log.debug("outbound established (key=" + key + " skew=" + clockSkew + " prevWriteEnd=" + Base64.encode(prevWriteEnd) + ")"); _sessionKey = key; @@ -532,8 +539,6 @@ class NTCPConnection { * prepare the next i2np message for transmission. this should be run from * the Writer thread pool. * - * Todo: remove synchronization? - * */ synchronized void prepareNextWrite() { //if (FAST_LARGE) @@ -645,14 +650,17 @@ class NTCPConnection { * prepare the next i2np message for transmission. this should be run from * the Writer thread pool. * - * Todo: remove synchronization? + * Caller must synchronize. * */ - synchronized void prepareNextWriteFast() { + private void prepareNextWriteFast() { + if (_closed) + return; //if (_log.shouldLog(Log.DEBUG)) // _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established); if (!_isInbound && !_established) { if (_establishState == null) { + // shouldn't happen _establishState = new EstablishState(_context, _transport, this); _establishState.prepareOutbound(); } else { @@ -1290,30 +1298,12 @@ class NTCPConnection { //public long getReadTime() { return _curReadState.getReadTime(); } - /** - * Just a byte array now (used to have a BAIS in it too, - * but that required an extra copy in the message handler) - */ - private static class DataBuf { - final byte data[]; - - public DataBuf() { - data = new byte[BUFFER_SIZE]; - } - } - - private static final int MAX_DATA_READ_BUFS = 16; - private final static LinkedBlockingQueue<DataBuf> _dataReadBufs = new LinkedBlockingQueue(MAX_DATA_READ_BUFS); - - private static DataBuf acquireReadBuf() { - DataBuf rv = _dataReadBufs.poll(); - if (rv != null) - return rv; - return new DataBuf(); + private static ByteArray acquireReadBuf() { + return _dataReadBufs.acquire(); } - private static void releaseReadBuf(DataBuf buf) { - _dataReadBufs.offer(buf); + private static void releaseReadBuf(ByteArray buf) { + _dataReadBufs.release(buf, false); } /** @@ -1322,7 +1312,6 @@ class NTCPConnection { */ static void releaseResources() { _i2npHandlers.clear(); - _dataReadBufs.clear(); _bufs.clear(); } @@ -1346,7 +1335,7 @@ class NTCPConnection { */ private class ReadState { private int _size; - private DataBuf _dataBuf; + private ByteArray _dataBuf; private int _nextWrite; private long _expectedCrc; private final Adler32 _crc; @@ -1404,7 +1393,7 @@ class NTCPConnection { } else { _stateBegin = System.currentTimeMillis(); _dataBuf = acquireReadBuf(); - System.arraycopy(buf, 2, _dataBuf.data, 0, buf.length-2); + System.arraycopy(buf, 2, _dataBuf.getData(), 0, buf.length-2); _nextWrite += buf.length-2; _crc.update(buf); _blocks++; @@ -1419,7 +1408,7 @@ class NTCPConnection { int remaining = _size - _nextWrite; int blockUsed = Math.min(buf.length, remaining); if (remaining > 0) { - System.arraycopy(buf, 0, _dataBuf.data, _nextWrite, blockUsed); + System.arraycopy(buf, 0, _dataBuf.getData(), _nextWrite, blockUsed); _nextWrite += blockUsed; remaining -= blockUsed; } @@ -1458,7 +1447,7 @@ class NTCPConnection { // We could extend BAIS to adjust the protected count variable to _size // so that readBytes() doesn't read too far, but it could still read too far. // So use the new handler method that limits the size. - h.readMessage(_dataBuf.data, 0, _size); + h.readMessage(_dataBuf.getData(), 0, _size); I2NPMessage read = h.lastRead(); long timeToRecv = System.currentTimeMillis() - _stateBegin; releaseHandler(h); @@ -1481,8 +1470,8 @@ class NTCPConnection { } catch (I2NPMessageException ime) { if (_log.shouldLog(Log.WARN)) { _log.warn("Error parsing I2NP message", ime); - _log.warn("DUMP:\n" + HexDump.dump(_dataBuf.data, 0, _size)); - _log.warn("RAW:\n" + Base64.encode(_dataBuf.data, 0, _size)); + _log.warn("DUMP:\n" + HexDump.dump(_dataBuf.getData(), 0, _size)); + _log.warn("RAW:\n" + Base64.encode(_dataBuf.getData(), 0, _size)); } _context.statManager().addRateData("ntcp.corruptI2NPIME", 1); // Don't close the con, possible attack vector, not necessarily the peer's fault, -- GitLab