diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index fc89437bd0215f3979222685a4db6668b5cdab23..ae729337399a9317254fe2ebac13bfb07c4506e8 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -4,71 +4,147 @@ import java.net.ConnectException; import net.i2p.I2PException; import net.i2p.util.Log; +import net.i2p.util.Clock; +import net.i2p.I2PAppContext; + +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; /** - * Initial stub implementation for the server socket + * Server socket implementation, allowing multiple threads to accept I2PSockets + * and pull from a queue populated by various threads (each of whom have their own + * timeout) * */ class I2PServerSocketImpl implements I2PServerSocket { private final static Log _log = new Log(I2PServerSocketImpl.class); private I2PSocketManager mgr; - private I2PSocket cached = null; // buffer one socket here - - private boolean closing = false; // Are we being closed? - - private Object acceptLock = new Object(); - + /** list of sockets waiting for the client to accept them */ + private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); + + /** have we been closed */ + private volatile boolean closing = false; + + /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ + private Object socketAcceptedLock = new Object(); + /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ + private Object socketAddedLock = new Object(); + public I2PServerSocketImpl(I2PSocketManager mgr) { this.mgr = mgr; } - - public synchronized I2PSocket accept() throws I2PException, ConnectException { - I2PSocket ret; - - synchronized (acceptLock) { - while ((cached == null) && !closing) { - myWait(); - } - - if (closing) { - throw new ConnectException("I2PServerSocket closed"); - } - - ret = cached; - cached = null; - acceptLock.notifyAll(); - } - - _log.debug("TIMING: handed out accept result " + ret.hashCode()); + + /** + * Waits for the next socket connecting. If a remote user tried to make a + * connection and the local application wasn't .accept()ing new connections, + * they should get refused (if .accept() doesnt occur in some small period) + * + * @return a connected I2PSocket + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + */ + public I2PSocket accept() throws I2PException, ConnectException { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("accept() called, pending: " + pendingSockets.size()); + + I2PSocket ret = null; + + while ( (ret == null) && (!closing) ){ + while (pendingSockets.size() <= 0) { + if (closing) throw new ConnectException("I2PServerSocket closed"); + try { + synchronized(socketAddedLock) { + socketAddedLock.wait(); + } + } catch (InterruptedException ie) {} + } + synchronized (pendingSockets) { + if (pendingSockets.size() > 0) { + ret = (I2PSocket)pendingSockets.remove(0); + } + } + if (ret != null) { + synchronized (socketAcceptedLock) { + socketAcceptedLock.notifyAll(); + } + } + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("TIMING: handed out accept result " + ret.hashCode()); return ret; } - - public boolean getNewSocket(I2PSocket s) { - synchronized (acceptLock) { - while (cached != null) { - myWait(); - } - cached = s; - acceptLock.notifyAll(); - } - + + /** + * Make the socket available and wait until the client app accepts it, or until + * the given timeout elapses. This doesn't have any limits on the queue size - + * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse) + * + * @param timeoutMs how long to wait until accept + * @return true if the socket was accepted, false if the timeout expired + * or the socket was closed + */ + public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("addWaitForAccept [new socket arrived, pending: " + pendingSockets.size()); + + if (closing) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Already closing the socket"); + return false; + } + + Clock clock = I2PAppContext.getGlobalContext().clock(); + long start = clock.now(); + long end = start + timeoutMs; + pendingSockets.add(s); + synchronized (socketAddedLock) { + socketAddedLock.notifyAll(); + } + + // keep looping until the socket has been grabbed by the accept() + // (or the expiration passes, or the socket is closed) + while (pendingSockets.contains(s)) { + long now = clock.now(); + if (now >= end) { + if (_log.shouldLog(Log.INFO)) + _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms"); + pendingSockets.remove(s); + return false; + } + if (closing) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Server socket closed while waiting for accept"); + pendingSockets.remove(s); + return false; + } + long remaining = end - now; + try { + synchronized (socketAcceptedLock) { + socketAcceptedLock.wait(remaining); + } + } catch (InterruptedException ie) {} + } + long now = clock.now(); + if (_log.shouldLog(Log.DEBUG)) + _log.info("Socket accepted after " + (now-start) + "ms"); return true; } - - public void close() throws I2PException { - synchronized (acceptLock) { - closing = true; - acceptLock.notifyAll(); - } - } - - public I2PSocketManager getManager() { - return mgr; - } - - private void myWait() { - try { - acceptLock.wait(); - } catch (InterruptedException ex) {} + + public void close() { + closing = true; + // let anyone .accept()ing know to fsck off + synchronized (socketAddedLock) { + socketAddedLock.notifyAll(); + } + // let anyone addWaitForAccept()ing know to fsck off + synchronized (socketAcceptedLock) { + socketAcceptedLock.notifyAll(); + } } + + public I2PSocketManager getManager() { return mgr; } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index d71ef46368bf41be29e4e233ac176c7331be517c..36673e43ae5c980dfc5b646d1ce78419f20a34d2 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -268,6 +268,7 @@ class I2PSocketImpl implements I2PSocket { } } + private static volatile long __runnerId = 0; public class I2PSocketRunner extends I2PThread { public InputStream in; @@ -276,7 +277,7 @@ class I2PSocketImpl implements I2PSocket { _log.debug("Runner's input stream is: " + in.hashCode()); this.in = in; String peer = I2PSocketImpl.this.remote.calculateHash().toBase64(); - setName("SocketRunner from " + peer.substring(0, 4)); + setName("SocketRunner " + (++__runnerId) + " " + peer.substring(0, 4)); start(); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index cdc391b63f8064832282d04fea6f7a0f53b88e40..821ac3f9e31b5d0263cdbd5d1e44ee8f9cfe1af6 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -41,6 +41,7 @@ public class I2PSocketManager implements I2PSessionListener { private HashMap _outSockets; private HashMap _inSockets; private I2PSocketOptions _defaultOptions; + private long _acceptTimeout; public static final short ACK = 0x51; public static final short CLOSE_OUT = 0x52; @@ -50,10 +51,17 @@ public class I2PSocketManager implements I2PSessionListener { public static final short DATA_IN = 0xA0; public static final short CHAFF = 0xFF; + /** + * How long to wait for the client app to accept() before sending back CLOSE? + * This includes the time waiting in the queue. Currently set to 5 seconds. + */ + private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; + public I2PSocketManager() { _session = null; _inSockets = new HashMap(16); _outSockets = new HashMap(16); + _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; } public I2PSession getSession() { @@ -64,6 +72,15 @@ public class I2PSocketManager implements I2PSessionListener { _session = session; if (session != null) session.setSessionListener(this); } + + /** + * How long should we wait for the client to .accept() a socket before + * sending back a NACK/Close? + * + * @param ms milliseconds to wait, maximum + */ + public void setAcceptTimeout(long ms) { _acceptTimeout = ms; } + public long getAcceptTimeout() { return _acceptTimeout; } public void disconnected(I2PSession session) { _log.info("Disconnected from the session"); @@ -260,23 +277,25 @@ public class I2PSocketManager implements I2PSessionListener { return; } - if (_serverSocket.getNewSocket(s)) { + if (_serverSocket.addWaitForAccept(s, _acceptTimeout)) { _inSockets.put(newLocalID, s); byte[] packet = makePacket((byte) ACK, id, toBytes(newLocalID)); boolean replySentOk = false; replySentOk = _session.sendMessage(d, packet); if (!replySentOk) { - _log.error("Error sending reply to " + d.calculateHash().toBase64() - + " in response to a new con message", - new Exception("Failed creation")); + if (_log.shouldLog(Log.WARN)) + _log.warn("Error sending reply to " + d.calculateHash().toBase64() + + " in response to a new con message", + new Exception("Failed creation")); s.internalClose(); } } else { + // timed out or serverSocket closed byte[] packet = toBytes(" " + id); packet[0] = CLOSE_OUT; boolean nackSent = session.sendMessage(d, packet); if (!nackSent) { - _log.error("Error sending NACK for session creation"); + _log.warn("Error sending NACK for session creation"); } s.internalClose(); } @@ -461,14 +480,9 @@ public class I2PSocketManager implements I2PSessionListener { * */ public void destroySocketManager() { - - try { - if (_serverSocket != null) { - _serverSocket.close(); - _serverSocket = null; - } - } catch (I2PException ex) { - _log.error("Error closing I2PServerSocket", ex); + if (_serverSocket != null) { + _serverSocket.close(); + _serverSocket = null; } synchronized (lock) {