I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 9741d127 authored by zzz's avatar zzz
Browse files

* NTCP:

   - Fix NPE with more syncing (hopefully) (ticket #770)
   - Use ByteCache for 16KB buffers
parent 8efc7e93
No related branches found
No related tags found
No related merge requests found
......@@ -14,6 +14,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.Adler32;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo;
......@@ -28,6 +29,7 @@ import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.router.util.PriBlockingQueue;
import net.i2p.util.ByteCache;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.HexDump;
import net.i2p.util.Log;
......@@ -142,6 +144,8 @@ class NTCPConnection {
* In the meantime, don't let the transport bid on big messages.
*/
public static final int BUFFER_SIZE = 16*1024;
private static final int MAX_DATA_READ_BUFS = 16;
private static final ByteCache _dataReadBufs = ByteCache.getInstance(MAX_DATA_READ_BUFS, BUFFER_SIZE);
/** 2 bytes for length and 4 for CRC */
public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4);
......@@ -218,11 +222,12 @@ class NTCPConnection {
public NTCPAddress getRemoteAddress() { return _remAddr; }
public RouterIdentity getRemotePeer() { return _remotePeer; }
public void setRemotePeer(RouterIdentity ident) { _remotePeer = ident; }
/**
* @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should
* be under 1 minute)
*/
public void finishInboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
public synchronized void finishInboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
_sessionKey = key;
_clockSkew = clockSkew;
_prevWriteEnd = prevWriteEnd;
......@@ -284,8 +289,10 @@ class NTCPConnection {
}
public boolean isClosed() { return _closed; }
public void close() { close(false); }
public void close(boolean allowRequeue) {
public synchronized void close(boolean allowRequeue) {
if (_log.shouldLog(Log.INFO))
_log.info("Closing connection " + toString(), new Exception("cause"));
_closed = true;
......@@ -489,7 +496,7 @@ class NTCPConnection {
* @param clockSkew alice's clock minus bob's clock in seconds (may be negative, obviously, but |val| should
* be under 1 minute)
*/
public void finishOutboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
public synchronized void finishOutboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("outbound established (key=" + key + " skew=" + clockSkew + " prevWriteEnd=" + Base64.encode(prevWriteEnd) + ")");
_sessionKey = key;
......@@ -532,8 +539,6 @@ class NTCPConnection {
* prepare the next i2np message for transmission. this should be run from
* the Writer thread pool.
*
* Todo: remove synchronization?
*
*/
synchronized void prepareNextWrite() {
//if (FAST_LARGE)
......@@ -645,14 +650,17 @@ class NTCPConnection {
* prepare the next i2np message for transmission. this should be run from
* the Writer thread pool.
*
* Todo: remove synchronization?
* Caller must synchronize.
*
*/
synchronized void prepareNextWriteFast() {
private void prepareNextWriteFast() {
if (_closed)
return;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
if (!_isInbound && !_established) {
if (_establishState == null) {
// shouldn't happen
_establishState = new EstablishState(_context, _transport, this);
_establishState.prepareOutbound();
} else {
......@@ -1290,30 +1298,12 @@ class NTCPConnection {
//public long getReadTime() { return _curReadState.getReadTime(); }
/**
* Just a byte array now (used to have a BAIS in it too,
* but that required an extra copy in the message handler)
*/
private static class DataBuf {
final byte data[];
public DataBuf() {
data = new byte[BUFFER_SIZE];
}
}
private static final int MAX_DATA_READ_BUFS = 16;
private final static LinkedBlockingQueue<DataBuf> _dataReadBufs = new LinkedBlockingQueue(MAX_DATA_READ_BUFS);
private static DataBuf acquireReadBuf() {
DataBuf rv = _dataReadBufs.poll();
if (rv != null)
return rv;
return new DataBuf();
private static ByteArray acquireReadBuf() {
return _dataReadBufs.acquire();
}
private static void releaseReadBuf(DataBuf buf) {
_dataReadBufs.offer(buf);
private static void releaseReadBuf(ByteArray buf) {
_dataReadBufs.release(buf, false);
}
/**
......@@ -1322,7 +1312,6 @@ class NTCPConnection {
*/
static void releaseResources() {
_i2npHandlers.clear();
_dataReadBufs.clear();
_bufs.clear();
}
......@@ -1346,7 +1335,7 @@ class NTCPConnection {
*/
private class ReadState {
private int _size;
private DataBuf _dataBuf;
private ByteArray _dataBuf;
private int _nextWrite;
private long _expectedCrc;
private final Adler32 _crc;
......@@ -1404,7 +1393,7 @@ class NTCPConnection {
} else {
_stateBegin = System.currentTimeMillis();
_dataBuf = acquireReadBuf();
System.arraycopy(buf, 2, _dataBuf.data, 0, buf.length-2);
System.arraycopy(buf, 2, _dataBuf.getData(), 0, buf.length-2);
_nextWrite += buf.length-2;
_crc.update(buf);
_blocks++;
......@@ -1419,7 +1408,7 @@ class NTCPConnection {
int remaining = _size - _nextWrite;
int blockUsed = Math.min(buf.length, remaining);
if (remaining > 0) {
System.arraycopy(buf, 0, _dataBuf.data, _nextWrite, blockUsed);
System.arraycopy(buf, 0, _dataBuf.getData(), _nextWrite, blockUsed);
_nextWrite += blockUsed;
remaining -= blockUsed;
}
......@@ -1458,7 +1447,7 @@ class NTCPConnection {
// We could extend BAIS to adjust the protected count variable to _size
// so that readBytes() doesn't read too far, but it could still read too far.
// So use the new handler method that limits the size.
h.readMessage(_dataBuf.data, 0, _size);
h.readMessage(_dataBuf.getData(), 0, _size);
I2NPMessage read = h.lastRead();
long timeToRecv = System.currentTimeMillis() - _stateBegin;
releaseHandler(h);
......@@ -1481,8 +1470,8 @@ class NTCPConnection {
} catch (I2NPMessageException ime) {
if (_log.shouldLog(Log.WARN)) {
_log.warn("Error parsing I2NP message", ime);
_log.warn("DUMP:\n" + HexDump.dump(_dataBuf.data, 0, _size));
_log.warn("RAW:\n" + Base64.encode(_dataBuf.data, 0, _size));
_log.warn("DUMP:\n" + HexDump.dump(_dataBuf.getData(), 0, _size));
_log.warn("RAW:\n" + Base64.encode(_dataBuf.getData(), 0, _size));
}
_context.statManager().addRateData("ntcp.corruptI2NPIME", 1);
// Don't close the con, possible attack vector, not necessarily the peer's fault,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment