From 4cdd42f39176c6ecbb9ef5a2c7088d7dfdcb40eb Mon Sep 17 00:00:00 2001 From: aum <aum> Date: Tue, 13 Apr 2004 17:40:07 +0000 Subject: [PATCH] Fixed build.xml to detect os, and launch 'jythonc' or 'jythonc.bat' according to whether we're running on *nix or windoze. build.xml should now work on your platform, as long as you have jython installed and jython is on your execution path. Got SAM STREAMs working - test code added to i2psamclient.py as function demoSTREAM() --- apps/sam/code.leo | 669 +++++++++++++++++++++++++--- apps/sam/jython/build.xml | 20 +- apps/sam/jython/src/i2psam.py | 374 +++++++++++----- apps/sam/python/src/i2psamclient.py | 423 +++++++++++++++++- 4 files changed, 1265 insertions(+), 221 deletions(-) diff --git a/apps/sam/code.leo b/apps/sam/code.leo index 52cd3703f6..8869595259 100644 --- a/apps/sam/code.leo +++ b/apps/sam/code.leo @@ -1,8 +1,8 @@ <?xml version="1.0" encoding="UTF-8"?> <leo_file> -<leo_header file_format="2" tnodes="0" max_tnode_index="216" clone_windows="0"/> -<globals body_outline_ratio="0.37624999999999997"> - <global_window_position top="155" left="108" height="585" width="890"/> +<leo_header file_format="2" tnodes="0" max_tnode_index="217" clone_windows="0"/> +<globals body_outline_ratio="0.35262008733624456"> + <global_window_position top="70" left="219" height="649" width="978"/> <global_log_window_position top="0" left="0" height="0" width="0"/> </globals> <preferences> @@ -13,9 +13,9 @@ </find_panel_settings> <vnodes> <v t="davidmcnab.041004143447" a="E"><vh>I2P SAM Server and Client</vh> -<v t="davidmcnab.041004144338" tnodeList="davidmcnab.041004144338,davidmcnab.041004144338.1,davidmcnab.041004144338.2,davidmcnab.041004144338.4,davidmcnab.041004144338.5,davidmcnab.041004144338.6,davidmcnab.041004144338.8,davidmcnab.041004144338.9,davidmcnab.041004144338.10,davidmcnab.041004144338.11,davidmcnab.041004144338.12,davidmcnab.041004144338.13,davidmcnab.041004144338.14,davidmcnab.041004144338.15,davidmcnab.041004144338.17,davidmcnab.041004144338.18,davidmcnab.041004144338.19,davidmcnab.041004144338.20,davidmcnab.041004144338.21,davidmcnab.041004144338.22,davidmcnab.041004144338.23,davidmcnab.041004144338.24,davidmcnab.041004144338.26,davidmcnab.041004144338.27,davidmcnab.041004144338.29,davidmcnab.041004144338.30,davidmcnab.041004144338.31,davidmcnab.041004144338.32,davidmcnab.041004144338.33,davidmcnab.041004144338.34,davidmcnab.041004144338.35,davidmcnab.041004144338.36,davidmcnab.041004144338.37,davidmcnab.041004144338.38,davidmcnab.041004144338.39,davidmcnab.041004144338.40,davidmcnab.041004144338.41,davidmcnab.041004144338.42,davidmcnab.041004144338.43,davidmcnab.041004144338.44,davidmcnab.041004144338.45,davidmcnab.041004144338.46,davidmcnab.041004144338.47,davidmcnab.041004144338.49,davidmcnab.041004144338.50,davidmcnab.041004144338.51,davidmcnab.041004144338.52,davidmcnab.041004144338.53,davidmcnab.041004144338.54,davidmcnab.041004144338.55,davidmcnab.041004144338.56,davidmcnab.041004144338.57,davidmcnab.041004144338.58,davidmcnab.041004144338.59,davidmcnab.041004144338.60,davidmcnab.041004144338.62,davidmcnab.041004144338.63,davidmcnab.041004144338.64,davidmcnab.041004144338.65,davidmcnab.041004144338.66,davidmcnab.041004144338.67,davidmcnab.041004144338.68,davidmcnab.041004144338.69,davidmcnab.041004144338.70,davidmcnab.041004144338.71,davidmcnab.041004144338.72,davidmcnab.041004144338.73,davidmcnab.041004144338.74,davidmcnab.041004144338.75,davidmcnab.041004144338.76,davidmcnab.041004144338.77,davidmcnab.041004144338.78,davidmcnab.041004144338.79,davidmcnab.041004144338.80,davidmcnab.041004144338.81,davidmcnab.041004144338.82,davidmcnab.041004144338.83,davidmcnab.041004144338.84,davidmcnab.041004144338.85,davidmcnab.041004144338.86,davidmcnab.041004144338.87,davidmcnab.041004144338.88,davidmcnab.041004144338.89,davidmcnab.041004144338.90,davidmcnab.041004144338.92,davidmcnab.041004144338.93,davidmcnab.041004144338.94,davidmcnab.041004144338.95,davidmcnab.041004144338.96,davidmcnab.041004144338.97,davidmcnab.041004144338.98,davidmcnab.041004144338.99,davidmcnab.041004144338.100,davidmcnab.041004144338.101,davidmcnab.041004144338.102,davidmcnab.041004144338.103,davidmcnab.041004144338.105,davidmcnab.041004144338.106,davidmcnab.041004144338.107,davidmcnab.041004144338.108,davidmcnab.041004144338.109"><vh>@file jython/src/i2psam.py</vh> -<v t="davidmcnab.041004144338.1"><vh>imports</vh></v> -<v t="davidmcnab.041004144338.2"><vh>globals</vh></v> +<v t="davidmcnab.041004144338" a="E" tnodeList="davidmcnab.041004144338,davidmcnab.041004144338.1,davidmcnab.041004144338.2,davidmcnab.041004144338.4,davidmcnab.041004144338.5,davidmcnab.041004144338.6,davidmcnab.041004144338.8,davidmcnab.041004144338.9,davidmcnab.041004144338.10,davidmcnab.041004144338.11,davidmcnab.041004144338.12,davidmcnab.041004144338.13,davidmcnab.041004144338.14,davidmcnab.041004144338.15,davidmcnab.041004144338.17,davidmcnab.041004144338.18,davidmcnab.041004144338.19,davidmcnab.041004144338.20,davidmcnab.041004144338.21,davidmcnab.041004144338.22,davidmcnab.041004144338.23,davidmcnab.041004144338.24,davidmcnab.041004144338.26,davidmcnab.041004144338.27,davidmcnab.041004144338.29,davidmcnab.041004144338.30,davidmcnab.041004144338.31,davidmcnab.041004144338.32,davidmcnab.041004144338.33,davidmcnab.041004144338.34,davidmcnab.041004144338.35,davidmcnab.041004144338.36,davidmcnab.041004144338.37,davidmcnab.041004144338.38,davidmcnab.041004144338.39,davidmcnab.041004144338.40,davidmcnab.041004144338.41,davidmcnab.041004144338.42,davidmcnab.041004144338.43,davidmcnab.041004144338.44,davidmcnab.041004144338.45,davidmcnab.041004144338.46,davidmcnab.041004144338.47,davidmcnab.041004144338.49,davidmcnab.041004144338.50,davidmcnab.041004144338.51,davidmcnab.041004144338.52,davidmcnab.041004144338.53,davidmcnab.041004144338.54,davidmcnab.041004144338.55,davidmcnab.041004144338.56,davidmcnab.041004144338.57,davidmcnab.041004144338.58,davidmcnab.041004144338.59,davidmcnab.041004144338.60,davidmcnab.041004144338.62,davidmcnab.041004144338.63,davidmcnab.041004144338.64,davidmcnab.041004144338.65,davidmcnab.041004144338.66,davidmcnab.041004144338.67,davidmcnab.041004144338.68,davidmcnab.041004144338.69,davidmcnab.041004144338.70,davidmcnab.041004144338.71,davidmcnab.041004144338.72,davidmcnab.041004144338.73,davidmcnab.041004144338.74,davidmcnab.041004144338.75,davidmcnab.041004144338.76,davidmcnab.041004144338.77,davidmcnab.041004144338.78,davidmcnab.041004144338.79,davidmcnab.041004144338.80,davidmcnab.041004144338.81,davidmcnab.041004144338.82,davidmcnab.041004144338.83,davidmcnab.041004144338.84,davidmcnab.041304205426,davidmcnab.041004144338.85,davidmcnab.041004144338.86,davidmcnab.041004144338.87,davidmcnab.041004144338.88,davidmcnab.041004144338.89,davidmcnab.041004144338.90,davidmcnab.041004144338.92,davidmcnab.041004144338.93,davidmcnab.041004144338.94,davidmcnab.041004144338.95,davidmcnab.041004144338.96,davidmcnab.041004144338.97,davidmcnab.041004144338.98,davidmcnab.041004144338.99,davidmcnab.041004144338.100,davidmcnab.041004144338.101,davidmcnab.041004144338.105,davidmcnab.041004144338.106,davidmcnab.041004144338.107,davidmcnab.041004144338.108,davidmcnab.041004144338.102,davidmcnab.041004144338.103,davidmcnab.041004144338.109"><vh>@file jython/src/i2psam.py</vh> +<v t="davidmcnab.041004144338.1" a="M"><vh>imports</vh></v> +<v t="davidmcnab.041004144338.2" a="V"><vh>globals</vh></v> <v t="davidmcnab.041004144338.3" a="E"><vh>I2CP Interface Classes</vh> <v t="davidmcnab.041004144338.4"><vh>class JavaWrapper</vh></v> <v t="davidmcnab.041004144338.5" a="E"><vh>class I2PDestination</vh> @@ -110,6 +110,7 @@ <v t="davidmcnab.041004144338.82"><vh>on_DEST</vh></v> <v t="davidmcnab.041004144338.83"><vh>on_message</vh></v> <v t="davidmcnab.041004144338.84"><vh>threadSocketListener</vh></v> +<v t="davidmcnab.041304205426"><vh>threadSocketReceiver</vh></v> <v t="davidmcnab.041004144338.85"><vh>samParse</vh></v> <v t="davidmcnab.041004144338.86"><vh>samSend</vh></v> <v t="davidmcnab.041004144338.87"><vh>samCreateArgsList</vh></v> @@ -129,18 +130,18 @@ <v t="davidmcnab.041004144338.99"><vh>takeKey</vh></v> <v t="davidmcnab.041004144338.100"><vh>log</vh></v> <v t="davidmcnab.041004144338.101"><vh>logException</vh></v> -<v t="davidmcnab.041004144338.102"><vh>usage</vh></v> -<v t="davidmcnab.041004144338.103"><vh>main</vh></v> -</v> <v t="davidmcnab.041004144338.104" a="E"><vh>Tests</vh> <v t="davidmcnab.041004144338.105" tnodeList="davidmcnab.041004144338.105"><vh>testdests</vh></v> <v t="davidmcnab.041004144338.106"><vh>testsigs</vh></v> <v t="davidmcnab.041004144338.107"><vh>testsession</vh></v> <v t="davidmcnab.041004144338.108"><vh>testsocket</vh></v> </v> +<v t="davidmcnab.041004144338.102"><vh>usage</vh></v> +<v t="davidmcnab.041004144338.103"><vh>main</vh></v> +</v> <v t="davidmcnab.041004144338.109"><vh>MAINLINE</vh></v> </v> -<v t="davidmcnab.041004144551" a="EV" tnodeList="davidmcnab.041004144551,davidmcnab.041004144551.1,davidmcnab.041004144551.2,davidmcnab.041004144551.3,davidmcnab.041004144551.4,davidmcnab.041004144551.5,davidmcnab.041004144551.6,davidmcnab.041004144551.7,davidmcnab.041004144551.8,davidmcnab.041004144551.9,davidmcnab.041004144551.10,davidmcnab.041004144551.12,davidmcnab.041004144551.13,davidmcnab.041004144551.14,davidmcnab.041004144551.15,davidmcnab.041004144551.16,davidmcnab.041004144551.17,davidmcnab.041004144551.18,davidmcnab.041004144551.19,davidmcnab.041004144551.20,davidmcnab.041004144551.21,davidmcnab.041004144551.22,davidmcnab.041004144551.23,davidmcnab.041004144551.24,davidmcnab.041004144551.26,davidmcnab.041004144551.27,davidmcnab.041004144551.28,davidmcnab.041004144551.29,davidmcnab.041004144551.30,davidmcnab.041004144551.31,davidmcnab.041004144551.32,davidmcnab.041004144551.33,davidmcnab.041004144551.35,davidmcnab.041004144551.36,davidmcnab.041004144551.37,davidmcnab.041004144551.38,davidmcnab.041004144551.39,davidmcnab.041004144551.40,davidmcnab.041004144551.41,davidmcnab.041004144551.42,davidmcnab.041004144551.43,davidmcnab.041004144551.45,davidmcnab.041004144551.46,davidmcnab.041004144551.47,davidmcnab.041004144551.48,davidmcnab.041004144551.49,davidmcnab.041004144551.50,davidmcnab.041004144551.51,davidmcnab.041004144551.52"><vh>@file python/src/i2psamclient.py</vh> +<v t="davidmcnab.041004144551" a="E" tnodeList="davidmcnab.041004144551,davidmcnab.041004144551.1,davidmcnab.041004144551.2,davidmcnab.041004144551.3,davidmcnab.041004144551.4,davidmcnab.041004144551.5,davidmcnab.041004144551.6,davidmcnab.041004144551.7,davidmcnab.041004144551.8,davidmcnab.041004144551.9,davidmcnab.041004144551.10,davidmcnab.041004144551.12,davidmcnab.041004144551.13,davidmcnab.041004144551.14,davidmcnab.041004144551.15,davidmcnab.041004144551.16,davidmcnab.041004144551.17,davidmcnab.041004144551.18,davidmcnab.041004144551.19,davidmcnab.041004144551.20,davidmcnab.041204020513,davidmcnab.041204204235,davidmcnab.041204044735,davidmcnab.041204050339,davidmcnab.041004144551.21,davidmcnab.041004144551.22,davidmcnab.041004144551.23,davidmcnab.041004144551.24,davidmcnab.041004144551.26,davidmcnab.041004144551.27,davidmcnab.041004144551.28,davidmcnab.041004144551.29,davidmcnab.041004144551.30,davidmcnab.041004144551.31,davidmcnab.041004144551.32,davidmcnab.041004144551.33,davidmcnab.041204042212,davidmcnab.041004144551.35,davidmcnab.041004144551.36,davidmcnab.041004144551.37,davidmcnab.041004144551.38,davidmcnab.041204042212.1,davidmcnab.041204042212.2,davidmcnab.041204044735.1,davidmcnab.041204050339.1,davidmcnab.041304235615,davidmcnab.041204050339.2,davidmcnab.041204050511,davidmcnab.041204044135,davidmcnab.041004144551.39,davidmcnab.041004144551.40,davidmcnab.041004144551.41,davidmcnab.041004144551.42,davidmcnab.041004144551.43,davidmcnab.041004144551.45,davidmcnab.041004144551.46,davidmcnab.041004144551.47,davidmcnab.041004144551.48,davidmcnab.041004144551.49,davidmcnab.041004144551.50,davidmcnab.041204203651,davidmcnab.041004144551.51,davidmcnab.041004144551.52"><vh>@file python/src/i2psamclient.py</vh> <v t="davidmcnab.041004144551.1"><vh>imports</vh></v> <v t="davidmcnab.041004144551.2"><vh>globals</vh></v> <v t="davidmcnab.041004144551.3"><vh>exceptions</vh></v> @@ -161,6 +162,10 @@ <v t="davidmcnab.041004144551.18"><vh>samDatagramSend</vh></v> <v t="davidmcnab.041004144551.19"><vh>samDatagramCheck</vh></v> <v t="davidmcnab.041004144551.20"><vh>samDatagramReceive</vh></v> +<v t="davidmcnab.041204020513"><vh>samStreamConnect</vh></v> +<v t="davidmcnab.041204204235"><vh>samStreamAccept</vh></v> +<v t="davidmcnab.041204044735"><vh>samStreamSend</vh></v> +<v t="davidmcnab.041204050339"><vh>samStreamClose</vh></v> <v t="davidmcnab.041004144551.21"><vh>samNamingLookup</vh></v> <v t="davidmcnab.041004144551.22"><vh>samParse</vh></v> <v t="davidmcnab.041004144551.23"><vh>samSend</vh></v> @@ -177,12 +182,22 @@ <v t="davidmcnab.041004144551.33"><vh>on_DEST</vh></v> </v> <v t="davidmcnab.041004144551.34" a="E"><vh>Utility Methods</vh> +<v t="davidmcnab.041204042212"><vh>samAllocId</vh></v> <v t="davidmcnab.041004144551.35"><vh>_recvline</vh></v> <v t="davidmcnab.041004144551.36"><vh>_recvbytes</vh></v> <v t="davidmcnab.041004144551.37"><vh>_sendbytes</vh></v> <v t="davidmcnab.041004144551.38"><vh>_sendline</vh></v> </v> </v> +<v t="davidmcnab.041204042212.1" a="E"><vh>class I2PSAMStream</vh> +<v t="davidmcnab.041204042212.2"><vh>__init__</vh></v> +<v t="davidmcnab.041204044735.1"><vh>send</vh></v> +<v t="davidmcnab.041204050339.1"><vh>recv</vh></v> +<v t="davidmcnab.041304235615"><vh>readline</vh></v> +<v t="davidmcnab.041204050339.2"><vh>close</vh></v> +<v t="davidmcnab.041204050511"><vh>__del__</vh></v> +<v t="davidmcnab.041204044135"><vh>_notifyIncomingData</vh></v> +</v> <v t="davidmcnab.041004144551.39" a="E"><vh>class I2PRemoteSession</vh> <v t="davidmcnab.041004144551.40"><vh>__init__</vh></v> <v t="davidmcnab.041004144551.41"><vh>send</vh></v> @@ -196,6 +211,7 @@ <v t="davidmcnab.041004144551.48"><vh>demoRAW</vh></v> <v t="davidmcnab.041004144551.49"><vh>demoDATAGRAM</vh></v> <v t="davidmcnab.041004144551.50"><vh>demoSTREAM</vh></v> +<v t="davidmcnab.041204203651"><vh>demoSTREAM_thread</vh></v> <v t="davidmcnab.041004144551.51"><vh>demo</vh></v> </v> <v t="davidmcnab.041004144551.52"><vh>MAINLINE</vh></v> @@ -244,15 +260,13 @@ import java # i2p-specific imports import net.i2p import net.i2p.client # to shut up epydoc - -# shut up java with a few more imports +#import net.i2p.client.I2PClient +#import net.i2p.client.I2PClientFactory +#import net.i2p.client.I2PSessionListener +import net.i2p.client.naming import net.i2p.client.streaming import net.i2p.crypto import net.i2p.data -import net.i2p.client.I2PClient -import net.i2p.client.I2PClientFactory -import net.i2p.client.naming -#import net.i2p.client.I2PSessionListener # handy shorthand refs i2p = net.i2p @@ -285,7 +299,7 @@ i2cpPort = 7654 # logging settings # 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful -verbosity = 5 +verbosity = 2 # change to a filename to log there instead logfile = sys.stdout @@ -1017,23 +1031,28 @@ port = i2cpPort self.dest = dest if kw.has_key('sock') \ - and kw.has_key('dest') \ and kw.has_key('remdest') \ and kw.has_key('instream') \ and kw.has_key('outstream'): + # wrapping an accept()'ed connection + log(4, "accept()'ed a connection, wrapping...") + self.sock = kw['sock'] - self.dest = kw['dest'] + self.dest = dest self.remdest = kw['remdest'] self.instream = kw['instream'] self.outstream = kw['outstream'] else: + log(4, "creating new I2PSocket %s" % dest) + # process keywords self.host = kw.get('host', self.host) self.port = int(kw.get('port', self.port)) # we need a factory, don't we? self.sockmgrFact = i2p.client.streaming.I2PSocketManagerFactory() + </t> <t tx="davidmcnab.041004144338.52">def bind(self, dest=None): """ @@ -1047,7 +1066,8 @@ port = i2cpPort self.dest = dest elif not self.dest: # create new dest, client should interrogate it at some time - self.dest = Destination() + log(4, "bind: socket has no dest, creating one") + self.dest = I2PDestination() </t> <t tx="davidmcnab.041004144338.53">def listen(self, *args, **kw): """ @@ -1058,6 +1078,8 @@ port = i2cpPort raise I2PSocketError(".sockmgr already present - have you already called listen?") if not self.dest: raise I2PSocketError("socket is not bound to a destination") + + log(4, "listening on socket") # create the socket manager self._createSockmgr() @@ -1090,6 +1112,10 @@ port = i2cpPort <t tx="davidmcnab.041004144338.55">def connect(self, remdest): """ Connects to a remote destination + + This has one totally major difference from the normal socket + paradigm, and that is that you can have n outbound connections + to different dests. """ # sanity check if self.sockmgr: @@ -1097,6 +1123,7 @@ port = i2cpPort # create whole new dest if none was provided to constructor if self.dest is None: + log(4, "connect: creating whole new dest") self.dest = I2PDestination() # create the socket manager @@ -1107,13 +1134,22 @@ port = i2cpPort opts = net.i2p.client.streaming.I2PSocketOptions() try: - self.sock = self.sockmgr.connect(remdest._item, opts) + log(4, "trying to connect to %s" % remdest.toBase64()) + sock = self.sock = self.sockmgr.connect(remdest._item, opts) self.remdest = remdest except: logException(2, "apparent exception, continuing...") - self.instream = self.sock.getInputStream() - self.outstream = self.sock.getOutputStream() + + self.instream = sock.getInputStream() + self.outstream = sock.getOutputStream() + + sockobj = I2PSocket(dest=self.dest, + remdest=remdest, + sock=sock, + instream=self.instream, + outstream=self.outstream) self._connected = 1 + return sockobj </t> <t tx="davidmcnab.041004144338.56">def recv(self, nbytes): """ @@ -1150,22 +1186,26 @@ port = i2cpPort raise I2PSocketError("Socket is not connected") # and write it out - #print "send: writing '%s' to outstream..." % repr(buf) + log(4, "send: writing '%s' to outstream..." % repr(buf)) outstream = self.outstream for c in buf: outstream.write(ord(c)) # flush just in case - #print "send: flushing..." + log(4, "send: flushing...") self.outstream.flush() - #print "send: done" + log(4, "send: done") + </t> <t tx="davidmcnab.041004144338.58">def available(self): """ Returns the number of bytes available for recv() """ - return self.sock.available() + #print "available: sock is %s" % repr(self.sock) + + return self.instream.available() + </t> <t tx="davidmcnab.041004144338.59">def close(self): @@ -1192,6 +1232,9 @@ port = i2cpPort </t> <t tx="davidmcnab.041004144338.60">def _createSockmgr(self): + if getattr(self, 'sockmgr', None): + return + #options = {jI2PClient.PROP_TCP_HOST: self.host, # jI2PClient.PROP_TCP_PORT: self.port} options = {} @@ -1328,6 +1371,9 @@ version = version self.samSessionIsOpen = 0 self.samSessionStyle = '' + # localise the id allocator + self.samAllocId = self.server.samAllocId + # need a local sending lock self.sendLock = threading.Lock() @@ -1729,6 +1775,26 @@ version = version else: # STREAM # no need to create session object, because we're using streaming api + log(4, "Creating STREAM session") + + # what kind of stream? + direction = args.get('DIRECTION', 'BOTH') + if direction not in ['BOTH', 'RECEIVE', 'CREATE']: + self.samSend("SESSION", "STATUS", + RESULT="I2P_ERROR", + MESSAGE="Illegal_direction_keyword_%s" % direction.replace(" ","_"), + ) + return + + if direction == 'BOTH': + self.canConnect = 1 + self.canAccept = 1 + elif direction == 'RECEIVE': + self.canConnect = 0 + self.canAccept = 1 + elif direction == 'CREATE': + self.canConnect = 1 + self.canAccept = 0 # but we do need to mark it as being in use localsessions[destb64] = globalsessions[destb64] = None @@ -1736,8 +1802,9 @@ version = version # make a local socket sock = self.samSock = I2PSocket(dest) - # and we also need to fire up a socket listener - thread.start_new_thread(self.threadSocketListener, (sock, dest)) + # and we also need to fire up a socket listener, if not CREATE-only + if self.canAccept: + thread.start_new_thread(self.threadSocketListener, (sock, dest)) # finally, we can reply with the good news self.samSend("SESSION", "STATUS", @@ -1784,21 +1851,40 @@ version = version if subtopic == 'CONNECT': # who are we connecting to again? - remdest = I2PDestionation(b64=args['DESTINATION']) - id = args['ID'] + remdest = I2PDestination(base64=args['DESTINATION']) + id = int(args['ID']) try: - self.samSock.connect(remdest) + log(4, "Trying to connect to remote peer %s..." % args['DESTINATION']) + sock = self.samSock.connect(remdest) + log(4, "Connected to remote peer %s..." % args['DESTINATION']) + self.localstreams[id] = sock self.samSend("STREAM", "STATUS", RESULT='OK', ID=id, ) + thread.start_new_thread(self.threadSocketReceiver, (sock, id)) + except: + log(4, "Failed to connect to remote peer %s..." % args['DESTINATION']) self.samSend("STREAM", "STATUS", RESULT='I2P_ERROR', - MESSAGE='exception on connect', + MESSAGE='exception_on_connect', + ID=id, ) + elif subtopic == 'SEND': + # send to someone + id = int(args['ID']) + try: + sock = self.localstreams[id] + sock.send(args['DATA']) + except: + logException(4, "send failed") + + + + </t> <t tx="davidmcnab.041004144338.79">def on_DATAGRAM(self, topic, subtopic, args): r""" @@ -1953,15 +2039,24 @@ version = version """ destb64 = dest.toBase64() - log(4, "Listening for connections to %s..." % destb64[:40]) + log(4, "Listening for connections to %s..." % destb64) + + sock.bind() + sock.listen() + while 1: + log(4, "Awaiting next connection to %s..." % destb64) newsock = sock.accept() - + log(4, "Got connection to %s..." % destb64) + # need an id, negative id = - self.server.samAllocId() # register it in local and global streams self.localstreams[id] = self.globalstreams[id] = newsock + + # fire up the receiver thread + thread.start_new_thread(self.threadSocketReceiver, (newsock, id)) # who is connected to us? remdest = newsock.remdest @@ -1971,6 +2066,7 @@ version = version self.samSend("STREAM", "CONNECTED", DESTINATION=remdest_b64, ID=id) + </t> <t tx="davidmcnab.041004144338.85">def samParse(self, flds): """ @@ -1988,7 +2084,7 @@ version = version try: name, val = arg.split("=", 1) except: - logException(3, "failed to process %s" % repr(arg)) + logException(3, "failed to process %s in %s" % (repr(arg), repr(flds))) raise dargs[name] = val @@ -2224,18 +2320,20 @@ class I2PSocketError(Exception): print " (run with no commands to launch SAM server)" print " samserver - runs as a SAM server" print " test - run a suite of self-tests" + print " testsocket - run only the socket test" + print " testbidirsocket - run socket test in bidirectional mode" print sys.exit(0) - - - </t> <t tx="davidmcnab.041004144338.103">def main(): argv = sys.argv argc = len(argv) + # ------------------------------------------------- + # do the getopt command line parsing + try: opts, args = getopt.getopt(sys.argv[1:], "h?vV:H:P:", @@ -2247,14 +2345,9 @@ class I2PSocketError(Exception): traceback.print_exc(file=sys.stdout) usage("You entered an invalid option") - cmd = 'samserver' + #print "args=%s" % args - # we prolly should pass all these parms in constructor call, but - # what the heck! - #global verbosity, i2psamhost, i2psamport, i2cpHost, i2cpPort - serveropts = {} - for opt, val in opts: if opt in ['-h', '-?', '--help']: usage(1) @@ -2274,6 +2367,9 @@ class I2PSocketError(Exception): else: usage(0) + # -------------------------------------------------- + # now run in required mode, default is 'samserver' + if len(args) == 0: cmd = 'samserver' else: @@ -2287,14 +2383,25 @@ class I2PSocketError(Exception): elif cmd == 'test': - print "RUNNING I2P Jython TESTS" + print "RUNNING full I2PSAM Jython TEST SUITE" testsigs() testdests() testsession() testsocket() + elif cmd == 'testsocket': + + print "RUNNING SOCKET TEST" + testsocket(0) + + elif cmd == 'testbidirsocket': + print "RUNNING BIDIRECTIONAL SOCKET TEST" + testsocket(1) + else: + # spit at unrecognised option usage(0) + </t> <t tx="davidmcnab.041004144338.104"></t> <t tx="davidmcnab.041004144338.105">def testdests(): @@ -2441,7 +2548,7 @@ class I2PSocketError(Exception): print "session tests passed!" </t> -<t tx="davidmcnab.041004144338.108">def testsocket(): +<t tx="davidmcnab.041004144338.108">def testsocket(bidirectional=0): global d1, d2, s1, s2 @@ -2501,6 +2608,19 @@ class I2PSocketError(Exception): print "launching server thread..." thread.start_new_thread(servThread, (sServer,)) + if bidirectional: + # dummy thread which accepts connections TO client socket + def threadDummy(s): + print "dummy: listening" + s.listen() + print "dummy: accepting" + + sock = s.accept() + print "dummy: got connection" + + print "test - launching dummy client accept thread" + thread.start_new_thread(threadDummy, (sClient,)) + print "client: trying to connect" sClient.connect(dServer) @@ -2517,6 +2637,7 @@ class I2PSocketError(Exception): print "I2PSocket test apparently succeeded" + </t> <t tx="davidmcnab.041004144338.109">if __name__ == '__main__': main() @@ -2539,6 +2660,8 @@ Run this module without arguments to see a demo in action </t> <t tx="davidmcnab.041004144551.1">import sys, os, socket, thread, threading, Queue, traceback, StringIO, time +from pdb import set_trace + </t> <t tx="davidmcnab.041004144551.2"># ----------------------------------------- # server access settings @@ -2573,6 +2696,11 @@ class I2PCommandFail(Exception): A failure in an I2CP command """ pass + +class I2PStreamClosed(Exception): + """ + Stream is not open + """ </t> <t tx="davidmcnab.041004144551.4">class I2PSamClient: """ @@ -2628,10 +2756,20 @@ i2cpPort = None self.qSession = Queue.Queue() self.qDatagrams = Queue.Queue() self.qRawMessages = Queue.Queue() + self.namingReplies = {} self.namingCache = {} + + self.streams = {} # currently open streams, keyed by id + self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id + self.qNewStreams = Queue.Queue() # incoming connections + + self.samNextIdLock = threading.Lock() + self.samNextId = 1 + self.isRunning = 1 + log(4, "trying connection to SAM server...") try: self.sock.connect((self.host, self.port)) @@ -2825,6 +2963,8 @@ i2cpPort = None - dest - base64 private destination Keywords: + - direction - only used for STREAM sessions, can be RECEIVE, + CREATE or BOTH (default BOTH) - i2cphost - hostname for the SAM bridge to contact i2p router on - i2cpport - port for the SAM bridge to contact i2p router on @@ -2835,6 +2975,20 @@ i2cpPort = None kw1 = dict(kw) kw1['STYLE'] = self.samStyle = style kw1['DESTINATION'] = dest + if style == 'STREAM': + direction = kw.get('direction', 'BOTH') + kw1['DIRECTION'] = direction + if direction == 'BOTH': + self.canAccept = 1 + self.canConnect = 1 + elif direction == 'RECEIVE': + self.canAccept = 1 + self.canConnect = 0 + elif direction == 'CREATE': + self.canAccept = 0 + self.canConnect = 1 + else: + raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH") # stick in i2cp host/port if specified if kw.has_key('i2cphost'): @@ -2974,7 +3128,7 @@ i2cpPort = None try: name, val = arg.split("=", 1) except: - logException(3, "failed to process %s" % repr(arg)) + logException(3, "failed to process %s in %s" % (repr(arg), repr(flds))) raise dargs[name] = val @@ -2990,6 +3144,8 @@ i2cpPort = None + + </t> <t tx="davidmcnab.041004144551.23">def samSend(self, topic, subtopic, data=None, **kw): """ @@ -3083,7 +3239,90 @@ i2cpPort = None <t tx="davidmcnab.041004144551.29">def on_STREAM(self, topic, subtopic, args): """ Handles STREAM messages from server + + STREAM STATUS + RESULT=$result + ID=$id + [MESSAGE=...] + + STREAM CONNECTED + DESTINATION=$base64key + ID=$id + + STREAM RECEIVED + ID=$id + SIZE=$numBytes\n[$numBytes of data] + + STREAM CLOSED + RESULT=$result + ID=$id + [MESSAGE=...] """ + log(4, "got %s %s %s" % (topic, subtopic, args)) + + # which stream? + id = int(args['ID']) + + # result of prior connection attempt + if subtopic == 'STATUS': + # stick it on the queue that the caller is waiting on and let the + # caller interpret the result + self.streamConnectReplies[id].put(args) + return + + # notice of incoming connection + if subtopic == 'CONNECTED': + + # grab details + dest = args['DESTINATION'] + + # wrap it in a stream obj + conn = I2PSAMStream(self, id, dest) + self.streams[id] = conn + + # and put it there for anyone calling samStreamAccept() + self.qNewStreams.put(conn) + + # done + return + + # notice of received data + elif subtopic == 'RECEIVED': + # grab details + data = args['DATA'] + + # lookup the connection + conn = self.streams.get(id, None) + if not conn: + # conn not known, just ditch + log(2, "got data, but don't recall any conn with id %s" % id) + return + + # and post the received data + conn._notifyIncomingData(data) + + log(4, "wrote data to conn's inbound queue") + + # done + return + + elif subtopic == 'CLOSED': + # lookup the connection + conn = self.streams.get(id, None) + if not conn: + # conn not known, just ditch + return + + # mark conn as closed and forget it + conn._notifyIncomingData("") # special signal to close + conn.isOpen = 0 + del self.streams[id] + + # done + return + + + </t> <t tx="davidmcnab.041004144551.30">def on_DATAGRAM(self, topic, subtopic, args): """ @@ -3187,7 +3426,8 @@ i2cpPort = None self.dest = dest </t> <t tx="davidmcnab.041004144551.41">def send(self, peerdest, msg): - + """ + """ return self.client.send(self.dest, peerdest, msg) </t> <t tx="davidmcnab.041004144551.42">def receive(self): @@ -3360,35 +3600,59 @@ i2cpPort = None print "Starting SAM STREAM demo..." print - print "Instantiating 2 more client connections..." - c5 = I2PSamClient() + print "Instantiating client c6..." c6 = I2PSamClient() - print "Creating more dests via SAM" - pub5, priv5 = c5.samDestGenerate() + print "Creating dest for c6" pub6, priv6 = c6.samDestGenerate() - print "Creating SAM STREAM SESSION on connection c3..." - res = c5.samSessionCreate("STREAM", priv5) + print "Creating SAM STREAM SESSION on connection c6..." + res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE") if res != 'OK': - print "Failed to create STREAM session on connection c5: %s" % repr(res) + print "Failed to create STREAM session on connection c6: %s" % repr(res) return - print "STREAM Session on connection c5 created successfully" + print "STREAM Session on connection c6 created successfully" - print "Creating SAM STREAM SESSION on connection c6..." - res = c6.samSessionCreate("STREAM", priv6) + print "Launching acceptor thread..." + thread.start_new_thread(demoSTREAM_thread, (c6,)) + + #print "sleep a while and give the server a chance..." + #time.sleep(10) + + print "----------------------------------------" + + print "Instantiating client c5..." + c5 = I2PSamClient() + + print "Creating dest for c5" + pub5, priv5 = c5.samDestGenerate() + + print "Creating SAM STREAM SESSION on connection c5..." + res = c5.samSessionCreate("STREAM", priv5, direction="CREATE") if res != 'OK': - print "Failed to create STREAM session on connection c4: %s" % repr(res) + print "Failed to create STREAM session on connection c5: %s" % repr(res) return - print "STREAM Session on connection c4 created successfully" + print "STREAM Session on connection c5 created successfully" - msg = "Hi there, this is a datagram!" - print "sending from c5 to c6: %s" % repr(msg) - c5.samStreamSend(pub6, msg) + print "----------------------------------------" + + print "Making connection from c5 to c6..." + + #set_trace() + + try: + conn_c5 = c5.samStreamConnect(pub6) + except: + print "Stream Connection failed" + return + print "Stream connect succeeded" + + print "Receiving from c5..." + buf = conn_c5.readline() + print "Got %s" % repr(buf) - print "now try to receive from c6 (will block)..." - msg1 = c6.samStreamReceive() - print "Connection c6 got %s from %s..." % (repr(msg1), repr(remdest)) + #print "Try to accept connection on c6..." + #conn_c6 = c6.sam print print "--------------------------------------" @@ -3396,6 +3660,10 @@ i2cpPort = None print "--------------------------------------" print + + + + </t> <t tx="davidmcnab.041004144551.51">def demo(): """ @@ -3410,10 +3678,10 @@ i2cpPort = None print "-----------------------------------------" print - demoNAMING() - demoRAW() - demoDATAGRAM() - #demoSTREAM() + #demoNAMING() + #demoRAW() + #demoDATAGRAM() + demoSTREAM() print print "-----------------------------------------" @@ -3426,5 +3694,264 @@ i2cpPort = None demo() </t> +<t tx="davidmcnab.041204020513">def samStreamConnect(self, dest): + """ + Makes a STREAM connection to a remote dest + + STREAM STATUS + RESULT=$result + ID=$id + [MESSAGE=...] + """ + # need an ID + id = self.samAllocId() + + # create queue for connect reply + q = self.streamConnectReplies[id] = Queue.Queue() + + # send req + self.samSend("STREAM", "CONNECT", + ID=id, + DESTINATION=dest, + ) + + # await reply - comes back as a dict + resp = q.get() + + # ditch queue + del self.streamConnectReplies[id] + del q + + # check out response + result = resp['RESULT'] + if result == 'OK': + conn = I2PSAMStream(self, id, dest) + self.streams[id] = conn + return conn + else: + msg = resp.get('MESSAGE', '') + raise I2PCommandFail(result, msg, "STREAM CONNECT") + +</t> +<t tx="davidmcnab.041204042212">def samAllocId(self): + """ + Allocates a new unique id as required by SAM protocol + """ + self.samNextIdLock.acquire() + id = self.samNextId + self.samNextId += 1 + self.samNextIdLock.release() + return id +</t> +<t tx="davidmcnab.041204042212.1">class I2PSAMStream: + """ + Wrapper for a stream object + """ + @others +</t> +<t tx="davidmcnab.041204042212.2">def __init__(self, client, id, dest): + """ + """ + self.client = client + self.id = id + self.dest = dest + + self.qIncomingData = Queue.Queue() + + self.inbuf = '' + self.isOpen = 1 +</t> +<t tx="davidmcnab.041204044135">def _notifyIncomingData(self, data): + """ + Called by client receiver to notify incoming data + """ + log(4, "got %s" % repr(data)) + self.qIncomingData.put(data) +</t> +<t tx="davidmcnab.041204044735">def samStreamSend(self, conn, data): + """ + DO NOT CALL THIS DIRECTLY + + Invoked by an I2PSAMStream object to transfer data + Use the object's .send() method instead. + + conn is the I2PSAMStream + + STREAM SEND + ID=$id + SIZE=$numBytes\n[$numBytes of data] + """ + # dispatch + self.samSend("STREAM", "SEND", data, ID=conn.id) + + # useless, but mimics socket paradigm + return len(data) + +</t> +<t tx="davidmcnab.041204044735.1">def send(self, data): + """ + Sends data to a stream connection + """ + # barf if stream not open + if not self.isOpen: + raise I2PStreamClosed + + # can send + return self.client.samStreamSend(self, data) +</t> +<t tx="davidmcnab.041204050339">def samStreamClose(self, conn): + """ + DO NOT CALL DIRECTLY + + Invoked by I2PSAMStream to close stream + Use the object's .send() method instead. + + STREAM CLOSE + ID=$id + """ + self.samSend("STREAM", "CLOSE", ID=conn.id) + del self.streams[conn.id] + +</t> +<t tx="davidmcnab.041204050339.1">def recv(self, size): + """ + Retrieves n bytes from peer + """ + chunks = [] + + while self.isOpen and size > 0: + # try internal buffer first + if self.inbuf: + chunk = self.inbuf[:size] + chunklen = len(chunk) + self.inbuf = self.inbuf[chunklen:] + chunks.append(chunk) + size -= chunklen + else: + # replenish input buffer + log(4, "I2PSAMStream.recv: replenishing input buffer") + buf = self.qIncomingData.get() + if buf == '': + # connection closed by peer + self.isOpen = 0 + break + else: + # got more data + log(4, "I2PSAMStream: queue returned %s" % repr(buf)) + self.inbuf += buf + + # return whatever we've got, hopefully all + return "".join(chunks) + + +</t> +<t tx="davidmcnab.041204050339.2">def close(self): + """ + close this stream connection + """ + log(4, "closing stream") + self.client.samStreamClose(self) + log(4, "stream closed") + self.isOpen = 0 + + # and just to make sure... + self.qIncomingData.put("") # busts out of recv() loops + +</t> +<t tx="davidmcnab.041204050511">def __del__(self): + """ + Dropping last ref to this object closes stream + """ + self.close() +</t> +<t tx="davidmcnab.041204203651">def demoSTREAM_thread(sess): + + while 1: + sock = sess.samStreamAccept() + log(4, "got incoming connection") + + print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING" + + time.sleep(10) + + sock.send("Hi there, what do you want?\n") + + print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING" + time.sleep(300) + print "**ACCEPTOR CLOSING STREAM" + + sock.close() + +</t> +<t tx="davidmcnab.041204204235">def samStreamAccept(self): + """ + Waits for an incoming connection, returning a wrapped conn obj + """ + log(4, "waiting for connection") + conn = self.qNewStreams.get() + log(4, "got connection") + return conn +</t> +<t tx="davidmcnab.041304205426">def threadSocketReceiver(self, sock, id): + """ + One of these gets launched each time a new stream connection + is created. Due to the lack of callback mechanism within the + ministreaming API, we have to actively poll for and send back + received data + """ + while 1: + #avail = sock.available() + #if avail <= 0: + # print "threadSocketReceiver: waiting for data on %s (%s avail)..." % (id, avail) + # time.sleep(5) + # continue + #log(4, "reading a byte") + + try: + buf = sock.recv(1) + except: + logException(4, "Exception reading first byte") + + if buf == '': + log(4, "stream closed") + + # notify a close + self.samSend("STREAM", "CLOSED", + ID=id) + return + + # grab more if there's any available + navail = sock.available() + if navail > 0: + #log(4, "%d more bytes available, reading..." % navail) + rest = sock.recv(navail) + buf += rest + + # send if off + log(4, "got from peer: %s" % repr(buf)) + + self.samSend("STREAM", "RECEIVED", buf, + ID=id, + ) + + + + +</t> +<t tx="davidmcnab.041304235615">def readline(self): + """ + Read a line of text from stream, return the line without trailing newline + + This method really shouldn't exist in a class that's trying to look a bit + like a socket object, but what the hell! + """ + chars = [] + while 1: + char = self.recv(1) + if char in ['', '\n']: + break + chars.append(char) + return "".join(chars) +</t> </tnodes> </leo_file> diff --git a/apps/sam/jython/build.xml b/apps/sam/jython/build.xml index fed7912659..4a7656152f 100644 --- a/apps/sam/jython/build.xml +++ b/apps/sam/jython/build.xml @@ -11,17 +11,25 @@ </target> <target name="jar"> - <mkdir dir="./build" /> - <exec executable="jythonc"> + + <condition property="jythonext" value=".bat"> + <os family="windows" /> + </condition> + <condition property="jythonext" value=""> + <not> + <os family="windows" /> + </not> + </condition> + + <exec executable="jythonc${jythonext}" dir="."> <env key="CLASSPATH" path="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar"/> - <arg value="--jar"/> - <arg value="./build/i2psam.jar"/> - <arg value="./src/i2psam.py"/> + <arg value="--jar"/><arg path="./i2psam.jar"/> + <arg path="./src/i2psam.py"/> </exec> </target> <target name="clean"> - <delete dir="./build" /> + <delete file="i2psam.jar" /> <delete dir="./jpywork" /> </target> diff --git a/apps/sam/jython/src/i2psam.py b/apps/sam/jython/src/i2psam.py index b5ad210a93..fef0b482ee 100644 --- a/apps/sam/jython/src/i2psam.py +++ b/apps/sam/jython/src/i2psam.py @@ -79,7 +79,7 @@ i2cpPort = 7654 # logging settings # 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful -verbosity = 5 +verbosity = 2 # change to a filename to log there instead logfile = sys.stdout @@ -851,23 +851,28 @@ class I2PSocket: self.dest = dest if kw.has_key('sock') \ - and kw.has_key('dest') \ and kw.has_key('remdest') \ and kw.has_key('instream') \ and kw.has_key('outstream'): + # wrapping an accept()'ed connection + log(4, "accept()'ed a connection, wrapping...") + self.sock = kw['sock'] - self.dest = kw['dest'] + self.dest = dest self.remdest = kw['remdest'] self.instream = kw['instream'] self.outstream = kw['outstream'] else: + log(4, "creating new I2PSocket %s" % dest) + # process keywords self.host = kw.get('host', self.host) self.port = int(kw.get('port', self.port)) # we need a factory, don't we? self.sockmgrFact = i2p.client.streaming.I2PSocketManagerFactory() + #@-node:__init__ #@+node:bind def bind(self, dest=None): @@ -882,7 +887,8 @@ class I2PSocket: self.dest = dest elif not self.dest: # create new dest, client should interrogate it at some time - self.dest = Destination() + log(4, "bind: socket has no dest, creating one") + self.dest = I2PDestination() #@-node:bind #@+node:listen def listen(self, *args, **kw): @@ -894,6 +900,8 @@ class I2PSocket: raise I2PSocketError(".sockmgr already present - have you already called listen?") if not self.dest: raise I2PSocketError("socket is not bound to a destination") + + log(4, "listening on socket") # create the socket manager self._createSockmgr() @@ -936,11 +944,12 @@ class I2PSocket: to different dests. """ # sanity check - #if self.sockmgr: - # raise I2PSocketError(".sockmgr already present - have you already called listen/connect?") + if self.sockmgr: + raise I2PSocketError(".sockmgr already present - have you already called listen/connect?") # create whole new dest if none was provided to constructor if self.dest is None: + log(4, "connect: creating whole new dest") self.dest = I2PDestination() # create the socket manager @@ -951,6 +960,7 @@ class I2PSocket: opts = net.i2p.client.streaming.I2PSocketOptions() try: + log(4, "trying to connect to %s" % remdest.toBase64()) sock = self.sock = self.sockmgr.connect(remdest._item, opts) self.remdest = remdest except: @@ -962,8 +972,8 @@ class I2PSocket: sockobj = I2PSocket(dest=self.dest, remdest=remdest, sock=sock, - instream=instream, - outstream=outstream) + instream=self.instream, + outstream=self.outstream) self._connected = 1 return sockobj #@-node:connect @@ -1004,23 +1014,27 @@ class I2PSocket: raise I2PSocketError("Socket is not connected") # and write it out - #print "send: writing '%s' to outstream..." % repr(buf) + log(4, "send: writing '%s' to outstream..." % repr(buf)) outstream = self.outstream for c in buf: outstream.write(ord(c)) # flush just in case - #print "send: flushing..." + log(4, "send: flushing...") self.outstream.flush() - #print "send: done" + log(4, "send: done") + #@-node:send #@+node:available def available(self): """ Returns the number of bytes available for recv() """ - return self.sock.available() + #print "available: sock is %s" % repr(self.sock) + + return self.instream.available() + #@-node:available #@+node:close @@ -1049,6 +1063,9 @@ class I2PSocket: #@+node:_createSockmgr def _createSockmgr(self): + if getattr(self, 'sockmgr', None): + return + #options = {jI2PClient.PROP_TCP_HOST: self.host, # jI2PClient.PROP_TCP_PORT: self.port} options = {} @@ -1606,6 +1623,26 @@ class I2PSamClientHandler(StreamRequestHandler): else: # STREAM # no need to create session object, because we're using streaming api + log(4, "Creating STREAM session") + + # what kind of stream? + direction = args.get('DIRECTION', 'BOTH') + if direction not in ['BOTH', 'RECEIVE', 'CREATE']: + self.samSend("SESSION", "STATUS", + RESULT="I2P_ERROR", + MESSAGE="Illegal_direction_keyword_%s" % direction.replace(" ","_"), + ) + return + + if direction == 'BOTH': + self.canConnect = 1 + self.canAccept = 1 + elif direction == 'RECEIVE': + self.canConnect = 0 + self.canAccept = 1 + elif direction == 'CREATE': + self.canConnect = 1 + self.canAccept = 0 # but we do need to mark it as being in use localsessions[destb64] = globalsessions[destb64] = None @@ -1613,8 +1650,9 @@ class I2PSamClientHandler(StreamRequestHandler): # make a local socket sock = self.samSock = I2PSocket(dest) - # and we also need to fire up a socket listener - thread.start_new_thread(self.threadSocketListener, (sock, dest)) + # and we also need to fire up a socket listener, if not CREATE-only + if self.canAccept: + thread.start_new_thread(self.threadSocketListener, (sock, dest)) # finally, we can reply with the good news self.samSend("SESSION", "STATUS", @@ -1663,22 +1701,39 @@ class I2PSamClientHandler(StreamRequestHandler): if subtopic == 'CONNECT': # who are we connecting to again? - remdest = I2PDestionation(b64=args['DESTINATION']) - id = args['ID'] + remdest = I2PDestination(base64=args['DESTINATION']) + id = int(args['ID']) try: + log(4, "Trying to connect to remote peer %s..." % args['DESTINATION']) sock = self.samSock.connect(remdest) + log(4, "Connected to remote peer %s..." % args['DESTINATION']) self.localstreams[id] = sock self.samSend("STREAM", "STATUS", RESULT='OK', ID=id, ) + thread.start_new_thread(self.threadSocketReceiver, (sock, id)) + except: + log(4, "Failed to connect to remote peer %s..." % args['DESTINATION']) self.samSend("STREAM", "STATUS", RESULT='I2P_ERROR', MESSAGE='exception_on_connect', + ID=id, ) + elif subtopic == 'SEND': + # send to someone + id = int(args['ID']) + try: + sock = self.localstreams[id] + sock.send(args['DATA']) + except: + logException(4, "send failed") + + + #@-node:on_STREAM #@+node:on_DATAGRAM @@ -1840,18 +1895,24 @@ class I2PSamClientHandler(StreamRequestHandler): """ destb64 = dest.toBase64() - log(4, "Listening for connections to %s..." % destb64[:40]) + log(4, "Listening for connections to %s..." % destb64) + sock.bind() sock.listen() while 1: + log(4, "Awaiting next connection to %s..." % destb64) newsock = sock.accept() - + log(4, "Got connection to %s..." % destb64) + # need an id, negative id = - self.server.samAllocId() # register it in local and global streams self.localstreams[id] = self.globalstreams[id] = newsock + + # fire up the receiver thread + thread.start_new_thread(self.threadSocketReceiver, (newsock, id)) # who is connected to us? remdest = newsock.remdest @@ -1861,7 +1922,55 @@ class I2PSamClientHandler(StreamRequestHandler): self.samSend("STREAM", "CONNECTED", DESTINATION=remdest_b64, ID=id) + #@-node:threadSocketListener + #@+node:threadSocketReceiver + def threadSocketReceiver(self, sock, id): + """ + One of these gets launched each time a new stream connection + is created. Due to the lack of callback mechanism within the + ministreaming API, we have to actively poll for and send back + received data + """ + while 1: + #avail = sock.available() + #if avail <= 0: + # print "threadSocketReceiver: waiting for data on %s (%s avail)..." % (id, avail) + # time.sleep(5) + # continue + #log(4, "reading a byte") + + try: + buf = sock.recv(1) + except: + logException(4, "Exception reading first byte") + + if buf == '': + log(4, "stream closed") + + # notify a close + self.samSend("STREAM", "CLOSED", + ID=id) + return + + # grab more if there's any available + navail = sock.available() + if navail > 0: + #log(4, "%d more bytes available, reading..." % navail) + rest = sock.recv(navail) + buf += rest + + # send if off + log(4, "got from peer: %s" % repr(buf)) + + self.samSend("STREAM", "RECEIVED", buf, + ID=id, + ) + + + + + #@-node:threadSocketReceiver #@+node:samParse def samParse(self, flds): """ @@ -1879,7 +1988,7 @@ class I2PSamClientHandler(StreamRequestHandler): try: name, val = arg.split("=", 1) except: - logException(3, "failed to process %s" % repr(arg)) + logException(3, "failed to process %s in %s" % (repr(arg), repr(flds))) raise dargs[name] = val @@ -2108,106 +2217,6 @@ def logException(level, msg=''): traceback.print_exc(file=s) log(level, "%s\n%s" % (s.getvalue(), msg), 1) #@-node:logException -#@+node:usage -def usage(detailed=0): - - print "Usage: %s <options> [<command>]" % sys.argv[0] - if not detailed: - print "Run with '-h' to get detailed help" - sys.exit(0) - - print "I2PSAM is a bridge that allows I2P client programs to access the" - print "I2P network by talking over a plaintext socket connection." - print "References:" - print " - http://www.freenet.org.nz/i2p - source, doco, downloadables" - print " - http://drupal.i2p.net/node/view/144 - I2P SAM specification" - print - print "Options:" - print " -h, -?, --help - display this help" - print " -v, --version - print program version" - print " -V, --verbosity=n - set verbosity to n, default 2, 1==quiet, 4==noisy" - print " -H, --listenhost=host - specify host to listen on for client connections" - print " -P, --listenport=port - port to listen on for client connections" - print " --i2cphost=host - hostname of I2P router's I2CP interface" - print " --i2cpport=port - port of I2P router's I2CP interface" - print - print "Commands:" - print " (run with no commands to launch SAM server)" - print " samserver - runs as a SAM server" - print " test - run a suite of self-tests" - print - - sys.exit(0) - - - -#@-node:usage -#@+node:main -def main(): - - argv = sys.argv - argc = len(argv) - - try: - opts, args = getopt.getopt(sys.argv[1:], - "h?vV:H:P:", - ['help', 'version', 'verbosity=', - 'listenhost=', 'listenport=', - 'i2cphost=', 'i2cpport=', - ]) - except: - traceback.print_exc(file=sys.stdout) - usage("You entered an invalid option") - - cmd = 'samserver' - - # we prolly should pass all these parms in constructor call, but - # what the heck! - #global verbosity, i2psamhost, i2psamport, i2cpHost, i2cpPort - - serveropts = {} - - for opt, val in opts: - if opt in ['-h', '-?', '--help']: - usage(1) - elif opt in ['-v', '--version']: - print "I2P SAM version %s" % version - sys.exit(0) - elif opt in ['-V', '--verbosity']: - serveropts['verbosity'] = int(val) - elif opt in ['-H', '--listenhost']: - serveropts['host'] = val - elif opt in ['-P', '--listenport']: - serveropts['port'] = int(val) - elif opt in ['--i2cphost']: - serveropts['i2cphost'] = val - elif opt in ['--i2cpport']: - serveropts['i2cpport'] = int(val) - else: - usage(0) - - if len(args) == 0: - cmd = 'samserver' - else: - cmd = args[0] - - if cmd == 'samserver': - - log(2, "Running I2P SAM Server...") - server = I2PSamServer(**serveropts) - server.run() - - elif cmd == 'test': - - print "RUNNING I2P Jython TESTS" - testsigs() - testdests() - testsession() - testsocket() - - else: - usage(0) -#@-node:main #@+node:testdests def testdests(): """ @@ -2356,7 +2365,7 @@ def testsession(): print "session tests passed!" #@-node:testsession #@+node:testsocket -def testsocket(): +def testsocket(bidirectional=0): global d1, d2, s1, s2 @@ -2416,6 +2425,19 @@ def testsocket(): print "launching server thread..." thread.start_new_thread(servThread, (sServer,)) + if bidirectional: + # dummy thread which accepts connections TO client socket + def threadDummy(s): + print "dummy: listening" + s.listen() + print "dummy: accepting" + + sock = s.accept() + print "dummy: got connection" + + print "test - launching dummy client accept thread" + thread.start_new_thread(threadDummy, (sClient,)) + print "client: trying to connect" sClient.connect(dServer) @@ -2432,7 +2454,119 @@ def testsocket(): print "I2PSocket test apparently succeeded" + #@-node:testsocket +#@+node:usage +def usage(detailed=0): + + print "Usage: %s <options> [<command>]" % sys.argv[0] + if not detailed: + print "Run with '-h' to get detailed help" + sys.exit(0) + + print "I2PSAM is a bridge that allows I2P client programs to access the" + print "I2P network by talking over a plaintext socket connection." + print "References:" + print " - http://www.freenet.org.nz/i2p - source, doco, downloadables" + print " - http://drupal.i2p.net/node/view/144 - I2P SAM specification" + print + print "Options:" + print " -h, -?, --help - display this help" + print " -v, --version - print program version" + print " -V, --verbosity=n - set verbosity to n, default 2, 1==quiet, 4==noisy" + print " -H, --listenhost=host - specify host to listen on for client connections" + print " -P, --listenport=port - port to listen on for client connections" + print " --i2cphost=host - hostname of I2P router's I2CP interface" + print " --i2cpport=port - port of I2P router's I2CP interface" + print + print "Commands:" + print " (run with no commands to launch SAM server)" + print " samserver - runs as a SAM server" + print " test - run a suite of self-tests" + print " testsocket - run only the socket test" + print " testbidirsocket - run socket test in bidirectional mode" + print + + sys.exit(0) +#@-node:usage +#@+node:main +def main(): + + argv = sys.argv + argc = len(argv) + + # ------------------------------------------------- + # do the getopt command line parsing + + try: + opts, args = getopt.getopt(sys.argv[1:], + "h?vV:H:P:", + ['help', 'version', 'verbosity=', + 'listenhost=', 'listenport=', + 'i2cphost=', 'i2cpport=', + ]) + except: + traceback.print_exc(file=sys.stdout) + usage("You entered an invalid option") + + #print "args=%s" % args + + serveropts = {} + for opt, val in opts: + if opt in ['-h', '-?', '--help']: + usage(1) + elif opt in ['-v', '--version']: + print "I2P SAM version %s" % version + sys.exit(0) + elif opt in ['-V', '--verbosity']: + serveropts['verbosity'] = int(val) + elif opt in ['-H', '--listenhost']: + serveropts['host'] = val + elif opt in ['-P', '--listenport']: + serveropts['port'] = int(val) + elif opt in ['--i2cphost']: + serveropts['i2cphost'] = val + elif opt in ['--i2cpport']: + serveropts['i2cpport'] = int(val) + else: + usage(0) + + # -------------------------------------------------- + # now run in required mode, default is 'samserver' + + if len(args) == 0: + cmd = 'samserver' + else: + cmd = args[0] + + if cmd == 'samserver': + + log(2, "Running I2P SAM Server...") + server = I2PSamServer(**serveropts) + server.run() + + elif cmd == 'test': + + print "RUNNING full I2PSAM Jython TEST SUITE" + testsigs() + testdests() + testsession() + testsocket() + + elif cmd == 'testsocket': + + print "RUNNING SOCKET TEST" + testsocket(0) + + elif cmd == 'testbidirsocket': + print "RUNNING BIDIRECTIONAL SOCKET TEST" + testsocket(1) + + else: + # spit at unrecognised option + usage(0) + +#@-node:main #@+node:MAINLINE if __name__ == '__main__': main() diff --git a/apps/sam/python/src/i2psamclient.py b/apps/sam/python/src/i2psamclient.py index 03a2c8e6cd..2070bfd9ab 100644 --- a/apps/sam/python/src/i2psamclient.py +++ b/apps/sam/python/src/i2psamclient.py @@ -17,6 +17,8 @@ Run this module without arguments to see a demo in action #@+node:imports import sys, os, socket, thread, threading, Queue, traceback, StringIO, time +from pdb import set_trace + #@-node:imports #@+node:globals # ----------------------------------------- @@ -53,6 +55,11 @@ class I2PCommandFail(Exception): A failure in an I2CP command """ pass + +class I2PStreamClosed(Exception): + """ + Stream is not open + """ #@-node:exceptions #@+node:class I2PSamClient class I2PSamClient: @@ -110,10 +117,20 @@ class I2PSamClient: self.qSession = Queue.Queue() self.qDatagrams = Queue.Queue() self.qRawMessages = Queue.Queue() + self.namingReplies = {} self.namingCache = {} + + self.streams = {} # currently open streams, keyed by id + self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id + self.qNewStreams = Queue.Queue() # incoming connections + + self.samNextIdLock = threading.Lock() + self.samNextId = 1 + self.isRunning = 1 + log(4, "trying connection to SAM server...") try: self.sock.connect((self.host, self.port)) @@ -312,6 +329,8 @@ class I2PSamClient: - dest - base64 private destination Keywords: + - direction - only used for STREAM sessions, can be RECEIVE, + CREATE or BOTH (default BOTH) - i2cphost - hostname for the SAM bridge to contact i2p router on - i2cpport - port for the SAM bridge to contact i2p router on @@ -322,6 +341,20 @@ class I2PSamClient: kw1 = dict(kw) kw1['STYLE'] = self.samStyle = style kw1['DESTINATION'] = dest + if style == 'STREAM': + direction = kw.get('direction', 'BOTH') + kw1['DIRECTION'] = direction + if direction == 'BOTH': + self.canAccept = 1 + self.canConnect = 1 + elif direction == 'RECEIVE': + self.canAccept = 1 + self.canConnect = 0 + elif direction == 'CREATE': + self.canAccept = 0 + self.canConnect = 1 + else: + raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH") # stick in i2cp host/port if specified if kw.has_key('i2cphost'): @@ -423,6 +456,92 @@ class I2PSamClient: return None return self.qDatagrams.get() #@-node:samDatagramReceive + #@+node:samStreamConnect + def samStreamConnect(self, dest): + """ + Makes a STREAM connection to a remote dest + + STREAM STATUS + RESULT=$result + ID=$id + [MESSAGE=...] + """ + # need an ID + id = self.samAllocId() + + # create queue for connect reply + q = self.streamConnectReplies[id] = Queue.Queue() + + # send req + self.samSend("STREAM", "CONNECT", + ID=id, + DESTINATION=dest, + ) + + # await reply - comes back as a dict + resp = q.get() + + # ditch queue + del self.streamConnectReplies[id] + del q + + # check out response + result = resp['RESULT'] + if result == 'OK': + conn = I2PSAMStream(self, id, dest) + self.streams[id] = conn + return conn + else: + msg = resp.get('MESSAGE', '') + raise I2PCommandFail(result, msg, "STREAM CONNECT") + + #@-node:samStreamConnect + #@+node:samStreamAccept + def samStreamAccept(self): + """ + Waits for an incoming connection, returning a wrapped conn obj + """ + log(4, "waiting for connection") + conn = self.qNewStreams.get() + log(4, "got connection") + return conn + #@-node:samStreamAccept + #@+node:samStreamSend + def samStreamSend(self, conn, data): + """ + DO NOT CALL THIS DIRECTLY + + Invoked by an I2PSAMStream object to transfer data + Use the object's .send() method instead. + + conn is the I2PSAMStream + + STREAM SEND + ID=$id + SIZE=$numBytes\n[$numBytes of data] + """ + # dispatch + self.samSend("STREAM", "SEND", data, ID=conn.id) + + # useless, but mimics socket paradigm + return len(data) + + #@-node:samStreamSend + #@+node:samStreamClose + def samStreamClose(self, conn): + """ + DO NOT CALL DIRECTLY + + Invoked by I2PSAMStream to close stream + Use the object's .send() method instead. + + STREAM CLOSE + ID=$id + """ + self.samSend("STREAM", "CLOSE", ID=conn.id) + del self.streams[conn.id] + + #@-node:samStreamClose #@+node:samNamingLookup def samNamingLookup(self, host): """ @@ -472,7 +591,7 @@ class I2PSamClient: try: name, val = arg.split("=", 1) except: - logException(3, "failed to process %s" % repr(arg)) + logException(3, "failed to process %s in %s" % (repr(arg), repr(flds))) raise dargs[name] = val @@ -488,6 +607,8 @@ class I2PSamClient: + + #@-node:samParse #@+node:samSend def samSend(self, topic, subtopic, data=None, **kw): @@ -586,7 +707,90 @@ class I2PSamClient: def on_STREAM(self, topic, subtopic, args): """ Handles STREAM messages from server + + STREAM STATUS + RESULT=$result + ID=$id + [MESSAGE=...] + + STREAM CONNECTED + DESTINATION=$base64key + ID=$id + + STREAM RECEIVED + ID=$id + SIZE=$numBytes\n[$numBytes of data] + + STREAM CLOSED + RESULT=$result + ID=$id + [MESSAGE=...] """ + log(4, "got %s %s %s" % (topic, subtopic, args)) + + # which stream? + id = int(args['ID']) + + # result of prior connection attempt + if subtopic == 'STATUS': + # stick it on the queue that the caller is waiting on and let the + # caller interpret the result + self.streamConnectReplies[id].put(args) + return + + # notice of incoming connection + if subtopic == 'CONNECTED': + + # grab details + dest = args['DESTINATION'] + + # wrap it in a stream obj + conn = I2PSAMStream(self, id, dest) + self.streams[id] = conn + + # and put it there for anyone calling samStreamAccept() + self.qNewStreams.put(conn) + + # done + return + + # notice of received data + elif subtopic == 'RECEIVED': + # grab details + data = args['DATA'] + + # lookup the connection + conn = self.streams.get(id, None) + if not conn: + # conn not known, just ditch + log(2, "got data, but don't recall any conn with id %s" % id) + return + + # and post the received data + conn._notifyIncomingData(data) + + log(4, "wrote data to conn's inbound queue") + + # done + return + + elif subtopic == 'CLOSED': + # lookup the connection + conn = self.streams.get(id, None) + if not conn: + # conn not known, just ditch + return + + # mark conn as closed and forget it + conn._notifyIncomingData("") # special signal to close + conn.isOpen = 0 + del self.streams[id] + + # done + return + + + #@-node:on_STREAM #@+node:on_DATAGRAM def on_DATAGRAM(self, topic, subtopic, args): @@ -627,6 +831,17 @@ class I2PSamClient: res = pubkey, privkey self.qNewDests.put(res) #@-node:on_DEST + #@+node:samAllocId + def samAllocId(self): + """ + Allocates a new unique id as required by SAM protocol + """ + self.samNextIdLock.acquire() + id = self.samNextId + self.samNextId += 1 + self.samNextIdLock.release() + return id + #@-node:samAllocId #@+node:_recvline def _recvline(self): """ @@ -680,6 +895,117 @@ class I2PSamClient: #@-node:_sendline #@-others #@-node:class I2PSamClient +#@+node:class I2PSAMStream +class I2PSAMStream: + """ + Wrapper for a stream object + """ + #@ @+others + #@+node:__init__ + def __init__(self, client, id, dest): + """ + """ + self.client = client + self.id = id + self.dest = dest + + self.qIncomingData = Queue.Queue() + + self.inbuf = '' + self.isOpen = 1 + #@-node:__init__ + #@+node:send + def send(self, data): + """ + Sends data to a stream connection + """ + # barf if stream not open + if not self.isOpen: + raise I2PStreamClosed + + # can send + return self.client.samStreamSend(self, data) + #@-node:send + #@+node:recv + def recv(self, size): + """ + Retrieves n bytes from peer + """ + chunks = [] + + while self.isOpen and size > 0: + # try internal buffer first + if self.inbuf: + chunk = self.inbuf[:size] + chunklen = len(chunk) + self.inbuf = self.inbuf[chunklen:] + chunks.append(chunk) + size -= chunklen + else: + # replenish input buffer + log(4, "I2PSAMStream.recv: replenishing input buffer") + buf = self.qIncomingData.get() + if buf == '': + # connection closed by peer + self.isOpen = 0 + break + else: + # got more data + log(4, "I2PSAMStream: queue returned %s" % repr(buf)) + self.inbuf += buf + + # return whatever we've got, hopefully all + return "".join(chunks) + + + #@-node:recv + #@+node:readline + def readline(self): + """ + Read a line of text from stream, return the line without trailing newline + + This method really shouldn't exist in a class that's trying to look a bit + like a socket object, but what the hell! + """ + chars = [] + while 1: + char = self.recv(1) + if char in ['', '\n']: + break + chars.append(char) + return "".join(chars) + #@-node:readline + #@+node:close + def close(self): + """ + close this stream connection + """ + log(4, "closing stream") + self.client.samStreamClose(self) + log(4, "stream closed") + self.isOpen = 0 + + # and just to make sure... + self.qIncomingData.put("") # busts out of recv() loops + + #@-node:close + #@+node:__del__ + def __del__(self): + """ + Dropping last ref to this object closes stream + """ + self.close() + #@-node:__del__ + #@+node:_notifyIncomingData + def _notifyIncomingData(self, data): + """ + Called by client receiver to notify incoming data + """ + log(4, "got %s" % repr(data)) + self.qIncomingData.put(data) + #@-node:_notifyIncomingData + #@-others +#@-node:class I2PSAMStream #@+node:class I2PRemoteSession class I2PRemoteSession: """ @@ -701,7 +1027,8 @@ class I2PRemoteSession: #@-node:__init__ #@+node:send def send(self, peerdest, msg): - + """ + """ return self.client.send(self.dest, peerdest, msg) #@-node:send #@+node:recv @@ -885,35 +1212,59 @@ def demoSTREAM(): print "Starting SAM STREAM demo..." print - print "Instantiating 2 more client connections..." - c5 = I2PSamClient() + print "Instantiating client c6..." c6 = I2PSamClient() - print "Creating more dests via SAM" - pub5, priv5 = c5.samDestGenerate() + print "Creating dest for c6" pub6, priv6 = c6.samDestGenerate() - print "Creating SAM STREAM SESSION on connection c3..." - res = c5.samSessionCreate("STREAM", priv5) + print "Creating SAM STREAM SESSION on connection c6..." + res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE") if res != 'OK': - print "Failed to create STREAM session on connection c5: %s" % repr(res) + print "Failed to create STREAM session on connection c6: %s" % repr(res) return - print "STREAM Session on connection c5 created successfully" + print "STREAM Session on connection c6 created successfully" - print "Creating SAM STREAM SESSION on connection c6..." - res = c6.samSessionCreate("STREAM", priv6) + print "Launching acceptor thread..." + thread.start_new_thread(demoSTREAM_thread, (c6,)) + + #print "sleep a while and give the server a chance..." + #time.sleep(10) + + print "----------------------------------------" + + print "Instantiating client c5..." + c5 = I2PSamClient() + + print "Creating dest for c5" + pub5, priv5 = c5.samDestGenerate() + + print "Creating SAM STREAM SESSION on connection c5..." + res = c5.samSessionCreate("STREAM", priv5, direction="CREATE") if res != 'OK': - print "Failed to create STREAM session on connection c4: %s" % repr(res) + print "Failed to create STREAM session on connection c5: %s" % repr(res) return - print "STREAM Session on connection c4 created successfully" + print "STREAM Session on connection c5 created successfully" - msg = "Hi there, this is a datagram!" - print "sending from c5 to c6: %s" % repr(msg) - c5.samStreamSend(pub6, msg) + print "----------------------------------------" - print "now try to receive from c6 (will block)..." - msg1 = c6.samStreamReceive() - print "Connection c6 got %s from %s..." % (repr(msg1), repr(remdest)) + print "Making connection from c5 to c6..." + + #set_trace() + + try: + conn_c5 = c5.samStreamConnect(pub6) + except: + print "Stream Connection failed" + return + print "Stream connect succeeded" + + print "Receiving from c5..." + buf = conn_c5.readline() + print "Got %s" % repr(buf) + + #print "Try to accept connection on c6..." + #conn_c6 = c6.sam print print "--------------------------------------" @@ -921,7 +1272,31 @@ def demoSTREAM(): print "--------------------------------------" print + + + + #@-node:demoSTREAM +#@+node:demoSTREAM_thread +def demoSTREAM_thread(sess): + + while 1: + sock = sess.samStreamAccept() + log(4, "got incoming connection") + + print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING" + + time.sleep(10) + + sock.send("Hi there, what do you want?\n") + + print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING" + time.sleep(300) + print "**ACCEPTOR CLOSING STREAM" + + sock.close() + +#@-node:demoSTREAM_thread #@+node:demo def demo(): """ @@ -936,10 +1311,10 @@ def demo(): print "-----------------------------------------" print - demoNAMING() - demoRAW() - demoDATAGRAM() - #demoSTREAM() + #demoNAMING() + #demoRAW() + #demoDATAGRAM() + demoSTREAM() print print "-----------------------------------------" -- GitLab