diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java new file mode 100644 index 0000000000000000000000000000000000000000..0229b3ea02c86a0b1ba1ce084b34ac6243453a8b --- /dev/null +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -0,0 +1,105 @@ +package net.i2p.util; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import net.i2p.I2PAppContext; + +/** + * Simple event scheduler - toss an event on the queue and it gets fired at the + * appropriate time. The method that is fired however should NOT block (otherwise + * they b0rk the timer). + * + */ +public class SimpleTimer { + private static final SimpleTimer _instance = new SimpleTimer(); + public static SimpleTimer getInstance() { return _instance; } + private Log _log; + private Map _events; + + private SimpleTimer() { + _events = new TreeMap(); + I2PThread runner = new I2PThread(new SimpleTimerRunner()); + runner.setName("SimpleTimer"); + runner.setDaemon(true); + runner.start(); + } + + /** + * Queue up the given event to be fired no sooner than timeoutMs from now + * + */ + public void addEvent(TimedEvent event, long timeoutMs) { + long eventTime = System.currentTimeMillis() + timeoutMs; + synchronized (_events) { + while (_events.containsKey(new Long(eventTime))) + eventTime++; + _events.put(new Long(eventTime), event); + _events.notifyAll(); + } + } + + /** + * Simple interface for events to be queued up and notified on expiration + */ + public interface TimedEvent { + /** + * the time requested has been reached (this call should NOT block, + * otherwise the whole SimpleTimer gets backed up) + * + */ + public void timeReached(); + } + + private void log(String msg, Throwable t) { + synchronized (this) { + if (_log == null) + _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); + } + _log.log(Log.CRIT, msg, t); + } + + private class SimpleTimerRunner implements Runnable { + public void run() { + while (true) { + try { + synchronized (_events) { + if (_events.size() <= 0) + _events.wait(); + long now = System.currentTimeMillis(); + long nextEventDelay = -1; + List removed = null; + for (Iterator iter = _events.keySet().iterator(); iter.hasNext(); ) { + Long when = (Long)iter.next(); + if (when.longValue() <= now) { + TimedEvent evt = (TimedEvent)_events.get(when); + try { + evt.timeReached(); + } catch (Throwable t) { + log("wtf, event borked: " + evt, t); + } + if (removed == null) + removed = new ArrayList(1); + removed.add(when); + } else { + nextEventDelay = when.longValue() - now; + break; + } + } + if (removed != null) { + for (int i = 0; i < removed.size(); i++) + _events.remove(removed.get(i)); + } + if (nextEventDelay != -1) + _events.wait(nextEventDelay); + else + _events.wait(); + } + } catch (InterruptedException ie) {} + } + } + } +} diff --git a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java index 91f3065419855b0c8788660c53ee71ef5f8ae6cf..0fd4397c5d4da8468493b73c86415a79720c141c 100644 --- a/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/RestrictiveTCPConnection.java @@ -220,26 +220,10 @@ class RestrictiveTCPConnection extends TCPConnection { private boolean sendsUsData(TCPAddress peer) { SocketCreator creator = new SocketCreator(peer.getHost(), peer.getPort(), false); - I2PThread sockCreator = new I2PThread(creator); - sockCreator.setDaemon(true); - sockCreator.setName("PeerCallback:" + _transport.getListenPort()); - //sockCreator.setPriority(I2PThread.MIN_PRIORITY); - sockCreator.start(); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Before joining socket creator via peer callback..."); - try { - synchronized (creator) { - creator.wait(TCPTransport.SOCKET_CREATE_TIMEOUT); - } - } catch (InterruptedException ie) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Timed out waiting to connect to " + peer.getHost() - + ':' + peer.getPort()); - } - - boolean established = creator.couldEstablish(); - // returns a socket if and only if the connection was established and the I2P handshake byte sent and received + // blocking call, timing out after the SOCKET_CREATE_TIMEOUT if there + // isn't a definitive yes or no on whether the peer is running I2NP or not + // the call closes the socket created regardless + boolean established = creator.verifyReachability(TCPTransport.SOCKET_CREATE_TIMEOUT); if (_log.shouldLog(Log.DEBUG)) _log.debug("After joining socket creator via peer callback [could establish? " + established + "]"); return established; diff --git a/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java b/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java index a9a8e0713f5bde2511a8985916b64d37eaaa6fdf..bc8e48e95c8fd3398736611c0e5cf1e8a51df281 100644 --- a/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java +++ b/router/java/src/net/i2p/router/transport/tcp/SocketCreator.java @@ -6,14 +6,22 @@ import java.net.Socket; import java.net.UnknownHostException; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; -class SocketCreator implements Runnable { +/** + * Helper class to coordinate the creation of sockets to I2P routers + * + */ +class SocketCreator implements SimpleTimer.TimedEvent { private final static Log _log = new Log(SocketCreator.class); private String _host; private int _port; private Socket _socket; private boolean _keepOpen; private boolean _established; + private long _created; + private long _timeoutMs; + private String _caller; public SocketCreator(String host, int port) { this(host, port, true); @@ -24,6 +32,7 @@ class SocketCreator implements Runnable { _socket = null; _keepOpen = keepOpen; _established = false; + _created = System.currentTimeMillis(); } public Socket getSocket() { return _socket; } @@ -35,48 +44,124 @@ class SocketCreator implements Runnable { /** sent if we arent trying to talk */ private final static int NOT_I2P_FLAG = 0x2B; - public void run() { - if (_keepOpen) { - doEstablish(); + /** + * Blocking call to determine whether the socket configured can be reached + * (and whether it is a valid I2P router). The socket created to test this + * will be closed afterwards. + * + * @param timeoutMs max time to wait for validation + * @return true if the peer is reachable and sends us the I2P_FLAG, false + * otherwise + */ + public boolean verifyReachability(long timeoutMs) { + _timeoutMs = timeoutMs; + _caller = Thread.currentThread().getName(); + SimpleTimer.getInstance().addEvent(this, timeoutMs); + checkEstablish(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("veriyReachability complete, established? " + _established); + return _established; + } + + /** + * Blocking call to establish a socket connection to the peer. After either + * the timeout has expired or the socket has been created, the socket and/or + * its status can be accessed via couldEstablish() and getSocket(), + * respectively. If the socket could not be established in the given time + * frame, the socket is closed. + * + */ + public void establishConnection(long timeoutMs) { + _timeoutMs = timeoutMs; + _caller = Thread.currentThread().getName(); + SimpleTimer.getInstance().addEvent(this, timeoutMs); + doEstablish(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("EstablishConnection complete, established? " + _established); + } + + /** + * Called when the timeout was reached - depending on our configuration and + * whether a connection was established, we may want to tear down the socket. + * + */ + public void timeReached() { + long duration = System.currentTimeMillis() - _created; + if (!_keepOpen) { + if (_socket != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_caller + ": timeReached(), dont keep open, and we have a socket. kill it (" + + duration + "ms, delay " + _timeoutMs + ")"); + try { _socket.close(); } catch (IOException ioe) {} + _socket = null; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_caller + ": timeReached(), dont keep open, but we don't have a socket. noop"); + } } else { - checkEstablish(); + if (_established) { + // noop + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_caller + ": timeReached(), keep open, and we have an established socket. noop"); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(_caller + ": timeReached(), keep open, but we havent established yet. kill the socket! (" + + duration + "ms, delay " + _timeoutMs + ")"); + if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} + _socket = null; + } } } + /** + * Create the socket with the intent of keeping it open + * + */ private void doEstablish() { try { _socket = new Socket(_host, _port); if (_log.shouldLog(Log.DEBUG)) _log.debug("Socket created"); + + if (_socket == null) return; OutputStream os = _socket.getOutputStream(); os.write(I2P_FLAG); os.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("I2P flag sent"); + + if (_socket == null) return; int val = _socket.getInputStream().read(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); if (val != I2P_FLAG) { - _socket.close(); + if (_socket != null) + _socket.close(); _socket = null; } + _established = true; return; } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error establishing connection to " + _host + ':' + _port, uhe); + if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {} + _socket = null; return; } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Error establishing", ioe); + if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {} _socket = null; return; - } finally { - synchronized (this) { - notifyAll(); - } - } + } catch (Exception e) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unknown error establishing connection to " + _host + ':' + _port + ": " + e.getMessage()); + if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {} + _socket = null; + return; + } } /** @@ -92,14 +177,18 @@ class SocketCreator implements Runnable { if (_log.shouldLog(Log.DEBUG)) _log.debug("Socket created (but we're not sending the flag, since we're just testing them)"); + if (_socket == null) return; OutputStream os = _socket.getOutputStream(); os.write(NOT_I2P_FLAG); os.flush(); + if (_socket == null) return; int val = _socket.getInputStream().read(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Value read: [" + val + "] == flag? [" + I2P_FLAG + "]"); + + if (_socket == null) return; _socket.close(); _socket = null; _established = (val == I2P_FLAG); @@ -107,18 +196,22 @@ class SocketCreator implements Runnable { } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error establishing connection to " + _host + ':' + _port, uhe); + if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} return; } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) _log.warn("Error establishing connection to " + _host + ':' + _port + ": "+ ioe.getMessage()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Error establishing", ioe); + if (_socket != null) try { _socket.close(); } catch (IOException ioe2) {} _socket = null; return; - } finally { - synchronized (this) { - notifyAll(); - } - } + } catch (Exception e) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unknown error establishing connection to " + _host + ':' + _port + ": " + e.getMessage()); + if (_socket != null) try { _socket.close(); } catch (IOException ioe) {} + _socket = null; + return; + } } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java index 2b6e1ada5fbca7cbce36a88b1bdf9af7793b0bcf..9b205b92b01f385c5cc790f702452144c39bbc31 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPListener.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPListener.java @@ -16,9 +16,13 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; /** * Listen for TCP connections with a listener thread @@ -31,18 +35,32 @@ class TCPListener { private ServerSocket _socket; private ListenerRunner _listener; private RouterContext _context; + private List _pendingSockets; + private List _handlers; public TCPListener(RouterContext context, TCPTransport transport) { _context = context; _log = context.logManager().getLog(TCPListener.class); _myAddress = null; _transport = transport; + _pendingSockets = new ArrayList(10); + _handlers = new ArrayList(CONCURRENT_HANDLERS); } public void setAddress(TCPAddress address) { _myAddress = address; } public TCPAddress getAddress() { return _myAddress; } + private static final int CONCURRENT_HANDLERS = 3; + public void startListening() { + for (int i = 0; i < CONCURRENT_HANDLERS; i++) { + SocketHandler handler = new SocketHandler(); + _handlers.add(handler); + Thread t = new I2PThread(handler); + t.setName("Handler" + i+" [" + _myAddress.getPort()+"]"); + t.setDaemon(true); + t.start(); + } _listener = new ListenerRunner(); Thread t = new I2PThread(_listener); t.setName("Listener [" + _myAddress.getPort()+"]"); @@ -52,6 +70,11 @@ class TCPListener { public void stopListening() { _listener.stopListening(); + for (int i = 0; i < _handlers.size(); i++) { + SocketHandler h = (SocketHandler)_handlers.get(i); + h.stopHandling(); + } + _handlers.clear(); if (_socket != null) try { _socket.close(); @@ -84,7 +107,8 @@ class TCPListener { public void stopListening() { _isRunning = false; } public void run() { - _log.info("Beginning TCP listener"); + if (_log.shouldLog(Log.INFO)) + _log.info("Beginning TCP listener"); int curDelay = 0; while ( (_isRunning) && (curDelay < MAX_FAIL_DELAY) ) { @@ -94,11 +118,14 @@ class TCPListener { } else { _socket = new ServerSocket(_myAddress.getPort()); } - _log.info("Begin looping for host " + _myAddress.getHost() + ":" + _myAddress.getPort()); + if (_log.shouldLog(Log.INFO)) + _log.info("Begin looping for host " + _myAddress.getHost() + ":" + _myAddress.getPort()); curDelay = 0; loop(); } catch (IOException ioe) { - _log.error("Error listening to tcp connection " + _myAddress.getHost() + ":" + _myAddress.getPort(), ioe); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error listening to tcp connection " + _myAddress.getHost() + ":" + + _myAddress.getPort(), ioe); } if (_socket != null) { @@ -107,12 +134,14 @@ class TCPListener { _socket = null; } - _log.error("Error listening, waiting " + _nextFailDelay + "ms before we try again"); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error listening, waiting " + _nextFailDelay + "ms before we try again"); try { Thread.sleep(_nextFailDelay); } catch (InterruptedException ie) {} curDelay += _nextFailDelay; _nextFailDelay *= 5; } - _log.error("CANCELING TCP LISTEN. delay = " + curDelay, new Exception("TCP Listen cancelled!!!")); + if (_log.shouldLog(Log.ERROR)) + _log.error("CANCELING TCP LISTEN. delay = " + curDelay); _isRunning = false; } private void loop() { @@ -128,60 +157,96 @@ class TCPListener { handle(s); } catch (SocketException se) { - _log.error("Error handling a connection - closed?", se); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error handling a connection - closed?", se); return; } catch (Throwable t) { - _log.error("Error handling a connection", t); + if (_log.shouldLog(Log.ERROR)) + _log.error("Error handling a connection", t); } } } } + /** + * Just toss it on a queue for our pool of handlers to deal with (but also + * queue up a timeout event in case they're swamped) + * + */ private void handle(Socket s) { - I2PThread t = new I2PThread(new BlockingHandler(s)); - t.setDaemon(true); - t.setName("BlockingHandler:"+_transport.getListenPort()); - t.start(); + SimpleTimer.getInstance().addEvent(new CloseUnhandled(s), HANDLE_TIMEOUT); + synchronized (_pendingSockets) { + _pendingSockets.add(s); + _pendingSockets.notifyAll(); + } } - private class BlockingHandler implements Runnable { - private Socket _handledSocket; - public BlockingHandler(Socket socket) { - _handledSocket = socket; + /** callback to close an unhandled socket (if the handlers are overwhelmed) */ + private class CloseUnhandled implements SimpleTimer.TimedEvent { + private Socket _cur; + public CloseUnhandled(Socket socket) { + _cur = socket; } - public void run() { - TimedHandler h = new TimedHandler(_handledSocket); - I2PThread t = new I2PThread(h); - t.setDaemon(true); - t.start(); - try { - synchronized (h) { - h.wait(HANDLE_TIMEOUT); - } - } catch (InterruptedException ie) { - // got through early... + public void timeReached() { + boolean removed; + synchronized (_pendingSockets) { + removed = _pendingSockets.remove(_cur); } - if (h.wasSuccessful()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handle successful"); - } else { - if (h.receivedIdentByte()) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to handle in the time allotted"); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning"); - } - try { _handledSocket.close(); } catch (IOException ioe) {} + if (removed) { + // handlers hadn't taken it yet, so close it + if (_log.shouldLog(Log.WARN)) + _log.warn("Closing unhandled socket " + _cur); + try { _cur.close(); } catch (IOException ioe) {} } } + } + /** + * Implement a runner for the pool of handlers, pulling sockets out of the + * _pendingSockets queue and synchronously pumping them through a + * TimedHandler. + * + */ + private class SocketHandler implements Runnable { + private boolean _handle; + public SocketHandler() { + _handle = true; + } + public void run () { + while (_handle) { + Socket cur = null; + try { + synchronized (_pendingSockets) { + if (_pendingSockets.size() <= 0) + _pendingSockets.wait(); + else + cur = (Socket)_pendingSockets.remove(0); + } + } catch (InterruptedException ie) {} + + if (cur != null) + handleSocket(cur); + cur = null; + } + } + public void stopHandling() { _handle = false; } + + /** + * blocking call to establish the basic connection, but with a timeout + * in the TimedHandler + */ + private void handleSocket(Socket s) { + TimedHandler h = new TimedHandler(s); + h.handle(); + } + } + /** if we're not making progress in 30s, drop 'em */ private final static long HANDLE_TIMEOUT = 10*1000; private static volatile int __handlerId = 0; - private class TimedHandler implements Runnable { + private class TimedHandler implements SimpleTimer.TimedEvent { private int _handlerId; private Socket _socket; private boolean _wasSuccessful; @@ -192,8 +257,9 @@ class TCPListener { _handlerId = ++__handlerId; _receivedIdentByte = false; } - public void run() { - Thread.currentThread().setName("TimedHandler"+_handlerId + ':' + _transport.getListenPort()); + public int getHandlerId() { return _handlerId; } + public void handle() { + SimpleTimer.getInstance().addEvent(TimedHandler.this, HANDLE_TIMEOUT); try { OutputStream os = _socket.getOutputStream(); os.write(SocketCreator.I2P_FLAG); @@ -227,11 +293,29 @@ class TCPListener { _log.error("Error handling", t); _wasSuccessful = false; } - synchronized (TimedHandler.this) { - TimedHandler.this.notifyAll(); - } } public boolean wasSuccessful() { return _wasSuccessful; } public boolean receivedIdentByte() { return _receivedIdentByte; } + + /** + * Called after a timeout period - if we haven't already established the + * connection, close the socket (interrupting any blocking ops) + * + */ + public void timeReached() { + if (wasSuccessful()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handle successful"); + } else { + if (receivedIdentByte()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unable to handle in the time allotted"); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Peer didn't send the ident byte, so either they were testing us, or portscanning"); + } + try { _socket.close(); } catch (IOException ioe) {} + } + } } } diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java index 08412f25e99f62b8f6c8b186fc91aaefe993f963..53c24282f60a4be804582f5bc339c9d2d32f2269 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPTransport.java @@ -240,17 +240,9 @@ public class TCPTransport extends TransportImpl { long start = _context.clock().now(); SocketCreator creator = new SocketCreator(host, port); - I2PThread sockCreator = new I2PThread(creator); - sockCreator.setDaemon(true); - sockCreator.setName("SocketCreator_:" + _listenPort); - //sockCreator.setPriority(I2PThread.MIN_PRIORITY); - sockCreator.start(); - - try { - synchronized (creator) { - creator.wait(SOCKET_CREATE_TIMEOUT); - } - } catch (InterruptedException ie) {} + // blocking call, timing out after the SOCKET_CREATE_TIMEOUT and + // killing the socket if it hasn't established the connection yet + creator.establishConnection(SOCKET_CREATE_TIMEOUT); long finish = _context.clock().now(); long diff = finish - start; @@ -258,7 +250,10 @@ public class TCPTransport extends TransportImpl { if (_log.shouldLog(Log.WARN)) _log.warn("Creating a new socket took too long? wtf?! " + diff + "ms for " + host + ':' + port); } - return creator.getSocket(); + if (creator.couldEstablish()) + return creator.getSocket(); + else + return null; } private boolean isConnected(RouterInfo info) { @@ -356,7 +351,7 @@ public class TCPTransport extends TransportImpl { _running = true; for (int i = 0; i < _numConnectionEstablishers; i++) { - Thread t = new I2PThread(new ConnEstablisher(i)); + Thread t = new I2PThread(new ConnEstablisher(i), "Conn Establisher" + i + ':' + _listenPort); t.setDaemon(true); t.start(); } @@ -583,8 +578,6 @@ public class TCPTransport extends TransportImpl { public int getId() { return _id; } public void run() { - Thread.currentThread().setName("Conn Establisher" + _id + ':' + _listenPort); - while (_running) { try { PendingMessages pending = nextPeer(this);