diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 733958736abafe0f5b48b6f8956197cb89a47ef1..d0028fdb81dba2d83e47844d722c4efdad513cff 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -30,37 +30,6 @@ public interface I2PServerSocket { */ public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; - /** - * accept(timeout) waits timeout ms for a socket connecting. If a socket is - * not available during the timeout, return null. accept(0) behaves like accept() - * - * @param timeout in ms - * - * @return a connected I2PSocket, or null - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - * @throws InterruptedException if thread is interrupted while waiting - */ - public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException; - - /** - * Wait until there is a socket waiting for acception or the timeout is - * reached. - * - * @param timeoutMs timeout in ms. If ms is 0, wait forever. - * - * @return true if a socket is available, false if not - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - * @throws InterruptedException if the thread is interrupted before - * completion - */ - public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException; - /** * Set Sock Option accept timeout * @param x timeout in ms diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 2da8320fcb5f84e54a7f27561757701479fec108..662fd9e5721eb40de7446f84a59006ab473711af 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -1,7 +1,6 @@ package net.i2p.client.streaming; import java.net.ConnectException; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -51,33 +50,24 @@ class I2PServerSocketImpl implements I2PServerSocket { } /** - * Waits until there is a socket waiting for acception or the timeout is - * reached. - * - * @param timeoutMs timeout in ms. A negative value waits forever. + * Waits for the next socket connecting. If a remote user tried to make a + * connection and the local application wasn't .accept()ing new connections, + * they should get refused (if .accept() doesnt occur in some small period - + * currently 5 seconds) + * + * @return a connected I2PSocket * * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed - * @throws InterruptedException if thread is interrupted while waiting */ - public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException { + public I2PSocket accept() throws I2PException, ConnectException { if (_log.shouldLog(Log.DEBUG)) - _log.debug("waitIncoming() called, pending: " + pendingSockets.size()); + _log.debug("accept() called, pending: " + pendingSockets.size()); - boolean isTimed = (timeoutMs>0); - if (isTimed) { - Clock clock = I2PAppContext.getGlobalContext().clock(); - long now = clock.now(); - long end = now + timeoutMs; - while (pendingSockets.size() <= 0 && now<end) { - if (closing) throw new ConnectException("I2PServerSocket closed"); - synchronized(socketAddedLock) { - socketAddedLock.wait(end - now); - } - now = clock.now(); - } - } else { + I2PSocket ret = null; + + while ( (ret == null) && (!closing) ){ while (pendingSockets.size() <= 0) { if (closing) throw new ConnectException("I2PServerSocket closed"); try { @@ -86,71 +76,16 @@ class I2PServerSocketImpl implements I2PServerSocket { } } catch (InterruptedException ie) {} } - } - } - - /** - * accept(timeout) waits timeout ms for a socket connecting. If a socket is - * not available during the timeout, return null. accept(0) behaves like accept() - * - * @param timeout in ms - * - * @return a connected I2PSocket, or null - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - * @throws InterruptedException if thread is interrupted while waiting - */ - public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException { - I2PSocket ret = null; - - if (timeout<=0) { - ret = accept(); - } else { - long now = I2PAppContext.getGlobalContext().clock().now(); - long expiration = timeout + now ; - synchronized (pendingSockets) { - while (pendingSockets.size() == 0 && expiration>now) { - pendingSockets.wait(expiration-now); - now = I2PAppContext.getGlobalContext().clock().now(); + synchronized (pendingSockets) { + if (pendingSockets.size() > 0) { + ret = (I2PSocket)pendingSockets.remove(0); } - ret = (I2PSocket)pendingSockets.remove(0); } if (ret != null) { synchronized (socketAcceptedLock) { socketAcceptedLock.notifyAll(); } - } - } - return ret; - } - - /** - * Waits for the next socket connecting. If a remote user tried to make a - * connection and the local application wasn't .accept()ing new connections, - * they should get refused (if .accept() doesnt occur in some small period - - * currently 5 seconds) - * - * @return a connected I2PSocket - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - * @throws ConnectException if the I2PServerSocket is closed - */ - public I2PSocket accept() throws I2PException, ConnectException { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("accept() called, pending: " + pendingSockets.size()); - - I2PSocket ret = null; - - while ( (ret == null) && (!closing) ){ - try { - this.waitIncoming(0); - ret = accept(1); - } catch (InterruptedException e) { - throw new I2PException("Thread interrupted") ; - } + } } if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index b37bf10850c457c3a02b91488e4125bd017a10ca..dc7a48fd466da98fb3bf53495f39093af040530d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -350,6 +350,7 @@ class I2PSocketImpl implements I2PSocket { read = bc.startToByteArray(len); bc.notifyAll(); } + boolean timedOut = false; while ( (read.length == 0) && (!inStreamClosed) ) { synchronized (flagLock) { diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 374ec35ce6e395ef9f570701cdf0131f0534a2f6..0370ab16be25cf82d44679ae1a03e20f9a7a3806 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -13,6 +13,7 @@ import net.i2p.client.I2PClient; import net.i2p.client.I2PClientFactory; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; +import net.i2p.data.Destination; import net.i2p.util.Log; /** diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java index a272e99df27eb9430acf84c7438a1086ca2cb022..b40b9091f79e79caa8dae5faa44b713742b63b91 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java @@ -130,6 +130,7 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener { public void messageAvailable(I2PSession session, int msgId, long size) { try { + I2PSocketImpl s; byte msg[] = session.receiveMessage(msgId); if (msg.length == 1 && msg[0] == -1) { _log.debug(getName() + ": Ping received"); diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java b/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java index a29b9f6f1195f31e252d43e59d1408a12122f2a1..1c3ef54e945cf6abba208c520932ba0cd9d69169 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java @@ -28,10 +28,6 @@ public class TestSwarm { private String _conOptions; // unused? used elsewhere? private boolean _dead; // unused? used elsewhere? - public void antiCompilationWarnings() { - _log.debug(""+_conOptions+_dead); - } - public static void main(String args[]) { if (args.length < 1) { System.err.println("Usage: TestSwarm myDestFile [peerDestFile ]*"); @@ -135,14 +131,6 @@ public class TestSwarm { _context.statManager().createRateStat("swarm." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 }); } - public void antiCompilationWarnings() { - _log.debug(""+this._lastReceived+this._lastReceivedOn+this._started); - } - public void antiCompilationWarnings(long x, long y) { - if (false) - _log.debug(""+x+y); - } - public Flooder(I2PSocket socket) { _socket = socket; _remoteDestination = socket.getPeerDestination(); @@ -166,8 +154,6 @@ public class TestSwarm { _context.random().nextBytes(data); long value = 0; long lastSend = _context.clock().now(); - this.antiCompilationWarnings(value, lastSend); - if (_socket == null) { try { _socket = _manager.connect(_remoteDestination); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index a352e438737f2c8220046e8d7d102d1851b58682..30d849ba4411de595c149ebaaf12e56b2bf4820a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -21,7 +21,6 @@ class ConnectionHandler { private Log _log; private ConnectionManager _manager; private LinkedBlockingQueue<Packet> _synQueue; - private Object _synSignal; private boolean _active; private int _acceptTimeout; @@ -41,7 +40,6 @@ class ConnectionHandler { _log = context.logManager().getLog(ConnectionHandler.class); _manager = mgr; _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE); - _synSignal= new Object(); _active = false; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } @@ -81,10 +79,6 @@ class ConnectionHandler { boolean success = _synQueue.offer(packet); // fail immediately if full if (success) { SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); - // advertise the new syn packet to threads that could be waiting - // (by calling waitSyn(long) - if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) - synchronized (this._synSignal) {this._synSignal.notifyAll();} } else { if (_log.shouldLog(Log.WARN)) _log.warn("Dropping new SYN request, as the queue is full"); @@ -93,33 +87,6 @@ class ConnectionHandler { } } - /** - * Wait until some SYN packet is available - * @param ms max amount of time to wait for a connection (if negative or null, - * wait indefinitely) - * @throws InterruptedException - */ - public void waitSyn( long ms ) throws InterruptedException { - synchronized (this._synSignal) - { - long now = this._context.clock().now() ; - long expiration = now + ms ; - while ( expiration > now || ms<=0 ) { - // check if there is a SYN packet in the queue - for ( Packet p : this._synQueue ) { - if ( p.isFlagSet(Packet.FLAG_SYNCHRONIZE) ) return ; - } - // wait until a SYN is signaled - if ( ms == 0) { - this._synSignal.wait(); - } else { - this._synSignal.wait(expiration-now); - now = this._context.clock().now(); - } - } - } - } - /** * Receive an incoming connection (built from a received SYN) * Non-SYN packets with a zero SendStreamID may also be queued here so diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index 0a183afef23b66d98a5fd30012323a31e0100492..83f7c8376aa23710e4cf22347694598d6ca53194 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -1,8 +1,6 @@ package net.i2p.client.streaming; import java.net.SocketTimeoutException; - -import net.i2p.I2PAppContext; import net.i2p.I2PException; /** @@ -47,43 +45,4 @@ public class I2PServerSocketFull implements I2PServerSocket { public I2PSocketManager getManager() { return _socketManager; } - - /** - * accept(timeout) waits timeout ms for a socket connecting. If a socket is - * not available during the timeout, return null. accept(0) behaves like accept() - * - * @param timeout in ms - * - * @return a connected I2PSocket, or null - * - * @throws I2PException if there is a problem with reading a new socket - * from the data available (aka the I2PSession closed, etc) - */ - - public I2PSocket accept(long timeout) throws I2PException { - long reset_timeout = this.getSoTimeout(); - - try { - this.setSoTimeout(timeout); - return this.accept(); - } catch (SocketTimeoutException e) { - return null ; - } finally { - this.setSoTimeout(reset_timeout); - } - } - - /** - * block until a SYN packet is detected or the timeout is reached. If timeout is 0, - * block until a SYN packet is detected. - * - * @param timeoutMs - * @throws InterruptedException - * @throws I2PException - */ - public void waitIncoming(long timeoutMs) throws I2PException, InterruptedException { - if (this._socketManager.getConnectionManager().getSession().isClosed()) - throw new I2PException("Session is closed"); - this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs); - } }