I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit e3353df8 authored by zzz's avatar zzz
Browse files

* NTCP Transport:

      - Replace lists with concurrent queues in EventPumper
        and NTCPConnection to remove global locks
      - Java 5 cleanup
parent 25285fc0
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ import java.util.ArrayList; ...@@ -17,6 +17,7 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.RouterIdentity; import net.i2p.data.RouterIdentity;
import net.i2p.data.RouterInfo; import net.i2p.data.RouterInfo;
...@@ -33,11 +34,11 @@ public class EventPumper implements Runnable { ...@@ -33,11 +34,11 @@ public class EventPumper implements Runnable {
private Log _log; private Log _log;
private volatile boolean _alive; private volatile boolean _alive;
private Selector _selector; private Selector _selector;
private final List _bufCache; private final LinkedBlockingQueue<ByteBuffer> _bufCache;
private final List _wantsRead = new ArrayList(16); private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue();
private final List _wantsWrite = new ArrayList(4); private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue();
private final List _wantsRegister = new ArrayList(1); private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue();
private final List _wantsConRegister = new ArrayList(4); private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue();
private NTCPTransport _transport; private NTCPTransport _transport;
private long _expireIdleWriteTime; private long _expireIdleWriteTime;
...@@ -54,23 +55,19 @@ public class EventPumper implements Runnable { ...@@ -54,23 +55,19 @@ public class EventPumper implements Runnable {
/** tunnel test is every 30-60s, so this should be longer than, say, 3*45s to allow for drops */ /** tunnel test is every 30-60s, so this should be longer than, say, 3*45s to allow for drops */
private static final long MIN_EXPIRE_IDLE_TIME = 3*60*1000l; private static final long MIN_EXPIRE_IDLE_TIME = 3*60*1000l;
private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l; private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l;
public EventPumper(RouterContext ctx, NTCPTransport transport) { public EventPumper(RouterContext ctx, NTCPTransport transport) {
_context = ctx; _context = ctx;
_log = ctx.logManager().getLog(getClass()); _log = ctx.logManager().getLog(getClass());
_transport = transport; _transport = transport;
_alive = false; _alive = false;
_bufCache = new ArrayList(MAX_CACHE_SIZE); _bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE);
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
} }
public synchronized void startPumping() { public synchronized void startPumping() {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Starting pumper"); _log.info("Starting pumper");
// _wantsRead = new ArrayList(16);
// _wantsWrite = new ArrayList(4);
// _wantsRegister = new ArrayList(1);
// _wantsConRegister = new ArrayList(4);
try { try {
_selector = Selector.open(); _selector = Selector.open();
_alive = true; _alive = true;
...@@ -98,13 +95,13 @@ public class EventPumper implements Runnable { ...@@ -98,13 +95,13 @@ public class EventPumper implements Runnable {
public void register(ServerSocketChannel chan) { public void register(ServerSocketChannel chan) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering server socket channel"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering server socket channel");
synchronized (_wantsRegister) { _wantsRegister.add(chan); } _wantsRegister.offer(chan);
_selector.wakeup(); _selector.wakeup();
} }
public void registerConnect(NTCPConnection con) { public void registerConnect(NTCPConnection con) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Registering outbound connection");
_context.statManager().addRateData("ntcp.registerConnect", 1, 0); _context.statManager().addRateData("ntcp.registerConnect", 1, 0);
synchronized (_wantsConRegister) { _wantsConRegister.add(con); } _wantsConRegister.offer(con);
_selector.wakeup(); _selector.wakeup();
} }
...@@ -254,10 +251,10 @@ public class EventPumper implements Runnable { ...@@ -254,10 +251,10 @@ public class EventPumper implements Runnable {
} catch (Exception e) { } catch (Exception e) {
_log.error("Error closing keys on pumper shutdown", e); _log.error("Error closing keys on pumper shutdown", e);
} }
synchronized (_wantsConRegister) { _wantsConRegister.clear(); } _wantsConRegister.clear();
synchronized (_wantsRead) { _wantsRead.clear(); } _wantsRead.clear();
synchronized (_wantsRegister) { _wantsRegister.clear(); } _wantsRegister.clear();
synchronized (_wantsWrite) { _wantsWrite.clear(); } _wantsWrite.clear();
} }
private void processKeys(Set selected) { private void processKeys(Set selected) {
...@@ -322,10 +319,8 @@ public class EventPumper implements Runnable { ...@@ -322,10 +319,8 @@ public class EventPumper implements Runnable {
public void wantsWrite(NTCPConnection con) { public void wantsWrite(NTCPConnection con) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Before adding wants to write on " + con); _log.info("Before adding wants to write on " + con);
synchronized (_wantsWrite) { if (!_wantsWrite.contains(con))
if (!_wantsWrite.contains(con)) _wantsWrite.offer(con);
_wantsWrite.add(con);
}
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Wants to write on " + con); _log.info("Wants to write on " + con);
_selector.wakeup(); _selector.wakeup();
...@@ -333,10 +328,8 @@ public class EventPumper implements Runnable { ...@@ -333,10 +328,8 @@ public class EventPumper implements Runnable {
_log.debug("selector awoken for write"); _log.debug("selector awoken for write");
} }
public void wantsRead(NTCPConnection con) { public void wantsRead(NTCPConnection con) {
synchronized (_wantsRead) { if (!_wantsRead.contains(con))
if (!_wantsRead.contains(con)) _wantsRead.offer(con);
_wantsRead.add(con);
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("wants to read on " + con); _log.debug("wants to read on " + con);
_selector.wakeup(); _selector.wakeup();
...@@ -345,16 +338,16 @@ public class EventPumper implements Runnable { ...@@ -345,16 +338,16 @@ public class EventPumper implements Runnable {
} }
private static final int MIN_BUFS = 5; private static final int MIN_BUFS = 5;
/**
* There's only one pumper, so static is fine, unless multi router
* Is there a better way to do this?
*/
private static int NUM_BUFS = 5; private static int NUM_BUFS = 5;
private static int __liveBufs = 0; private static int __liveBufs = 0;
private static int __consecutiveExtra; private static int __consecutiveExtra;
ByteBuffer acquireBuf() { ByteBuffer acquireBuf() {
if (false) return ByteBuffer.allocate(BUF_SIZE); //if (false) return ByteBuffer.allocate(BUF_SIZE);
ByteBuffer rv = null; ByteBuffer rv = (ByteBuffer)_bufCache.poll();
synchronized (_bufCache) {
if (_bufCache.size() > 0)
rv = (ByteBuffer)_bufCache.remove(0);
}
if (rv == null) { if (rv == null) {
rv = ByteBuffer.allocate(BUF_SIZE); rv = ByteBuffer.allocate(BUF_SIZE);
NUM_BUFS = ++__liveBufs; NUM_BUFS = ++__liveBufs;
...@@ -369,27 +362,24 @@ public class EventPumper implements Runnable { ...@@ -369,27 +362,24 @@ public class EventPumper implements Runnable {
} }
void releaseBuf(ByteBuffer buf) { void releaseBuf(ByteBuffer buf) {
if (false) return; //if (false) return;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf); _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf);
buf.clear(); buf.clear();
int extra = 0; int extra = _bufCache.size();
boolean cached = false; boolean cached = extra < NUM_BUFS;
synchronized (_bufCache) {
if (_bufCache.size() < NUM_BUFS) { if (cached) {
extra = _bufCache.size(); _bufCache.offer(buf);
_bufCache.add(buf); if (extra > 5) {
cached = true; __consecutiveExtra++;
if (extra > 5) { if (__consecutiveExtra >= 20) {
__consecutiveExtra++; NUM_BUFS = Math.max(NUM_BUFS - 1, MIN_BUFS);
if (__consecutiveExtra >= 20) { __consecutiveExtra = 0;
NUM_BUFS = Math.max(NUM_BUFS - 1, MIN_BUFS);
__consecutiveExtra = 0;
}
} }
} else {
__liveBufs--;
} }
} else {
__liveBufs--;
} }
if (cached && _log.shouldLog(Log.DEBUG)) if (cached && _log.shouldLog(Log.DEBUG))
_log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live"); _log.debug("read buffer " + System.identityHashCode(buf) + " cached with " + __liveBufs + " live");
...@@ -578,14 +568,8 @@ public class EventPumper implements Runnable { ...@@ -578,14 +568,8 @@ public class EventPumper implements Runnable {
} }
private void runDelayedEvents(List buf) { private void runDelayedEvents(List buf) {
synchronized (_wantsRead) { NTCPConnection con;
if (_wantsRead.size() > 0) { while ((con = _wantsRead.poll()) != null) {
buf.addAll(_wantsRead);
_wantsRead.clear();
}
}
while (buf.size() > 0) {
NTCPConnection con = (NTCPConnection)buf.remove(0);
SelectionKey key = con.getKey(); SelectionKey key = con.getKey();
try { try {
key.interestOps(key.interestOps() | SelectionKey.OP_READ); key.interestOps(key.interestOps() | SelectionKey.OP_READ);
...@@ -594,14 +578,7 @@ public class EventPumper implements Runnable { ...@@ -594,14 +578,7 @@ public class EventPumper implements Runnable {
} }
} }
synchronized (_wantsWrite) { while ((con = _wantsWrite.poll()) != null) {
if (_wantsWrite.size() > 0) {
buf.addAll(_wantsWrite);
_wantsWrite.clear();
}
}
while (buf.size() > 0) {
NTCPConnection con = (NTCPConnection)buf.remove(0);
SelectionKey key = con.getKey(); SelectionKey key = con.getKey();
try { try {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
...@@ -610,14 +587,8 @@ public class EventPumper implements Runnable { ...@@ -610,14 +587,8 @@ public class EventPumper implements Runnable {
} }
} }
synchronized (_wantsRegister) { ServerSocketChannel chan;
if (_wantsRegister.size() > 0) { while ((chan = _wantsRegister.poll()) != null) {
buf.addAll(_wantsRegister);
_wantsRegister.clear();
}
}
while (buf.size() > 0) {
ServerSocketChannel chan = (ServerSocketChannel)buf.remove(0);
try { try {
SelectionKey key = chan.register(_selector, SelectionKey.OP_ACCEPT); SelectionKey key = chan.register(_selector, SelectionKey.OP_ACCEPT);
key.attach(chan); key.attach(chan);
...@@ -626,14 +597,7 @@ public class EventPumper implements Runnable { ...@@ -626,14 +597,7 @@ public class EventPumper implements Runnable {
} }
} }
synchronized (_wantsConRegister) { while ((con = _wantsConRegister.poll()) != null) {
if (_wantsConRegister.size() > 0) {
buf.addAll(_wantsConRegister);
_wantsConRegister.clear();
}
}
while (buf.size() > 0) {
NTCPConnection con = (NTCPConnection)buf.remove(0);
try { try {
SelectionKey key = con.getChannel().register(_selector, SelectionKey.OP_CONNECT); SelectionKey key = con.getChannel().register(_selector, SelectionKey.OP_CONNECT);
key.attach(con); key.attach(con);
......
...@@ -8,6 +8,8 @@ import java.nio.channels.SocketChannel; ...@@ -8,6 +8,8 @@ import java.nio.channels.SocketChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.Adler32; import java.util.zip.Adler32;
import net.i2p.data.Base64; import net.i2p.data.Base64;
...@@ -23,6 +25,7 @@ import net.i2p.router.OutNetMessage; ...@@ -23,6 +25,7 @@ import net.i2p.router.OutNetMessage;
import net.i2p.router.Router; import net.i2p.router.Router;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter; import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
...@@ -53,13 +56,14 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -53,13 +56,14 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
private SocketChannel _chan; private SocketChannel _chan;
private SelectionKey _conKey; private SelectionKey _conKey;
/** list of ByteBuffer containing data we have read and are ready to process, oldest first */ /** list of ByteBuffer containing data we have read and are ready to process, oldest first */
private final List _readBufs; private final LinkedBlockingQueue<ByteBuffer> _readBufs;
/** /**
* list of ByteBuffers containing fully populated and encrypted data, ready to write, * list of ByteBuffers containing fully populated and encrypted data, ready to write,
* and already cleared through the bandwidth limiter. * and already cleared through the bandwidth limiter.
*/ */
private final List _writeBufs; private final LinkedBlockingQueue<ByteBuffer> _writeBufs;
private final List _bwRequests; /** Todo: This is only so we can abort() them when we close() ??? */
private final Set<FIFOBandwidthLimiter.Request> _bwRequests;
private boolean _established; private boolean _established;
private long _establishedOn; private long _establishedOn;
private EstablishState _establishState; private EstablishState _establishState;
...@@ -72,8 +76,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -72,8 +76,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/** /**
* pending unprepared OutNetMessage instances * pending unprepared OutNetMessage instances
*/ */
private final List _outbound; private final LinkedBlockingQueue<OutNetMessage> _outbound;
/** current prepared OutNetMessage, or null */ /** current prepared OutNetMessage, or null - synchronize on _outbound to modify */
private OutNetMessage _currentOutbound; private OutNetMessage _currentOutbound;
private SessionKey _sessionKey; private SessionKey _sessionKey;
/** encrypted block of the current I2NP message being read */ /** encrypted block of the current I2NP message being read */
...@@ -124,10 +128,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -124,10 +128,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_lastReceiveTime = _created; _lastReceiveTime = _created;
_transport = transport; _transport = transport;
_chan = chan; _chan = chan;
_readBufs = new ArrayList(4); _readBufs = new LinkedBlockingQueue();
_writeBufs = new ArrayList(4); _writeBufs = new LinkedBlockingQueue();
_bwRequests = new ArrayList(2); _bwRequests = new ConcurrentHashSet(2);
_outbound = new ArrayList(4); _outbound = new LinkedBlockingQueue();
_established = false; _established = false;
_isInbound = true; _isInbound = true;
_closed = false; _closed = false;
...@@ -154,10 +158,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -154,10 +158,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_lastReceiveTime = _created; _lastReceiveTime = _created;
_transport = transport; _transport = transport;
_remAddr = remAddr; _remAddr = remAddr;
_readBufs = new ArrayList(4); _readBufs = new LinkedBlockingQueue();
_writeBufs = new ArrayList(4); _writeBufs = new LinkedBlockingQueue();
_bwRequests = new ArrayList(2); _bwRequests = new ConcurrentHashSet(2);
_outbound = new ArrayList(4); _outbound = new LinkedBlockingQueue();
_established = false; _established = false;
_isInbound = false; _isInbound = false;
_closed = false; _closed = false;
...@@ -210,12 +214,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -210,12 +214,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public long getMessagesSent() { return _messagesWritten; } public long getMessagesSent() { return _messagesWritten; }
public long getMessagesReceived() { return _messagesRead; } public long getMessagesReceived() { return _messagesRead; }
public long getOutboundQueueSize() { public long getOutboundQueueSize() {
synchronized (_outbound) {
int queued = _outbound.size(); int queued = _outbound.size();
if (_currentOutbound != null) if (_currentOutbound != null)
queued++; queued++;
return queued; return queued;
}
} }
public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; } public long getTimeSinceSend() { return System.currentTimeMillis()-_lastSendTime; }
public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; } public long getTimeSinceReceive() { return System.currentTimeMillis()-_lastReceiveTime; }
...@@ -234,29 +236,21 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -234,29 +236,21 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_transport.removeCon(this); _transport.removeCon(this);
_transport.getReader().connectionClosed(this); _transport.getReader().connectionClosed(this);
_transport.getWriter().connectionClosed(this); _transport.getWriter().connectionClosed(this);
List reqs = null;
synchronized (_bwRequests) { for (Iterator<FIFOBandwidthLimiter.Request> iter = _bwRequests.iterator(); iter.hasNext(); ) {
if (_bwRequests.size() > 0) { iter.next().abort();
reqs = new ArrayList(_bwRequests);
_bwRequests.clear();
}
}
if (reqs != null)
for (Iterator iter = reqs.iterator(); iter.hasNext(); )
((FIFOBandwidthLimiter.Request)iter.next()).abort();
List msgs = null;
synchronized (_outbound) {
msgs = new ArrayList(_outbound);
_outbound.clear();
} }
for (int i = 0; i < msgs.size(); i++) { _bwRequests.clear();
OutNetMessage msg = (OutNetMessage)msgs.get(i);
OutNetMessage msg;
while ((msg = _outbound.poll()) != null) {
Object buf = msg.releasePreparationBuffer(); Object buf = msg.releasePreparationBuffer();
if (buf != null) if (buf != null)
releaseBuf((PrepBuffer)buf); releaseBuf((PrepBuffer)buf);
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime()); _transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
} }
OutNetMessage msg = _currentOutbound;
msg = _currentOutbound;
if (msg != null) { if (msg != null) {
Object buf = msg.releasePreparationBuffer(); Object buf = msg.releasePreparationBuffer();
if (buf != null) if (buf != null)
...@@ -277,10 +271,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -277,10 +271,10 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (_consecutiveBacklog > 10) { // waaay too backlogged if (_consecutiveBacklog > 10) { // waaay too backlogged
boolean wantsWrite = false; boolean wantsWrite = false;
try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (Exception e) {} try { wantsWrite = ( (_conKey.interestOps() & SelectionKey.OP_WRITE) != 0); } catch (Exception e) {}
int blocks = 0; if (_log.shouldLog(Log.WARN)) {
synchronized (_writeBufs) { blocks = _writeBufs.size(); } int blocks = _writeBufs.size();
if (_log.shouldLog(Log.WARN))
_log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64()); _log.warn("Too backlogged for too long (" + _consecutiveBacklog + " messages for " + DataHelper.formatDuration(queueTime()) + ", sched? " + wantsWrite + ", blocks: " + blocks + ") sending to " + _remotePeer.calculateHash().toBase64());
}
_context.statManager().addRateData("ntcp.closeOnBacklog", getUptime(), getUptime()); _context.statManager().addRateData("ntcp.closeOnBacklog", getUptime(), getUptime());
close(); close();
} }
...@@ -292,40 +286,29 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -292,40 +286,29 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (FAST_LARGE) if (FAST_LARGE)
bufferedPrepare(msg); bufferedPrepare(msg);
boolean noOutbound = false; boolean noOutbound = false;
synchronized (_outbound) { _outbound.offer(msg);
_outbound.add(msg); enqueued = _outbound.size();
enqueued = _outbound.size(); msg.setQueueSize(enqueued);
msg.setQueueSize(enqueued); noOutbound = (_currentOutbound == null);
noOutbound = (_currentOutbound == null);
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
if (_established && noOutbound) if (_established && noOutbound)
_transport.getWriter().wantsWrite(this, "enqueued"); _transport.getWriter().wantsWrite(this, "enqueued");
} }
private long queueTime() { private long queueTime() {
long queueTime = 0; OutNetMessage msg = _currentOutbound;
int size = 0; if (msg == null) {
synchronized (_outbound) { msg = _outbound.peek();
OutNetMessage msg = _currentOutbound;
size = _outbound.size();
if ( (msg == null) && (size > 0) )
msg = (OutNetMessage)_outbound.get(0);
if (msg == null) if (msg == null)
return 0; return 0;
queueTime = msg.getSendTime(); // does not include any of the pre-send(...) preparation
} }
return queueTime; return msg.getSendTime(); // does not include any of the pre-send(...) preparation
} }
public boolean tooBacklogged() { public boolean tooBacklogged() {
long queueTime = queueTime(); long queueTime = queueTime();
if (queueTime <= 0) return false; if (queueTime <= 0) return false;
int size = 0; boolean currentOutboundSet = _currentOutbound != null;
boolean currentOutboundSet = false;
synchronized (_outbound) {
size = _outbound.size();
currentOutboundSet = (_currentOutbound != null);
}
// perhaps we could take into account the size of the queued messages too, our // perhaps we could take into account the size of the queued messages too, our
// current transmission rate, and how much time is left before the new message's expiration? // current transmission rate, and how much time is left before the new message's expiration?
...@@ -333,15 +316,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -333,15 +316,16 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if (getUptime() < 10*1000) // allow some slack just after establishment if (getUptime() < 10*1000) // allow some slack just after establishment
return false; return false;
if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime... if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
int writeBufs = 0; int size = _outbound.size();
synchronized (_writeBufs) { writeBufs = _writeBufs.size(); } if (_log.shouldLog(Log.WARN)) {
if (_log.shouldLog(Log.WARN)) int writeBufs = _writeBufs.size();
try { try {
_log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size _log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE)) + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
+ ", currentOut set? " + currentOutboundSet + ", currentOut set? " + currentOutboundSet
+ ", writeBufs: " + writeBufs + " on " + toString()); + ", writeBufs: " + writeBufs + " on " + toString());
} catch (Exception e) {} // java.nio.channels.CancelledKeyException } catch (Exception e) {} // java.nio.channels.CancelledKeyException
}
_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime, size); _context.statManager().addRateData("ntcp.sendBacklogTime", queueTime, size);
return true; return true;
//} else if (size > 32) { // another arbitrary limit. //} else if (size > 32) { // another arbitrary limit.
...@@ -440,10 +424,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -440,10 +424,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_establishState = null; _establishState = null;
_transport.markReachable(getRemotePeer().calculateHash(), false); _transport.markReachable(getRemotePeer().calculateHash(), false);
//_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE); //_context.shitlist().unshitlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
boolean msgs = false; boolean msgs = !_outbound.isEmpty();
synchronized (_outbound) {
msgs = (_outbound.size() > 0);
}
_nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY); _nextMetaTime = System.currentTimeMillis() + _context.random().nextInt(META_FREQUENCY);
_nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); _nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
if (msgs) if (msgs)
...@@ -469,6 +450,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -469,6 +450,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* prepare the next i2np message for transmission. this should be run from * prepare the next i2np message for transmission. this should be run from
* the Writer thread pool. * the Writer thread pool.
* *
* Todo: remove synchronization?
*
*/ */
synchronized void prepareNextWrite() { synchronized void prepareNextWrite() {
//if (FAST_LARGE) //if (FAST_LARGE)
...@@ -579,6 +562,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -579,6 +562,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* prepare the next i2np message for transmission. this should be run from * prepare the next i2np message for transmission. this should be run from
* the Writer thread pool. * the Writer thread pool.
* *
* Todo: remove synchronization?
*
*/ */
synchronized void prepareNextWriteFast() { synchronized void prepareNextWriteFast() {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
...@@ -600,6 +585,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -600,6 +585,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
} }
OutNetMessage msg = null; OutNetMessage msg = null;
// this is synchronized only for _currentOutbound
// Todo: figure out how to remove the synchronization
synchronized (_outbound) { synchronized (_outbound) {
if (_currentOutbound != null) { if (_currentOutbound != null) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
...@@ -608,21 +595,28 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -608,21 +595,28 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
} }
//throw new RuntimeException("We should not be preparing a write while we still have one pending"); //throw new RuntimeException("We should not be preparing a write while we still have one pending");
if (queueTime() > 3*1000) { // don't stall low-priority messages if (queueTime() > 3*1000) { // don't stall low-priority messages
msg = (OutNetMessage)_outbound.remove(0); msg = _outbound.poll();
if (msg == null)
return;
} else { } else {
Iterator it = _outbound.iterator(); int slot = 0; // only for logging
Iterator<OutNetMessage> it = _outbound.iterator();
for (int i = 0; it.hasNext() && i < 75; i++) { //arbitrary bound for (int i = 0; it.hasNext() && i < 75; i++) { //arbitrary bound
OutNetMessage mmsg = (OutNetMessage) it.next(); OutNetMessage mmsg = it.next();
if (msg == null || mmsg.getPriority() > msg.getPriority()) if (msg == null || mmsg.getPriority() > msg.getPriority()) {
msg = mmsg; msg = mmsg;
slot = i;
}
} }
if (msg == null) if (msg == null)
return; return;
// if (_outbound.indexOf(msg) > 0) // if (_outbound.indexOf(msg) > 0)
// _log.debug("Priority message sent, pri = " + msg.getPriority() + " pos = " + _outbound.indexOf(msg) + "/" +_outbound.size()); // _log.debug("Priority message sent, pri = " + msg.getPriority() + " pos = " + _outbound.indexOf(msg) + "/" +_outbound.size());
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Type " + msg.getMessage().getType() + " pri " + msg.getPriority() + " slot " + _outbound.indexOf(msg)); _log.info("Type " + msg.getMessage().getType() + " pri " + msg.getPriority() + " slot " + slot);
_outbound.remove(msg); boolean removed = _outbound.remove(msg);
if ((!removed) && _log.shouldLog(Log.ERROR))
_log.info("Already removed??? " + msg.getMessage().getType());
} }
_currentOutbound = msg; _currentOutbound = msg;
} }
...@@ -815,17 +809,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -815,17 +809,13 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
write(buf); write(buf);
} }
} }
private void removeRequest(FIFOBandwidthLimiter.Request req) { private void removeRequest(FIFOBandwidthLimiter.Request req) {
synchronized (_bwRequests) { _bwRequests.remove(req); } _bwRequests.remove(req);
} }
private void addRequest(FIFOBandwidthLimiter.Request req) { private void addRequest(FIFOBandwidthLimiter.Request req) {
synchronized (_bwRequests) { _bwRequests.add(req); } _bwRequests.add(req);
}
public int outboundQueueSize() {
synchronized (_writeBufs) {
return _writeBufs.size();
}
} }
/** /**
...@@ -852,10 +842,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -852,10 +842,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*/ */
public void recv(ByteBuffer buf) { public void recv(ByteBuffer buf) {
_bytesReceived += buf.remaining(); _bytesReceived += buf.remaining();
synchronized (_readBufs) {
//buf.flip(); //buf.flip();
_readBufs.add(buf); _readBufs.offer(buf);
}
_transport.getReader().wantsRead(this); _transport.getReader().wantsRead(this);
updateStats(); updateStats();
} }
...@@ -865,34 +853,27 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -865,34 +853,27 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
*/ */
public void write(ByteBuffer buf) { public void write(ByteBuffer buf) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Before write(buf)");
synchronized (_writeBufs) { _writeBufs.offer(buf);
_writeBufs.add(buf);
}
if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)"); if (_log.shouldLog(Log.DEBUG)) _log.debug("After write(buf)");
_transport.getPumper().wantsWrite(this); _transport.getPumper().wantsWrite(this);
} }
/** @return null if none available */
public ByteBuffer getNextReadBuf() { public ByteBuffer getNextReadBuf() {
synchronized (_readBufs) { return _readBufs.poll();
if (_readBufs.size() > 0)
return (ByteBuffer)_readBufs.get(0);
}
return null;
} }
/** since getNextReadBuf() removes, this should not be necessary */
public void removeReadBuf(ByteBuffer buf) { public void removeReadBuf(ByteBuffer buf) {
synchronized (_readBufs) { _readBufs.remove(buf);
_readBufs.remove(buf);
}
//_transport.getPumper().releaseBuf(buf); //_transport.getPumper().releaseBuf(buf);
} }
public int getWriteBufCount() { synchronized (_writeBufs) { return _writeBufs.size(); } } public int getWriteBufCount() { return _writeBufs.size(); }
/** @return null if none available */
public ByteBuffer getNextWriteBuf() { public ByteBuffer getNextWriteBuf() {
synchronized (_writeBufs) { return _writeBufs.peek(); // not remove! we removeWriteBuf afterwards
if (_writeBufs.size() > 0)
return (ByteBuffer)_writeBufs.get(0); // not remove! we removeWriteBuf afterwards
}
return null;
} }
public void removeWriteBuf(ByteBuffer buf) { public void removeWriteBuf(ByteBuffer buf) {
...@@ -900,16 +881,15 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -900,16 +881,15 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
OutNetMessage msg = null; OutNetMessage msg = null;
boolean bufsRemain = false; boolean bufsRemain = false;
boolean clearMessage = false; boolean clearMessage = false;
synchronized (_writeBufs) { if (_sendingMeta && (buf.capacity() == _meta.length)) {
if (_sendingMeta && (buf.capacity() == _meta.length)) { _sendingMeta = false;
_sendingMeta = false; } else {
} else { clearMessage = true;
clearMessage = true;
}
_writeBufs.remove(buf);
bufsRemain = _writeBufs.size() > 0;
} }
_writeBufs.remove(buf);
bufsRemain = !_writeBufs.isEmpty();
if (clearMessage) { if (clearMessage) {
// see synchronization comments in prepareNextWriteFast()
synchronized (_outbound) { synchronized (_outbound) {
if (_currentOutbound != null) if (_currentOutbound != null)
msg = _currentOutbound; msg = _currentOutbound;
...@@ -935,10 +915,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -935,10 +915,7 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_log.info("I2NP meta message sent completely"); _log.info("I2NP meta message sent completely");
} }
boolean msgs = false; boolean msgs = ((!_outbound.isEmpty()) || (_currentOutbound != null));
synchronized (_outbound) {
msgs = ((_outbound.size() > 0) || (_currentOutbound != null));
}
if (msgs) // push through the bw limiter to reach _writeBufs if (msgs) // push through the bw limiter to reach _writeBufs
_transport.getWriter().wantsWrite(this, "write completed"); _transport.getWriter().wantsWrite(this, "write completed");
if (bufsRemain) // send asap if (bufsRemain) // send asap
...@@ -1113,22 +1090,17 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -1113,22 +1090,17 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
} }
private static final int MAX_HANDLERS = 4; private static final int MAX_HANDLERS = 4;
private final static List _i2npHandlers = new ArrayList(MAX_HANDLERS); private final static LinkedBlockingQueue<I2NPMessageHandler> _i2npHandlers = new LinkedBlockingQueue(MAX_HANDLERS);
private final static I2NPMessageHandler acquireHandler(RouterContext ctx) { private final static I2NPMessageHandler acquireHandler(RouterContext ctx) {
I2NPMessageHandler rv = null; I2NPMessageHandler rv = _i2npHandlers.poll();
synchronized (_i2npHandlers) {
if (_i2npHandlers.size() > 0)
rv = (I2NPMessageHandler)_i2npHandlers.remove(0);
}
if (rv == null) if (rv == null)
rv = new I2NPMessageHandler(ctx); rv = new I2NPMessageHandler(ctx);
return rv; return rv;
} }
private static void releaseHandler(I2NPMessageHandler handler) { private static void releaseHandler(I2NPMessageHandler handler) {
synchronized (_i2npHandlers) { _i2npHandlers.offer(handler);
if (_i2npHandlers.size() < MAX_HANDLERS)
_i2npHandlers.add(handler);
}
} }
...@@ -1144,21 +1116,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { ...@@ -1144,21 +1116,20 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
} }
private static final int MAX_DATA_READ_BUFS = 16; private static final int MAX_DATA_READ_BUFS = 16;
private final static List _dataReadBufs = new ArrayList(MAX_DATA_READ_BUFS); private final static LinkedBlockingQueue<DataBuf> _dataReadBufs = new LinkedBlockingQueue(MAX_DATA_READ_BUFS);
private static DataBuf acquireReadBuf() { private static DataBuf acquireReadBuf() {
synchronized (_dataReadBufs) { DataBuf rv = _dataReadBufs.poll();
if (_dataReadBufs.size() > 0) if (rv != null)
return (DataBuf)_dataReadBufs.remove(0); return rv;
}
return new DataBuf(); return new DataBuf();
} }
private static void releaseReadBuf(DataBuf buf) { private static void releaseReadBuf(DataBuf buf) {
buf.bais.reset(); buf.bais.reset();
synchronized (_dataReadBufs) { _dataReadBufs.offer(buf);
if (_dataReadBufs.size() < MAX_DATA_READ_BUFS)
_dataReadBufs.add(buf);
}
} }
/** /**
* sizeof(data)+data+pad+crc. * sizeof(data)+data+pad+crc.
* *
......
...@@ -17,10 +17,10 @@ import net.i2p.util.Log; ...@@ -17,10 +17,10 @@ import net.i2p.util.Log;
class Reader { class Reader {
private RouterContext _context; private RouterContext _context;
private Log _log; private Log _log;
private final List _pendingConnections; private final List<NTCPConnection> _pendingConnections;
private List _liveReads; private List<NTCPConnection> _liveReads;
private List _readAfterLive; private List<NTCPConnection> _readAfterLive;
private List _runners; private List<Runner> _runners;
public Reader(RouterContext ctx) { public Reader(RouterContext ctx) {
_context = ctx; _context = ctx;
...@@ -41,7 +41,7 @@ class Reader { ...@@ -41,7 +41,7 @@ class Reader {
} }
public void stopReading() { public void stopReading() {
while (_runners.size() > 0) { while (_runners.size() > 0) {
Runner r = (Runner)_runners.remove(0); Runner r = _runners.remove(0);
r.stop(); r.stop();
} }
synchronized (_pendingConnections) { synchronized (_pendingConnections) {
...@@ -93,7 +93,7 @@ class Reader { ...@@ -93,7 +93,7 @@ class Reader {
if (_pendingConnections.size() <= 0) { if (_pendingConnections.size() <= 0) {
_pendingConnections.wait(); _pendingConnections.wait();
} else { } else {
con = (NTCPConnection)_pendingConnections.remove(0); con = _pendingConnections.remove(0);
_liveReads.add(con); _liveReads.add(con);
} }
} }
...@@ -155,7 +155,8 @@ class Reader { ...@@ -155,7 +155,8 @@ class Reader {
con.close(); con.close();
return; return;
} else if (buf.remaining() <= 0) { } else if (buf.remaining() <= 0) {
con.removeReadBuf(buf); // not necessary, getNextReadBuf() removes
//con.removeReadBuf(buf);
} }
if (est.isComplete() && est.getExtraBytes() != null) if (est.isComplete() && est.getExtraBytes() != null)
con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes())); con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes()));
...@@ -165,7 +166,8 @@ class Reader { ...@@ -165,7 +166,8 @@ class Reader {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)"); _log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)");
con.recvEncryptedI2NP(buf); con.recvEncryptedI2NP(buf);
con.removeReadBuf(buf); // not necessary, getNextReadBuf() removes
//con.removeReadBuf(buf);
} }
} }
} }
...@@ -16,10 +16,10 @@ import net.i2p.util.Log; ...@@ -16,10 +16,10 @@ import net.i2p.util.Log;
class Writer { class Writer {
private RouterContext _context; private RouterContext _context;
private Log _log; private Log _log;
private final List _pendingConnections; private final List<NTCPConnection> _pendingConnections;
private List _liveWrites; private List<NTCPConnection> _liveWrites;
private List _writeAfterLive; private List<NTCPConnection> _writeAfterLive;
private List _runners; private List<Runner> _runners;
public Writer(RouterContext ctx) { public Writer(RouterContext ctx) {
_context = ctx; _context = ctx;
...@@ -40,7 +40,7 @@ class Writer { ...@@ -40,7 +40,7 @@ class Writer {
} }
public void stopWriting() { public void stopWriting() {
while (_runners.size() > 0) { while (_runners.size() > 0) {
Runner r = (Runner)_runners.remove(0); Runner r = _runners.remove(0);
r.stop(); r.stop();
} }
synchronized (_pendingConnections) { synchronized (_pendingConnections) {
...@@ -100,7 +100,7 @@ class Writer { ...@@ -100,7 +100,7 @@ class Writer {
_log.debug("Done writing, but nothing pending, so wait"); _log.debug("Done writing, but nothing pending, so wait");
_pendingConnections.wait(); _pendingConnections.wait();
} else { } else {
con = (NTCPConnection)_pendingConnections.remove(0); con = _pendingConnections.remove(0);
_liveWrites.add(con); _liveWrites.add(con);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Switch to writing on: " + con); _log.debug("Switch to writing on: " + con);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment