I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 1420c773 authored by zzz's avatar zzz
Browse files

* Streaming; Drop the preliminary channel implementations,

   as added by 'dream' in late 2011. Testing couldn't have happened,
   as they don't work and can't ever work as designed.
   Channels must have underlying file descriptors unless you implement
   your own Selector and that would probably require JNI.
   See http://zzz.i2p/topics/1229 for details.
   Also http://stackoverflow.com/questions/911780/how-do-i-define-my-own-selectablechannel
parent 690b40ed
No related branches found
No related tags found
No related merge requests found
Showing
with 22 additions and 363 deletions
package net.i2p.client.streaming;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.I2PException;
import java.net.ConnectException;
import java.nio.channels.SelectableChannel;
......@@ -12,6 +10,8 @@ import java.nio.channels.SelectableChannel;
*
* Warning, this interface and implementation is preliminary and subject to change without notice.
*
* Unimplemented, unlikely to ever be implemented.
*
* @since 0.8.11
*/
public abstract class AcceptingChannel extends SelectableChannel {
......
......@@ -32,6 +32,10 @@ public interface I2PServerSocket {
public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
/**
* Unimplemented, unlikely to ever be implemented.
*
* @deprecated
* @return null always
* @since 0.8.11
*/
public AcceptingChannel getChannel();
......
......@@ -43,6 +43,10 @@ public interface I2PSocket extends Closeable {
public OutputStream getOutputStream() throws IOException;
/**
* Unimplemented, unlikely to ever be implemented.
*
* @deprecated
* @return null always
* @since 0.8.9
*/
public SelectableChannel getChannel() throws IOException;
......
package net.i2p.client.streaming.impl;
import java.net.SocketTimeoutException;
import java.net.ConnectException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectionKey;
import java.nio.channels.spi.SelectorProvider;
import net.i2p.I2PException;
import net.i2p.client.streaming.AcceptingChannel;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
/**
* As this does not (yet) extend ServerSocketChannel it cannot be returned by StandardServerSocket.getChannel(),
* until we implement an I2P SocketAddress class.
*
* Warning, this interface and implementation is preliminary and subject to change without notice.
*
* @since 0.8.11
*/
class AcceptingChannelImpl extends AcceptingChannel {
private boolean _isRegistered;
private SelectionKey whichKey;
private SelectorProvider provider;
private Selector sel;
private Object lock;
private volatile I2PSocket next;
private final I2PServerSocket socket;
protected I2PSocket accept() throws I2PException, ConnectException {
I2PSocket sock;
try {
sock = socket.accept();
} catch(SocketTimeoutException ex) {
return null;
}
synchronized (this) {
I2PSocket temp = next;
next = sock;
return temp;
}
}
protected AcceptingChannelImpl(I2PSocketManager manager) {
super(manager);
// this cheats and just sets the manager timeout low in order to repeatedly poll it.
// that means we can "only" accept one new connection every 100 milliseconds.
socket = manager.getServerSocket();
socket.setSoTimeout(100);
}
@Override
public SelectorProvider provider() {
return provider;
}
@Override
public int validOps() {
return SelectionKey.OP_ACCEPT;
}
@Override
public boolean isRegistered() {
return _isRegistered;
}
@Override
public SelectionKey keyFor(Selector arg0) {
return whichKey;
}
@Override
public SelectionKey register(final Selector sel, final int ops, Object lock) throws ClosedChannelException {
this.sel = sel;
this.provider = sel.provider();
this.lock = lock;
this._isRegistered = true;
final AcceptingChannel that = this; // lol java
SelectionKey key = new AbstractSelectionKey() {
int operations = ops;
@Override
public SelectableChannel channel() {
return that;
}
@Override
public Selector selector() {
return sel;
}
@Override
public int interestOps() {
return this.operations;
}
@Override
public SelectionKey interestOps(int ops) {
this.operations = ops;
return this;
}
@Override
public int readyOps() {
if((operations & OP_ACCEPT) != 0) {
if(next != null) {
return OP_ACCEPT;
} else {
try {
accept(); // ping it again.
} catch(I2PException ex) {
} catch(ConnectException ex) {}
if(next != null)
return OP_ACCEPT;
}
}
return 0;
}
};
key.attach(lock);
// I... THINK this is right?
sel.keys().add(key);
return key;
}
@Override
public SelectableChannel configureBlocking(boolean blocking) throws IOException {
if (blocking == false) {
return this;
}
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public boolean isBlocking() {
return false;
}
@Override
public Object blockingLock() {
return this.lock;
}
@Override
protected void implCloseChannel() throws IOException {
I2PSocket nxt = next;
if(nxt != null) {
nxt.close();
}
_socketManager.destroySocketManager();
}
}
......@@ -14,7 +14,6 @@ import net.i2p.client.streaming.I2PSocketManager;
*/
class I2PServerSocketFull implements I2PServerSocket {
private final I2PSocketManagerFull _socketManager;
private volatile AcceptingChannel _channel;
public I2PServerSocketFull(I2PSocketManagerFull mgr) {
_socketManager = mgr;
......@@ -32,12 +31,14 @@ class I2PServerSocketFull implements I2PServerSocket {
}
/**
* Unimplemented, unlikely to ever be implemented.
*
* @deprecated
* @return null always
* @since 0.8.11
*/
public synchronized AcceptingChannel getChannel() {
if (_channel == null)
_channel = new AcceptingChannelImpl(_socketManager);
return _channel;
return null;
}
public long getSoTimeout() {
......
......@@ -22,7 +22,6 @@ class I2PSocketFull implements I2PSocket {
private volatile Connection _connection;
private final Destination _remotePeer;
private final Destination _localPeer;
private volatile MessageChannel _channel;
private final AtomicBoolean _closed = new AtomicBoolean();
public I2PSocketFull(Connection con, I2PAppContext context) {
......@@ -89,12 +88,14 @@ class I2PSocketFull implements I2PSocket {
}
/**
* Unimplemented, unlikely to ever be implemented.
*
* @deprecated
* @return null always
* @since 0.8.9
*/
public synchronized SelectableChannel getChannel() {
if (_channel == null)
_channel = new MessageChannel(this);
return _channel;
return null;
}
/**
......
package net.i2p.client.streaming.impl;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.WritableByteChannel;
import java.nio.channels.spi.AbstractSelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.i2p.client.streaming.I2PSocket;
/**
* As this does not (yet) extend SocketChannel it cannot be returned by StandardSocket.getChannel(),
* until we implement an I2P SocketAddress class.
*
* Warning, this interface and implementation is preliminary and subject to change without notice.
*
* @since 0.8.9
*/
public class MessageChannel extends SelectableChannel implements ReadableByteChannel, WritableByteChannel {
private final MessageInputStream in;
private final MessageOutputStream out;
private boolean _isRegistered;
private SelectionKey whichKey;
private SelectorProvider provider;
private Selector sel;
private Object lock;
private final I2PSocket socket;
MessageChannel(I2PSocket socket) {
try {
this.socket = socket;
in = (MessageInputStream) socket.getInputStream();
out = (MessageOutputStream) socket.getOutputStream();
in.setReadTimeout(0);
out.setWriteTimeout(0);
out.setBufferSize(0x1000);
} catch (IOException ex) {
Logger.getLogger(MessageChannel.class.getName()).log(Level.SEVERE, null, ex);
// dunno what to do with this for now
throw new RuntimeException(ex);
}
}
@Override
public SelectorProvider provider() {
return provider;
}
@Override
public int validOps() {
return SelectionKey.OP_READ | SelectionKey.OP_WRITE;
}
@Override
public boolean isRegistered() {
return _isRegistered;
}
@Override
public SelectionKey keyFor(Selector arg0) {
return whichKey;
}
@Override
public SelectionKey register(final Selector sel, final int ops, Object lock) throws ClosedChannelException {
this.sel = sel;
this.provider = sel.provider();
this.lock = lock;
this._isRegistered = true;
final MessageChannel that = this; // lol java
SelectionKey key = new AbstractSelectionKey() {
int operations = ops;
@Override
public SelectableChannel channel() {
return that;
}
@Override
public Selector selector() {
return sel;
}
@Override
public int interestOps() {
return this.operations;
}
@Override
public SelectionKey interestOps(int ops) {
this.operations = ops;
return this;
}
@Override
public int readyOps() {
int readyOps = 0;
if((operations & OP_READ) != 0) {
try {
// check the input stream
if (in.available() > 0) {
readyOps |= OP_READ;
}
} catch (IOException ex) {}
}
if((operations & OP_WRITE) != 0) {
if(!out.getClosed())
readyOps |= OP_WRITE;
}
return readyOps;
}
};
key.attach(lock);
// I... THINK this is right?
sel.keys().add(key);
return key;
}
@Override
public SelectableChannel configureBlocking(boolean blocking) throws IOException {
if (blocking == false) {
return this;
}
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public boolean isBlocking() {
return false;
}
@Override
public Object blockingLock() {
return this.lock;
}
@Override
protected void implCloseChannel() throws IOException {
this.socket.close();
}
/* Read no more than buf.remaining()
* Continue to read until that, or in.read
* returns 0, which happens when there's
* no more data available.
*/
public int read(ByteBuffer buf) throws IOException {
int amount = 0;
for (;;) {
// TODO if buf.hasArray() ... getArray() ... getArrayOffset() ...
byte[] lbuf = new byte[buf.remaining()];
int samount = in.read(lbuf);
if (samount <= 0) {
this.close();
}
if (samount == 0) {
break;
}
amount += samount;
buf.put(lbuf, 0, samount);
}
return amount;
}
/* Write in 0x1000 increments, the MessageOutputStream's
* already set buffer size. Once it starts to fail
* (wait timeout is 0) then put the bytes back and return.
*/
public int write(ByteBuffer buf) throws IOException {
int written = 0;
for (;;) {
if(buf.remaining()==0)
return written;
// TODO if buf.hasArray() ... getArray() ... getArrayOffset() ...
byte[] lbuf = new byte[Math.min(buf.remaining(), 0x1000)];
buf.get(lbuf);
try {
out.write(lbuf, 0, lbuf.length);
written += lbuf.length;
} catch(InterruptedIOException ex) {
buf.put(lbuf);
return written;
}
}
}
}
......@@ -74,7 +74,7 @@ class StandardServerSocket extends ServerSocket {
}
/**
* @return null always, see AcceptingChannelImpl for more info
* @return null always, unimplemented
*/
@Override
public ServerSocketChannel getChannel() {
......
......@@ -68,7 +68,7 @@ class StandardSocket extends Socket {
}
/**
* @return null always, see MessageChannel for more info
* @return null always, unimplemented
*/
@Override
public SocketChannel getChannel() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment