From a4d16af95d36d3df3f8a1a22fb66660f721bfc96 Mon Sep 17 00:00:00 2001 From: mkvore-commit <mkvore-commit@mail.i2p> Date: Thu, 2 Apr 2009 08:22:31 +0000 Subject: [PATCH] SAM version 3 : - Raw and Datagram sessions implemented - option "SILENT=true" added to the stream protocol - java 6 warnings removed ministreaming : - java 6 warnings removed ministreaming and streaming : - added functions : I2PServerSocket.waitIncoming(long timeout) I2PServerSocket.accept(boolean block) --- .../i2p/client/streaming/I2PServerSocket.java | 30 + .../client/streaming/I2PServerSocketImpl.java | 95 ++- .../i2p/client/streaming/I2PSocketImpl.java | 1 - .../streaming/I2PSocketManagerFactory.java | 1 - .../streaming/I2PSocketManagerImpl.java | 19 +- .../net/i2p/client/streaming/TestSwarm.java | 14 + apps/sam/Demos/datagramTests/README.txt | 15 + apps/sam/Demos/datagramTests/samForward.py | 35 + apps/sam/Demos/datagramTests/samIn.py | 29 + apps/sam/Demos/datagramTests/samOut.py | 31 + apps/sam/Demos/rawTests/README.txt | 15 + apps/sam/Demos/rawTests/samForward.py | 36 + apps/sam/Demos/rawTests/samIn.py | 31 + apps/sam/Demos/rawTests/samOut.py | 31 + apps/sam/Demos/streamTests/README.txt | 24 + apps/sam/Demos/streamTests/samForward.py | 58 ++ apps/sam/Demos/streamTests/samIn.py | 89 +++ apps/sam/Demos/streamTests/samOut.py | 52 ++ .../sam/Demos/streamTests/samOutWithNaming.py | 51 ++ apps/sam/Demos/streamTests/server.py | 41 + apps/sam/doc/protocol-v3.txt | 17 + apps/sam/java/src/net/i2p/sam/SAMBridge.java | 36 +- .../src/net/i2p/sam/SAMDatagramSession.java | 9 +- .../java/src/net/i2p/sam/SAMException.java | 6 +- apps/sam/java/src/net/i2p/sam/SAMHandler.java | 46 +- .../src/net/i2p/sam/SAMHandlerFactory.java | 21 +- .../i2p/sam/SAMInvalidDirectionException.java | 3 +- .../src/net/i2p/sam/SAMMessageSession.java | 3 +- .../java/src/net/i2p/sam/SAMRawSession.java | 2 +- .../src/net/i2p/sam/SAMStreamReceiver.java | 3 +- .../src/net/i2p/sam/SAMStreamSession.java | 27 +- apps/sam/java/src/net/i2p/sam/SAMUtils.java | 18 + .../java/src/net/i2p/sam/SAMv1Handler.java | 103 ++- .../java/src/net/i2p/sam/SAMv2Handler.java | 6 +- .../src/net/i2p/sam/SAMv2StreamSession.java | 18 +- .../src/net/i2p/sam/SAMv3DatagramSession.java | 90 +++ .../java/src/net/i2p/sam/SAMv3Handler.java | 748 ++++++++++++++++++ .../java/src/net/i2p/sam/SAMv3RawSession.java | 88 +++ .../src/net/i2p/sam/SAMv3StreamSession.java | 389 +++++++++ .../net/i2p/sam/client/SAMEventHandler.java | 6 +- .../src/net/i2p/sam/client/SAMStreamSend.java | 11 +- .../src/net/i2p/sam/client/SAMStreamSink.java | 11 +- .../client/streaming/ConnectionHandler.java | 46 +- .../client/streaming/I2PServerSocketFull.java | 44 ++ 44 files changed, 2272 insertions(+), 177 deletions(-) create mode 100644 apps/sam/Demos/datagramTests/README.txt create mode 100755 apps/sam/Demos/datagramTests/samForward.py create mode 100755 apps/sam/Demos/datagramTests/samIn.py create mode 100755 apps/sam/Demos/datagramTests/samOut.py create mode 100644 apps/sam/Demos/rawTests/README.txt create mode 100755 apps/sam/Demos/rawTests/samForward.py create mode 100755 apps/sam/Demos/rawTests/samIn.py create mode 100755 apps/sam/Demos/rawTests/samOut.py create mode 100644 apps/sam/Demos/streamTests/README.txt create mode 100755 apps/sam/Demos/streamTests/samForward.py create mode 100755 apps/sam/Demos/streamTests/samIn.py create mode 100755 apps/sam/Demos/streamTests/samOut.py create mode 100755 apps/sam/Demos/streamTests/samOutWithNaming.py create mode 100755 apps/sam/Demos/streamTests/server.py create mode 100644 apps/sam/doc/protocol-v3.txt create mode 100644 apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java create mode 100644 apps/sam/java/src/net/i2p/sam/SAMv3Handler.java create mode 100644 apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java create mode 100644 apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index d0028fdb81..e7db251fa7 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -30,6 +30,36 @@ public interface I2PServerSocket { */ public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; + /** + * accept(true) has the same behaviour as accept(). + * accept(false) does not wait for a socket connecting. If a socket is + * available in the queue, it is accepted. Else, null is returned. + * + * @param true if the call should block until a socket is available + * + * @return a connected I2PSocket, or null + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + * @throws SocketTimeoutException + */ + public I2PSocket accept(boolean blocking) throws I2PException, ConnectException, SocketTimeoutException; + + /** + * Waits until there is a socket waiting for acception or the timeout is + * reached. + * + * @param timeoutMs timeout in ms. A negative value waits forever. + * + * @return true if a socket is available, false if not + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + */ + public boolean waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException; + /** * Set Sock Option accept timeout * @param x timeout in ms diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 93db8595b7..a2c075e3e4 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -20,7 +20,7 @@ class I2PServerSocketImpl implements I2PServerSocket { private final static Log _log = new Log(I2PServerSocketImpl.class); private I2PSocketManager mgr; /** list of sockets waiting for the client to accept them */ - private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); + private List<I2PSocket> pendingSockets = Collections.synchronizedList(new ArrayList<I2PSocket>(4)); /** have we been closed */ private volatile boolean closing = false; @@ -49,25 +49,42 @@ class I2PServerSocketImpl implements I2PServerSocket { this.mgr = mgr; } + + + + + /** - * Waits for the next socket connecting. If a remote user tried to make a - * connection and the local application wasn't .accept()ing new connections, - * they should get refused (if .accept() doesnt occur in some small period - - * currently 5 seconds) + * Waits until there is a socket waiting for acception or the timeout is + * reached. + * + * @param timeoutMs timeout in ms. A negative value waits forever. * - * @return a connected I2PSocket + * @return true if a socket is available, false if not * * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed */ - public I2PSocket accept() throws I2PException, ConnectException { + public boolean waitIncoming(long timeoutMs) throws I2PException, ConnectException { if (_log.shouldLog(Log.DEBUG)) - _log.debug("accept() called, pending: " + pendingSockets.size()); + _log.debug("waitIncoming() called, pending: " + pendingSockets.size()); - I2PSocket ret = null; - - while ( (ret == null) && (!closing) ){ + boolean isTimed = (timeoutMs>=0); + if (isTimed) { + Clock clock = I2PAppContext.getGlobalContext().clock(); + long now = clock.now(); + long end = now + timeoutMs; + while (pendingSockets.size() <= 0 && now<end) { + if (closing) throw new ConnectException("I2PServerSocket closed"); + try { + synchronized(socketAddedLock) { + socketAddedLock.wait(end - now); + } + } catch (InterruptedException ie) {} + now = clock.now(); + } + } else { while (pendingSockets.size() <= 0) { if (closing) throw new ConnectException("I2PServerSocket closed"); try { @@ -76,7 +93,32 @@ class I2PServerSocketImpl implements I2PServerSocket { } } catch (InterruptedException ie) {} } - synchronized (pendingSockets) { + } + return (pendingSockets.size()>0); + } + + + /** + * accept(true) has the same behaviour as accept(). + * accept(false) does not wait for a socket connecting. If a socket is + * available in the queue, it is accepted. Else, null is returned. + * + * @param true if the call should block until a socket is available + * + * @return a connected I2PSocket, or null + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + */ + + public I2PSocket accept(boolean blocking) throws I2PException, ConnectException { + I2PSocket ret = null; + + if (blocking) { + ret = accept(); + } else { + synchronized (pendingSockets) { if (pendingSockets.size() > 0) { ret = (I2PSocket)pendingSockets.remove(0); } @@ -85,7 +127,34 @@ class I2PServerSocketImpl implements I2PServerSocket { synchronized (socketAcceptedLock) { socketAcceptedLock.notifyAll(); } - } + } + } + return ret; + } + + /** + * Waits for the next socket connecting. If a remote user tried to make a + * connection and the local application wasn't .accept()ing new connections, + * they should get refused (if .accept() doesnt occur in some small period - + * currently 5 seconds) + * + * @return a connected I2PSocket + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws ConnectException if the I2PServerSocket is closed + */ + public I2PSocket accept() throws I2PException, ConnectException { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("accept() called, pending: " + pendingSockets.size()); + + I2PSocket ret = null; + + while ( (ret == null) && (!closing) ){ + + this.waitIncoming(-1); + + ret = accept(false); } if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index dc7a48fd46..b37bf10850 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -350,7 +350,6 @@ class I2PSocketImpl implements I2PSocket { read = bc.startToByteArray(len); bc.notifyAll(); } - boolean timedOut = false; while ( (read.length == 0) && (!inStreamClosed) ) { synchronized (flagLock) { diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 33477a4a80..2cf23f1347 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -13,7 +13,6 @@ import net.i2p.client.I2PClient; import net.i2p.client.I2PClientFactory; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; -import net.i2p.data.Destination; import net.i2p.util.Log; /** diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java index 406f718474..a272e99df2 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java @@ -43,12 +43,12 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener { private I2PSession _session; private I2PServerSocketImpl _serverSocket = null; private Object lock = new Object(); // for locking socket lists - private HashMap _outSockets; - private HashMap _inSockets; + private HashMap<String,I2PSocket> _outSockets; + private HashMap<String,I2PSocket> _inSockets; private I2PSocketOptions _defaultOptions; private long _acceptTimeout; private String _name; - private List _listeners; + private List<DisconnectListener> _listeners; private static int __managerId = 0; public static final short ACK = 0x51; @@ -76,10 +76,10 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener { _name = name; _context = context; _log = _context.logManager().getLog(I2PSocketManager.class); - _inSockets = new HashMap(16); - _outSockets = new HashMap(16); + _inSockets = new HashMap<String,I2PSocket>(16); + _outSockets = new HashMap<String,I2PSocket>(16); _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; - _listeners = new ArrayList(1); + _listeners = new ArrayList<DisconnectListener>(1); setSession(session); setDefaultOptions(buildOptions(opts)); _context.statManager().createRateStat("streaming.lifetime", "How long before the socket is closed?", "streaming", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 }); @@ -113,9 +113,9 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener { public void disconnected(I2PSession session) { _log.info(getName() + ": Disconnected from the session"); destroySocketManager(); - List listeners = null; + List<DisconnectListener> listeners = null; synchronized (_listeners) { - listeners = new ArrayList(_listeners); + listeners = new ArrayList<DisconnectListener>(_listeners); _listeners.clear(); } for (int i = 0; i < listeners.size(); i++) { @@ -130,7 +130,6 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener { public void messageAvailable(I2PSession session, int msgId, long size) { try { - I2PSocketImpl s; byte msg[] = session.receiveMessage(msgId); if (msg.length == 1 && msg[0] == -1) { _log.debug(getName() + ": Ping received"); @@ -660,7 +659,7 @@ class I2PSocketManagerImpl implements I2PSocketManager, I2PSessionListener { * */ public Set listSockets() { - Set sockets = new HashSet(8); + Set<I2PSocket> sockets = new HashSet<I2PSocket>(8); synchronized (lock) { sockets.addAll(_inSockets.values()); sockets.addAll(_outSockets.values()); diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java b/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java index 1c3ef54e94..a29b9f6f11 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java @@ -28,6 +28,10 @@ public class TestSwarm { private String _conOptions; // unused? used elsewhere? private boolean _dead; // unused? used elsewhere? + public void antiCompilationWarnings() { + _log.debug(""+_conOptions+_dead); + } + public static void main(String args[]) { if (args.length < 1) { System.err.println("Usage: TestSwarm myDestFile [peerDestFile ]*"); @@ -131,6 +135,14 @@ public class TestSwarm { _context.statManager().createRateStat("swarm." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 }); } + public void antiCompilationWarnings() { + _log.debug(""+this._lastReceived+this._lastReceivedOn+this._started); + } + public void antiCompilationWarnings(long x, long y) { + if (false) + _log.debug(""+x+y); + } + public Flooder(I2PSocket socket) { _socket = socket; _remoteDestination = socket.getPeerDestination(); @@ -154,6 +166,8 @@ public class TestSwarm { _context.random().nextBytes(data); long value = 0; long lastSend = _context.clock().now(); + this.antiCompilationWarnings(value, lastSend); + if (_socket == null) { try { _socket = _manager.connect(_remoteDestination); diff --git a/apps/sam/Demos/datagramTests/README.txt b/apps/sam/Demos/datagramTests/README.txt new file mode 100644 index 0000000000..8e79434f95 --- /dev/null +++ b/apps/sam/Demos/datagramTests/README.txt @@ -0,0 +1,15 @@ +# test example + +#in a first terminal, launch : + ./samIn.py inTest + +#in a second terminal, launch : + ./samForward.py 25000 forward + +#in a third terminal, launch : +l=0 +while [ $l -lt 1000 ] +do + l=$((l+1)) + ./samOut.py forward this is message n. $l +done diff --git a/apps/sam/Demos/datagramTests/samForward.py b/apps/sam/Demos/datagramTests/samForward.py new file mode 100755 index 0000000000..56590e7ef5 --- /dev/null +++ b/apps/sam/Demos/datagramTests/samForward.py @@ -0,0 +1,35 @@ +#!/usr/bin/python + +import socket +import sys + +# create a forward style SAM datagram session +# that forwards messages on specified port (default port : 25000) +# creates a standard datagram server that listens on this port forever +# usage : ./samForward.py [port [SAM session name]] + +if len(sys.argv)>=2 : + port = eval(sys.argv[1]) +else : + port = 25000 + +if len(sys.argv)==3 : + name = sys.argv[2] +else : + name = "essaiSamForward" + +sess = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sess.connect(("127.0.0.1",7656)); +sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sess.recv(1000)) +sess.send("SESSION CREATE STYLE=DATAGRAM PORT="+str(port)+" ID="+name+" DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW\n") +sys.stdout.write(sess.recv(10000)) + +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s.bind(("", port)) +print "waiting on port:", port +while 1: + data, addr = s.recvfrom(40000) + print data, " received from ", addr, "length=", len(data) + diff --git a/apps/sam/Demos/datagramTests/samIn.py b/apps/sam/Demos/datagramTests/samIn.py new file mode 100755 index 0000000000..c2c0589f30 --- /dev/null +++ b/apps/sam/Demos/datagramTests/samIn.py @@ -0,0 +1,29 @@ +#!/usr/bin/python + + +# create a SAM datagram session that writes incoming messages on its master session stream +# and a listen forever +# usage : ./samIn.py [session name] + +import socket +import sys + +if len(sys.argv)==2 : + name = sys.argv[1] +else : + name = "datagramSamIn" + + +sess = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sess.connect(("127.0.0.1",7656)); +sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sess.recv(1000)) +sess.send("SESSION CREATE STYLE=DATAGRAM ID="+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p\n") +sys.stdout.write(sess.recv(1000)) +while 1 : + chunk = sess.recv(10000) + sys.stdout.write(chunk+'\n') + if not chunk : break +print + diff --git a/apps/sam/Demos/datagramTests/samOut.py b/apps/sam/Demos/datagramTests/samOut.py new file mode 100755 index 0000000000..bb9be4db2d --- /dev/null +++ b/apps/sam/Demos/datagramTests/samOut.py @@ -0,0 +1,31 @@ +#!/usr/bin/python + +# sends a message to datagram destinations opened by samForward.py and samIn.py, using specified sending session name +# at least samForward.py should be running for results to be seen +# usage : ./samOut.py [ sendingSessionName [ message ... ] ] +# sendingSessionName : default = datagramSamForward +# message : default = "this is nice message" + +import socket +import sys +import time + +if len(sys.argv)>=2 : + name = sys.argv[1] +else : + name = "datagramSamForward" + +if len(sys.argv)>2 : + message = ''.join([s+' ' for s in sys.argv[2:]]).strip() +else : + message = "This is a nice message" + + +# client.py +port = 7655 +host = "localhost" +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s.bind(("", 0)) +s.sendto(name+" tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAA\n"+message, (host, port)) +s.sendto(name+" EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAA\n"+message, (host, port)) + diff --git a/apps/sam/Demos/rawTests/README.txt b/apps/sam/Demos/rawTests/README.txt new file mode 100644 index 0000000000..8e79434f95 --- /dev/null +++ b/apps/sam/Demos/rawTests/README.txt @@ -0,0 +1,15 @@ +# test example + +#in a first terminal, launch : + ./samIn.py inTest + +#in a second terminal, launch : + ./samForward.py 25000 forward + +#in a third terminal, launch : +l=0 +while [ $l -lt 1000 ] +do + l=$((l+1)) + ./samOut.py forward this is message n. $l +done diff --git a/apps/sam/Demos/rawTests/samForward.py b/apps/sam/Demos/rawTests/samForward.py new file mode 100755 index 0000000000..6d55da5f77 --- /dev/null +++ b/apps/sam/Demos/rawTests/samForward.py @@ -0,0 +1,36 @@ +#!/usr/bin/python + +import socket +import sys + +# create a forward style SAM raw datagram session +# that forwards messages on specified port (default port : 25000) +# creates a standard datagram server that listens on this port forever +# usage : ./samForward.py [port [SAM session name]] + +if len(sys.argv)>=2 : + port = eval(sys.argv[1]) +else : + port = 25000 + +if len(sys.argv)==3 : + name = sys.argv[2] +else : + name = "essaiSamForward" + +sess = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sess.connect(("127.0.0.1",7656)); +sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sess.recv(1000)) +sess.send("SESSION CREATE STYLE=RAW PORT="+str(port)+" ID="+name+" DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW\n") +sys.stdout.write(sess.recv(10000)) + +# listening server +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s.bind(("", port)) +print "waiting on port:", port +while 1: + data, addr = s.recvfrom(40000) + print data, " received from ", addr, "length=", len(data) + diff --git a/apps/sam/Demos/rawTests/samIn.py b/apps/sam/Demos/rawTests/samIn.py new file mode 100755 index 0000000000..0f89deb17b --- /dev/null +++ b/apps/sam/Demos/rawTests/samIn.py @@ -0,0 +1,31 @@ +#!/usr/bin/python + + +# create a SAM datagram session that writes incoming messages on its master session stream +# and a listen forever +# usage : ./samIn.py [session name] + +import socket +import sys + +if len(sys.argv)==2 : + name = sys.argv[1] +else : + name = "datagramSamIn" + + +sess = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sess.connect(("127.0.0.1",7656)); +sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sess.recv(1000)) +sess.send("SESSION CREATE STYLE=RAW ID="+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p\n") +sys.stdout.write(sess.recv(1000)) + +# listen incoming messages +while 1 : + chunk = sess.recv(10000) + sys.stdout.write(chunk+'\n') + if not chunk : break +print + diff --git a/apps/sam/Demos/rawTests/samOut.py b/apps/sam/Demos/rawTests/samOut.py new file mode 100755 index 0000000000..bb9be4db2d --- /dev/null +++ b/apps/sam/Demos/rawTests/samOut.py @@ -0,0 +1,31 @@ +#!/usr/bin/python + +# sends a message to datagram destinations opened by samForward.py and samIn.py, using specified sending session name +# at least samForward.py should be running for results to be seen +# usage : ./samOut.py [ sendingSessionName [ message ... ] ] +# sendingSessionName : default = datagramSamForward +# message : default = "this is nice message" + +import socket +import sys +import time + +if len(sys.argv)>=2 : + name = sys.argv[1] +else : + name = "datagramSamForward" + +if len(sys.argv)>2 : + message = ''.join([s+' ' for s in sys.argv[2:]]).strip() +else : + message = "This is a nice message" + + +# client.py +port = 7655 +host = "localhost" +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +s.bind(("", 0)) +s.sendto(name+" tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAA\n"+message, (host, port)) +s.sendto(name+" EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAA\n"+message, (host, port)) + diff --git a/apps/sam/Demos/streamTests/README.txt b/apps/sam/Demos/streamTests/README.txt new file mode 100644 index 0000000000..6ce7ae57b5 --- /dev/null +++ b/apps/sam/Demos/streamTests/README.txt @@ -0,0 +1,24 @@ +# test example + +#in a first terminal, launch : + ./samIn.py inTest + +#in a second terminal, launch : + ./samOut.py + +#and again + ./samOut.py + +########## + +# test example n°2 + +#in a first terminal, launch : + ./samForward.py inTest + +#in a second terminal, launch : + ./server.py + +#in a third terminal, launch : + ./samOut.py + diff --git a/apps/sam/Demos/streamTests/samForward.py b/apps/sam/Demos/streamTests/samForward.py new file mode 100755 index 0000000000..c127534056 --- /dev/null +++ b/apps/sam/Demos/streamTests/samForward.py @@ -0,0 +1,58 @@ +#!/usr/bin/python + +import socket +import sys + +# create a master SAM stream session that opens a destination in I2P world +# then open another session that tells SAM to forward incoming connections +# to the specified address +# +# usage : +# ./samForward.py [ silent [ port [ sessionName [ host ] ] ] ] +# +# silent : should the first line of incoming socket contain the peer destination (true or false) +# port : port to which connections are forwarded (default : 25000) +# sessionName : session id (default : "forward") +# host : host to which connections are forwarded (default : this host) + +if len(sys.argv)>=2 : + silent = " SILENT="+sys.argv[1] +else : silent = " SILENT=false" + +if len(sys.argv)>=3 : + port = " PORT="+sys.argv[2] +else : port = " PORT=25000" + +if len(sys.argv)>=4 : + name = " ID="+sys.argv[3] +else : name = " ID=forward" + +if len(sys.argv)>=5 : + host = " HOST="+sys.argv[4] +else : host = "" + + + + +sess = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sess.connect(("127.0.0.1",7656)); +sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sess.recv(1000)) +sess.send("SESSION CREATE STYLE=STREAM"+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p\n") +sys.stdout.write(sess.recv(1000)) + +sock = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sock.connect(("127.0.0.1",7656)); +sock.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sock.recv(1000)) +sock.send("STREAM FORWARD" + name + host + port + silent + "\n") +sys.stdout.write(sock.recv(1000)) + +l=0 +while 1 : + chunk = sock.recv(100) + sys.stdout.write(chunk) + if not chunk : break + diff --git a/apps/sam/Demos/streamTests/samIn.py b/apps/sam/Demos/streamTests/samIn.py new file mode 100755 index 0000000000..2edfd65c62 --- /dev/null +++ b/apps/sam/Demos/streamTests/samIn.py @@ -0,0 +1,89 @@ +#!/usr/bin/python + + +# create an stream session +# then an "accept" stream connected to this session +# then another "accept" stream from the same session +# then listen from the first stream and then listen from the second +# usage : ./samIn.py [ silent [ name ] ] +# name : the session id ( defaults to InTest ) +# silent : true or false : tells wether we want to receive the incoming stream destination +# as first line + +import socket +import sys +import time + +if len(sys.argv)>=2 : + silent = " SILENT="+sys.argv[1] +else : silent = " SILENT=false" + +if len(sys.argv)>=3 : + name = sys.argv[2] +else : name = "inTest" + + + + +sess = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sess.connect(("127.0.0.1",7656)); +sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sess.recv(1000)) +sess.send("SESSION CREATE STYLE=STREAM ID="+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p\n") +sys.stdout.write(sess.recv(1000)) + + + + + +def accept() : + sock = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("127.0.0.1",7656)); + sock.send("HELLO VERSION MIN=3.0 MAX=3.0\n") + sys.stdout.write(sock.recv(1000)) + sock.send("STREAM ACCEPT ID=" + name + silent+"\n") + print "STREAM ACCEPT ID="+name+silent+"\n" + return sock + +def echo( sock, lines ) : + l = 0 + while lines==-1 or l<lines : + chunk = sock.recv(1000) + sys.stdout.write(chunk) + if lines!=-1 : l = l + 1 + if not chunk : break + sock.send(chunk) + print + + + +sock1 = accept() +time.sleep(1) + +sock2 = accept() + +print "Second listening session" +try : + echo(sock2, -1) +except : + print sock2 + +if silent == " SILENT=false" : + sys.stdout.write(sock1.recv(1000)) +else : + # we know sock1 is accepted if it receives a byte + sock1.recv(1) + +sock3 = accept() + + +print "First listening session" +echo(sock1, 2) +sock1.close() + +print "Third listening session" +echo(sock3, -1) + + diff --git a/apps/sam/Demos/streamTests/samOut.py b/apps/sam/Demos/streamTests/samOut.py new file mode 100755 index 0000000000..7e132f2c4b --- /dev/null +++ b/apps/sam/Demos/streamTests/samOut.py @@ -0,0 +1,52 @@ +#!/usr/bin/python + + +# open a I2P stream destination +# then open another stream that connects to the destination created by samForward.py or samIn.py +# then send bytes through the stream +# usage : +# ./samOut.py [ silent [ sessionName ] ] +# +# silent : should the first incoming after the connection request contain the connection status message (true or false) +# sessionName : session id (default : "forward") + +import socket +import sys +import time + +if len(sys.argv)>=2 : + silent = " SILENT="+sys.argv[1] +else : silent = " SILENT=false" + +if len(sys.argv)>=3 : + name = " ID="+sys.argv[2] +else : name = " ID=testOutStream" + +sess = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sess.connect(("127.0.0.1",7656)); +sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sess.recv(1000)) +sess.send("SESSION CREATE STYLE=STREAM"+name+" DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW\n") +sys.stdout.write(sess.recv(1000)) + +sock = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sock.connect(("127.0.0.1",7656)); +sock.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sock.recv(1000)) +sock.send("STREAM CONNECT"+name+" DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAA"+silent+"\n") + +# wait for acknowledgement before sending data, if we asked for it +if (silent==" SILENT=false") : + sys.stdout.write(sock.recv(1000)) + +for i in range(1,11) : + sock.send(str(i)+'\n') + buf=sock.recv(1000) + sys.stdout.write(str(i)+' '+buf) + if not buf : break + +print + + diff --git a/apps/sam/Demos/streamTests/samOutWithNaming.py b/apps/sam/Demos/streamTests/samOutWithNaming.py new file mode 100755 index 0000000000..6aa6476bc2 --- /dev/null +++ b/apps/sam/Demos/streamTests/samOutWithNaming.py @@ -0,0 +1,51 @@ +#!/usr/bin/python + + +# open a I2P stream destination +# then open another stream that connects to the destination created by samForward.py or samIn.py +# then send bytes through the stream +# usage : +# ./samOut.py [ silent [ sessionName ] ] +# +# silent : should the first incoming after the connection request contain the connection status message (true or false) +# sessionName : session id (default : "forward") + +import socket +import sys +import time + +if len(sys.argv)>=2 : + silent = " SILENT="+sys.argv[1] +else : silent = " SILENT=false" + +if len(sys.argv)>=3 : + name = " ID="+sys.argv[2] +else : name = " ID=testOutStream" + +sess = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sess.connect(("127.0.0.1",7656)); +sess.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sess.recv(1000)) +sess.send("SESSION CREATE STYLE=STREAM"+name+" DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW\n") +sys.stdout.write(sess.recv(1000)) + +sock = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +sock.connect(("127.0.0.1",7656)); +sock.send("HELLO VERSION MIN=3.0 MAX=3.0\n") +sys.stdout.write(sock.recv(1000)) +sock.send("STREAM CONNECT"+name+" DESTINATION=http://amiga.i2p"+silent+"\n") + +# wait for acknowledgement before sending data, if we asked for it +if (silent==" SILENT=false") : + sys.stdout.write(sock.recv(1000)) + +while (1) : + buf=sock.recv(1000) + sys.stdout.write(buf) + if not buf : break + +print + + diff --git a/apps/sam/Demos/streamTests/server.py b/apps/sam/Demos/streamTests/server.py new file mode 100755 index 0000000000..829a37e321 --- /dev/null +++ b/apps/sam/Demos/streamTests/server.py @@ -0,0 +1,41 @@ +#!/usr/bin/python + + +# echo server +# accepts a socket on specified port, writes on stdout and send back incoming data + +import socket +import sys + +if len(sys.argv)>=2 : + port = eval(sys.argv[1]) +else : port = 25000 + +#create an INET, STREAMing socket +serversocket = socket.socket( + socket.AF_INET, socket.SOCK_STREAM) +#bind the socket to a public host, +# and a well-known port +serversocket.bind(("0.0.0.0", port)) + #become a server socket +serversocket.listen(1) + + + #accept connections from outside +(clientsocket, address) = serversocket.accept() + #now do something with the clientsocket + #in this case, we'll pretend this is a threaded server + +i = 0 +while 1 : + chunk = clientsocket.recv(1024) + i = i + 1 + sys.stdout.write(str(i)+' '+chunk) + if not chunk: break + clientsocket.send(str(i)+' '+chunk) + + +clientsocket.close() + +print + diff --git a/apps/sam/doc/protocol-v3.txt b/apps/sam/doc/protocol-v3.txt new file mode 100644 index 0000000000..9aacfa124b --- /dev/null +++ b/apps/sam/doc/protocol-v3.txt @@ -0,0 +1,17 @@ +telnet localhost 7656 +HELLO VERSION MIN=3.0 MAX=3.0 +SESSION CREATE STYLE=STREAM ID=essaiSamIn DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABngJSS8xMyF4t82otZmCDhrKjbm-QLMtOLoumwR28ebDHEd4clF6O7aRa3d3yRH7p + +telnet localhost 7656 +HELLO VERSION MIN=3.0 MAX=3.0 +STREAM ACCEPT ID=essaiSamIn + + +telnet localhost 7656 +HELLO VERSION MIN=3.0 MAX=3.0 +SESSION CREATE STYLE=STREAM ID=essaiSamOut DESTINATION=EYUpJFeW9tiubXR0aOjvCJ~ndj3xN0Wn-ljuGdbpOEttPg7nj0VCTOQDJ~FAolzn9FIDdmR3VjM0OFFDT46Q5HN4vShXFE2VNC8e3~GjzxJfaJhijRC2R9oIOzsNlzKtInD2o9lh0PxPioNMCigwmgWuqlQHs4tjWeaYRAtooHxbrtuoCIhIdGfyVV-nAcPiyYbouKq3leETXE~4kBXm-LfWfyPtrv6OuDk3GBVVcthv19GYBmnl2YI8HpJjc-G-TvNkgYishjzIJyEW-Xrpy43R4ZBXlyQqnheGLlbOEY8NLDbyNHLRMMOGbcr~67SVE3Iw3RqQ3Dhrkq2FCaQwcDucfIUCCbOfCZgu0hlnCkS42xsUvegQeiwMxbdI~h9v7vcR3yFFOrHX6WQvIZSbFLKNGArGJcfmOJVLqw1wTC4AgYXjk3csVDPd-QWbMXOuodyBgrg27Ds2BBYTsVXWskoo6ASsMIQZ6jMfL7PkY9dPLCRParIyzb9aPmf~MntNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABHNqwgkhJnBW4ymaRsdVmITAha-ff0UiALfKSlznqp5HcSewgMHbzQ0I01TQytFnW + +telnet localhost 7656 +HELLO VERSION MIN=3.0 MAX=3.0 +STREAM CONNECT ID=essaiSamOut DESTINATION=tYhjbFlFL38WFuO5eCzTvE0UBr4RfaqWMKlekGeMoB-Ouz7nYaWfiS-9j3jMiZT7FH~pwdmoSREOs2ZbXK84sR59P~pPfeCMxnJrk57f3U9uKzXkesjkKWYco3YAGs-G8sw8Fu2FBx0Do57yBdA9~j8Zq6pMjmgPBXCLuXG3vo0Z8zUWCjApJyFY6OXYopHck9Fz9vKy7YhC6zXFHfEuNHVkAooduiLd~aCoGij0TW3lH2rTVU-lx-DUdi6edxQ5-RvDNkXfikvytoCpRkivbNVytjCJLk~7RNU4FpBD20wTZWNJmEG3OY3cjNjawJVFdNjtgczh9K7gZ7ad-NjVjZVhXEj1lU8mk~vAH-2QE5om8dstWUwWoNDwmVDlvIJNKzQmahG~VrpFexFHXO0n3fKIXcSgWGOHDExM8w9neCt7AxUjxPDtXXuYNW~bRwcfiL-C9~z4K9rmwiTPZX0lmsToSXTF28l7WAoj~TMT9kZAjQeFRRWU5oW5oxVuonVvAAAA + diff --git a/apps/sam/java/src/net/i2p/sam/SAMBridge.java b/apps/sam/java/src/net/i2p/sam/SAMBridge.java index ee7e33bb4a..95ece882fc 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMBridge.java +++ b/apps/sam/java/src/net/i2p/sam/SAMBridge.java @@ -14,9 +14,10 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -34,7 +35,7 @@ import net.i2p.util.Log; */ public class SAMBridge implements Runnable { private final static Log _log = new Log(SAMBridge.class); - private ServerSocket serverSocket; + private ServerSocketChannel serverSocket; private Properties i2cpProps; /** * filename in which the name to private key mapping should @@ -45,12 +46,17 @@ public class SAMBridge implements Runnable { * app designated destination name to the base64 of the I2P formatted * destination keys (Destination+PrivateKey+SigningPrivateKey) */ - private Map nameToPrivKeys; + private Map<String,String> nameToPrivKeys; private boolean acceptConnections = true; private static final int SAM_LISTENPORT = 7656; public static final String DEFAULT_SAM_KEYFILE = "sam.keys"; + public static final String PROP_DATAGRAM_HOST = "sam.datagram.host"; + public static final String PROP_DATAGRAM_PORT = "sam.datagram.port"; + public static final String DEFAULT_DATAGRAM_HOST = "0.0.0.0"; + public static final String DEFAULT_DATAGRAM_PORT = "7655"; + private SAMBridge() {} @@ -64,16 +70,18 @@ public class SAMBridge implements Runnable { */ public SAMBridge(String listenHost, int listenPort, Properties i2cpProps, String persistFile) { persistFilename = persistFile; - nameToPrivKeys = new HashMap(8); + nameToPrivKeys = new HashMap<String,String>(8); loadKeys(); try { if ( (listenHost != null) && !("0.0.0.0".equals(listenHost)) ) { - serverSocket = new ServerSocket(listenPort, 0, InetAddress.getByName(listenHost)); + serverSocket = ServerSocketChannel.open(); + serverSocket.socket().bind(new InetSocketAddress(listenHost, listenPort)); if (_log.shouldLog(Log.DEBUG)) _log.debug("SAM bridge listening on " + listenHost + ":" + listenPort); } else { - serverSocket = new ServerSocket(listenPort); + serverSocket = ServerSocketChannel.open(); + serverSocket.socket().bind(new InetSocketAddress(listenPort)); if (_log.shouldLog(Log.DEBUG)) _log.debug("SAM bridge listening on 0.0.0.0:" + listenPort); } @@ -193,12 +201,12 @@ public class SAMBridge implements Runnable { /** * Usage: - * <pre>SAMBridge [[listenHost ]listenPort[ name=val]*]</pre> + * <pre>SAMBridge [ keyfile [listenHost ] listenPort [ name=val ]* ]</pre> * * name=val options are passed to the I2CP code to build a session, * allowing the bridge to specify an alternate I2CP host and port, tunnel * depth, etc. - * @param args [[listenHost ]listenPort[ name=val]*] + * @param args [ keyfile [ listenHost ] listenPort [ name=val ]* ] */ public static void main(String args[]) { String keyfile = DEFAULT_SAM_KEYFILE; @@ -266,11 +274,11 @@ public class SAMBridge implements Runnable { if (serverSocket == null) return; try { while (acceptConnections) { - Socket s = serverSocket.accept(); + SocketChannel s = serverSocket.accept(); if (_log.shouldLog(Log.DEBUG)) _log.debug("New connection from " - + s.getInetAddress().toString() + ":" - + s.getPort()); + + s.socket().getInetAddress().toString() + ":" + + s.socket().getPort()); try { SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps); @@ -289,7 +297,7 @@ public class SAMBridge implements Runnable { _log.error("SAM error: " + e.getMessage(), e); try { String reply = "HELLO REPLY RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n"; - s.getOutputStream().write(reply.getBytes("ISO-8859-1")); + s.write(ByteBuffer.wrap(reply.getBytes("ISO-8859-1"))); } catch (IOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("SAM Error sending error reply", ioe); diff --git a/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java b/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java index a3e20f7df8..c8d31b489d 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java @@ -30,7 +30,7 @@ public class SAMDatagramSession extends SAMMessageSession { private final static Log _log = new Log(SAMDatagramSession.class); public static int DGRAM_SIZE_MAX = 31*1024; - private SAMDatagramReceiver recv = null; + protected SAMDatagramReceiver recv = null; private I2PDatagramMaker dgramMaker; private I2PDatagramDissector dgramDissector = new I2PDatagramDissector(); @@ -84,9 +84,10 @@ public class SAMDatagramSession extends SAMMessageSession { public boolean sendBytes(String dest, byte[] data) throws DataFormatException { if (data.length > DGRAM_SIZE_MAX) throw new DataFormatException("Datagram size exceeded (" + data.length + ")"); - - byte[] dgram = dgramMaker.makeI2PDatagram(data); - + byte[] dgram ; + synchronized (dgramMaker) { + dgram = dgramMaker.makeI2PDatagram(data); + } return sendBytesThroughMessageSession(dest, dgram); } diff --git a/apps/sam/java/src/net/i2p/sam/SAMException.java b/apps/sam/java/src/net/i2p/sam/SAMException.java index e51e35ea4f..ae965a4c8c 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMException.java +++ b/apps/sam/java/src/net/i2p/sam/SAMException.java @@ -15,11 +15,13 @@ package net.i2p.sam; */ public class SAMException extends Exception { + static final long serialVersionUID = 1 ; + public SAMException() { - super(); + super(); } public SAMException(String s) { - super(s); + super(s); } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandler.java b/apps/sam/java/src/net/i2p/sam/SAMHandler.java index 64d824a578..d53a5a6621 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandler.java @@ -9,9 +9,8 @@ package net.i2p.sam; */ import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.nio.ByteBuffer; import java.util.Properties; import net.i2p.util.I2PAppThread; @@ -32,8 +31,7 @@ public abstract class SAMHandler implements Runnable { protected SAMBridge bridge = null; private Object socketWLock = new Object(); // Guards writings on socket - private Socket socket = null; - private OutputStream socketOS = null; // Stream associated to socket + protected SocketChannel socket = null; protected int verMajor = 0; protected int verMinor = 0; @@ -53,10 +51,9 @@ public abstract class SAMHandler implements Runnable { * @param i2cpProps properties to configure the I2CP connection (host, port, etc) * @throws IOException */ - protected SAMHandler(Socket s, + protected SAMHandler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws IOException { socket = s; - socketOS = socket.getOutputStream(); this.verMajor = verMajor; this.verMinor = verMinor; @@ -86,8 +83,8 @@ public abstract class SAMHandler implements Runnable { * @return input stream * @throws IOException */ - protected final InputStream getClientSocketInputStream() throws IOException { - return socket.getInputStream(); + protected final SocketChannel getClientSocket() { + return socket ; } /** @@ -98,13 +95,17 @@ public abstract class SAMHandler implements Runnable { * @param data A byte array to be written * @throws IOException */ - protected final void writeBytes(byte[] data) throws IOException { + protected final void writeBytes(ByteBuffer data) throws IOException { synchronized (socketWLock) { - socketOS.write(data); - socketOS.flush(); + writeBytes(data, socket); } } + static public void writeBytes(ByteBuffer data, SocketChannel out) throws IOException { + while (data.hasRemaining()) out.write(data); + out.socket().getOutputStream().flush(); + } + /** * If you're crazy enough to write to the raw socket, grab the write lock * with getWriteLock(), synchronize against it, and write to the getOut() @@ -112,7 +113,6 @@ public abstract class SAMHandler implements Runnable { * @return socket Write lock object */ protected Object getWriteLock() { return socketWLock; } - protected OutputStream getOut() { return socketOS; } /** * Write a string to the handler's socket. This method must @@ -121,21 +121,25 @@ public abstract class SAMHandler implements Runnable { * * @param str A byte array to be written * - * @return True is the string was successfully written, false otherwise + * @return True if the string was successfully written, false otherwise */ protected final boolean writeString(String str) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending the client: [" + str + "]"); - try { - writeBytes(str.getBytes("ISO-8859-1")); + return writeString(str, socket); + } + + public static boolean writeString(String str, SocketChannel out) + { + try { + writeBytes(ByteBuffer.wrap(str.getBytes("ISO-8859-1")), out); } catch (IOException e) { _log.debug("Caught IOException", e); return false; } - - return true; + return true ; } - + /** * Close the socket connected to the SAM client. * @@ -178,8 +182,8 @@ public abstract class SAMHandler implements Runnable { return ("SAM handler (class: " + this.getClass().getName() + "; SAM version: " + verMajor + "." + verMinor + "; client: " - + this.socket.getInetAddress().toString() + ":" - + this.socket.getPort() + ")"); + + this.socket.socket().getInetAddress().toString() + ":" + + this.socket.socket().getPort() + ")"); } public final void run() { diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java index 21a0e97d27..b5c3d198c3 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java @@ -9,9 +9,9 @@ package net.i2p.sam; */ import java.io.IOException; -import java.io.OutputStream; import java.io.UnsupportedEncodingException; -import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.nio.ByteBuffer; import java.util.Properties; import java.util.StringTokenizer; @@ -34,17 +34,17 @@ public class SAMHandlerFactory { * @throws SAMException if the connection handshake (HELLO message) was malformed * @return A SAM protocol handler, or null if the client closed before the handshake */ - public static SAMHandler createSAMHandler(Socket s, Properties i2cpProps) throws SAMException { + public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps) throws SAMException { String line; StringTokenizer tok; try { - line = DataHelper.readLine(s.getInputStream()); + line = DataHelper.readLine(s.socket().getInputStream()); if (line == null) { _log.debug("Connection closed by client"); return null; } - tok = new StringTokenizer(line, " "); + tok = new StringTokenizer(line.trim(), " "); } catch (IOException e) { throw new SAMException("Error reading from socket: " + e.getMessage()); @@ -89,9 +89,8 @@ public class SAMHandlerFactory { // Let's answer positively try { - OutputStream out = s.getOutputStream(); - out.write(("HELLO REPLY RESULT=OK VERSION=" - + ver + "\n").getBytes("ISO-8859-1")); + s.write(ByteBuffer.wrap(("HELLO REPLY RESULT=OK VERSION=" + + ver + "\n").getBytes("ISO-8859-1"))); } catch (UnsupportedEncodingException e) { _log.error("Caught UnsupportedEncodingException (" + e.getMessage() + ")"); @@ -115,6 +114,9 @@ public class SAMHandlerFactory { case 2: handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps); break; + case 3: + handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps); + break; default: _log.error("BUG! Trying to initialize the wrong SAM version!"); throw new SAMException("BUG! (in handler instantiation)"); @@ -128,6 +130,7 @@ public class SAMHandlerFactory { /* Return the best version we can use, or null on failure */ private static String chooseBestVersion(String minVer, String maxVer) { + int minMajor = getMajor(minVer), minMinor = getMinor(minVer); int maxMajor = getMajor(maxVer), maxMinor = getMinor(maxVer); @@ -143,6 +146,8 @@ public class SAMHandlerFactory { float fmaxVer = (float) maxMajor + (float) maxMinor / 10 ; + if ( ( fminVer <= 3.0 ) && ( fmaxVer >= 3.0 ) ) return "3.0" ; + if ( ( fminVer <= 2.0 ) && ( fmaxVer >= 2.0 ) ) return "2.0" ; if ( ( fminVer <= 1.0 ) && ( fmaxVer >= 1.0 ) ) return "1.0" ; diff --git a/apps/sam/java/src/net/i2p/sam/SAMInvalidDirectionException.java b/apps/sam/java/src/net/i2p/sam/SAMInvalidDirectionException.java index b52ecda65d..cd1c6b1a57 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMInvalidDirectionException.java +++ b/apps/sam/java/src/net/i2p/sam/SAMInvalidDirectionException.java @@ -15,7 +15,8 @@ package net.i2p.sam; * @author human */ public class SAMInvalidDirectionException extends Exception { - + static final long serialVersionUID = 1 ; + public SAMInvalidDirectionException() { super(); } diff --git a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java index b29b2f84c5..2c8ed2756b 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java @@ -109,8 +109,7 @@ public abstract class SAMMessageSession { * @throws DataFormatException */ protected boolean sendBytesThroughMessageSession(String dest, byte[] data) throws DataFormatException { - Destination d = new Destination(); - d.fromBase64(dest); + Destination d = SAMUtils.getDest(dest); if (_log.shouldLog(Log.DEBUG)) { _log.debug("Sending " + data.length + " bytes to " + dest); diff --git a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java index 7f56066b1b..92bf4960dd 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java @@ -26,7 +26,7 @@ public class SAMRawSession extends SAMMessageSession { private final static Log _log = new Log(SAMRawSession.class); public static final int RAW_SIZE_MAX = 32*1024; - private SAMRawReceiver recv = null; + protected SAMRawReceiver recv = null; /** * Create a new SAM RAW session. * diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java b/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java index 6d6d824b5f..326c81020e 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java @@ -9,6 +9,7 @@ package net.i2p.sam; */ import java.io.IOException; +import java.nio.ByteBuffer; import net.i2p.data.Destination; @@ -60,7 +61,7 @@ public interface SAMStreamReceiver { * @param len Number of bytes in data * @throws IOException */ - public void receiveStreamBytes(int id, byte data[], int len) throws IOException; + public void receiveStreamBytes(int id, ByteBuffer data) throws IOException; /** * Notify that a connection has been closed diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index 280562e48e..aef2802bd8 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -13,6 +13,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.util.ArrayList; @@ -51,15 +53,15 @@ public class SAMStreamSession { protected SAMStreamReceiver recv = null; - private SAMStreamSessionServer server = null; + protected SAMStreamSessionServer server = null; protected I2PSocketManager socketMgr = null; private Object handlersMapLock = new Object(); /** stream id (Long) to SAMStreamSessionSocketReader */ - private HashMap handlersMap = new HashMap(); + private HashMap<Integer,SAMStreamSessionSocketReader> handlersMap = new HashMap<Integer,SAMStreamSessionSocketReader>(); /** stream id (Long) to StreamSender */ - private HashMap sendersMap = new HashMap(); + private HashMap<Integer,StreamSender> sendersMap = new HashMap<Integer,StreamSender>(); private Object idLock = new Object(); private int lastNegativeId = 0; @@ -76,6 +78,10 @@ public class SAMStreamSession { public static String PROP_FORCE_FLUSH = "sam.forceFlush"; public static String DEFAULT_FORCE_FLUSH = "false"; + public SAMStreamSession() { + + } + /** * Create a new SAM STREAM session. * @@ -166,7 +172,7 @@ public class SAMStreamSession { } } - private class DisconnectListener implements I2PSocketManager.DisconnectListener { + protected class DisconnectListener implements I2PSocketManager.DisconnectListener { public void sessionDisconnected() { close(); } @@ -572,19 +578,20 @@ public class SAMStreamSession { _log.debug("run() called for socket reader " + id); int read = -1; - byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE]; + ByteBuffer data = ByteBuffer.allocateDirect(SOCKET_HANDLER_BUF_SIZE); try { InputStream in = i2pSocket.getInputStream(); while (stillRunning) { - read = in.read(data); + data.clear(); + read = Channels.newChannel(in).read(data); if (read == -1) { _log.debug("Handler " + id + ": connection closed"); break; } - - recv.receiveStreamBytes(id, data, read); + data.flip(); + recv.receiveStreamBytes(id, data); } } catch (IOException e) { _log.debug("Caught IOException", e); @@ -650,7 +657,7 @@ public class SAMStreamSession { protected class v1StreamSender extends StreamSender { - private List _data; + private List<ByteArray> _data; private int _id; private ByteCache _cache; private OutputStream _out = null; @@ -660,7 +667,7 @@ public class SAMStreamSession { public v1StreamSender ( I2PSocket s, int id ) throws IOException { super ( s, id ); - _data = new ArrayList(1); + _data = new ArrayList<ByteArray>(1); _id = id; _cache = ByteCache.getInstance(4, 32*1024); _out = s.getOutputStream(); diff --git a/apps/sam/java/src/net/i2p/sam/SAMUtils.java b/apps/sam/java/src/net/i2p/sam/SAMUtils.java index 8bb3fac300..6a6d81f880 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMUtils.java +++ b/apps/sam/java/src/net/i2p/sam/SAMUtils.java @@ -101,6 +101,24 @@ public class SAMUtils { return dest; } + /** + * Resolve the destination from a key or a hostname + * + * @param s Hostname or key to be resolved + * + * @return the Destination for the specified hostname, or null if not found + */ + public static Destination getDest(String s) + { + Destination d = new Destination() ; + try { + d.fromBase64(s); + return d ; + } catch (DataFormatException e) { + return lookupHost(s, null); + } + } + /** * Parse SAM parameters, and put them into a Propetries object * diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index 93a9a8d669..ac86d69346 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -12,12 +12,11 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.io.InterruptedIOException; -import java.io.OutputStream; import java.net.ConnectException; import java.net.NoRouteToHostException; -import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.nio.ByteBuffer; import java.util.Properties; import java.util.StringTokenizer; @@ -40,14 +39,14 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag private final static Log _log = new Log(SAMv1Handler.class); - private final static int IN_BUFSIZE = 2048; + protected SAMRawSession rawSession = null; + protected SAMDatagramSession datagramSession = null; + protected SAMStreamSession streamSession = null; + protected SAMDatagramSession getDatagramSession() {return datagramSession ;} + protected SAMRawSession getRawSession() {return rawSession ;} - private SAMRawSession rawSession = null; - private SAMDatagramSession datagramSession = null; - protected SAMStreamSession streamSession = null; - - private long _id; - private static volatile long __id = 0; + protected long _id; + protected static volatile long __id = 0; /** * Create a new SAM version 1 handler. This constructor expects @@ -60,7 +59,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag * @throws SAMException * @throws IOException */ - public SAMv1Handler(Socket s, int verMajor, int verMinor) throws SAMException, IOException { + public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException { this(s, verMajor, verMinor, new Properties()); } /** @@ -75,7 +74,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag * @throws SAMException * @throws IOException */ - public SAMv1Handler(Socket s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException { + public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException { super(s, verMajor, verMinor, i2cpProps); _id = ++__id; _log.debug("SAM version 1 handler instantiated"); @@ -101,16 +100,13 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag _log.debug("SAM handling started"); try { - InputStream in = getClientSocketInputStream(); - int b = -1; - while (true) { if (shouldStop()) { _log.debug("Stop request found"); break; } - msg = DataHelper.readLine(in); + msg = DataHelper.readLine(getClientSocket().socket().getInputStream()).trim(); if (msg == null) { _log.debug("Connection closed by client"); break; @@ -175,11 +171,11 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } catch (IOException e) { _log.error("Error closing socket: " + e.getMessage()); } - if (rawSession != null) { - rawSession.close(); + if (getRawSession() != null) { + getRawSession().close(); } - if (datagramSession != null) { - datagramSession.close(); + if (getDatagramSession() != null) { + getDatagramSession().close(); } if (streamSession != null) { streamSession.close(); @@ -188,13 +184,13 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } /* Parse and execute a SESSION message */ - private boolean execSessionMessage(String opcode, Properties props) { + protected boolean execSessionMessage(String opcode, Properties props) { String dest = "BUG!"; try{ if (opcode.equals("CREATE")) { - if ((rawSession != null) || (datagramSession != null) + if ((getRawSession() != null) || (getDatagramSession() != null) || (streamSession != null)) { _log.debug("Trying to create a session, but one still exists"); return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n"); @@ -293,7 +289,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } /* Parse and execute a DEST message*/ - private boolean execDestMessage(String opcode, Properties props) { + protected boolean execDestMessage(String opcode, Properties props) { if (opcode.equals("GENERATE")) { if (props.size() > 0) { @@ -318,7 +314,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } /* Parse and execute a NAMING message */ - private boolean execNamingMessage(String opcode, Properties props) { + protected boolean execNamingMessage(String opcode, Properties props) { if (opcode.equals("LOOKUP")) { if (props == null) { _log.debug("No parameters specified in NAMING LOOKUP message"); @@ -333,18 +329,18 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag Destination dest; if (name.equals("ME")) { - if (rawSession != null) { - dest = rawSession.getDestination(); + if (getRawSession() != null) { + dest = getRawSession().getDestination(); } else if (streamSession != null) { dest = streamSession.getDestination(); - } else if (datagramSession != null) { - dest = datagramSession.getDestination(); + } else if (getDatagramSession() != null) { + dest = getDatagramSession().getDestination(); } else { _log.debug("Lookup for SESSION destination, but session is null"); return false; } } else { - dest = SAMUtils.lookupHost(name, null); + dest = SAMUtils.getDest(name); } if (dest == null) { @@ -364,8 +360,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag /* Parse and execute a DATAGRAM message */ - private boolean execDatagramMessage(String opcode, Properties props) { - if (datagramSession == null) { + protected boolean execDatagramMessage(String opcode, Properties props) { + if (getDatagramSession() == null) { _log.error("DATAGRAM message received, but no DATAGRAM session exists"); return false; } @@ -403,7 +399,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } try { - DataInputStream in = new DataInputStream(getClientSocketInputStream()); + DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream()); byte[] data = new byte[size]; in.readFully(data); @@ -435,8 +431,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } /* Parse and execute a RAW message */ - private boolean execRawMessage(String opcode, Properties props) { - if (rawSession == null) { + protected boolean execRawMessage(String opcode, Properties props) { + if (getRawSession() == null) { _log.error("RAW message received, but no RAW session exists"); return false; } @@ -474,12 +470,12 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } try { - DataInputStream in = new DataInputStream(getClientSocketInputStream()); + DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream()); byte[] data = new byte[size]; in.readFully(data); - if (!rawSession.sendBytes(dest, data)) { + if (!getRawSession().sendBytes(dest, data)) { _log.error("RAW SEND failed"); return true; } @@ -567,7 +563,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } try { - if (!streamSession.sendBytes(id, getClientSocketInputStream(), size)) { // data)) { + if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) { if (_log.shouldLog(Log.WARN)) _log.warn("STREAM SEND [" + size + "] failed"); boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n"); @@ -691,7 +687,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag // SAMRawReceiver implementation public void receiveRawBytes(byte data[]) throws IOException { - if (rawSession == null) { + if (getRawSession() == null) { _log.error("BUG! Received raw bytes, but session is null!"); throw new NullPointerException("BUG! RAW session is null!"); } @@ -701,17 +697,18 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag String msgText = "RAW RECEIVED SIZE=" + data.length + "\n"; msg.write(msgText.getBytes("ISO-8859-1")); msg.write(data); + msg.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("sending to client: " + msgText); - writeBytes(msg.toByteArray()); + writeBytes(ByteBuffer.wrap(msg.toByteArray())); } public void stopRawReceiving() { _log.debug("stopRawReceiving() invoked"); - if (rawSession == null) { + if (getRawSession() == null) { _log.error("BUG! Got raw receiving stop, but session is null!"); throw new NullPointerException("BUG! RAW session is null!"); } @@ -726,7 +723,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag // SAMDatagramReceiver implementation public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException { - if (datagramSession == null) { + if (getDatagramSession() == null) { _log.error("BUG! Received datagram bytes, but session is null!"); throw new NullPointerException("BUG! DATAGRAM session is null!"); } @@ -740,14 +737,14 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag if (_log.shouldLog(Log.DEBUG)) _log.debug("sending to client: " + msgText); msg.write(data); - - writeBytes(msg.toByteArray()); + msg.flush(); + writeBytes(ByteBuffer.wrap(msg.toByteArray())); } public void stopDatagramReceiving() { _log.debug("stopDatagramReceiving() invoked"); - if (datagramSession == null) { + if (getDatagramSession() == null) { _log.error("BUG! Got datagram receiving stop, but session is null!"); throw new NullPointerException("BUG! DATAGRAM session is null!"); } @@ -830,29 +827,23 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag } } - public void receiveStreamBytes(int id, byte data[], int len) throws IOException { + public void receiveStreamBytes(int id, ByteBuffer data) throws IOException { if (streamSession == null) { _log.error("Received stream bytes, but session is null!"); throw new NullPointerException("BUG! STREAM session is null!"); } - String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + len + "\n"; + String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + data.remaining() + "\n"; if (_log.shouldLog(Log.DEBUG)) _log.debug("sending to client: " + msgText); - byte prefix[] = msgText.getBytes("ISO-8859-1"); + ByteBuffer prefix = ByteBuffer.wrap(msgText.getBytes("ISO-8859-1")); - // dont waste so much memory - //ByteArrayOutputStream msg = new ByteArrayOutputStream(); - //msg.write(msgText.getBytes("ISO-8859-1")); - //msg.write(data, 0, len); - // writeBytes(msg.toByteArray()); Object writeLock = getWriteLock(); - OutputStream out = getOut(); synchronized (writeLock) { - out.write(prefix); - out.write(data, 0, len); - out.flush(); + while (prefix.hasRemaining()) socket.write(prefix); + while (data.hasRemaining()) socket.write(data); + socket.socket().getOutputStream().flush(); } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java index 75f1bd4b47..0800e17e90 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java @@ -9,7 +9,7 @@ package net.i2p.sam; */ import java.io.IOException; -import java.net.Socket; +import java.nio.channels.SocketChannel; import java.util.Properties; import net.i2p.data.DataFormatException; @@ -36,7 +36,7 @@ public class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDat * @param verMajor SAM major version to manage (should be 2) * @param verMinor SAM minor version to manage */ - public SAMv2Handler ( Socket s, int verMajor, int verMinor ) throws SAMException, IOException + public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException { this ( s, verMajor, verMinor, new Properties() ); } @@ -52,7 +52,7 @@ public class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDat * @param i2cpProps properties to configure the I2CP connection (host, port, etc) */ - public SAMv2Handler ( Socket s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException + public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException { super ( s, verMajor, verMinor, i2cpProps ); } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java index de5b7851bc..4197597eb0 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java @@ -12,6 +12,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.ByteBuffer; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.util.ArrayList; @@ -140,9 +142,6 @@ public class SAMv2StreamSession extends SAMStreamSession public class StreamConnector implements Runnable { - private Object runningLock = new Object(); - private boolean stillRunning = true; - private int id; private Destination dest ; private I2PSocketOptions opts ; @@ -245,7 +244,7 @@ public class SAMv2StreamSession extends SAMStreamSession protected class v2StreamSender extends StreamSender { - private List _data; + private List<ByteArray> _data; private int _dataSize; private int _id; private ByteCache _cache; @@ -257,7 +256,7 @@ public class SAMv2StreamSession extends SAMStreamSession public v2StreamSender ( I2PSocket s, int id ) throws IOException { super ( s, id ); - _data = new ArrayList ( 1 ); + _data = new ArrayList<ByteArray> ( 1 ); _dataSize = 0; _id = id; _cache = ByteCache.getInstance ( 10, 32 * 1024 ); @@ -511,7 +510,7 @@ public class SAMv2StreamSession extends SAMStreamSession _log.debug ( "run() called for socket reader " + id ); int read = -1; - byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE]; + ByteBuffer data = ByteBuffer.allocateDirect(SOCKET_HANDLER_BUF_SIZE); try { @@ -533,7 +532,8 @@ public class SAMv2StreamSession extends SAMStreamSession break ; } - read = in.read ( data ); + data.clear(); + read = Channels.newChannel(in).read ( data ); if ( read == -1 ) { @@ -542,8 +542,8 @@ public class SAMv2StreamSession extends SAMStreamSession } totalReceived += read ; - - recv.receiveStreamBytes ( id, data, read ); + data.flip(); + recv.receiveStreamBytes ( id, data ); } } catch ( IOException e ) diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java new file mode 100644 index 0000000000..dcca03c141 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java @@ -0,0 +1,90 @@ +/** + * @author MKVore + * + */ + +package net.i2p.sam; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Properties; + +import net.i2p.client.I2PSessionException; +import net.i2p.data.DataFormatException; +import net.i2p.data.Destination; +import net.i2p.util.Log; + +import java.net.InetSocketAddress; +import java.net.SocketAddress ; +import java.nio.ByteBuffer; + +public class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Session, SAMDatagramReceiver { + + private final static Log _log = new Log ( SAMv3DatagramSession.class ); + + SAMv3Handler handler = null ; + SAMv3Handler.DatagramServer server = null ; + String nick = null ; + SocketAddress clientAddress = null ; + + public String getNick() { return nick; } + + /** + * @param nick nickname of the session + * @param server DatagramServer used for communication with the client + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + */ + public SAMv3DatagramSession(String nick) + throws IOException, DataFormatException, I2PSessionException { + + super(SAMv3Handler.sSessionsHash.get(nick).getDest(), + SAMv3Handler.sSessionsHash.get(nick).getProps(), + null + ); + this.nick = nick ; + this.recv = this ; + this.server = SAMv3Handler.DatagramServer.getInstance() ; + + SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + if ( rec==null ) throw new InterruptedIOException() ; + + this.handler = rec.getHandler(); + + Properties props = rec.getProps(); + String portStr = props.getProperty("PORT") ; + if ( portStr==null ) { + _log.debug("receiver port not specified. Current socket will be used."); + } + else { + int port = Integer.parseInt(portStr); + + String host = props.getProperty("HOST"); + if ( host==null ) { + _log.debug("no host specified. Take from the client socket"); + + host = rec.getHandler().getClientIP(); + } + + + this.clientAddress = new InetSocketAddress(host,port); + } + } + + public void receiveDatagramBytes(Destination sender, byte[] data) throws IOException { + if (this.clientAddress==null) { + this.handler.receiveDatagramBytes(sender, data); + } else { + String msg = sender.toBase64()+"\n"; + ByteBuffer msgBuf = ByteBuffer.allocate(msg.length()+data.length); + msgBuf.put(msg.getBytes("ISO-8859-1")); + msgBuf.put(data); + msgBuf.flip(); + this.server.send(this.clientAddress, msgBuf); + } + } + + public void stopDatagramReceiving() { + } +} diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java new file mode 100644 index 0000000000..21ff0df680 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -0,0 +1,748 @@ +package net.i2p.sam; +/* + * free (adj.): unencumbered; not under the control of others + * Written by human in 2004 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.NoRouteToHostException; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SocketChannel; +import java.nio.ByteBuffer; +import java.util.Properties; +import java.util.HashMap; +import java.util.StringTokenizer; + +import net.i2p.I2PException; +import net.i2p.client.I2PSessionException; +import net.i2p.data.Base64; +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.util.Log; +import net.i2p.data.VerifiedDestination; +import net.i2p.util.I2PAppThread; + +/** + * Class able to handle a SAM version 3 client connection. + * + * @author mkvore + */ + +public class SAMv3Handler extends SAMv1Handler +{ + private final static Log _log = new Log ( SAMv3Handler.class ); + + protected SAMv3StreamSession streamSession = null ; + protected SAMv3RawSession rawSession = null ; + protected SAMv3DatagramSession datagramSession = null ; + + protected SAMDatagramSession getDatagramSession() { + return datagramSession ; + } + + protected SAMRawSession getRawSession() { + return rawSession ; + } + + protected Session session = null ; + + interface Session { + String getNick(); + void close(); + boolean sendBytes(String dest, byte[] data) throws DataFormatException; + } + + /** + * Create a new SAM version 3 handler. This constructor expects + * that the SAM HELLO message has been still answered (and + * stripped) from the socket input stream. + * + * @param s Socket attached to a SAM client + * @param verMajor SAM major version to manage (should be 3) + * @param verMinor SAM minor version to manage + */ + public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException + { + this ( s, verMajor, verMinor, new Properties() ); + } + + /** + * Create a new SAM version 3 handler. This constructor expects + * that the SAM HELLO message has been still answered (and + * stripped) from the socket input stream. + * + * @param s Socket attached to a SAM client + * @param verMajor SAM major version to manage (should be 3) + * @param verMinor SAM minor version to manage + * @param i2cpProps properties to configure the I2CP connection (host, port, etc) + */ + + public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException + { + super ( s, verMajor, verMinor, i2cpProps ); + _log.debug("SAM version 3 handler instantiated"); + } + + public boolean verifVersion() + { + return (verMajor == 3 && verMinor == 0) ; + } + + static public class DatagramServer { + + private static DatagramServer _instance = null ; + private static DatagramChannel server = null ; + + public static DatagramServer getInstance() throws IOException { + return getInstance(new Properties()); + } + + public static DatagramServer getInstance(Properties props) throws IOException { + if (_instance==null) { + _instance = new DatagramServer(props); + } + return _instance ; + } + + public DatagramServer(Properties props) throws IOException { + if (server==null) { + server = DatagramChannel.open(); + } + + String host = props.getProperty(SAMBridge.PROP_DATAGRAM_HOST, SAMBridge.DEFAULT_DATAGRAM_HOST); + String portStr = props.getProperty(SAMBridge.PROP_DATAGRAM_PORT, SAMBridge.DEFAULT_DATAGRAM_PORT); + int port ; + try { + port = Integer.parseInt(portStr); + } catch (NumberFormatException e) { + port = Integer.parseInt(SAMBridge.DEFAULT_DATAGRAM_PORT); + } + + server.socket().bind(new InetSocketAddress(host, port)); + new I2PAppThread(new Listener(server), "DatagramListener").start(); + } + + public void send(SocketAddress addr, ByteBuffer msg) throws IOException { + server.send(msg, addr); + } + + class Listener implements Runnable { + + DatagramChannel server = null; + + public Listener(DatagramChannel server) + { + this.server = server ; + } + public void run() + { + ByteBuffer inBuf = ByteBuffer.allocateDirect(SAMRawSession.RAW_SIZE_MAX+1024); + + while (!Thread.interrupted()) + { + inBuf.clear(); + try { + server.receive(inBuf); + } catch (IOException e) { + break ; + } + inBuf.flip(); + ByteBuffer outBuf = ByteBuffer.wrap(new byte[inBuf.remaining()]); + outBuf.put(inBuf); + outBuf.flip(); + new I2PAppThread(new MessageDispatcher(outBuf.array()), "MessageDispatcher").start(); + } + } + } + } + + public static class MessageDispatcher implements Runnable + { + ByteArrayInputStream is = null ; + + public MessageDispatcher(byte[] buf) + { + this.is = new java.io.ByteArrayInputStream(buf) ; + } + + public void run() { + String header = null ; + String nick ; + String dest ; + + try { + header = DataHelper.readLine(is).trim(); + StringTokenizer tok = new StringTokenizer(header, " "); + if (tok.countTokens() != 2) { + // This is not a correct message, for sure + _log.debug("Error in message format"); + return; + } + nick = tok.nextToken(); + dest = tok.nextToken(); + + byte[] data = new byte[is.available()]; + is.read(data); + SessionRecord rec = sSessionsHash.get(nick); + if (rec!=null) { + rec.getHandler().session.sendBytes(dest,data); + } + } catch (Exception e) {} + } + } + + public class SessionRecord + { + protected String m_dest ; + protected Properties m_props ; + protected ThreadGroup m_threadgroup ; + protected SAMv3Handler m_handler ; + + public SessionRecord( String dest, Properties props, SAMv3Handler handler ) + { + m_dest = new String(dest) ; + m_props = new Properties() ; + m_props.putAll(props); + m_threadgroup = null ; + m_handler = handler ; + } + + public SessionRecord( SessionRecord in ) + { + m_dest = in.getDest(); + m_props = in.getProps(); + m_threadgroup = in.getThreadGroup(); + m_handler = in.getHandler(); + } + + synchronized public String getDest() + { + return new String(m_dest) ; + } + synchronized public Properties getProps() + { + Properties p = new Properties(); + p.putAll(m_props); + return m_props; + } + synchronized public SAMv3Handler getHandler() + { + return m_handler ; + } + synchronized public ThreadGroup getThreadGroup() + { + return m_threadgroup ; + } + synchronized public void createThreadGroup(String name) + { + if (m_threadgroup == null) + m_threadgroup = new ThreadGroup(name); + } + } + + public static class SessionsDB + { + static final long serialVersionUID = 0x1 ; + + HashMap<String, SessionRecord> map ; + + public SessionsDB() { + map = new HashMap<String, SessionRecord>() ; + } + + synchronized public boolean put( String nick, SessionRecord session ) + { + if ( !map.containsKey(nick) ) { + session.createThreadGroup("SAM session "+nick); + map.put(nick, session) ; + return true ; + } + else + return false ; + } + synchronized public boolean del( String nick ) + { + SessionRecord rec = map.get(nick); + + if ( rec!=null ) { + map.remove(nick); + return true ; + } + else + return false ; + } + synchronized public SessionRecord get(String nick) + { + return map.get(nick); + } + synchronized public boolean containsKey( String nick ) + { + return map.containsKey(nick); + } + } + + public static SessionsDB sSessionsHash = new SessionsDB() ; + + public String getClientIP() + { + return this.socket.socket().getInetAddress().getHostAddress(); + } + + boolean stolenSocket = false ; + + public void stealSocket() + { + stolenSocket = true ; + this.stopHandling(); + } + + public void handle() { + String msg = null; + String domain = null; + String opcode = null; + boolean canContinue = false; + StringTokenizer tok; + Properties props; + + this.thread.setName("SAMv3Handler " + _id); + _log.debug("SAM handling started"); + + try { + InputStream in = getClientSocket().socket().getInputStream(); + + while (true) { + if (shouldStop()) { + _log.debug("Stop request found"); + break; + } + + msg = DataHelper.readLine(in).trim(); + if (msg == null) { + _log.debug("Connection closed by client"); + break; + } + + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("New message received: [" + msg + "]"); + } + + if(msg.equals("")) { + _log.debug("Ignoring newline"); + continue; + } + + tok = new StringTokenizer(msg, " "); + if (tok.countTokens() < 2) { + // This is not a correct message, for sure + _log.debug("Error in message format"); + break; + } + domain = tok.nextToken(); + opcode = tok.nextToken(); + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("Parsing (domain: \"" + domain + + "\"; opcode: \"" + opcode + "\")"); + } + props = SAMUtils.parseParams(tok); + + if (domain.equals("STREAM")) { + canContinue = execStreamMessage(opcode, props); + } else if (domain.equals("SESSION")) { + if (i2cpProps != null) + props.putAll(i2cpProps); // make sure we've got the i2cp settings + canContinue = execSessionMessage(opcode, props); + } else if (domain.equals("DEST")) { + canContinue = execDestMessage(opcode, props); + } else if (domain.equals("NAMING")) { + canContinue = execNamingMessage(opcode, props); + } else { + _log.debug("Unrecognized message domain: \"" + + domain + "\""); + break; + } + + if (!canContinue) { + break; + } + } + } catch (IOException e) { + _log.debug("Caught IOException (" + + e.getMessage() + ") for message [" + msg + "]", e); + } catch (Exception e) { + _log.error("Unexpected exception for message [" + msg + "]", e); + } finally { + _log.debug("Stopping handler"); + + if (!this.stolenSocket) + { + try { + closeClientSocket(); + } catch (IOException e) { + _log.error("Error closing socket: " + e.getMessage()); + } + } + + die(); + } + } + + protected void die() { + SessionRecord rec = null ; + + if (session!=null) { + session.close(); + rec = sSessionsHash.get(session.getNick()); + } + if (rec!=null) { + rec.getThreadGroup().interrupt() ; + while (rec.getThreadGroup().activeCount()>0) + try { + Thread.sleep(1000); + } catch ( InterruptedException e) {} + rec.getThreadGroup().destroy(); + sSessionsHash.del(session.getNick()); + } + } + + /* Parse and execute a SESSION message */ + @Override + protected boolean execSessionMessage(String opcode, Properties props) { + + String dest = "BUG!"; + String nick = null ; + boolean ok = false ; + + try{ + if (opcode.equals("CREATE")) { + if ((this.getRawSession()!= null) || (this.getDatagramSession() != null) + || (streamSession != null)) { + _log.debug("Trying to create a session, but one still exists"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n"); + } + if (props == null) { + _log.debug("No parameters specified in SESSION CREATE message"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n"); + } + + dest = props.getProperty("DESTINATION"); + if (dest == null) { + _log.debug("SESSION DESTINATION parameter not specified"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n"); + } + props.remove("DESTINATION"); + + + if (dest.equals("TRANSIENT")) { + _log.debug("TRANSIENT destination requested"); + ByteArrayOutputStream priv = new ByteArrayOutputStream(640); + SAMUtils.genRandomKey(priv, null); + + dest = Base64.encode(priv.toByteArray()); + } else { + _log.debug("Custom destination specified [" + dest + "]"); + } + + boolean good_key = false ; + try { + good_key = (new VerifiedDestination(dest)).verifyCert(true); + } catch (DataFormatException e) { + good_key = false ; + } + if (!good_key) + { + _log.debug("Bad destination key"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"bad destination key\"\n"); + } + + nick = props.getProperty("ID"); + if (nick == null) { + _log.debug("SESSION ID parameter not specified"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n"); + } + props.remove("ID"); + + + String style = props.getProperty("STYLE"); + if (style == null) { + _log.debug("SESSION STYLE parameter not specified"); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n"); + } + props.remove("STYLE"); + + + + // Record the session in the database sSessionsHash + Properties allProps = new Properties(); + allProps.putAll(i2cpProps); + allProps.putAll(props); + + if (! sSessionsHash.put( nick, new SessionRecord(dest, allProps, this) ) ) { + _log.debug("SESSION ID parameter already in use"); + String n = nick ; + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID "+n+" already in use\"\n"); + } + + // Create the session + + if (style.equals("RAW")) { + DatagramServer.getInstance(i2cpProps); + rawSession = newSAMRawSession(nick); + this.session = rawSession ; + } else if (style.equals("DATAGRAM")) { + DatagramServer.getInstance(i2cpProps); + datagramSession = newSAMDatagramSession(nick); + this.session = datagramSession ; + } else if (style.equals("STREAM")) { + streamSession = newSAMStreamSession(nick); + this.session = streamSession ; + } else { + _log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n"); + } + ok = true ; + return writeString("SESSION STATUS RESULT=OK DESTINATION=" + + dest + "\n"); + } else { + _log.debug("Unrecognized SESSION message opcode: \"" + + opcode + "\""); + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n"); + } + } catch (DataFormatException e) { + _log.debug("Invalid destination specified"); + return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + } catch (I2PSessionException e) { + _log.debug("I2P error when instantiating session", e); + return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + } catch (SAMException e) { + _log.error("Unexpected SAM error", e); + return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + } catch (IOException e) { + _log.error("Unexpected IOException", e); + return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n"); + } finally { + // unregister the session if it has not been created + if ( !ok && nick!=null ) { + sSessionsHash.del(nick) ; + session = null ; + } + } + } + + SAMv3StreamSession newSAMStreamSession(String login ) + throws IOException, DataFormatException, SAMException + { + return new SAMv3StreamSession( login ) ; + } + + SAMv3RawSession newSAMRawSession(String login ) + throws IOException, DataFormatException, SAMException, I2PSessionException + { + return new SAMv3RawSession( login ) ; + } + + SAMv3DatagramSession newSAMDatagramSession(String login ) + throws IOException, DataFormatException, SAMException, I2PSessionException + { + return new SAMv3DatagramSession( login ) ; + } + + /* Parse and execute a STREAM message */ + protected boolean execStreamMessage ( String opcode, Properties props ) + { + String nick = null ; + SessionRecord rec = null ; + + if ( session != null ) + { + _log.error ( "STREAM message received, but this session is a master session" ); + writeString("STREAM STATUS RESULT=I2P_ERROR MESSAGE=\"master session cannot be used for streams"); + return false; + } + + nick = props.getProperty("ID"); + if (nick == null) { + _log.debug("SESSION ID parameter not specified"); + writeString("STREAM STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n"); + return false ; + } + props.remove("ID"); + + rec = sSessionsHash.get(nick); + + if ( rec==null ) { + _log.debug("STREAM SESSION ID does not exist"); + writeString("STREAM STATUS RESULT=I2P_ERROR MESSAGE=\"STREAM SESSION ID does not exist\"\n"); + return false ; + } + + streamSession = rec.getHandler().streamSession ; + + if (streamSession==null) { + _log.debug("specified ID is not a stream session"); + writeString("STREAM STATUS RESULT=I2P_ERROR MESSAGE=\"specified ID is not a STREAM session\"\n"); + return false ; + } + + if ( opcode.equals ( "CONNECT" ) ) + { + return execStreamConnect ( props ); + } + else if ( opcode.equals ( "ACCEPT" ) ) + { + return execStreamAccept ( props ); + } + else if ( opcode.equals ( "FORWARD") ) + { + return execStreamForwardIncoming( props ); + } + else + { + _log.debug ( "Unrecognized RAW message opcode: \"" + + opcode + "\"" ); + writeString("STREAM STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized RAW message opcode: \"" + + opcode + "\"" ); + return false; + } + } + + protected boolean execStreamConnect( Properties props) { + if (props == null) { + _log.debug("No parameters specified in STREAM CONNECT message"); + return false; + } + boolean verbose = props.getProperty("SILENT","false").equals("false"); + + String dest = props.getProperty("DESTINATION"); + if (dest == null) { + _log.debug("Destination not specified in RAW SEND message"); + return false; + } + props.remove("DESTINATION"); + + try { + try { + streamSession.connect( this, dest, props ); + return true ; + } catch (DataFormatException e) { + _log.debug("Invalid destination in STREAM CONNECT message"); + if (verbose) notifyStreamAccept ( "INVALID_KEY" ); + } catch (ConnectException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + if (verbose) notifyStreamAccept ( "CONNECTION_REFUSED" ); + } catch (NoRouteToHostException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + if (verbose) notifyStreamAccept ( "CANT_REACH_PEER" ); + } catch (InterruptedIOException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + if (verbose) notifyStreamAccept ( "TIMEOUT" ); + } catch (I2PException e) { + _log.debug("STREAM CONNECT failed: " + e.getMessage()); + if (verbose) notifyStreamAccept ( "I2P_ERROR" ); + } + } catch (IOException e) { + } + return false ; + } + + protected boolean execStreamForwardIncoming( Properties props ) { + try { + try { + streamSession.startForwardingIncoming(props); + notifyStreamAccept("OK"); + return true ; + } catch (SAMException e) { + _log.debug("Forwarding STREAM connections failed: " + e.getMessage()); + notifyStreamAccept ( "FORWARDER_FAILED" ); + } + } catch (IOException e) { + } + return false ; + } + + protected boolean execStreamAccept( Properties props ) + { + boolean verbose = props.getProperty( "SILENT", "false").equals("false"); + try { + try { + streamSession.accept(this, verbose); + return true ; + } catch (InterruptedIOException e) { + _log.debug("STREAM ACCEPT failed: " + e.getMessage()); + if (verbose) notifyStreamAccept( "TIMEOUT" ); + } catch (I2PException e) { + _log.debug("STREAM ACCEPT failed: " + e.getMessage()); + if (verbose) notifyStreamAccept ( "I2P_ERROR" ); + } catch (SAMException e) { + _log.debug("STREAM ACCEPT failed: " + e.getMessage()); + if (verbose) notifyStreamAccept ( "ALREADY_ACCEPTING" ); + } + } catch (IOException e) { + } + return false ; + } + + + public void notifyStreamAccept(String status) throws IOException + { + if ( streamSession == null ) + { + _log.error ( "BUG! Received stream connection, but session is null!" ); + throw new NullPointerException ( "BUG! STREAM session is null!" ); + } + + if ( !writeString ( "STREAM STATUS RESULT=" + + status + + "\n" ) ) + { + throw new IOException ( "Error notifying connection to SAM client" ); + } + } + + public void notifyStreamOutgoingConnection(String result) throws IOException + { + if ( streamSession == null ) + { + _log.error ( "BUG! Received stream connection, but session is null!" ); + throw new NullPointerException ( "BUG! STREAM session is null!" ); + } + + if ( !writeString ( "STREAM STATUS RESULT=" + + result + + "\n" ) ) + { + throw new IOException ( "Error notifying connection to SAM client" ); + } + } + + public void notifyStreamIncomingConnection(Destination d) throws IOException { + if (streamSession == null) { + _log.error("BUG! Received stream connection, but session is null!"); + throw new NullPointerException("BUG! STREAM session is null!"); + } + + if (!writeString(d.toBase64() + "\n")) { + throw new IOException("Error notifying connection to SAM client"); + } + } + + public static void notifyStreamIncomingConnection(SocketChannel client, Destination d) throws IOException { + if (!writeString(d.toBase64() + "\n", client)) { + throw new IOException("Error notifying connection to SAM client"); + } + } + +} + diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java new file mode 100644 index 0000000000..1bf7d18a0b --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java @@ -0,0 +1,88 @@ +/** + * + */ +package net.i2p.sam; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Properties; + +import net.i2p.client.I2PSessionException; +import net.i2p.data.DataFormatException; +import net.i2p.util.Log; + +/** + * @author MKVore + * + */ +public class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SAMRawReceiver { + + String nick = null ; + SAMv3Handler handler = null ; + SAMv3Handler.DatagramServer server ; + private final static Log _log = new Log ( SAMv3DatagramSession.class ); + SocketAddress clientAddress = null ; + + public String getNick() { return nick; } + + /** + * @param nick nickname of the session + * @param server DatagramServer used for communication with the client + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + */ + public SAMv3RawSession(String nick) + throws IOException, DataFormatException, I2PSessionException { + + super(SAMv3Handler.sSessionsHash.get(nick).getDest(), + SAMv3Handler.sSessionsHash.get(nick).getProps(), + SAMv3Handler.sSessionsHash.get(nick).getHandler() + ); + this.nick = nick ; + this.recv = this ; + this.server = SAMv3Handler.DatagramServer.getInstance() ; + + SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + if ( rec==null ) throw new InterruptedIOException() ; + + this.handler = rec.getHandler(); + + Properties props = rec.getProps(); + + + String portStr = props.getProperty("PORT") ; + if ( portStr==null ) { + _log.debug("receiver port not specified. Current socket will be used."); + } + else { + int port = Integer.parseInt(portStr); + + String host = props.getProperty("HOST"); + if ( host==null ) { + _log.debug("no host specified. Take from the client socket"); + + host = rec.getHandler().getClientIP(); + } + + + this.clientAddress = new InetSocketAddress(host,port); + } + } + + public void receiveRawBytes(byte[] data) throws IOException { + if (this.clientAddress==null) { + this.handler.receiveRawBytes(data); + } else { + ByteBuffer msgBuf = ByteBuffer.allocate(data.length); + msgBuf.put(data); + msgBuf.flip(); + this.server.send(this.clientAddress, msgBuf); + } + } + + public void stopRawReceiving() {} +} diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java new file mode 100644 index 0000000000..b9ea710c65 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java @@ -0,0 +1,389 @@ +package net.i2p.sam; +/* + * free (adj.): unencumbered; not under the control of others + * Written by human in 2004 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.util.Properties; + +import net.i2p.I2PException; +import net.i2p.client.I2PClient; +import net.i2p.client.streaming.I2PServerSocket; +import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManagerFactory; +import net.i2p.client.streaming.I2PSocketOptions; +import net.i2p.data.Base64; +import net.i2p.data.DataFormatException; +import net.i2p.data.Destination; +import net.i2p.util.I2PAppThread; +import net.i2p.util.Log; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.ByteBuffer ; +import java.nio.channels.SocketChannel; + +/** + * SAMv3 STREAM session class. + * + * @author mkvore + */ + +public class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Session +{ + + private final static Log _log = new Log ( SAMv3StreamSession.class ); + + protected final int BUFFER_SIZE = 1024 ; + + protected Object socketServerLock = new Object(); + protected I2PServerSocket socketServer = null; + + protected String nick ; + + public String getNick() { + return nick ; + } + + /** + * Create a new SAM STREAM session. + * + * @param dest Base64-encoded destination (private key) + * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") + * @param props Properties to setup the I2P session + * @param recv Object that will receive incoming data + * @throws IOException + * @throws DataFormatException + * @throws SAMException + */ + public SAMv3StreamSession(String login) + throws IOException, DataFormatException, SAMException + { + initSAMStreamSession(login); + } + + public static SAMv3Handler.SessionsDB getDB() + { + return SAMv3Handler.sSessionsHash ; + } + + private void initSAMStreamSession(String login) + throws IOException, DataFormatException, SAMException{ + + SAMv3Handler.SessionRecord rec = getDB().get(login); + String dest = rec.getDest() ; + ByteArrayInputStream ba_dest = new ByteArrayInputStream(Base64.decode(dest)); + + this.recv = rec.getHandler(); + + _log.debug("SAM STREAM session instantiated"); + + Properties allprops = new Properties(); + allprops.putAll(System.getProperties()); + allprops.putAll(rec.getProps()); + + String i2cpHost = allprops.getProperty(I2PClient.PROP_TCP_HOST, "127.0.0.1"); + int i2cpPort ; + String port = allprops.getProperty(I2PClient.PROP_TCP_PORT, "7654"); + try { + i2cpPort = Integer.parseInt(port); + } catch (NumberFormatException nfe) { + throw new SAMException("Invalid I2CP port specified [" + port + "]"); + } + + _log.debug("Creating I2PSocketManager..."); + socketMgr = I2PSocketManagerFactory.createManager(ba_dest, + i2cpHost, + i2cpPort, + allprops); + if (socketMgr == null) { + throw new SAMException("Error creating I2PSocketManager towards "+i2cpHost+":"+i2cpPort); + } + + socketMgr.addDisconnectListener(new DisconnectListener()); + this.nick = login ; + } + + /** + * Connect the SAM STREAM session to the specified Destination + * + * @param id Unique id for the connection + * @param dest Base64-encoded Destination to connect to + * @param props Options to be used for connection + * + * @return true if successful + * @throws DataFormatException if the destination is not valid + * @throws ConnectException if the destination refuses connections + * @throws NoRouteToHostException if the destination can't be reached + * @throws InterruptedIOException if the connection timeouts + * @throws I2PException if there's another I2P-related error + * @throws IOException + */ + public void connect ( SAMv3Handler handler, String dest, Properties props ) throws I2PException, ConnectException, NoRouteToHostException, DataFormatException, InterruptedIOException, IOException { + + boolean verbose = (props.getProperty("SILENT", "false").equals("false")); + Destination d = new Destination(); + d = SAMUtils.getDest(dest); + + I2PSocketOptions opts = socketMgr.buildOptions(props); + if (props.getProperty(I2PSocketOptions.PROP_CONNECT_TIMEOUT) == null) + opts.setConnectTimeout(60 * 1000); + + _log.debug("Connecting new I2PSocket..."); + + // blocking connection (SAMv3) + + I2PSocket i2ps = socketMgr.connect(d, opts); + + SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + + if ( rec==null ) throw new InterruptedIOException() ; + + if (verbose) handler.notifyStreamOutgoingConnection("OK") ; + + handler.stealSocket() ; + + ReadableByteChannel fromClient = handler.getClientSocket(); + ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream()); + WritableByteChannel toClient = handler.getClientSocket(); + WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); + + (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P), "SAMPipeClientToI2P"))).start(); + (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient), "SAMPipeClientToI2P"))).start(); + + } + + /** + * Accept an incoming STREAM + * + * @param id Unique id for the connection + * @param dest Base64-encoded Destination to connect to + * @param props Options to be used for connection + * + * @return true if successful + * @throws DataFormatException if the destination is not valid + * @throws ConnectException if the destination refuses connections + * @throws NoRouteToHostException if the destination can't be reached + * @throws InterruptedIOException if the connection timeouts + * @throws I2PException if there's another I2P-related error + * @throws IOException + */ + public void accept(SAMv3Handler handler, boolean verbose) + throws I2PException, InterruptedIOException, IOException, SAMException { + + synchronized( this.socketServerLock ) + { + if (this.socketServer!=null) { + _log.debug("a socket server is already defined for this destination"); + throw new SAMException("a socket server is already defined for this destination"); + } + this.socketServer = this.socketMgr.getServerSocket(); + } + + I2PSocket i2ps; + i2ps = this.socketServer.accept(); + + synchronized( this.socketServerLock ) + { + this.socketServer = null ; + } + + SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + + if ( rec==null ) throw new InterruptedIOException() ; + + if (verbose) + handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ; + + handler.stealSocket() ; + ReadableByteChannel fromClient = handler.getClientSocket(); + ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream()); + WritableByteChannel toClient = handler.getClientSocket(); + WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); + + (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P), "SAMPipeClientToI2P"))).start(); + (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient), "SAMPipeClientToI2P"))).start(); + } + + + public void startForwardingIncoming( Properties props ) throws SAMException, InterruptedIOException + { + SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + boolean verbose = props.getProperty("SILENT", "false").equals("false"); + + if ( rec==null ) throw new InterruptedIOException() ; + + String portStr = props.getProperty("PORT") ; + if ( portStr==null ) { + _log.debug("receiver port not specified"); + throw new SAMException("receiver port not specified"); + } + int port = Integer.parseInt(portStr); + + String host = props.getProperty("HOST"); + if ( host==null ) { + _log.debug("no host specified. Take from the client socket"); + + host = rec.getHandler().getClientIP(); + } + + + synchronized( this.socketServerLock ) + { + if (this.socketServer!=null) { + _log.debug("a socket server is already defined for this destination"); + throw new SAMException("a socket server is already defined for this destination"); + } + this.socketServer = this.socketMgr.getServerSocket(); + } + + SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose); + (new Thread(rec.getThreadGroup(), new I2PAppThread(forwarder, "SAMStreamForwarder"))).start(); + + } + + public class SocketForwarder extends Thread + { + String host = null ; + int port = 0 ; + SAMv3StreamSession session; + boolean verbose; + + SocketForwarder(String host, int port, SAMv3StreamSession session, boolean verbose) { + this.host = host ; + this.port = port ; + this.session = session ; + this.verbose = verbose ; + } + + public void run() + { + while (session.socketServer!=null) { + + boolean available = false ; + I2PSocket i2ps = null ; + try { + available = session.socketServer.waitIncoming(-1); + } catch (ConnectException e) { + _log.debug("ConnectException"); + break ; + } catch (I2PException e) { + _log.debug("I2PServerSocket has been closed"); + break ; + } catch (InterruptedException e) { + _log.debug("InterruptedException"); + break ; + } + if ( !available ) continue ; + + java.net.InetSocketAddress addr = new java.net.InetSocketAddress(host,port); + + SocketChannel clientServerSock = null ; + try { + clientServerSock = SocketChannel.open(addr) ; + } + catch ( IOException e ) { + continue ; + } + + try { + i2ps = session.socketServer.accept(false); + } catch (Exception e) {} + + if (i2ps==null) { + try { + clientServerSock.close(); + } catch (IOException ee) {} + continue ; + } + try { + if (this.verbose) + SAMv3Handler.notifyStreamIncomingConnection( + clientServerSock, i2ps.getPeerDestination()); + ReadableByteChannel fromClient = clientServerSock ; + ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream()); + WritableByteChannel toClient = clientServerSock ; + WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); + new I2PAppThread(new Pipe(fromClient,toI2P), "SAMPipeClientToI2P").start(); + new I2PAppThread(new Pipe(fromI2P,toClient), "SAMPipeClientToI2P").start(); + + } catch (IOException e) { + try { + clientServerSock.close(); + } catch (IOException ee) {} + try { + i2ps.close(); + } catch (IOException ee) {} + continue ; + } + } + } + } + public class Pipe extends Thread + { + ReadableByteChannel in ; + WritableByteChannel out ; + ByteBuffer buf ; + + public Pipe(ReadableByteChannel in, WritableByteChannel out) + { + this.in = in ; + this.out = out ; + this.buf = ByteBuffer.allocate(BUFFER_SIZE) ; + } + + public void run() + { + try { + while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) { + buf.flip(); + out.write(buf); + buf.compact(); + } + } + catch (IOException e) + { + this.interrupt(); + } + try { + in.close(); + } + catch (IOException e) {} + try { + buf.flip(); + while (buf.hasRemaining()) + out.write(buf); + } + catch (IOException e) {} + try { + out.close(); + } + catch (IOException e) {} + } + } + + + + /** + * Close the stream session + */ + @Override + public void close() { + socketMgr.destroySocketManager(); + } + + public boolean sendBytes(String s, byte[] b) throws DataFormatException + { + throw new DataFormatException(null); + } + +} diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java index 7df1a23242..9df867aa55 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java @@ -12,17 +12,17 @@ import net.i2p.util.Log; * */ public class SAMEventHandler extends SAMClientEventListenerImpl { - private I2PAppContext _context; + //private I2PAppContext _context; private Log _log; private Boolean _helloOk; private Object _helloLock = new Object(); private Boolean _sessionCreateOk; private Object _sessionCreateLock = new Object(); private Object _namingReplyLock = new Object(); - private Map _namingReplies = new HashMap(); + private Map<String,String> _namingReplies = new HashMap<String,String>(); public SAMEventHandler(I2PAppContext ctx) { - _context = ctx; + //_context = ctx; _log = ctx.logManager().getLog(getClass()); } diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java index 4e9d1133b0..80db744a31 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -31,10 +31,10 @@ public class SAMStreamSend { private OutputStream _samOut; private InputStream _samIn; private SAMReader _reader; - private boolean _dead; + //private boolean _dead; private SAMEventHandler _eventHandler; /** Connection id (Integer) to peer (Flooder) */ - private Map _remotePeers; + private Map<Integer, Sender> _remotePeers; public static void main(String args[]) { if (args.length < 4) { @@ -42,7 +42,7 @@ public class SAMStreamSend { return; } I2PAppContext ctx = new I2PAppContext(); - String files[] = new String[args.length - 3]; + //String files[] = new String[args.length - 3]; SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]); sender.startup(); } @@ -50,14 +50,14 @@ public class SAMStreamSend { public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) { _context = ctx; _log = ctx.logManager().getLog(SAMStreamSend.class); - _dead = false; + //_dead = false; _samHost = samHost; _samPort = samPort; _destFile = destFile; _dataFile = dataFile; _conOptions = ""; _eventHandler = new SendEventHandler(_context); - _remotePeers = new HashMap(); + _remotePeers = new HashMap<Integer,Sender>(); } public void startup() { @@ -207,7 +207,6 @@ public class SAMStreamSend { _started = _context.clock().now(); _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0); byte data[] = new byte[1024]; - long value = 0; long lastSend = _context.clock().now(); while (!_closed) { try { diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index 8d29e37994..406150b367 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -31,10 +31,10 @@ public class SAMStreamSink { private OutputStream _samOut; private InputStream _samIn; private SAMReader _reader; - private boolean _dead; + //private boolean _dead; private SAMEventHandler _eventHandler; /** Connection id (Integer) to peer (Flooder) */ - private Map _remotePeers; + private Map<Integer, Sink> _remotePeers; public static void main(String args[]) { if (args.length < 4) { @@ -49,14 +49,14 @@ public class SAMStreamSink { public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) { _context = ctx; _log = ctx.logManager().getLog(SAMStreamSink.class); - _dead = false; + //_dead = false; _samHost = samHost; _samPort = samPort; _destFile = destFile; _sinkDir = sinkDir; _conOptions = ""; _eventHandler = new SinkEventHandler(_context); - _remotePeers = new HashMap(); + _remotePeers = new HashMap<Integer,Sink>(); } public void startup() { @@ -70,7 +70,8 @@ public class SAMStreamSink { String ourDest = handshake(); _log.debug("Handshake complete. we are " + ourDest); if (ourDest != null) { - boolean written = writeDest(ourDest); + //boolean written = + writeDest(ourDest); _log.debug("Dest written"); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 7d1d4827f8..35eca57e5f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -1,9 +1,11 @@ package net.i2p.client.streaming; +import java.net.ConnectException; import java.util.ArrayList; import java.util.List; import net.i2p.I2PAppContext; +import net.i2p.util.Clock; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -14,7 +16,7 @@ class ConnectionHandler { private I2PAppContext _context; private Log _log; private ConnectionManager _manager; - private List _synQueue; + private List<Packet> _synQueue; private boolean _active; private int _acceptTimeout; @@ -61,13 +63,45 @@ class ConnectionHandler { } } + public boolean waitSyn( long ms ) throws InterruptedException { + boolean incoming = false ; + boolean isTimed = (ms>=0); + + Clock clock = I2PAppContext.getGlobalContext().clock(); + long now = clock.now(); + long end = now + ms; + while (!incoming && (!isTimed || now<=end) ) { + synchronized (_synQueue) { + + for (Packet p : _synQueue) + { + if (p.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + incoming = true ; + break; + } + } + if (!incoming) { + if (!isTimed) { + _synQueue.wait(); + } else { + now = clock.now(); + if (now < end) { + _synQueue.wait(end-now); + } + } + } + } + } + return incoming ; + } + /** * Receive an incoming connection (built from a received SYN) * Non-SYN packets with a zero SendStreamID may also be queued here so * that they don't get thrown away while the SYN packet before it is queued. * - * @param timeoutMs max amount of time to wait for a connection (if less - * than 1ms, wait indefinitely) + * @param timeoutMs max amount of time to wait for a connection (if negative, + * wait indefinitely) * @return connection received, or null if there was a timeout or the * handler was shut down */ @@ -77,8 +111,6 @@ class ConnectionHandler { long expiration = timeoutMs + _context.clock().now(); while (true) { - if ( (timeoutMs > 0) && (expiration < _context.clock().now()) ) - return null; if (!_active) { // fail all the ones we had queued up synchronized (_synQueue) { @@ -97,7 +129,7 @@ class ConnectionHandler { if (_log.shouldLog(Log.DEBUG)) _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: " + _synQueue.size()); - if (timeoutMs <= 0) { + if (timeoutMs < 0) { try { _synQueue.wait(); } catch (InterruptedException ie) {} } else { long remaining = expiration - _context.clock().now(); @@ -129,6 +161,8 @@ class ConnectionHandler { } } // keep looping... + if ( (timeoutMs >= 0) && (expiration < _context.clock().now()) ) + return null; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index 83f7c8376a..ab9cb1c9d4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -1,7 +1,12 @@ package net.i2p.client.streaming; +import java.net.ConnectException; import java.net.SocketTimeoutException; + +import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.util.Clock; +import net.i2p.util.Log; /** * Bridge to allow accepting new connections @@ -45,4 +50,43 @@ public class I2PServerSocketFull implements I2PServerSocket { public I2PSocketManager getManager() { return _socketManager; } + + /** + * accept(true) has the same behaviour as accept(). + * accept(false) does not wait for a socket connecting. If a socket is + * available in the queue, it is accepted. Else, null is returned. + * + * @param true if the call should block until a socket is available + * + * @return a connected I2PSocket, or null + * + * @throws I2PException if there is a problem with reading a new socket + * from the data available (aka the I2PSession closed, etc) + * @throws SocketTimeoutException if the timeout has been reached + */ + + public I2PSocket accept(boolean blocking) throws I2PException, SocketTimeoutException { + long timeout = this.getSoTimeout(); + + try { + if (blocking) + { + this.setSoTimeout(-1); + } else { + this.setSoTimeout(0); + } + try { + return this.accept(); + } catch (SocketTimeoutException e) { + if (blocking) throw e; + else return null ; + } + } finally { + this.setSoTimeout(timeout); + } + } + + public boolean waitIncoming(long timeoutMs) throws InterruptedException { + return this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs); + } } -- GitLab