I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit a9c7748a authored by dev's avatar dev
Browse files

minor code style updates to ntcp EventPumper

parent cd35b219
No related branches found
No related tags found
No related merge requests found
...@@ -13,9 +13,6 @@ import java.nio.channels.Selector; ...@@ -13,9 +13,6 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException; import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
...@@ -35,10 +32,10 @@ public class EventPumper implements Runnable { ...@@ -35,10 +32,10 @@ public class EventPumper implements Runnable {
private volatile boolean _alive; private volatile boolean _alive;
private Selector _selector; private Selector _selector;
private final LinkedBlockingQueue<ByteBuffer> _bufCache; private final LinkedBlockingQueue<ByteBuffer> _bufCache;
private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue(); private final LinkedBlockingQueue<NTCPConnection> _wantsRead = new LinkedBlockingQueue<NTCPConnection>();
private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue(); private final LinkedBlockingQueue<NTCPConnection> _wantsWrite = new LinkedBlockingQueue<NTCPConnection>();
private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue(); private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue<ServerSocketChannel>();
private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue(); private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue<NTCPConnection>();
private NTCPTransport _transport; private NTCPTransport _transport;
private long _expireIdleWriteTime; private long _expireIdleWriteTime;
...@@ -61,7 +58,7 @@ public class EventPumper implements Runnable { ...@@ -61,7 +58,7 @@ public class EventPumper implements Runnable {
_log = ctx.logManager().getLog(getClass()); _log = ctx.logManager().getLog(getClass());
_transport = transport; _transport = transport;
_alive = false; _alive = false;
_bufCache = new LinkedBlockingQueue(MAX_CACHE_SIZE); _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
_expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
} }
...@@ -107,10 +104,9 @@ public class EventPumper implements Runnable { ...@@ -107,10 +104,9 @@ public class EventPumper implements Runnable {
public void run() { public void run() {
long lastFailsafeIteration = System.currentTimeMillis(); long lastFailsafeIteration = System.currentTimeMillis();
List bufList = new ArrayList(16);
while (_alive && _selector.isOpen()) { while (_alive && _selector.isOpen()) {
try { try {
runDelayedEvents(bufList); runDelayedEvents();
int count = 0; int count = 0;
try { try {
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
...@@ -125,7 +121,7 @@ public class EventPumper implements Runnable { ...@@ -125,7 +121,7 @@ public class EventPumper implements Runnable {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("select returned " + count); _log.debug("select returned " + count);
Set selected = null; Set<SelectionKey> selected = null;
try { try {
selected = _selector.selectedKeys(); selected = _selector.selectedKeys();
} catch (ClosedSelectorException cse) { } catch (ClosedSelectorException cse) {
...@@ -142,7 +138,7 @@ public class EventPumper implements Runnable { ...@@ -142,7 +138,7 @@ public class EventPumper implements Runnable {
// properly marked as such, etc // properly marked as such, etc
lastFailsafeIteration = System.currentTimeMillis(); lastFailsafeIteration = System.currentTimeMillis();
try { try {
Set all = _selector.keys(); Set<SelectionKey> all = _selector.keys();
int failsafeWrites = 0; int failsafeWrites = 0;
int failsafeCloses = 0; int failsafeCloses = 0;
...@@ -153,9 +149,8 @@ public class EventPumper implements Runnable { ...@@ -153,9 +149,8 @@ public class EventPumper implements Runnable {
_expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME); _expireIdleWriteTime = Math.min(_expireIdleWriteTime + 1000, MAX_EXPIRE_IDLE_TIME);
else else
_expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME); _expireIdleWriteTime = Math.max(_expireIdleWriteTime - 3000, MIN_EXPIRE_IDLE_TIME);
for (Iterator iter = all.iterator(); iter.hasNext(); ) { for (SelectionKey key : all) {
try { try {
SelectionKey key = (SelectionKey)iter.next();
Object att = key.attachment(); Object att = key.attachment();
if (!(att instanceof NTCPConnection)) if (!(att instanceof NTCPConnection))
continue; // to the next con continue; // to the next con
...@@ -225,9 +220,8 @@ public class EventPumper implements Runnable { ...@@ -225,9 +220,8 @@ public class EventPumper implements Runnable {
if (_selector.isOpen()) { if (_selector.isOpen()) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Closing down the event pumper with selection keys remaining"); _log.debug("Closing down the event pumper with selection keys remaining");
Set keys = _selector.keys(); Set<SelectionKey> keys = _selector.keys();
for (Iterator iter = keys.iterator(); iter.hasNext(); ) { for (SelectionKey key : keys) {
SelectionKey key = (SelectionKey)iter.next();
try { try {
Object att = key.attachment(); Object att = key.attachment();
if (att instanceof ServerSocketChannel) { if (att instanceof ServerSocketChannel) {
...@@ -257,10 +251,9 @@ public class EventPumper implements Runnable { ...@@ -257,10 +251,9 @@ public class EventPumper implements Runnable {
_wantsWrite.clear(); _wantsWrite.clear();
} }
private void processKeys(Set selected) { private void processKeys(Set<SelectionKey> selected) {
for (Iterator iter = selected.iterator(); iter.hasNext(); ) { for (SelectionKey key : selected) {
try { try {
SelectionKey key = (SelectionKey)iter.next();
int ops = key.readyOps(); int ops = key.readyOps();
boolean accept = (ops & SelectionKey.OP_ACCEPT) != 0; boolean accept = (ops & SelectionKey.OP_ACCEPT) != 0;
boolean connect = (ops & SelectionKey.OP_CONNECT) != 0; boolean connect = (ops & SelectionKey.OP_CONNECT) != 0;
...@@ -346,8 +339,7 @@ public class EventPumper implements Runnable { ...@@ -346,8 +339,7 @@ public class EventPumper implements Runnable {
private static int __liveBufs = 0; private static int __liveBufs = 0;
private static int __consecutiveExtra; private static int __consecutiveExtra;
ByteBuffer acquireBuf() { ByteBuffer acquireBuf() {
//if (false) return ByteBuffer.allocate(BUF_SIZE); ByteBuffer rv = _bufCache.poll();
ByteBuffer rv = (ByteBuffer)_bufCache.poll();
if (rv == null) { if (rv == null) {
rv = ByteBuffer.allocate(BUF_SIZE); rv = ByteBuffer.allocate(BUF_SIZE);
NUM_BUFS = ++__liveBufs; NUM_BUFS = ++__liveBufs;
...@@ -470,6 +462,7 @@ public class EventPumper implements Runnable { ...@@ -470,6 +462,7 @@ public class EventPumper implements Runnable {
buf.flip(); buf.flip();
buf.get(data); buf.get(data);
releaseBuf(buf); releaseBuf(buf);
buf=null;
ByteBuffer rbuf = ByteBuffer.wrap(data); ByteBuffer rbuf = ByteBuffer.wrap(data);
FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf); FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
if (req.getPendingInboundRequested() > 0) { if (req.getPendingInboundRequested() > 0) {
...@@ -567,7 +560,7 @@ public class EventPumper implements Runnable { ...@@ -567,7 +560,7 @@ public class EventPumper implements Runnable {
+ " after " + (after-before)); + " after " + (after-before));
} }
private void runDelayedEvents(List buf) { private void runDelayedEvents() {
NTCPConnection con; NTCPConnection con;
while ((con = _wantsRead.poll()) != null) { while ((con = _wantsRead.poll()) != null) {
SelectionKey key = con.getKey(); SelectionKey key = con.getKey();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment