diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index 64b745e46bc5210aa6b7dc00d13d6a5cb03dc311..35121f59b3e9ecf04473237cfa8d84e52d44b689 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -13,9 +13,6 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; @@ -35,10 +32,10 @@ public class EventPumper implements Runnable { private volatile boolean _alive; private Selector _selector; private final LinkedBlockingQueue<ByteBuffer> _bufCache; - private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue(); - private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue(); - private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue(); - private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue(); + private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue<NTCPConnection>(); + private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue<NTCPConnection>(); + private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue<ServerSocketChannel>(); + private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue<NTCPConnection>(); private NTCPTransport _transport; private long _expireIdleWriteTime; @@ -61,7 +58,7 @@ public class EventPumper implements Runnable { _log = ctx.logManager().getLog(getClass()); _transport = transport; _alive = false; - _bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE); + _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE); _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; } @@ -107,10 +104,9 @@ public class EventPumper implements Runnable { public void run() { long lastFailsafeIteration = System.currentTimeMillis(); - List bufList = new ArrayList(16); while (_alive && _selector.isOpen()) { try { - runDelayedEvents(bufList); + runDelayedEvents(); int count = 0; try { //if (_log.shouldLog(Log.DEBUG)) @@ -125,7 +121,7 @@ public class EventPumper implements Runnable { if (_log.shouldLog(Log.DEBUG)) _log.debug("select returned " + count); - Set selected = null; + Set<SelectionKey> selected = null; try { selected = _selector.selectedKeys(); } catch (ClosedSelectorException cse) { @@ -142,7 +138,7 @@ public class EventPumper implements Runnable { // properly marked as such, etc lastFailsafeIteration = System.currentTimeMillis(); try { - Set all = _selector.keys(); + Set<SelectionKey> all = _selector.keys(); int failsafeWrites = 0; int failsafeCloses = 0; @@ -153,9 +149,8 @@ public class EventPumper implements Runnable { _expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME); else _expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME); - for (Iterator iter = all.iterator(); iter.hasNext(); ) { + for (SelectionKey key : all) { try { - SelectionKey key = (SelectionKey)iter.next(); Object att = key.attachment(); if (!(att instanceof NTCPConnection)) continue; // to the next con @@ -225,9 +220,8 @@ public class EventPumper implements Runnable { if (_selector.isOpen()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Closing down the event pumper with selection keys remaining"); - Set keys = _selector.keys(); - for (Iterator iter = keys.iterator(); iter.hasNext(); ) { - SelectionKey key = (SelectionKey)iter.next(); + Set<SelectionKey> keys = _selector.keys(); + for (SelectionKey key : keys) { try { Object att = key.attachment(); if (att instanceof ServerSocketChannel) { @@ -257,10 +251,9 @@ public class EventPumper implements Runnable { _wantsWrite.clear(); } - private void processKeys(Set selected) { - for (Iterator iter = selected.iterator(); iter.hasNext(); ) { + private void processKeys(Set<SelectionKey> selected) { + for (SelectionKey key : selected) { try { - SelectionKey key = (SelectionKey)iter.next(); int ops = key.readyOps(); boolean accept = (ops & SelectionKey.OP_ACCEPT) != 0; boolean connect = (ops & SelectionKey.OP_CONNECT) != 0; @@ -346,8 +339,7 @@ public class EventPumper implements Runnable { private static int __liveBufs = 0; private static int __consecutiveExtra; ByteBuffer acquireBuf() { - //if (false) return ByteBuffer.allocate(BUF_SIZE); - ByteBuffer rv = (ByteBuffer)_bufCache.poll(); + ByteBuffer rv = _bufCache.poll(); if (rv == null) { rv = ByteBuffer.allocate(BUF_SIZE); NUM_BUFS = ++__liveBufs; @@ -470,6 +462,7 @@ public class EventPumper implements Runnable { buf.flip(); buf.get(data); releaseBuf(buf); + buf=null; ByteBuffer rbuf = ByteBuffer.wrap(data); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf); if (req.getPendingInboundRequested() > 0) { @@ -567,7 +560,7 @@ public class EventPumper implements Runnable { + " after " + (after-before)); } - private void runDelayedEvents(List buf) { + private void runDelayedEvents() { NTCPConnection con; while ((con = _wantsRead.poll()) != null) { SelectionKey key = con.getKey();