Newer
Older
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

jrandom
committed
import net.i2p.util.Clock;

jrandom
committed
* Server socket implementation, allowing multiple threads to accept I2PSockets
* and pull from a queue populated by various threads (each of whom have their own
* timeout)
*
*/
class I2PServerSocketImpl implements I2PServerSocket {
private final static Log _log = new Log(I2PServerSocketImpl.class);
private I2PSocketManager mgr;
/** list of sockets waiting for the client to accept them */
private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
/** have we been closed */
private volatile boolean closing = false;
/** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
private Object socketAcceptedLock = new Object();
/** lock on this when adding a new socket to the pending list, and wait on it accordingly */
private Object socketAddedLock = new Object();
/**
* Set Sock Option accept timeout stub, does nothing
* @param x
*/
public void setSoTimeout(long x) {
}
/**
* Get Sock Option accept timeout stub, does nothing
* @return timeout
*/
public long getSoTimeout() {
return -1;
}
public I2PServerSocketImpl(I2PSocketManager mgr) {
this.mgr = mgr;
}
/**
* Waits for the next socket connecting. If a remote user tried to make a
* connection and the local application wasn't .accept()ing new connections,
* they should get refused (if .accept() doesnt occur in some small period -
* currently 5 seconds)
*
* @return a connected I2PSocket
*
* @throws I2PException if there is a problem with reading a new socket
* from the data available (aka the I2PSession closed, etc)
* @throws ConnectException if the I2PServerSocket is closed
*/
public I2PSocket accept() throws I2PException, ConnectException {
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("accept() called, pending: " + pendingSockets.size());
I2PSocket ret = null;
while ( (ret == null) && (!closing) ){
while (pendingSockets.size() <= 0) {
if(closing) {
throw new ConnectException("I2PServerSocket closed");
}
try {
synchronized(socketAddedLock) {
socketAddedLock.wait();
}
} catch(InterruptedException ie) {
}
}
synchronized (pendingSockets) {
if (pendingSockets.size() > 0) {
ret = (I2PSocket)pendingSockets.remove(0);
}
}
if (ret != null) {
synchronized (socketAcceptedLock) {
socketAcceptedLock.notifyAll();
}
}
}
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("TIMING: handed out accept result " + ret.hashCode());
return ret;
}
/**
* Make the socket available and wait until the client app accepts it, or until
* the given timeout elapses. This doesn't have any limits on the queue size -
* perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
*
* @param timeoutMs how long to wait until accept
* @return true if the socket was accepted, false if the timeout expired
* or the socket was closed
*/
public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
if (closing) {
if(_log.shouldLog(Log.WARN)) {
_log.warn("Already closing the socket");
return false;
}
Clock clock = I2PAppContext.getGlobalContext().clock();
long start = clock.now();
long end = start + timeoutMs;
pendingSockets.add(s);
synchronized (socketAddedLock) {
socketAddedLock.notifyAll();
}
// keep looping until the socket has been grabbed by the accept()
// (or the expiration passes, or the socket is closed)
while (pendingSockets.contains(s)) {
long now = clock.now();
if (now >= end) {
if(_log.shouldLog(Log.INFO)) {
_log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
pendingSockets.remove(s);
return false;
}
if (closing) {
if(_log.shouldLog(Log.WARN)) {
_log.warn("Server socket closed while waiting for accept");
pendingSockets.remove(s);
return false;
}
long remaining = end - now;
try {
synchronized (socketAcceptedLock) {
socketAcceptedLock.wait(remaining);
}
} catch(InterruptedException ie) {
}
}
long now = clock.now();
if(_log.shouldLog(Log.DEBUG)) {
_log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
return true;
}
public void close() {
closing = true;
// let anyone .accept()ing know to fsck off
synchronized (socketAddedLock) {
socketAddedLock.notifyAll();
}
// let anyone addWaitForAccept()ing know to fsck off
synchronized (socketAcceptedLock) {
socketAcceptedLock.notifyAll();
}
}
public I2PSocketManager getManager() {
return mgr;
}