From 7e7f97d72a7a056effad673157704822d8b66c20 Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Wed, 25 Aug 2004 20:17:46 +0000 Subject: [PATCH] * add a new simplified version of java.util.Timer/TimedEvent * removed all of the "temporary" threads used for adding timeouts to blocking socket operations: - use the ConnectionEstablisher's thread + a SimpleTimer.TimedEvent callback to timeout socket create - added a pool of socket handler threads (size=3 atm) for receiving any inbound sockets, which are pulled off a queue, after which a handshake occurs to verify the other side is I2NP (along side another SimpleTimer.TimedEvent callback in case that blocks) this should get the last of the temporary threads (Jetty has its own thread pool for dealing with HTTP requests, so we can ignore that thread created in the AdminRunner). The only significant reduction in threads left is to go with either NIO or UDP, but neither are happening in the immediate future. --- core/java/src/net/i2p/util/SimpleTimer.java | 105 +++++++++++ .../tcp/RestrictiveTCPConnection.java | 24 +-- .../router/transport/tcp/SocketCreator.java | 125 +++++++++++-- .../i2p/router/transport/tcp/TCPListener.java | 172 +++++++++++++----- .../router/transport/tcp/TCPTransport.java | 23 +-- 5 files changed, 354 insertions(+), 95 deletions(-) create mode 100644 core/java/src/net/i2p/util/SimpleTimer.java diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java new file mode 100644 index 0000000000..0229b3ea02 --- /dev/null +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -0,0 +1,105 @@ +package net.i2p.util; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import net.i2p.I2PAppContext; + +/** + * Simple event scheduler - toss an event on the queue and it gets fired at the + * appropriate time. The method that is fired however should NOT block (otherwise + * they b0rk the timer). + * + */ +public class SimpleTimer { + private static final SimpleTimer _instance = new SimpleTimer(); + public static SimpleTimer getInstance() { return _instance; } + private Log _log; + private Map _events; + + private SimpleTimer() { + _events = new TreeMap(); + I2PThread runner = new I2PThread(new SimpleTimerRunner()); + runner.setName("SimpleTimer"); + runner.setDaemon(true); + runner.start(); + } + + /** + * Queue up the given event to be fired no sooner than timeoutMs from now + * + */ + public void addEvent(TimedEvent event, long timeoutMs) { + long eventTime = System.currentTimeMillis() + timeoutMs; + synchronized (_events) { + while (_events.containsKey(new Long(eventTime))) + eventTime++; + _events.put(new Long(eventTime), event); + _events.notifyAll(); + } + } + + /** + * Simple interface for events to be queued up and notified on expiration + */ + public interface TimedEvent { + /** + * the time requested has been reached (this call should NOT block, + * otherwise the whole SimpleTimer gets backed up) + * + */ + public void timeReached(); + } + + private void log(String msg, Throwable t) { + synchronized (this) { + if (_log == null) + _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); + } + _log.log(Log.CRIT, msg, t); + } + + private class SimpleTimerRunner implements Runnable { + public void run() { + while (true) { + try { + synchronized (_events) { + if (_events.size() <= 0) + _events.wait(); + long now = System.currentTimeMillis(); + long nextEventDelay = -1; + List removed = null; + for (Iterator iter = _events.keySet().iterator(); iter.hasNext(); ) { + Long when = (Long)iter.next(); + if (when.longValue() <= now) { + TimedEvent evt = (TimedEvent)_events.get(when); + try { + evt.timeReached(); + } catch (Throwable t) { + log("wtf, event borked: " + evt, t); + } + if (removed == null) + removed = new ArrayList(1); + removed.add(when); + } else { + nextEventDelay = when.longValue() - now; + break; + } + } + if (removed != null) { + for (int i = 0; i < removed.size(); i++) + _events.remove(removed.get(i)); + } + if (nextEventDelay != -1) + _events.wait(nextEventDelay); + else + _events.wait(); + } + } catch (InterruptedException ie) {} + } + } + } +} diff --git a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java index 91f3065419..0fd4397c5d 100644 --- a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java @@ -220,26 +220,10 @@ class RestrictiveTCPConnection extends TCPConnection { private boolean sendsUsData(TCPAddress peer) { SocketCreator creator = new SocketCreator(peer.getHost(), peer.getPort(), false); - I2PThread sockCreator = new I2PThread(creator); - sockCreator.setDaemon(true); - sockCreator.setName("PeerCallback:" + _transport.getListenPort()); - //sockCreator.setPriority(I2PThread.MIN_PRIORITY); - sockCreator.start(); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Before joining socket creator via peer callback..."); - try { - synchronized (creator) { - creator.wait(TCPTransport.SOCKET_CREATE_TIMEOUT); - } - } catch (InterruptedException ie) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Timed out waiting to connect to " + peer.getHost() - + ':' + peer.getPort()); - } - - boolean established = creator.couldEstablish(); - // returns a socket if and only if the connection was established and the I2P handshake byte sent and received + // blocking call, timing out after the SOCKET_CREATE_TIMEOUT if there + // isn't a definitive yes or no on whether the peer is running I2NP or not + // the call closes the socket created regardless + boolean established = creator.verifyReachability(TCPTransport.SOCKET_CREATE_TIMEOUT); if (_log.shouldLog(Log.DEBUG)) _log.debug("After joining socket creator via peer callback [could establish? " + established + "]"); return established; diff --git a/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java b/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java index a9a8e0713f..bc8e48e95c 100644 --- a/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java +++ b/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java @@ -6,14 +6,22 @@ import java.net.Socket; import java.net.UnknownHostException; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; -class SocketCreator implements Runnable { +/** + * Helper class to coordinate the creation of sockets to I2P routers + * + */ +class SocketCreator implements SimpleTimer.TimedEvent { private final static Log _log = new Log(SocketCreator.class); private String _host; private int _port; private Socket _socket; private boolean _keepOpen; private boolean _established; + private long _created; + private long _timeoutMs; + private String _caller; public SocketCreator(String host, int port) { this(host, port, true); @@ -24,6 +32,7 @@ class SocketCreator implements Runnable { _socket = null; _keepOpen = keepOpen; _established = false; + _created = System.currentTimeMillis(); } public Socket getSocket() { return _socket; } @@ -35,48 +44,124 @@ class SocketCreator implements Runnable { /** sent if we arent trying to talk */ private final static int NOT_I2P_FLAG = 0x2B; - public void run() { - if (_keepOpen) { - doEstablish(); + /** + * Blocking call to determine whether the socket configured can be reached + * (and whether it is a valid I2P router). The socket created to test this + * will be closed afterwards. + * + * @param timeoutMs max time to wait for validation + * @return true if the peer is reachable and sends us the I2P_FLAG, false + * otherwise + */ + public boolean verifyReachability(long timeoutMs) { + _timeoutMs = timeoutMs; + _caller = Thread.currentThread().getName(); + SimpleTimer.getInstance().addEvent(this, timeoutMs); + checkEstablish(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("veriyReachability complete, established? " + _established); + return _established; + } + + /** + * Blocking call to establish a socket connection to the peer. After either + * the timeout has expired or the socket has been created, the socket and/or + * its status can be accessed via couldEstablish() and getSocket(), + * respectively. If the socket could not be established in the given time + * frame, the socket is closed. + * + */ + public void establishConnection(long timeoutMs) { + _timeoutMs = timeoutMs; + _caller = Thread.currentThread().getName(); + SimpleTimer.getInstance().addEvent(this, timeoutMs); + doEstablish(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("EstablishConnection complete, established? " + _established); + } + + /** + * Called when the timeout was reached - depending on our configuration and + * whether a connection was established, we may want to tear down the socket. + * + */ + public void timeReached() { + long duration = System.currentTimeMillis() - _created; + if (!_keepOpen) { + if (_socket != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_caller + ": timeReached(), dont keep open, and we have a socket. kill it (" + + duration + "ms, delay " + _timeoutMs + ")"); + try { _socket.close(); } catch (IOException ioe) {} + _socket = null; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_caller + ": timeReached(), dont keep open, but we don't have a socket. noop"); + } } else { - checkEstablish(); + if (_established) { + // noop + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_caller + ": timeReached(), keep open, and we have an established socket. noop"); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_caller + ": timeReached(), keep open, but we havent established yet. kill the socket! (" + + duration + "ms, delay " + _timeoutMs + ")"); + if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} + _socket = null; + } } } + /** + * Create the socket with the intent of keeping it open + * + */ private void doEstablish() { try { _socket = new Socket(_host, _port); if (_log.shouldLog(Log.DEBUG)) _log.debug("Socket created"); + + if (_socket == null) return; OutputStream os = _socket.getOutputStream(); os.write(I2P_FLAG); os.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("I2P flag sent"); + + if (_socket == null) return; int val = _socket.getInputStream().read(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); if (val != I2P_FLAG) { - _socket.close(); + if (_socket != null) + _socket.close(); _socket = null; } + _established = true; return; } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error establishing connection to " + _host + ':' + _port, uhe); + if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {} + _socket = null; return; } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Error establishing", ioe); + if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {} _socket = null; return; - } finally { - synchronized (this) { - notifyAll(); - } - } + } catch (Exception e) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unknown error establishing connection to " + _host + ':' + _port + ": " + e.getMessage()); + if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {} + _socket = null; + return; + } } /** @@ -92,14 +177,18 @@ class SocketCreator implements Runnable { if (_log.shouldLog(Log.DEBUG)) _log.debug("Socket created (but we're not sending the flag, since we're just testing them)"); + if (_socket == null) return; OutputStream os = _socket.getOutputStream(); os.write(NOT_I2P_FLAG); os.flush(); + if (_socket == null) return; int val = _socket.getInputStream().read(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); + + if (_socket == null) return; _socket.close(); _socket = null; _established = (val == I2P_FLAG); @@ -107,18 +196,22 @@ class SocketCreator implements Runnable { } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error establishing connection to " + _host + ':' + _port, uhe); + if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} return; } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Error establishing", ioe); + if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {} _socket = null; return; - } finally { - synchronized (this) { - notifyAll(); - } - } + } catch (Exception e) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unknown error establishing connection to " + _host + ':' + _port + ": " + e.getMessage()); + if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} + _socket = null; + return; + } } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java index 2b6e1ada5f..9b205b92b0 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java @@ -16,9 +16,13 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; /** * Listen for TCP connections with a listener thread @@ -31,18 +35,32 @@ class TCPListener { private ServerSocket _socket; private ListenerRunner _listener; private RouterContext _context; + private List _pendingSockets; + private List _handlers; public TCPListener(RouterContext context, TCPTransport transport) { _context = context; _log = context.logManager().getLog(TCPListener.class); _myAddress = null; _transport = transport; + _pendingSockets = new ArrayList(10); + _handlers = new ArrayList(CONCURRENT_HANDLERS); } public void setAddress(TCPAddress address) { _myAddress = address; } public TCPAddress getAddress() { return _myAddress; } + private static final int CONCURRENT_HANDLERS = 3; + public void startListening() { + for (int i = 0; i < CONCURRENT_HANDLERS; i++) { + SocketHandler handler = new SocketHandler(); + _handlers.add(handler); + Thread t = new I2PThread(handler); + t.setName("Handler" + i+" [" + _myAddress.getPort()+"]"); + t.setDaemon(true); + t.start(); + } _listener = new ListenerRunner(); Thread t = new I2PThread(_listener); t.setName("Listener [" + _myAddress.getPort()+"]"); @@ -52,6 +70,11 @@ class TCPListener { public void stopListening() { _listener.stopListening(); + for (int i = 0; i < _handlers.size(); i++) { + SocketHandler h = (SocketHandler)_handlers.get(i); + h.stopHandling(); + } + _handlers.clear(); if (_socket != null) try { _socket.close(); @@ -84,7 +107,8 @@ class TCPListener { public void stopListening() { _isRunning = false; } public void run() { - _log.info("Beginning TCP listener"); + if (_log.shouldLog(Log.INFO)) + _log.info("Beginning TCP listener"); int curDelay = 0; while ( (_isRunning) && (curDelay < MAX_FAIL_DELAY) ) { @@ -94,11 +118,14 @@ class TCPListener { } else { _socket = new ServerSocket(_myAddress.getPort()); } - _log.info("Begin looping for host " + _myAddress.getHost() + ":" + _myAddress.getPort()); + if (_log.shouldLog(Log.INFO)) + _log.info("Begin looping for host " + _myAddress.getHost() + ":" + _myAddress.getPort()); curDelay = 0; loop(); } catch (IOException ioe) { - _log.error("Error listening to tcp connection " + _myAddress.getHost() + ":" + _myAddress.getPort(), ioe); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error listening to tcp connection " + _myAddress.getHost() + ":" + + _myAddress.getPort(), ioe); } if (_socket != null) { @@ -107,12 +134,14 @@ class TCPListener { _socket = null; } - _log.error("Error listening, waiting " + _nextFailDelay + "ms before we try again"); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error listening, waiting " + _nextFailDelay + "ms before we try again"); try { Thread.sleep(_nextFailDelay); } catch (InterruptedException ie) {} curDelay += _nextFailDelay; _nextFailDelay *= 5; } - _log.error("CANCELING TCP LISTEN. delay = " + curDelay, new Exception("TCP Listen cancelled!!!")); + if (_log.shouldLog(Log.ERROR)) + _log.error("CANCELING TCP LISTEN. delay = " + curDelay); _isRunning = false; } private void loop() { @@ -128,60 +157,96 @@ class TCPListener { handle(s); } catch (SocketException se) { - _log.error("Error handling a connection - closed?", se); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error handling a connection - closed?", se); return; } catch (Throwable t) { - _log.error("Error handling a connection", t); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error handling a connection", t); } } } } + /** + * Just toss it on a queue for our pool of handlers to deal with (but also + * queue up a timeout event in case they're swamped) + * + */ private void handle(Socket s) { - I2PThread t = new I2PThread(new BlockingHandler(s)); - t.setDaemon(true); - t.setName("BlockingHandler:"+_transport.getListenPort()); - t.start(); + SimpleTimer.getInstance().addEvent(new CloseUnhandled(s), HANDLE_TIMEOUT); + synchronized (_pendingSockets) { + _pendingSockets.add(s); + _pendingSockets.notifyAll(); + } } - private class BlockingHandler implements Runnable { - private Socket _handledSocket; - public BlockingHandler(Socket socket) { - _handledSocket = socket; + /** callback to close an unhandled socket (if the handlers are overwhelmed) */ + private class CloseUnhandled implements SimpleTimer.TimedEvent { + private Socket _cur; + public CloseUnhandled(Socket socket) { + _cur = socket; } - public void run() { - TimedHandler h = new TimedHandler(_handledSocket); - I2PThread t = new I2PThread(h); - t.setDaemon(true); - t.start(); - try { - synchronized (h) { - h.wait(HANDLE_TIMEOUT); - } - } catch (InterruptedException ie) { - // got through early... + public void timeReached() { + boolean removed; + synchronized (_pendingSockets) { + removed = _pendingSockets.remove(_cur); } - if (h.wasSuccessful()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handle successful"); - } else { - if (h.receivedIdentByte()) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to handle in the time allotted"); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning"); - } - try { _handledSocket.close(); } catch (IOException ioe) {} + if (removed) { + // handlers hadn't taken it yet, so close it + if (_log.shouldLog(Log.WARN)) + _log.warn("Closing unhandled socket " + _cur); + try { _cur.close(); } catch (IOException ioe) {} } } + } + /** + * Implement a runner for the pool of handlers, pulling sockets out of the + * _pendingSockets queue and synchronously pumping them through a + * TimedHandler. + * + */ + private class SocketHandler implements Runnable { + private boolean _handle; + public SocketHandler() { + _handle = true; + } + public void run () { + while (_handle) { + Socket cur = null; + try { + synchronized (_pendingSockets) { + if (_pendingSockets.size() <= 0) + _pendingSockets.wait(); + else + cur = (Socket)_pendingSockets.remove(0); + } + } catch (InterruptedException ie) {} + + if (cur != null) + handleSocket(cur); + cur = null; + } + } + public void stopHandling() { _handle = false; } + + /** + * blocking call to establish the basic connection, but with a timeout + * in the TimedHandler + */ + private void handleSocket(Socket s) { + TimedHandler h = new TimedHandler(s); + h.handle(); + } + } + /** if we're not making progress in 30s, drop 'em */ private final static long HANDLE_TIMEOUT = 10*1000; private static volatile int __handlerId = 0; - private class TimedHandler implements Runnable { + private class TimedHandler implements SimpleTimer.TimedEvent { private int _handlerId; private Socket _socket; private boolean _wasSuccessful; @@ -192,8 +257,9 @@ class TCPListener { _handlerId = ++__handlerId; _receivedIdentByte = false; } - public void run() { - Thread.currentThread().setName("TimedHandler"+_handlerId + ':' + _transport.getListenPort()); + public int getHandlerId() { return _handlerId; } + public void handle() { + SimpleTimer.getInstance().addEvent(TimedHandler.this, HANDLE_TIMEOUT); try { OutputStream os = _socket.getOutputStream(); os.write(SocketCreator.I2P_FLAG); @@ -227,11 +293,29 @@ class TCPListener { _log.error("Error handling", t); _wasSuccessful = false; } - synchronized (TimedHandler.this) { - TimedHandler.this.notifyAll(); - } } public boolean wasSuccessful() { return _wasSuccessful; } public boolean receivedIdentByte() { return _receivedIdentByte; } + + /** + * Called after a timeout period - if we haven't already established the + * connection, close the socket (interrupting any blocking ops) + * + */ + public void timeReached() { + if (wasSuccessful()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handle successful"); + } else { + if (receivedIdentByte()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unable to handle in the time allotted"); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning"); + } + try { _socket.close(); } catch (IOException ioe) {} + } + } } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index 08412f25e9..53c24282f6 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -240,17 +240,9 @@ public class TCPTransport extends TransportImpl { long start = _context.clock().now(); SocketCreator creator = new SocketCreator(host, port); - I2PThread sockCreator = new I2PThread(creator); - sockCreator.setDaemon(true); - sockCreator.setName("SocketCreator_:" + _listenPort); - //sockCreator.setPriority(I2PThread.MIN_PRIORITY); - sockCreator.start(); - - try { - synchronized (creator) { - creator.wait(SOCKET_CREATE_TIMEOUT); - } - } catch (InterruptedException ie) {} + // blocking call, timing out after the SOCKET_CREATE_TIMEOUT and + // killing the socket if it hasn't established the connection yet + creator.establishConnection(SOCKET_CREATE_TIMEOUT); long finish = _context.clock().now(); long diff = finish - start; @@ -258,7 +250,10 @@ public class TCPTransport extends TransportImpl { if (_log.shouldLog(Log.WARN)) _log.warn("Creating a new socket took too long? wtf?! " + diff + "ms for " + host + ':' + port); } - return creator.getSocket(); + if (creator.couldEstablish()) + return creator.getSocket(); + else + return null; } private boolean isConnected(RouterInfo info) { @@ -356,7 +351,7 @@ public class TCPTransport extends TransportImpl { _running = true; for (int i = 0; i < _numConnectionEstablishers; i++) { - Thread t = new I2PThread(new ConnEstablisher(i)); + Thread t = new I2PThread(new ConnEstablisher(i), "Conn Establisher" + i + ':' + _listenPort); t.setDaemon(true); t.start(); } @@ -583,8 +578,6 @@ public class TCPTransport extends TransportImpl { public int getId() { return _id; } public void run() { - Thread.currentThread().setName("Conn Establisher" + _id + ':' + _listenPort); - while (_running) { try { PendingMessages pending = nextPeer(this); -- GitLab