diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/AcceptingChannel.java b/apps/ministreaming/java/src/net/i2p/client/streaming/AcceptingChannel.java new file mode 100644 index 0000000000000000000000000000000000000000..be468b68e4f99f91e3f3ca16bee8a34507f4cc3f --- /dev/null +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/AcceptingChannel.java @@ -0,0 +1,15 @@ +package net.i2p.client.streaming; + +import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.I2PException; +import java.net.ConnectException; +import java.nio.channels.SelectableChannel; + +public abstract class AcceptingChannel extends SelectableChannel { + abstract I2PSocket accept() throws I2PException, ConnectException; + I2PSocketManager _socketManager; + AcceptingChannel(I2PSocketManager manager) { + this._socketManager = manager; + } +} diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 9f43aa24644b1fb1f8d77c8214b7ca9b4ce4d547..b118a6b368135e5a79789b4bb8aff63c01094693 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -31,6 +31,8 @@ public interface I2PServerSocket { */ public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; + public AcceptingChannel getChannel(); + /** * Set Sock Option accept timeout * @param x timeout in ms diff --git a/apps/streaming/java/src/net/i2p/client/streaming/AcceptingChannelImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/AcceptingChannelImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..0c05f09ff3ddf62348ec34d644bc96ca16ea015b --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/AcceptingChannelImpl.java @@ -0,0 +1,141 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PException; + +import java.net.SocketTimeoutException; +import java.net.ConnectException; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.AbstractSelectionKey; +import java.nio.channels.spi.SelectorProvider; + +public class AcceptingChannelImpl extends AcceptingChannel { + boolean _isRegistered = false; + SelectionKey whichKey = null; + SelectorProvider provider = null; + Selector sel = null; + Object lock = null; + I2PSocket next = null; + I2PServerSocket socket; + + I2PSocket accept() throws I2PException, ConnectException { + I2PSocket sock; + try { + sock = socket.accept(); + } catch(SocketTimeoutException ex) { + return null; + } + I2PSocket temp = next; + next = sock; + return temp; + } + + AcceptingChannelImpl(I2PSocketManager manager) { + super(manager); + // this cheats and just sets the manager timeout low in order to repeatedly poll it. + // that means we can "only" accept one new connection every 100 milliseconds. + socket = manager.getServerSocket(); + socket.setSoTimeout(100); + } + + @Override + public SelectorProvider provider() { + return provider; + } + + @Override + public int validOps() { + return SelectionKey.OP_ACCEPT; + } + + @Override + public boolean isRegistered() { + return _isRegistered; + } + + @Override + public SelectionKey keyFor(Selector arg0) { + return whichKey; + } + + @Override + public SelectionKey register(final Selector sel, final int ops, Object lock) throws ClosedChannelException { + this.sel = sel; + this.provider = sel.provider(); + this.lock = lock; + this._isRegistered = true; + final AcceptingChannel that = this; // lol java + SelectionKey key = new AbstractSelectionKey() { + int operations = ops; + @Override + public SelectableChannel channel() { + return that; + } + + @Override + public Selector selector() { + return sel; + } + + @Override + public int interestOps() { + return this.operations; + } + + @Override + public SelectionKey interestOps(int ops) { + this.operations = ops; + return this; + } + + @Override + public int readyOps() { + if((operations & OP_ACCEPT) != 0) + if(next != null) { + return OP_ACCEPT; + } else { + try { + accept(); // ping it again. + } catch(I2PException ex) { + } catch(ConnectException ex) {} + if(next != null) + return OP_ACCEPT; + } + return 0; + } + }; + key.attach(lock); + // I... THINK this is right? + sel.keys().add(key); + return key; + } + + @Override + public SelectableChannel configureBlocking(boolean blocking) throws IOException { + if (blocking == false) { + return this; + } + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isBlocking() { + return false; + } + + @Override + public Object blockingLock() { + return this.lock; + } + + @Override + protected void implCloseChannel() throws IOException { + if(next != null) { + next.close(); + } + _socketManager.destroySocketManager(); + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index f40dbd0c7929045f06f2c1402d79017dd1a9a049..f688086aec046ef5c7468ff8b7089dbc08af03a7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -24,6 +24,10 @@ class I2PServerSocketFull implements I2PServerSocket { public I2PSocket accept() throws I2PException, SocketTimeoutException { return _socketManager.receiveSocket(); } + + public AcceptingChannel getChannel() { + return new AcceptingChannelImpl(_socketManager); + } public long getSoTimeout() { return _socketManager.getConnectionManager().getSoTimeout();