From 1420c773a6bd4abc12f23367c523eb1257b4c0b9 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Tue, 24 Jun 2014 12:49:18 +0000 Subject: [PATCH] * Streaming; Drop the preliminary channel implementations, as added by 'dream' in late 2011. Testing couldn't have happened, as they don't work and can't ever work as designed. Channels must have underlying file descriptors unless you implement your own Selector and that would probably require JNI. See http://zzz.i2p/topics/1229 for details. Also http://stackoverflow.com/questions/911780/how-do-i-define-my-own-selectablechannel --- .../client/streaming/AcceptingChannel.java | 4 +- .../i2p/client/streaming/I2PServerSocket.java | 4 + .../net/i2p/client/streaming/I2PSocket.java | 4 + .../streaming/impl/AcceptingChannelImpl.java | 157 -------------- .../streaming/impl/I2PServerSocketFull.java | 9 +- .../client/streaming/impl/I2PSocketFull.java | 9 +- .../client/streaming/impl/MessageChannel.java | 194 ------------------ .../streaming/impl/StandardServerSocket.java | 2 +- .../client/streaming/impl/StandardSocket.java | 2 +- 9 files changed, 22 insertions(+), 363 deletions(-) delete mode 100644 apps/streaming/java/src/net/i2p/client/streaming/impl/AcceptingChannelImpl.java delete mode 100644 apps/streaming/java/src/net/i2p/client/streaming/impl/MessageChannel.java diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/AcceptingChannel.java b/apps/ministreaming/java/src/net/i2p/client/streaming/AcceptingChannel.java index 3cf5949dba..43f392dbab 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/AcceptingChannel.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/AcceptingChannel.java @@ -1,7 +1,5 @@ package net.i2p.client.streaming; -import net.i2p.client.streaming.I2PSocket; -import net.i2p.client.streaming.I2PSocketManager; import net.i2p.I2PException; import java.net.ConnectException; import java.nio.channels.SelectableChannel; @@ -12,6 +10,8 @@ import java.nio.channels.SelectableChannel; * * Warning, this interface and implementation is preliminary and subject to change without notice. * + * Unimplemented, unlikely to ever be implemented. + * * @since 0.8.11 */ public abstract class AcceptingChannel extends SelectableChannel { diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 6a8e3a6cf5..35fb68daaa 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -32,6 +32,10 @@ public interface I2PServerSocket { public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; /** + * Unimplemented, unlikely to ever be implemented. + * + * @deprecated + * @return null always * @since 0.8.11 */ public AcceptingChannel getChannel(); diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index 3ddcb95333..f84fad967e 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -43,6 +43,10 @@ public interface I2PSocket extends Closeable { public OutputStream getOutputStream() throws IOException; /** + * Unimplemented, unlikely to ever be implemented. + * + * @deprecated + * @return null always * @since 0.8.9 */ public SelectableChannel getChannel() throws IOException; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/AcceptingChannelImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/AcceptingChannelImpl.java deleted file mode 100644 index b4952438db..0000000000 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/AcceptingChannelImpl.java +++ /dev/null @@ -1,157 +0,0 @@ -package net.i2p.client.streaming.impl; - -import java.net.SocketTimeoutException; -import java.net.ConnectException; -import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.AbstractSelectionKey; -import java.nio.channels.spi.SelectorProvider; - -import net.i2p.I2PException; -import net.i2p.client.streaming.AcceptingChannel; -import net.i2p.client.streaming.I2PServerSocket; -import net.i2p.client.streaming.I2PSocket; -import net.i2p.client.streaming.I2PSocketManager; - -/** - * As this does not (yet) extend ServerSocketChannel it cannot be returned by StandardServerSocket.getChannel(), - * until we implement an I2P SocketAddress class. - * - * Warning, this interface and implementation is preliminary and subject to change without notice. - * - * @since 0.8.11 - */ -class AcceptingChannelImpl extends AcceptingChannel { - private boolean _isRegistered; - private SelectionKey whichKey; - private SelectorProvider provider; - private Selector sel; - private Object lock; - private volatile I2PSocket next; - private final I2PServerSocket socket; - - protected I2PSocket accept() throws I2PException, ConnectException { - I2PSocket sock; - try { - sock = socket.accept(); - } catch(SocketTimeoutException ex) { - return null; - } - synchronized (this) { - I2PSocket temp = next; - next = sock; - return temp; - } - } - - protected AcceptingChannelImpl(I2PSocketManager manager) { - super(manager); - // this cheats and just sets the manager timeout low in order to repeatedly poll it. - // that means we can "only" accept one new connection every 100 milliseconds. - socket = manager.getServerSocket(); - socket.setSoTimeout(100); - } - - @Override - public SelectorProvider provider() { - return provider; - } - - @Override - public int validOps() { - return SelectionKey.OP_ACCEPT; - } - - @Override - public boolean isRegistered() { - return _isRegistered; - } - - @Override - public SelectionKey keyFor(Selector arg0) { - return whichKey; - } - - @Override - public SelectionKey register(final Selector sel, final int ops, Object lock) throws ClosedChannelException { - this.sel = sel; - this.provider = sel.provider(); - this.lock = lock; - this._isRegistered = true; - final AcceptingChannel that = this; // lol java - SelectionKey key = new AbstractSelectionKey() { - int operations = ops; - @Override - public SelectableChannel channel() { - return that; - } - - @Override - public Selector selector() { - return sel; - } - - @Override - public int interestOps() { - return this.operations; - } - - @Override - public SelectionKey interestOps(int ops) { - this.operations = ops; - return this; - } - - @Override - public int readyOps() { - if((operations & OP_ACCEPT) != 0) { - if(next != null) { - return OP_ACCEPT; - } else { - try { - accept(); // ping it again. - } catch(I2PException ex) { - } catch(ConnectException ex) {} - if(next != null) - return OP_ACCEPT; - } - } - return 0; - } - }; - key.attach(lock); - // I... THINK this is right? - sel.keys().add(key); - return key; - } - - @Override - public SelectableChannel configureBlocking(boolean blocking) throws IOException { - if (blocking == false) { - return this; - } - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isBlocking() { - return false; - } - - @Override - public Object blockingLock() { - return this.lock; - } - - @Override - protected void implCloseChannel() throws IOException { - I2PSocket nxt = next; - if(nxt != null) { - nxt.close(); - } - _socketManager.destroySocketManager(); - } -} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PServerSocketFull.java index bf673667c2..c41c5082e8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PServerSocketFull.java @@ -14,7 +14,6 @@ import net.i2p.client.streaming.I2PSocketManager; */ class I2PServerSocketFull implements I2PServerSocket { private final I2PSocketManagerFull _socketManager; - private volatile AcceptingChannel _channel; public I2PServerSocketFull(I2PSocketManagerFull mgr) { _socketManager = mgr; @@ -32,12 +31,14 @@ class I2PServerSocketFull implements I2PServerSocket { } /** + * Unimplemented, unlikely to ever be implemented. + * + * @deprecated + * @return null always * @since 0.8.11 */ public synchronized AcceptingChannel getChannel() { - if (_channel == null) - _channel = new AcceptingChannelImpl(_socketManager); - return _channel; + return null; } public long getSoTimeout() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketFull.java index 6e59ab1bb0..2885db4c93 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketFull.java @@ -22,7 +22,6 @@ class I2PSocketFull implements I2PSocket { private volatile Connection _connection; private final Destination _remotePeer; private final Destination _localPeer; - private volatile MessageChannel _channel; private final AtomicBoolean _closed = new AtomicBoolean(); public I2PSocketFull(Connection con, I2PAppContext context) { @@ -89,12 +88,14 @@ class I2PSocketFull implements I2PSocket { } /** + * Unimplemented, unlikely to ever be implemented. + * + * @deprecated + * @return null always * @since 0.8.9 */ public synchronized SelectableChannel getChannel() { - if (_channel == null) - _channel = new MessageChannel(this); - return _channel; + return null; } /** diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageChannel.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageChannel.java deleted file mode 100644 index 5e3cecacdf..0000000000 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageChannel.java +++ /dev/null @@ -1,194 +0,0 @@ -package net.i2p.client.streaming.impl; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.WritableByteChannel; -import java.nio.channels.spi.AbstractSelectionKey; -import java.nio.channels.spi.SelectorProvider; -import java.util.logging.Level; -import java.util.logging.Logger; - -import net.i2p.client.streaming.I2PSocket; - -/** - * As this does not (yet) extend SocketChannel it cannot be returned by StandardSocket.getChannel(), - * until we implement an I2P SocketAddress class. - * - * Warning, this interface and implementation is preliminary and subject to change without notice. - * - * @since 0.8.9 - */ -public class MessageChannel extends SelectableChannel implements ReadableByteChannel, WritableByteChannel { - - private final MessageInputStream in; - private final MessageOutputStream out; - private boolean _isRegistered; - private SelectionKey whichKey; - private SelectorProvider provider; - private Selector sel; - private Object lock; - private final I2PSocket socket; - - MessageChannel(I2PSocket socket) { - try { - this.socket = socket; - in = (MessageInputStream) socket.getInputStream(); - out = (MessageOutputStream) socket.getOutputStream(); - in.setReadTimeout(0); - out.setWriteTimeout(0); - out.setBufferSize(0x1000); - } catch (IOException ex) { - Logger.getLogger(MessageChannel.class.getName()).log(Level.SEVERE, null, ex); - // dunno what to do with this for now - throw new RuntimeException(ex); - } - } - - @Override - public SelectorProvider provider() { - return provider; - } - - @Override - public int validOps() { - return SelectionKey.OP_READ | SelectionKey.OP_WRITE; - } - - @Override - public boolean isRegistered() { - return _isRegistered; - } - - @Override - public SelectionKey keyFor(Selector arg0) { - return whichKey; - } - - @Override - public SelectionKey register(final Selector sel, final int ops, Object lock) throws ClosedChannelException { - this.sel = sel; - this.provider = sel.provider(); - this.lock = lock; - this._isRegistered = true; - final MessageChannel that = this; // lol java - SelectionKey key = new AbstractSelectionKey() { - int operations = ops; - @Override - public SelectableChannel channel() { - return that; - } - - @Override - public Selector selector() { - return sel; - } - - @Override - public int interestOps() { - return this.operations; - } - - @Override - public SelectionKey interestOps(int ops) { - this.operations = ops; - return this; - } - - @Override - public int readyOps() { - int readyOps = 0; - if((operations & OP_READ) != 0) { - try { - // check the input stream - if (in.available() > 0) { - readyOps |= OP_READ; - } - } catch (IOException ex) {} - } - if((operations & OP_WRITE) != 0) { - if(!out.getClosed()) - readyOps |= OP_WRITE; - } - return readyOps; - } - }; - key.attach(lock); - // I... THINK this is right? - sel.keys().add(key); - return key; - } - - @Override - public SelectableChannel configureBlocking(boolean blocking) throws IOException { - if (blocking == false) { - return this; - } - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isBlocking() { - return false; - } - - @Override - public Object blockingLock() { - return this.lock; - } - - @Override - protected void implCloseChannel() throws IOException { - this.socket.close(); - } - - /* Read no more than buf.remaining() - * Continue to read until that, or in.read - * returns 0, which happens when there's - * no more data available. - */ - public int read(ByteBuffer buf) throws IOException { - int amount = 0; - for (;;) { - // TODO if buf.hasArray() ... getArray() ... getArrayOffset() ... - byte[] lbuf = new byte[buf.remaining()]; - int samount = in.read(lbuf); - if (samount <= 0) { - this.close(); - } - if (samount == 0) { - break; - } - amount += samount; - buf.put(lbuf, 0, samount); - } - return amount; - } - - /* Write in 0x1000 increments, the MessageOutputStream's - * already set buffer size. Once it starts to fail - * (wait timeout is 0) then put the bytes back and return. - */ - public int write(ByteBuffer buf) throws IOException { - int written = 0; - for (;;) { - if(buf.remaining()==0) - return written; - // TODO if buf.hasArray() ... getArray() ... getArrayOffset() ... - byte[] lbuf = new byte[Math.min(buf.remaining(), 0x1000)]; - buf.get(lbuf); - try { - out.write(lbuf, 0, lbuf.length); - written += lbuf.length; - } catch(InterruptedIOException ex) { - buf.put(lbuf); - return written; - } - } - } -} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardServerSocket.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardServerSocket.java index 3bcf53a4d9..aaab086f05 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardServerSocket.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardServerSocket.java @@ -74,7 +74,7 @@ class StandardServerSocket extends ServerSocket { } /** - * @return null always, see AcceptingChannelImpl for more info + * @return null always, unimplemented */ @Override public ServerSocketChannel getChannel() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardSocket.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardSocket.java index 549df289a2..686eadd987 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardSocket.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardSocket.java @@ -68,7 +68,7 @@ class StandardSocket extends Socket { } /** - * @return null always, see MessageChannel for more info + * @return null always, unimplemented */ @Override public SocketChannel getChannel() { -- GitLab