diff --git a/apps/sam/python/src/i2p/BaseHTTPServer.py b/apps/sam/python/src/i2p/BaseHTTPServer.py new file mode 100644 index 0000000000000000000000000000000000000000..a621bd660304de2e85a3805dcaf8522dab964a0d --- /dev/null +++ b/apps/sam/python/src/i2p/BaseHTTPServer.py @@ -0,0 +1,83 @@ +#! /usr/bin/env python + +""" +Emulation of Python BaseHTTPServer module using I2P sockets. + +The Python module is described at +http://www.python.org/doc/current/lib/module-BaseHTTPServer.html + +To get a server going, use: + + >>> from i2p import BaseHTTPServer + >>> BaseHTTPServer.test(). + +Consult the documentation for function test() to change basic +server settings, such as the session name. + +A fully customizable example: + + >>> from i2p import BaseHTTPServer + >>> session = "mytestxxx.i2p" # SAM session name + >>> class MyServer(BaseHTTPServer.HTTPServer): pass + >>> class MyRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): pass + >>> httpd = MyServer(session, MyRequestHandler) + >>> httpd.socket.dest + (Base64 Destination of server) + >>> httpd.serve_forever() + +""" + +# By aum. + +# Hack to keep Python from importing from current directory: +# Use pylib package, then use = signs instead of from x import y. +import pylib +BaseHTTPServer = pylib.BaseHTTPServer + +import sys +import i2p.SocketServer + +__version__ = "0.3" + +__all__ = ["HTTPServer", "BaseHTTPRequestHandler", "test"] + +DEFAULT_ERROR_MESSAGE = BaseHTTPServer.DEFAULT_ERROR_MESSAGE + +class HTTPServer(i2p.SocketServer.TCPServer, BaseHTTPServer.HTTPServer): + """ + Same interface as Python class + BaseHTTPServer.HTTPServer. + """ + +class BaseHTTPRequestHandler( + i2p.SocketServer.StreamRequestHandler, + BaseHTTPServer.BaseHTTPRequestHandler): + """ + Same interface as Python class + BaseHTTPServer.BaseHTTPRequestHandler. + """ + +def test(HandlerClass = BaseHTTPRequestHandler, + ServerClass = HTTPServer, protocol="HTTP/1.0", + session = "mytestxxx.i2p"): + """ + Test the HTTP request handler class. + + This runs an I2P TCP server under SAM session 'session'. + If a single command line argument is given, the argument is used + instead as the SAM session name. + """ + + if sys.argv[1:] and __name__ == '__main__': + session = sys.argv[1] + + HandlerClass.protocol_version = protocol + httpd = ServerClass(session, HandlerClass) + + print "Serving HTTP on", session, "..." + print "Destination follows:" + print httpd.socket.dest + httpd.serve_forever() + +if __name__ == '__main__': + test() diff --git a/apps/sam/python/src/i2p/CGIHTTPServer.py b/apps/sam/python/src/i2p/CGIHTTPServer.py new file mode 100644 index 0000000000000000000000000000000000000000..69a33fb4275db0b349c1e6b45158dfbafbf1f329 --- /dev/null +++ b/apps/sam/python/src/i2p/CGIHTTPServer.py @@ -0,0 +1,69 @@ +#! /usr/bin/env python + +""" +Emulation of Python CGIHTTPServer module using I2P sockets. + +The Python module is described at +http://www.python.org/doc/current/lib/module-CGIHTTPServer.html + +To get a server going, use: + + >>> from i2p import CGIHTTPServer + >>> CGIHTTPServer.test(). + +Consult the documentation for function test() to change basic +server settings, such as the session name. + +A fully customizable example: + + >>> from i2p import BaseHTTPServer, CGIHTTPServer + >>> session = "mytestxxx.i2p" # SAM session name + >>> class MyServer(BaseHTTPServer.HTTPServer): pass + >>> class MyRequestHandler(CGIHTTPServer.CGIHTTPRequestHandler): pass + >>> httpd = MyServer(session, MyRequestHandler) + >>> httpd.socket.dest + (Base64 Destination of server) + >>> httpd.serve_forever() + +""" + +# By aum. +__all__ = ["CGIHTTPRequestHandler", "test"] + +# Hack to keep Python from importing from current directory: +# Use pylib package, then use = signs instead of from x import y. +import pylib +CGIHTTPServer = pylib.CGIHTTPServer +nobody_uid = CGIHTTPServer.nobody_uid +executable = CGIHTTPServer.executable + +import sys +import i2p.BaseHTTPServer +import i2p.SimpleHTTPServer + +HTTPServer = i2p.BaseHTTPServer.HTTPServer +class CGIHTTPRequestHandler(CGIHTTPServer.CGIHTTPRequestHandler): + """ + Same interface as Python class + CGIHTTPServer.CGIHTTPRequestHandler. + """ + +def test(HandlerClass = CGIHTTPRequestHandler, + ServerClass = i2p.BaseHTTPServer.HTTPServer, + session = "mytestxxx.i2p"): + """ + Test the HTTP CGI request handler class. + + This runs an I2P TCP server under SAM session 'session'. + If a single command line argument is given, the argument is used + instead as the SAM session name. + """ + if sys.argv[1:] and __name__ == '__main__': + session = sys.argv[1] + + i2p.SimpleHTTPServer.test(HandlerClass, ServerClass, + session=session) + +if __name__ == '__main__': + test() + diff --git a/apps/sam/python/src/i2p/SimpleHTTPServer.py b/apps/sam/python/src/i2p/SimpleHTTPServer.py new file mode 100644 index 0000000000000000000000000000000000000000..6b9e4a80725fe3c01babd81a74c0fca4d35af32a --- /dev/null +++ b/apps/sam/python/src/i2p/SimpleHTTPServer.py @@ -0,0 +1,68 @@ +#! /usr/bin/env python + +""" +Emulation of Python SimpleHTTPServer module using I2P sockets. + +The Python module is described at +http://www.python.org/doc/current/lib/module-SimpleHTTPServer.html + +To get a server going, use: + + >>> from i2p import SimpleHTTPServer + >>> SimpleHTTPServer.test(). + +Consult the documentation for function test() to change basic +server settings, such as the session name. + +A fully customizable example: + + >>> from i2p import BaseHTTPServer, SimpleHTTPServer + >>> session = "mytestxxx.i2p" # SAM session name + >>> class MyServer(BaseHTTPServer.HTTPServer): pass + >>> class MyRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): pass + >>> httpd = MyServer(session, MyRequestHandler) + >>> httpd.socket.dest + (Base64 Destination of server) + >>> httpd.serve_forever() + +""" + +# By aum. + +# Hack to keep Python from importing from current directory: +# Use pylib package, then use = signs instead of from x import y. +import pylib +SimpleHTTPServer = pylib.SimpleHTTPServer + +import sys +import i2p.BaseHTTPServer + +__version__ = "0.1.0" + +__all__ = ["SimpleHTTPRequestHandler", "test"] + +HTTPServer = i2p.BaseHTTPServer.HTTPServer +class SimpleHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): + """ + Same interface as Python class + SimpleHTTPServer.SimpleHTTPRequestHandler. + """ + +def test(HandlerClass = SimpleHTTPRequestHandler, + ServerClass = i2p.BaseHTTPServer.HTTPServer, + session = "mytestxxx.i2p"): + """ + Test the HTTP simple request handler class. + + This runs an I2P TCP server under SAM session 'session'. + If a single command line argument is given, the argument is used + instead as the SAM session name. + """ + if sys.argv[1:] and __name__ == '__main__': + session = sys.argv[1] + + i2p.BaseHTTPServer.test(HandlerClass, ServerClass, + session=session) + +if __name__ == '__main__': + test() diff --git a/apps/sam/python/src/i2p/SocketServer.py b/apps/sam/python/src/i2p/SocketServer.py new file mode 100644 index 0000000000000000000000000000000000000000..5273e09b22a8e721bf92dd8a41f428165da6edbe --- /dev/null +++ b/apps/sam/python/src/i2p/SocketServer.py @@ -0,0 +1,72 @@ + +""" +Emulation of Python SocketServer module using I2P sockets. + +The Python module is described at +http://www.python.org/doc/current/lib/module-SocketServer.html + +""" + +# By aum. + +# Hack to keep Python from importing from current directory: +# Use pylib package, then use = signs instead of from x import y. +import pylib +SocketServer = pylib.SocketServer + +import i2p.socket +class BaseServer(SocketServer.BaseServer): + pass +class TCPServer(SocketServer.TCPServer, BaseServer): + + socket_type = i2p.socket.SOCK_STREAM + + def __init__(self, session, RequestHandlerClass): + """ + Constructor. May be extended, do not override. + + The 'session' argument indicates the SAM session + name that should be used for the server. See module + i2p.socket for details on SAM sessions. + """ + BaseServer.__init__(self, session, RequestHandlerClass) + + #self.socket = socket.socket(self.address_family, + # self.socket_type) + self.session = session + self.socket = i2p.socket.socket(session, self.socket_type) + + self.server_bind() + self.server_activate() + +class UDPServer(TCPServer, SocketServer.UDPServer): + pass + +class ForkingMixIn(SocketServer.ForkingMixIn): + pass + +class ThreadingMixIn(SocketServer.ThreadingMixIn): + pass + +class ForkingUDPServer(ForkingMixIn, UDPServer): + pass + +class ForkingTCPServer(ForkingMixIn, TCPServer): + pass + +class ThreadingUDPServer(ThreadingMixIn, UDPServer): + pass + +class ThreadingTCPServer(ThreadingMixIn, TCPServer): + pass + +class BaseRequestHandler(SocketServer.BaseRequestHandler): + pass + +class StreamRequestHandler(SocketServer.StreamRequestHandler): + pass + +class DatagramRequestHandler(SocketServer.DatagramRequestHandler): + pass + + diff --git a/apps/sam/python/src/i2p/__init__.py b/apps/sam/python/src/i2p/__init__.py index 7d043ee70eabf953ea687c26a8ef673ad545a440..bff245121c4a7b65ff9c87f4498fbb8ac3dc724d 100644 --- a/apps/sam/python/src/i2p/__init__.py +++ b/apps/sam/python/src/i2p/__init__.py @@ -2,10 +2,17 @@ i2p -- I2P Python interface """ -__all__ = ['Error', 'RouterError', 'sam', 'eep', 'router', - 'I2PSocketServer', 'I2PBaseHTTPServer', - 'I2PSimpleHTTPServer', 'I2PCGIHTTPServer', - ] +__all__ = [ + 'BaseHTTPServer', + 'CGIHTTPServer', + 'eep', + 'router', + 'select', + 'SimpleHTTPServer', + 'socket', + 'SocketServer', + 'tunnel', +] class Error(Exception): """Base class for all I2P errors.""" @@ -13,9 +20,3 @@ class Error(Exception): class RouterError(Error): """Could not connect to router.""" -import sam -import eep -import router - -# Internal use only -#import samclasses as _samclasses diff --git a/apps/sam/python/src/i2p/eep.py b/apps/sam/python/src/i2p/eep.py index 08b410c2c2e7e613cae81846e09e4724e6bfd7de..6d7bfd0c7e38cdae91393e5cc365291a8ab9f1a0 100644 --- a/apps/sam/python/src/i2p/eep.py +++ b/apps/sam/python/src/i2p/eep.py @@ -1,10 +1,10 @@ # ------------------------------------------------------------- -# eep.py: I2P Project -- Eeproxy Python API +# eep.py: Eeproxy access module # ------------------------------------------------------------- """ -Eeproxy Python API +Eeproxy access module """ import urllib2 diff --git a/apps/sam/python/src/i2p/pylib/__init__.py b/apps/sam/python/src/i2p/pylib/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..78cfc08e3c253dd47a742c3ac0a2e7807a59f1cb --- /dev/null +++ b/apps/sam/python/src/i2p/pylib/__init__.py @@ -0,0 +1,17 @@ + +# ------------------------------------------------ +# Hack to import the Python library modules +# when names conflict in our package. +# ------------------------------------------------ + +import sys +sys.path.reverse() + +import socket +import select +import BaseHTTPServer +import SocketServer +import CGIHTTPServer +import SimpleHTTPServer + +sys.path.reverse() diff --git a/apps/sam/python/src/i2p/router.py b/apps/sam/python/src/i2p/router.py index b625b0ace09730caf88fc5b73f28a719273674cf..b0d0e20d319285b6e58160c8f8e99ec95bf4a8cd 100644 --- a/apps/sam/python/src/i2p/router.py +++ b/apps/sam/python/src/i2p/router.py @@ -1,24 +1,24 @@ # ------------------------------------------------------------- -# router.py: I2P Project -- Router Control API for Python +# router.py: Router control module # ------------------------------------------------------------- """ -Router Control API for Python +Router control module """ import i2p -import i2p.sam +import i2p.socket import i2p.eep +from i2p.pylib import socket as pysocket # Import Python socket -import socket as pysocket import os, sys import os.path import time import threading import urllib2 -check_addrlist = [i2p.sam.samaddr, i2p.eep.eepaddr] +check_addrlist = [i2p.socket.samaddr, i2p.eep.eepaddr] router_config = 'router.config' # Router config filename @@ -31,19 +31,16 @@ def find(dir=None): """Find the absolute path to a locally installed I2P router. An I2P installation is located by looking in the - environment I2P, then in PATH, then in the dir argument - given to the function. It looks for startRouter.sh or - startRouter.bat. Raises ValueError if an I2P installation + the dir argument given to the function, then in the + environment I2P, then in PATH. It looks for startRouter.sh + or startRouter.bat. Raises ValueError if an I2P installation could not be located. """ - if sys.platform[:3] == 'win': - sep = ';' - else: - sep = ':' + sep = os.pathsep # Path separator L = [] - if 'PATH' in os.environ: L += os.environ['PATH'].split(sep) - if 'I2P' in os.environ: L += os.environ['I2P'].split(sep) if dir != None and dir != '': L += dir.split(sep) + if 'I2P' in os.environ: L += os.environ['I2P'].split(sep) + if 'PATH' in os.environ: L += os.environ['PATH'].split(sep) for dirname in L: filename = os.path.join(dirname, 'startRouter.bat') if os.path.exists(filename): diff --git a/apps/sam/python/src/i2p/samclasses.py b/apps/sam/python/src/i2p/samclasses.py index db402b8f878bc1852f2c5a3ae3150d8a60cacdd0..f6e043b083d6bf09237f784c22103fe28b1ea6a3 100644 --- a/apps/sam/python/src/i2p/samclasses.py +++ b/apps/sam/python/src/i2p/samclasses.py @@ -1,6 +1,6 @@ # ------------------------------------------------------------- -# samclasses.py: Lower-level SAM API, interfaces with SAM Bridge. +# samclasses.py: Lower-level SAM classes, for internal use. # ------------------------------------------------------------- """ @@ -8,46 +8,45 @@ Lower-level SAM API, interfaces with SAM Bridge. For internal use only. -Use the higher level i2p.sam module for your own programs. +Use the higher level i2p.socket module for your own programs. For details on SAM, see "Simple Anonymous Messaging (SAM) v1.0," as published by jrandom. Class Overview: - SAMTerminal: Message sender/reader, talks to SAM Bridge - through a single socket. - StringBuffer: Queue for character data. - BaseSession: SAM session classes are derived from this. - StreamSession: Manipulate a SAM stream session through a - threadsafe, high-level interface. - DatagramSession: SAM datagram session, threadsafe, high level. - RawSession: SAM raw session, threadsafe, high level. + - SAMTerminal: Message sender/reader, talks to SAM Bridge. + - StringBuffer: Queue for character data. + - BaseSession: SAM session classes are derived from this. + - StreamSession: SAM stream session class, threadsafe, high level. + - DatagramSession: SAM datagram session, threadsafe, high level. + - RawSession: SAM raw session, threadsafe, high level. Note that a 'None' timeout is an infinite timeout: it blocks forever if necessary. Todo: - * Error handling is a huge mess. Neaten it up. + - Error handling is a huge mess. Neaten it up. Subclass a ErrorMixin class, then use set_error(e), check_error(), get_error(). - * Streams are a huge mess. Neaten them up. - * This whole interface is a tad confusing. Neaten it up. + - Streams are a huge mess. Neaten them up. + - This whole interface is a tad confusing. Neaten it up. """ # --------------------------------------------------------- # Imports # --------------------------------------------------------- -import socket, thread, threading, time, string import Queue, traceback, random, sys, shlex +import thread, threading, time, string # --------------------------------------------------------- -# Import i2p and i2p.sam (for defaults and errors) +# Import i2p and i2p.socket (for defaults and errors) # --------------------------------------------------------- import i2p -import i2p.sam +import i2p.socket +from i2p.pylib import socket as pysocket # Import Python socket # --------------------------------------------------------- # Functions @@ -55,7 +54,7 @@ import i2p.sam def sleep(): time.sleep(0.01) # Sleep between thread polls -sam_log = False # Logging flag. Logs to ./log.txt. +log = False # Logging flag. Logs to ./log.txt. # ----------------------------------------------------- # SAMTerminal @@ -63,13 +62,13 @@ sam_log = False # Logging flag. Logs to ./log.txt. class SAMTerminal: """Message-by-message communication with SAM through a single - socket. _on_* messages are dispatched to msgobj.""" + pysocket. _on_* messages are dispatched to msgobj.""" def __init__(self, addr, msgobj): try: self.host, self.port = addr.split(':') except: raise ValueError('sam port required') self.port = int(self.port) - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock=pysocket.socket(pysocket.AF_INET,pysocket.SOCK_STREAM) self.msgobj = msgobj try: self.sock.connect((self.host, self.port)) @@ -88,13 +87,13 @@ class SAMTerminal: line = [] while True: try: c = self.sock.recv(1) - except socket.error, ex: self.error = self.lost_error + except pysocket.error, ex: self.error = self.lost_error if c == '': self.error = self.lost_error if self.error != None: return if c == '\n': break if c != '': line += [c] line = ''.join(line) - if sam_log: + if log: logf = open('log.txt', 'a') logf.write('\n' + line + '\n') logf.close() @@ -105,7 +104,7 @@ class SAMTerminal: remain = int(kwargs['SIZE']) while True: try: s = self.sock.recv(remain) - except socket.error, ex: self.error = self.lost_error + except pysocket.error, ex: self.error = self.lost_error if s == '': self.error = self.lost_error if self.error != None: return if s != '': data += [s] @@ -142,7 +141,7 @@ class SAMTerminal: def check_message(self, kwargs): """Raises an error if kwargs['RESULT'] != 'OK'.""" if not kwargs.get('RESULT', '') in ['OK', '']: - raise i2p.sam.NetworkError((kwargs['RESULT'], + raise i2p.socket.NetworkError((kwargs['RESULT'], kwargs.get('MESSAGE', ''))) def on_message(self, msg, kwargs): @@ -156,12 +155,12 @@ class SAMTerminal: automatically added if none is present.""" self.check() if not '\n' in msg: msg = msg + '\n' - if sam_log: + if log: logf = open('log.txt', 'a') logf.write('\n' + msg) logf.close() try: self.sock.sendall(msg) - except socket.error: self.error = self.lost_error + except pysocket.error: self.error = self.lost_error self.check() def check(self): @@ -175,7 +174,7 @@ class SAMTerminal: # immediately, the data will be lost. Delay 0.01 s to fix this # bug (tested Windows, Linux). time.sleep(0.01) - self.error = i2p.sam.ClosedError() + self.error = i2p.socket.ClosedError() self.sock.close() def queue_get(self, q): @@ -230,10 +229,12 @@ class StringBuffer(Deque): appended to the end, and read from the beginning. Example: - B = StringBuffer('Hello W') - B.append('orld!') - print B.read(5) # 'Hello' - print B.read() # 'World!' + >>> B = StringBuffer('Hello W') + >>> B.append('orld!') + >>> B.read(5) + 'Hello' + >>> B.read() + 'World!' """ def __init__(self, s=''): Deque.__init__(self) @@ -285,7 +286,7 @@ class BaseSession: and RawSession are derived.""" def __init__(self, addr=''): - if addr == '': addr = i2p.sam.samaddr + if addr == '': addr = i2p.socket.samaddr self.term = SAMTerminal(addr=addr, msgobj=self) self.lock = threading.Lock() # Data lock. self.closed = False @@ -297,7 +298,7 @@ class BaseSession: def _hello(self): """Internal command, handshake with SAM terminal.""" self.term.send_message('HELLO VERSION MIN=' + - str(i2p.sam.samver) + ' MAX=' + str(i2p.sam.samver)) + str(i2p.socket.samver) + ' MAX=' + str(i2p.socket.samver)) self.term.check_message(self.term.queue_get(self.qhello)) def _on_HELLO_REPLY(self, **kwargs): @@ -358,7 +359,7 @@ class StreamSession(BaseSession): """Stream session. All methods are blocking and threadsafe.""" def __init__(self, name, addr='', **kwargs): - if addr == '': addr = i2p.sam.samaddr + if addr == '': addr = i2p.socket.samaddr BaseSession.__init__(self, addr) self.idmap = {} # Maps id to Stream object. self.qaccept = Queue.Queue() # Thread messaging, accept. @@ -443,9 +444,9 @@ class StreamSession(BaseSession): # Handle timeout and blocking errors if timeout == 0.0: - raise i2p.sam.BlockError('command would have blocked') + raise i2p.socket.BlockError('command would have blocked') else: - raise i2p.sam.Timeout('timed out') + raise i2p.socket.Timeout('timed out') def listen(self, backlog): """Set maximum number of queued connections.""" @@ -541,7 +542,7 @@ class Stream: id = self.id if self.closed or id == None: if self.err != None: raise self.err - raise i2p.sam.ClosedError('stream closed') + raise i2p.socket.ClosedError('stream closed') if len(s) == 0: return nmax = 32768 for block in [s[i:i+nmax] for i in range(0,len(s),nmax)]: @@ -584,9 +585,9 @@ class Stream: # Handle timeout and blocking error if timeout == 0.0: - raise i2p.sam.BlockError('command would have blocked') + raise i2p.socket.BlockError('command would have blocked') else: - raise i2p.sam.Timeout('timed out') + raise i2p.socket.Timeout('timed out') def __len__(self): """Current length of read buffer.""" @@ -632,7 +633,7 @@ class DatagramSession(BaseSession): """Datagram session. All methods are blocking and threadsafe.""" def __init__(self, name, addr='', **kwargs): - if addr == '': addr = i2p.sam.samaddr + if addr == '': addr = i2p.socket.samaddr BaseSession.__init__(self, addr) self.buf = Deque() # FIFO of incoming packets. self.name = name @@ -656,9 +657,9 @@ class DatagramSession(BaseSession): def send(self, s, dest): """Send packet with contents s to given destination.""" # Raise error if packet is too large. - if len(s) > i2p.sam.MAX_DGRAM: + if len(s) > i2p.socket.MAX_DGRAM: raise ValueError('packets must have length <= ' + - str(i2p.sam.MAX_DGRAM) + ' bytes') + str(i2p.socket.MAX_DGRAM) + ' bytes') self.term.send_message('DATAGRAM SEND DESTINATION=' + dest + ' SIZE=' + str(len(s)) + '\n' + s) @@ -687,9 +688,9 @@ class DatagramSession(BaseSession): # Handle timeout and blocking error if timeout == 0.0: - raise i2p.sam.BlockError('command would have blocked') + raise i2p.socket.BlockError('command would have blocked') else: - raise i2p.sam.Timeout('timed out') + raise i2p.socket.Timeout('timed out') def __len__(self): """Number of packets in read buffer.""" @@ -703,7 +704,7 @@ class RawSession(BaseSession): """Raw session. All methods are blocking and threadsafe.""" def __init__(self, name, addr='', **kwargs): - if addr == '': addr = i2p.sam.samaddr + if addr == '': addr = i2p.socket.samaddr BaseSession.__init__(self, addr) self.buf = Deque() # FIFO of incoming packets. self.name = name @@ -727,9 +728,9 @@ class RawSession(BaseSession): def send(self, s, dest): """Send packet with contents s to given destination.""" # Raise error if packet is too big - if len(s) > i2p.sam.MAX_RAW: + if len(s) > i2p.socket.MAX_RAW: raise ValueError('packets must have length <= ' + - str(i2p.sam.MAX_RAW) + ' bytes') + str(i2p.socket.MAX_RAW) + ' bytes') self.term.send_message('RAW SEND DESTINATION=' + dest + ' SIZE=' + str(len(s)) + '\n' + s) @@ -755,9 +756,9 @@ class RawSession(BaseSession): # Handle timeout and blocking error if timeout == 0.0: - raise i2p.sam.BlockError('command would have blocked') + raise i2p.socket.BlockError('command would have blocked') else: - raise i2p.sam.Timeout('timed out') + raise i2p.socket.Timeout('timed out') def __len__(self): """Number of packets in read buffer.""" diff --git a/apps/sam/python/src/i2p/select.py b/apps/sam/python/src/i2p/select.py new file mode 100644 index 0000000000000000000000000000000000000000..9e0a0e0f435dcd99a41d0fb68dd0dc6cf2c52019 --- /dev/null +++ b/apps/sam/python/src/i2p/select.py @@ -0,0 +1,138 @@ + +# ------------------------------------------------------------- +# select.py: Emulation of Python select module. +# ------------------------------------------------------------- + +""" +I2P Python API - Emulation of Python select module. +""" + +import time + +import i2p.socket +from i2p.pylib import select as pyselect # Import Python select + +# -------------------------------------------------- +# Poll and select +# -------------------------------------------------- + +POLLIN = 1 # There is data to read +POLLPRI = 1 # Same as POLLIN +POLLOUT = 4 # Ready for output +POLLERR = 8 # Wait for error condition +POLLHUP = 16 # Not implemented +POLLNVAL = 32 # Not implemented + +class Poll: + """Class implementing poll interface. Works for Python sockets + and SAM sockets.""" + def __init__(self): + self.fds = {} # Maps _hash() -> (socket, mask) + def _hash(self, fd): + """Get a unique number for each object.""" + if isinstance(fd, int): + return fd # Use the fd itself if integer. + else: + return id(fd) # Use object address (no copies!) + def register(self, fd, eventmask=POLLIN|POLLOUT|POLLERR): + self.fds[self._hash(fd)] = (fd, eventmask) + def unregister(self, fd): + del self.fds[self._hash(fd)] + def poll(self, timeout=None): + readlist, writelist, errlist = [], [], [] + for F, mask in self.fds: + if mask & POLLIN: readlist += [F] + if mask & POLLOUT: writelist += [F] + if mask & POLLERR: errlist += [F] + (Rs, Ws, Es) = select(readlist, writelist, errlist, + timeout=timeout) + ans = [] + for R in Rs: ans.append((R, POLLIN)) + for W in Ws: ans.append((W, POLLOUT)) + for E in Es: ans.append((E, POLLERR)) + return ans + +def poll(): + """Returns a polling object. Works on SAM sockets and Python + sockets. See select.poll() in the Python library for more + information. + + Polling flags specified in this module: + - POLLIN + - POLLOUT + - POLLERR + - POLLHUP + - POLLNVAL + - POLLPRI +""" + return Poll() + +def select(readlist, writelist, errlist, timeout=None): + """Performs a select call. Works on SAM sockets and Python + sockets. See select.select() in the Python library for more + information.""" + Rans = [] + Wans = [] + Eans = [] + if timeout != None: end = time.clock() + timeout + while True: + # FIXME: Check performance. + # Use pyselect.poll for Python sockets, if needed for speed. + + # Check for read availability. + for R in readlist: + if isinstance(R, int) or hasattr(R, 'fileno'): + # Python socket + if len(pyselect.select([R], [], [], 0.0)[0]) > 0: + Rans.append(R) + else: + # SAM Socket + if R.type == i2p.socket.SOCK_STREAM: + try: + R._verify_connected() + Rans.append(R) + except: + pass + else: + if len(R) > 0: Rans.append(R) + + # Check for write availability. + for W in writelist: + if isinstance(W, int) or hasattr(W, 'fileno'): + # Python socket + if len(pyselect.select([], [W], [], 0.0)[1]) > 0: + Wans.append(W) + else: + # SAM Socket + if W.type == i2p.socket.SOCK_STREAM: + try: + W._verify_connected() + Wans.append(W) + except: + pass + else: + Wans.append(W) + + # Check for error conditions. + # These can only be stream errors. + for E in errlist: + if isinstance(E, int) or hasattr(E, 'fileno'): + # Python socket + if len(pyselect.select([], [], [E], 0.0)[2]) > 0: + Eans.append(E) + else: + if E.type == i2p.socket.SOCK_STREAM: + try: + # FIXME: Use a ._get_error() function for errors. + # Socket can only have an error if it connected. + E._verify_connected() + if E.sessobj.err != None: + Eans.append(E) + except: + pass + if timeout != None and time.clock() >= end: break + if len(Rans) != 0 or len(Wans) != 0 or len(Eans) != 0: break + + time.sleep(0.01) + + return (Rans, Wans, Eans) diff --git a/apps/sam/python/src/i2p/socket.py b/apps/sam/python/src/i2p/socket.py new file mode 100644 index 0000000000000000000000000000000000000000..bd67a21a4a4adb6e54ed607c01b3bdfcd089ee7e --- /dev/null +++ b/apps/sam/python/src/i2p/socket.py @@ -0,0 +1,529 @@ + +# ------------------------------------------------------------- +# socket.py: Emulation of Python socket module. +# ------------------------------------------------------------- + +""" +Emulation of Python socket module using SAM. +""" + +import i2p + +import samclasses, threading, time, copy, Queue, thread +from i2p.pylib import socket as pysocket # Import Python socket +from i2p.pylib import select as pyselect # Import Python select + +# -------------------------------------------------- +# Global variables +# -------------------------------------------------- + +# Ports +samaddr = '127.0.0.1:7656' # Default port for SAM Bridge + +# Flags for recv, recvfrom. +MSG_PEEK = 2 # Peek at incoming message +MSG_WAITALL = 64 # Wait for data or error +MSG_DONTWAIT = 128 # Nonblocking + +# Packet sizes +MAX_DGRAM = 31744 # Max size of datagram packet +MAX_RAW = 32768 # Max size of raw packet + +# Socket types +SOCK_STREAM = 1 # Stream socket +SOCK_DGRAM = 2 # Datagram socket +SOCK_RAW = 3 # Raw socket + +# Miscellaneous +samver = 1.0 # SAM version implemented + +# -------------------------------------------------- +# Errors +# -------------------------------------------------- + +class Error(i2p.Error): + """Base class for all SAM errors.""" + +class NetworkError(Error): + """Network error occurred within I2P. + The error object is a 2-tuple: (errtag, errdesc). + errtag is a SAM error string, + errdesc is a human readable error description. + """ + +class ClosedError(Error): + """A command was used on a socket that closed gracefully.""" + +class BlockError(Error): + """Socket call would have blocked.""" + +class Timeout(Error): + """Time out occurred for a socket which had timeouts enabled + via a prior call to settimeout().""" + +# -------------------------------------------------- +# Sockets +# -------------------------------------------------- + +# Note: socket(), __make_session() and Socket() should have same args +def socket(session, type, samaddr=samaddr, **kwargs): + r"""Create a new socket. Argument session should be a session + name -- if the name has not yet been used, an I2P + Destination will be created for it, otherwise, the + existing Destination will be re-used. An empty session + string causes a transient session to be created. Argument + type is one of SOCK_STREAM, SOCK_DGRAM, or SOCK_RAW. + + I2P configuration keyword arguments: + + - in_depth - depth of incoming tunnel (default 2) + - out_depth - depth of outgoing tunnel (default 2) + + A single session may be shared by more than one socket, if + the sockets are the same type, and if the sockets are + created within the same Python process. The socket + objects are multithread-safe. + + Examples: + >>> a = i2p.socket('Alice', i2p.SOCK_STREAM) + >>> b = i2p.socket('Bob', i2p.SOCK_DGRAM, + in_depth=2, out_depth=5) + + The created object behaves identically to a socket from + module socket, with the following exceptions: + + - I2P Destinations are used as address arguments [1]. + - bind is a no-op: sockets are always bound. + - send* methods send all data and are non-blocking. + + A given session name can only be open in a single Python + program at a time. If you need to overcome this + limitation, consider patching I2P. + + [1]. + Alternatively, a host name can be used as an address. + It will be resolved using hosts.txt. + + For details on how to use socket objects, see + http://www.python.org/doc/current/lib/socket-objects.html + + See the examples directory for code examples. + """ + + return Socket(session, type, samaddr, **kwargs) + + +# -------------------------------------------------- +# Socket session objects +# -------------------------------------------------- + +# Global list of session objects. +_sessions = {} +_session_lock = threading.Lock() + +def _make_session(session, type, samaddr, **kwargs): + """Make a session object (eg samclasses.StreamSession). Same + arguments as socket(). Return an existing session object + if one has been previously created under the given name. + """ + # Synchronize + _session_lock.acquire() + try: + if type == SOCK_STREAM: C = samclasses.StreamSession + elif type == SOCK_DGRAM: C = samclasses.DatagramSession + elif type == SOCK_RAW: C = samclasses.RawSession + else: raise ValueError('bad socket type') + # Get existing session, if available + if session != '' and _sessions.has_key(session): + if _sessions[session].__class__ != C: + raise ValueError('session ' + repr(session) + ' was ' + + 'created with a different session type.') + return _sessions[session] + # Create new session + if type == SOCK_STREAM: ans = C(session, samaddr, **kwargs) + elif type == SOCK_DGRAM: ans = C(session, samaddr, **kwargs) + elif type == SOCK_RAW: ans = C(session, samaddr, **kwargs) + if session != '': _sessions[session] = ans + return ans + finally: _session_lock.release() + +def _wrap_stream(stream, parent_socket): + """Wraps a Socket object around a samclasses.Stream object.""" + s = Socket('', 0, dummy_socket=True) + s.sessobj = stream + s.remotedest = stream.remotedest + s.dest = parent_socket.dest + s.session = parent_socket.session + s.type = parent_socket.type + s.timeout = None + s.samaddr = parent_socket.samaddr + s.closed = False + return s + +# -------------------------------------------------- +# Socket class +# -------------------------------------------------- + +class Socket: + """A socket object.""" + + # Docstrings for pydoc. These variables will be overwritten. + dest = property(doc='Local I2P Destination of socket') + session = property(doc='Session name') + type = property(doc='Socket type: SOCK_STREAM, SOCK_DGRAM,' + + ' or SOCK_RAW.') + # FIXME: Use getsockopt to detect errors. + + def __init__(self, session, type, samaddr=samaddr, **kwargs): + """Equivalent to socket().""" + if kwargs.has_key('dummy_socket'): return + self.sessobj = _make_session(session, type, samaddr, **kwargs) + self.dest = self.sessobj.dest + self.session = session + self.type = type + self.timeout = None # None indicates blocking mode + self.samaddr = samaddr + self.closed = False # Was current object closed? + self.lock = threading.Lock() + + def _verify_open(self): + """Verify that the socket has not been closed.""" + if self.closed == True: + raise ClosedError('socket closed') + + def _verify_stream(self): + """Raise an error if socket is not a SOCK_STREAM.""" + if self.type != SOCK_STREAM: + raise i2p.Error('operation not supported') + # FIXME: Check for errors also. + + def _verify_connected(self, needs_to_be_connected=True): + """Raise an error if socket is not a connected stream socket.""" + self._verify_stream() + if not hasattr(self.sessobj, 'remotedest'): + raise i2p.Error('socket is not connected') + if needs_to_be_connected and not self.sessobj.didconnect: + raise i2p.Error('socket is in the process of connecting') + # FIXME: Check for errors also. + + def _verify_not_connected(self): + """Verify that the socket is not currently connected, and is not + in the process of connecting.""" + self._verify_stream() + if hasattr(self.sessobj, 'remotedest'): + raise i2p.Error('socket is already connected') + # FIXME: Check for errors also. + + def accept(self): + """Accept an incoming connection. The socket must be type + SOCK_STREAM, and listen() must be called prior to this + command. The return value is (conn, remotedest), where + conn is a new socket object made for the connection, and + remotedest is the remote Destination from which the + connection was made. + + Example: + + >>> from i2p import socket + >>> s = socket.socket('Alice', socket.SOCK_STREAM) + >>> s.listen(10) + + This prepares the server. Now accept an incoming connection: + + >>> c, remotedest = s.accept() + >>> c.send('hello world!') + + If accept() is called on a socket that is in non-blocking + mode or has a timeout, i2p.socket.BlockError or + i2p.socket.Timeout may be raised. This indicates that no + incoming connection is currently available.""" + + self._verify_open() + self._verify_not_connected() + # Raises BlockError or Timeout if not ready. + C = _wrap_stream(self.sessobj.accept(self.timeout), self) + return (C, C.remotedest) + + def bind(self, address): + """Does nothing. Provided for compatibility with the Python + socket command bind(), which binds a server to a port.""" + self._verify_open() + self._verify_not_connected() + + def close(self): + """Closes the socket. It is an error to call any method + other than recv() or recvfrom() on a closed socket. + For streams, the receive methods return data that was + received prior to the closing of the socket. For + datagram and raw sockets, the receive methods cannot + be used on a closed socket.""" + try: + self._verify_connected() + connected = True + except i2p.Error: + connected = False + if connected: + # Close the Stream object. + self.sessobj.close() + else: + # Never close a session object. + pass + self.closed = True + + def connect(self, address): + """ + Connect to a remote dest, identified in local SAM bridge's hosts + file as host 'address'. + + For example: + >>> s.connect('duck.i2p') + + Alternatively, you can use a full base64 Destination: + + Example: + >>> s.connect('238797sdfh2k34kjh....AAAA') + + If connect() is called on a socket that is in non-blocking + mode or has a timeout, i2p.socket.BlockError or + i2p.socket.Timeout may be raised. This indicates that the + connection is still being initiated. Use i2p.select.select() + to determine when the connection is ready. + """ + # Synchronized. Lock prevents two connects from occurring at the + # same time in different threads. + self.lock.acquire() + try: + self._verify_open() + if self.type == SOCK_DGRAM or self.type == SOCK_RAW: + self.packet_dest = address + return + + self._verify_not_connected() + + address = resolve(address, self.samaddr) + + timeout = self.timeout + unwrap = self.sessobj.connect(address, timeout=timeout) + w = _wrap_stream(unwrap, self) + self.sessobj = w.sessobj + self.remotedest = w.remotedest + + if self.sessobj.err != None: + raise self.sessobj.err + + # Raise error if not yet connected + if not self.sessobj.didconnect: + if timeout == 0.0: + raise BlockError('command would have blocked. use ' + + 'i2p.select.select() to find when socket is connected') + else: raise Timeout('timed out. use i2p.select.select()' + + ' to find when socket is connected') + + finally: self.lock.release() + + def connect_ex(self, address): + """Like connect(), but return any error that is raised. + Returns None if no error is raised.""" + try: self.connect(address) + except i2p.Error, e: return e + + # Don't implement fileno(), as we don't have a real file handle. + + def getpeername(self): + """Get the remote Destination associated with the socket. + This is equivalent to s.remotedest, and is provided for + compatibility with the Python socket module.""" + self._verify_connected() + return self.remotedest + + def getsockname(self): + """Get the local Destination associated with the socket. + This is equivalent to s.dest, and is provided for + compatibility with the Python socket module.""" + return self.dest + + def listen(self, backlog): + """Listen for connections made to the socket. + This method must be called before accept(). + The backlog argument specifies the maximum number of + queued incoming connections.""" + self._verify_open() + self._verify_not_connected() + self.sessobj.listen(backlog) + + def makefile(self, mode='r', bufsize=-1): + """Return a file object for the socket. + See socket.makefile() in the Python documentation for + more information.""" + self._verify_open() + self._verify_connected() + return pysocket._fileobject(self, mode, bufsize) + + def recv(self, bufsize, flags=0): + """Receive string data from the socket. + + The maximum amount of data to be received is given by + bufsize. If bufsize is zero, this function returns + an empty string immediately. If bufsize is nonzero, + this function blocks until at least one character is + available for reading. If the socket has been closed, + an empty string is returned as an end of file indicator. + + If recv() is called on a socket that is in non-blocking + mode or has a timeout, i2p.socket.BlockError or + i2p.socket.Timeout will be raised if data is not available + within the given timeframe. + + For a datagram or raw socket, the first bufsize characters + of the packet are read, and the remainder of the packet is + discarded. To read the entire packet, use bufsize = -1. + + For datagram and raw sockets, the packet may originate from + any Destination. Use recvfrom() with datagrams to determine + the Destination from which the packet was received. + + The flags argument can be a bitwise OR of MSG_PEEK, + MSG_WAITALL, and/or MSG_DONTWAIT. MSG_PEEK indicates that + any data read should not be removed from the socket's + incoming buffer. MSG_WAITALL indicates to wait for exactly + bufsize characters or an error. MSG_DONTWAIT indicates + that the recv() command should not block execution. + """ + + # FIXME: What about recv'ing if connected in asynchronous mode? + # It is acceptable to call recv() after a stream has closed + # gracefully. It is an error to call recv() after a stream has + # closed due to an I2P network error. + timeout = self.timeout + (peek, waitall, dontwait) = \ + (flags & MSG_PEEK, flags & MSG_WAITALL, flags & MSG_DONTWAIT) + if dontwait: timeout = 0.0 + + if self.type == SOCK_STREAM: + self._verify_connected() + return self.sessobj.recv(bufsize, timeout, peek, waitall) + else: + return self.recvfrom(bufsize, flags)[0] + + def recvfrom(self, bufsize, flags=0): + """Like recv(), but returns a tuple (data, remoteaddr), where + data is the string data received, and remoteaddr is the + remote Destination.""" + timeout = self.timeout + (peek, waitall, dontwait) = \ + (flags & MSG_PEEK, flags & MSG_WAITALL, flags & MSG_DONTWAIT) + if dontwait: timeout = 0.0 + + if self.type == SOCK_STREAM: + self._verify_connected() + if bufsize < 0: raise ValueError('bufsize must be >= 0') + return (self.sessobj.recv(bufsize, timeout, peek, waitall), \ + self.remotedest) + else: + return self.sessobj.recv(timeout, peek)[:bufsize] + + def send(self, string, flags=0): + """Sends string data to a remote Destination. + + For a stream, connect() must be called prior to send(). + Once close() is called, no further data can be sent, and + the stream cannot be re-opened. + + For datagram and raw sockets, connect() only specifies + a Destination to which packets are sent to. send() will + then send a packet to the given Destination. connect() + can be used multiple times. + + The send() command never blocks execution. The flags + argument is ignored. + """ + + self._verify_open() + if self.type == SOCK_DGRAM or self.type == SOCK_RAW: + if not hasattr(self, 'packet_dest'): + raise i2p.Error('use connect or sendto to specify a ' + + 'Destination') + self.sendto(string, flags, self.packet_dest) + return + + self._verify_connected() + if self.closed: + raise i2p.Error('send operation on closed socket') + # FIXME: What about send'ing if connected in asynchronous mode? + self.sessobj.send(string) + + def sendall(self, string, flags=0): + """Identical to send().""" + self.send(string) + + def sendto(self, string, flags, address): + """Send a packet to the given Destination. + + Only valid for datagram and raw sockets. The address + argument should be either a name from the hosts file, + or a base64 Destination. + + The sendto() command never blocks execution. The flags + argument is ignored. + """ + + self._verify_open() + if not self.type in [SOCK_DGRAM, SOCK_RAW]: + raise i2p.Error('operation not supported') + if self.closed: + raise i2p.Error('sendto operation on closed socket') + address = resolve(address, self.samaddr) + self.sessobj.send(string, address) + + def setblocking(self, flag): + """Set blocking or non-blocking mode for the socket. + + If flag is True, any method called on the socket will + hang until the method has completed. If flag is False, + all methods will raise i2p.socket.BlockError() if they + cannot complete instantly. + + s.setblocking(False) is equivalent to s.settimeout(0); + s.setblocking(True) is equivalent to s.settimeout(None). + """ + if flag: self.timeout = None + else: self.timeout = 0.0 + + def settimeout(self, value): + """Set a timeout for the socket. + + The value argument should be a timeout value in seconds, + or None. None is equivalent to an infinite timeout. + + A socket operation will raise a i2p.socket.Timeout if + the operation cannot complete within in the specified + time limit. + """ + self.timeout = value + + def gettimeout(self): + """Get the timeout value.""" + return self.timeout + + def __copy__(self): + """Returns the original object.""" + return self + + def __deepcopy__(self, memo): + """Returns the original object.""" + return self + +def resolve(host, samaddr=samaddr): + """Resolve I2P host name --> I2P Destination. + Returns the same string if host is already a Destination.""" + if host.find('http://') == 0: host = host[len('http://'):] + host = host.rstrip('/') + if len(host) >= 256: return host + S = samclasses.BaseSession(samaddr) + ans = S._namelookup(host) + S.close() + return ans + +# -------------------------------------------------- +# End of File +# -------------------------------------------------- diff --git a/apps/sam/python/src/i2p/test/test_eep.py b/apps/sam/python/src/i2p/test/test_eep.py index 34b11f07838c884f2d40c05000a2c88a1a726006..030becc8e03c9a051650a9fbd8e46173b68bc366 100644 --- a/apps/sam/python/src/i2p/test/test_eep.py +++ b/apps/sam/python/src/i2p/test/test_eep.py @@ -7,7 +7,7 @@ import sys; sys.path += ['../../'] import traceback, sys -from i2p import eep, sam, samclasses +from i2p import eep def verify_html(s): """Raise an error if s does not end with </html>""" diff --git a/apps/sam/python/src/i2p/test/test_samclasses.py b/apps/sam/python/src/i2p/test/test_samclasses.py index 7609dd3810f5d8bec5d6750ac6b75858105788b2..fb4245aee9080bfe3fc3b194a59ca06bbd233ad3 100644 --- a/apps/sam/python/src/i2p/test/test_samclasses.py +++ b/apps/sam/python/src/i2p/test/test_samclasses.py @@ -7,7 +7,7 @@ import sys; sys.path += ['../../'] import traceback, time, thread, threading, random -from i2p import eep, sam, samclasses +from i2p import eep, socket, samclasses def test_passed(s, msg='OK'): """Notify user that the given unit test passed.""" @@ -17,18 +17,6 @@ def verify_html(s): """Raise an error if s does not end with </html>""" assert s.strip().lower()[-7:] == '</html>' -def resolve_test(name='duck.i2p'): - """Unit test for resolve.""" - try: - rname = sam.resolve(name) - except: - print 'Unit test failed for sam.resolve' - traceback.print_exc(); sys.exit() - - test_passed('sam.resolve', 'See below') - print ' Use hosts.txt to verify that ' + name + '=' + \ - rname[:15] + '...' - def raw_test1(): """Unit test for samclasses.RawSession.""" @@ -103,7 +91,7 @@ def stream_test1(): """Unit test for samclasses.StreamSession.connect.""" try: - dest = sam.resolve('duck.i2p') + dest = socket.resolve('duck.i2p') S = samclasses.StreamSession('Bob') verify_html(stream_http_get(S, dest)) verify_html(stream_http_get(S, dest)) @@ -225,7 +213,7 @@ def multithread_packet_test(raw=True): time.sleep(0.01*random.uniform(0.0,1.0)) # Read any available packets. try: (p, fromaddr) = C.recv(timeout=0.0) - except sam.BlockError: p = None + except socket.BlockError: p = None if p != None and not raw: assert fromaddr == D.dest __lock.acquire() @@ -233,7 +221,7 @@ def multithread_packet_test(raw=True): __lock.release() try: (p, fromaddr) = D.recv(timeout=0.0) - except sam.BlockError: p = None + except socket.BlockError: p = None if p != None and not raw: assert fromaddr == C.dest __lock.acquire() @@ -256,13 +244,13 @@ def multithread_packet_test(raw=True): while time.clock() < end_time: # Read any available packets. try: (p, fromaddr) = C.recv(timeout=0.0) - except sam.BlockError: p = None + except socket.BlockError: p = None if p != None and not raw: assert fromaddr == D.dest if p != None: C_got += [p] try: (p, fromaddr) = D.recv(timeout=0.0) - except sam.BlockError: p = None + except socket.BlockError: p = None if p != None and not raw: assert fromaddr == C.dest if p != None: D_got += [p] @@ -357,13 +345,13 @@ def multithread_stream_test(): __lock.acquire() try: p = Cin.recv(100000, timeout=0.0) - except sam.BlockError: p = None + except socket.BlockError: p = None if p != None: C_got += [p] __lock.release() __lock.acquire() try: p = Din.recv(100000, timeout=0.0) - except sam.BlockError: p = None + except socket.BlockError: p = None if p != None: D_got += [p] __lock.release() @@ -383,11 +371,11 @@ def multithread_stream_test(): while time.clock() < end_time: # Read any available string data, non-blocking. try: p = Cin.recv(100000, timeout=0.0) - except sam.BlockError: p = None + except socket.BlockError: p = None if p != None: C_got += [p] try: p = Din.recv(100000, timeout=0.0) - except sam.BlockError: p = None + except socket.BlockError: p = None if p != None: D_got += [p] if len(''.join(C_got)) == len(''.join(C_recv)) and \ @@ -428,7 +416,6 @@ def test(): print print 'Testing:' - resolve_test() raw_test1() datagram_test1() stream_test1() @@ -437,7 +424,7 @@ def test(): multithread_stream_test() # Note: The datagram unit test fails, but it's apparently I2P's - # fault (the code is the same as for raw packets, and the sam + # fault (the code is the same as for raw packets, and the SAM # bridge is sent all the relevant data). # Code: multithread_packet_test(raw=False) diff --git a/apps/sam/python/src/i2p/test/test_socket.py b/apps/sam/python/src/i2p/test/test_socket.py new file mode 100644 index 0000000000000000000000000000000000000000..987e5ec21452f935685b2dfe43b6fffec109b156 --- /dev/null +++ b/apps/sam/python/src/i2p/test/test_socket.py @@ -0,0 +1,427 @@ + +# -------------------------------------------------------- +# test_socket.py: Unit tests for socket, select. +# -------------------------------------------------------- + +# Make sure we can import i2p +import sys; sys.path += ['../../'] + +import traceback, time, thread, threading, random, copy +from i2p import socket, select + +def test_passed(s, msg='OK'): + """Notify user that the given unit test passed.""" + print ' ' + (s + ':').ljust(50) + msg + +def verify_html(s): + """Raise an error if s does not end with </html>""" + assert s.strip().lower()[-7:] == '</html>' + +def resolve_test(name='duck.i2p'): + """Unit test for resolve.""" + try: + rname = socket.resolve(name) + except: + print 'Unit test failed for socket.resolve' + traceback.print_exc(); sys.exit() + + test_passed('socket.resolve', 'See below') + print ' Use hosts.txt to verify that ' + name + '=' + \ + rname[:15] + '...' + +def stream_client(dest): + """Sub-unit test for socket.socket in SOCK_STREAM mode.""" + S = socket.socket('Alice', socket.SOCK_STREAM) + S.connect(dest) + S.send('GET / HTTP/1.0\r\n\r\n') # Send request + f = S.makefile() # File object + + while True: # Read header + line = f.readline().strip() # Read a line + if line == '': break # Content begins + + s = f.read() # Get content + f.close() + S.close() + +def stream_client_test(): + """Unit test for socket.socket in SOCK_STREAM mode.""" + url = 'duck.i2p' + stream_client('http://' + url + '/') + stream_client(url) + stream_client(url + '/') + stream_client('http://' + url) + stream_client(socket.resolve('http://' + url + '/')) + test_passed('socket.socket stream client') + +def packet_test(raw=True): + """Unit test for socket.socket in SOCK_DGRAM or SOCK_RAW modes.""" + + try: + multithread_wait_time = 500.0 + may_need_increase = False + + kwargs = {'in_depth': 0, 'out_depth': 0} + if raw: + C = socket.socket('Carola', socket.SOCK_RAW, **kwargs) + D = socket.socket('Davey', socket.SOCK_RAW, **kwargs) + else: + C = socket.socket('Carol', socket.SOCK_DGRAM, **kwargs) + D = socket.socket('Dave', socket.SOCK_DGRAM, **kwargs) + + global C_recv, D_recv, C_got, D_got, __lock + C_recv = [] # Packets C *should* receive + D_recv = [] # Packets D *should* receive + C_got = [] # Packets C actually got + D_got = [] # Packets D actually got + + n = 50 # Create n threads + m = 40 # Each thread sends m packets + + global __done_count + __done_count = 0 + __lock = threading.Lock() + + # Use C and D to send and read in many different threads. + def f(): + # This code is run in each separate thread + global C_recv, D_recv, C_got, D_got, __lock, __done_count + for i in range(m): + # Random binary string of length 2-80. + index_list = range(random.randrange(2, 80)) + s = ''.join([chr(random.randrange(256)) for j in index_list]) + if random.randrange(2) == 0: + # Send packet from C to D, and log it. + C.sendto(s, 0, D.dest) + __lock.acquire() + D_recv += [s] + __lock.release() + else: + # Send packet from D to C, and log it. + D.sendto(s, 0, C.dest) + __lock.acquire() + C_recv += [s] + __lock.release() + time.sleep(0.01*random.uniform(0.0,1.0)) + # Read any available packets. + try: (p, fromaddr) = C.recvfrom(1000, socket.MSG_DONTWAIT) + except socket.BlockError: p = None + if p != None and not raw: assert fromaddr == D.dest + + __lock.acquire() + if p != None: C_got += [p] + __lock.release() + + try: (p, fromaddr) = D.recvfrom(1000, socket.MSG_DONTWAIT) + except socket.BlockError: p = None + if p != None and not raw: assert fromaddr == C.dest + + __lock.acquire() + if p != None: D_got += [p] + __lock.release() + + __lock.acquire() + __done_count += 1 + __lock.release() + + # Create n threads. + for i in range(n): + threading.Thread(target=f).start() + + # Wait for them to finish. + while __done_count < n: time.sleep(0.01) + + # Read any left-over received packets. + end_time = time.clock() + multithread_wait_time + while time.clock() < end_time: + # Read any available packets. + try: (p, fromaddr) = C.recvfrom(1000, socket.MSG_DONTWAIT) + except socket.BlockError: p = None + if p != None and not raw: assert fromaddr == D.dest + + if p != None: C_got += [p] + + try: (p, fromaddr) = D.recvfrom(1000, socket.MSG_DONTWAIT) + except socket.BlockError: p = None + if p != None and not raw: assert fromaddr == C.dest + + if p != None: D_got += [p] + if len(C_got) == len(C_recv) and len(D_got) == len(D_recv): + break + + if time.clock() >= end_time: + may_need_increase = True + + C_got.sort() + D_got.sort() + C_recv.sort() + D_recv.sort() + + assert C_got == C_recv + assert D_got == D_recv + + C.close() + D.close() + except: + if raw: + print 'Unit test failed for socket.socket (SOCK_RAW).' + print 'Raw packets are not reliable.' + else: + print 'Unit test failed for socket.socket (SOCK_DGRAM).' + print 'Datagram packets are not reliable.' + + if may_need_increase: + print 'Try increasing multithread_wait_time.' + + traceback.print_exc(); sys.exit() + + if raw: + test_passed('socket.socket (SOCK_RAW)') + else: + test_passed('socket.socket (SOCK_DGRAM)') + +def stream_test(): + """Multithreaded unit test for socket.socket (SOCK_STREAM).""" + + try: + multithread_wait_time = 200.0 + may_need_increase = False + + kwargs = {'in_depth':0, 'out_depth':0} + C = socket.socket('Carolic', socket.SOCK_STREAM, **kwargs) + D = socket.socket('David', socket.SOCK_STREAM, **kwargs) + Cout = socket.socket('Carolic', socket.SOCK_STREAM, **kwargs) + Dout = socket.socket('David', socket.SOCK_STREAM, **kwargs) + + assert C.dest == Cout.dest + assert D.dest == Dout.dest + + C.listen(5) + D.listen(5) + Cout.connect(D.dest) + Dout.connect(C.dest) + (Cin, ignoredest) = C.accept() + (Din, ignoredest) = D.accept() + + global C_recv, D_recv, C_got, D_got, __lock + C_recv = [] # String data C *should* receive + D_recv = [] # String data D *should* receive + C_got = [] # String data C actually got + D_got = [] # String data D actually got + + n = 50 # Create n threads + m = 40 # Each thread sends m strings + + global __done_count + __done_count = 0 + __lock = threading.Lock() + + # Use C and D to send and read in many different threads. + def f(): + # This code is run in each separate thread + global C_recv, D_recv, C_got, D_got, __lock, __done_count + for i in range(m): + # Random binary string of length 2-80. + index_list = range(random.randrange(2, 80)) + s = ''.join([chr(random.randrange(256)) for j in index_list]) + if random.randrange(2) == 0: + # Send packet from C to D, and log it. + __lock.acquire() + Cout.send(s) + D_recv += [s] + __lock.release() + else: + # Send packet from D to C, and log it. + __lock.acquire() + Dout.send(s) + C_recv += [s] + __lock.release() + time.sleep(0.01*random.uniform(0.0,1.0)) + # Read any available string data, non-blocking. + + __lock.acquire() + try: p = Cin.recv(100000, socket.MSG_DONTWAIT) + except socket.BlockError: p = None + if p != None: C_got += [p] + __lock.release() + + __lock.acquire() + try: p = Din.recv(100000, socket.MSG_DONTWAIT) + except socket.BlockError: p = None + if p != None: D_got += [p] + __lock.release() + + __lock.acquire() + __done_count += 1 + __lock.release() + + # Create n threads. + for i in range(n): + threading.Thread(target=f).start() + + # Wait for them to finish. + while __done_count < n: time.sleep(0.01) + + # Read any left-over received string data. + end_time = time.clock() + multithread_wait_time + while time.clock() < end_time: + # Read any available string data, non-blocking. + try: p = Cin.recv(100000, socket.MSG_DONTWAIT) + except socket.BlockError: p = None + if p != None: C_got += [p] + + try: p = Din.recv(100000, socket.MSG_DONTWAIT) + except socket.BlockError: p = None + if p != None: D_got += [p] + + if len(''.join(C_got)) == len(''.join(C_recv)) and \ + len(''.join(D_got)) == len(''.join(D_recv)): + break + + if time.clock() >= end_time: + may_need_increase = True + + C_got = ''.join(C_got) + D_got = ''.join(D_got) + C_recv = ''.join(C_recv) + D_recv = ''.join(D_recv) + assert C_got == C_recv + assert D_got == D_recv + + Cin.close() + Din.close() + Cout.close() + Dout.close() + C.close() + D.close() + except: + print 'Unit test failed for socket.socket ' + \ + '(SOCK_STREAM, multithreaded).' + + if may_need_increase: + print 'Try increasing multithread_wait_time.' + + traceback.print_exc(); sys.exit() + + test_passed('socket.socket (SOCK_STREAM, multithreaded)') + + +def noblock_stream_test(): + """Unit test for non-blocking stream commands and listen.""" + + kwargs = {'in_depth': 0, 'out_depth': 0} + serv = socket.socket('Allison',socket.SOCK_STREAM,**kwargs) + serv.setblocking(False) + serv.listen(100) + assert serv.gettimeout() == 0.0 + + msg_to_client = 'Hi, client!!!!' + msg_to_server = 'Hi, server!' + + nconnects = 5 + + global server_done, client_count, client_lock + server_done = False + client_count = 0 + client_lock = threading.Lock() + + def serv_func(n = nconnects): + while True: + try: + (C, ignoredest) = serv.accept() + C.send(msg_to_client) + rmsg = C.recv(len(msg_to_server), socket.MSG_WAITALL) + if rmsg != msg_to_server: + raise ValueError('message should have been: ' + + repr(msg_to_server) + ' was: ' + repr(rmsg)) + C.close() + n -= 1 + if n == 0: break + except socket.BlockError: + pass + time.sleep(0.01) + global server_done + server_done = True + + def client_func(): + # FIXME: i2p.socket.NetworkError('TIMEOUT', '') errors are produced + # for our streams if we use '' for all clients. Why? + C = socket.socket('Bobb', socket.SOCK_STREAM, **kwargs) + C.setblocking(False) + try: + C.connect(serv.dest) + except socket.BlockError: + # One could also use timeout=0.1 and loop + (Rlist, Wlist, Elist) = select.select([C], [C], [C]) + if len(Elist) > 0: + assert Elist[0] == C + raise Elist[0].sessobj.err + C.send(msg_to_server) + C.setblocking(True) + rmsg = C.recv(len(msg_to_client), socket.MSG_WAITALL) + if rmsg != msg_to_client: + raise ValueError('message should have been: ' + + repr(msg_to_client) + ' was: ' + repr(rmsg)) + C.close() + global client_count, client_lock + + # Synchronized + client_lock.acquire() + try: client_count += 1 + finally: client_lock.release() + + + thread.start_new_thread(serv_func, ()) + + for i in range(nconnects): + thread.start_new_thread(client_func, ()) + + while True: + if server_done and client_count == nconnects: break + time.sleep(0.01) + + test_passed('socket.listen (SOCK_STREAM), and non-blocking IO') + +def multi_stream_test(n): + """See if we can have n streams open at once.""" + server = None + client = [None] * n + + kwargs = {'in_depth': 0, 'out_depth': 0} + server = socket.socket('Aligi',socket.SOCK_STREAM,**kwargs) + server.listen(n) + + for i in range(n): + client[i] = socket.socket('Bobo', socket.SOCK_STREAM, \ + in_depth=0, out_depth=0) + + for i in range(n): + client[i].connect(server.dest) + client[i].send('Hi') + + for i in range(n): + client[i].close() + server.close() + + test_passed(str(n) + ' streams open at once') + + +# Todo: +# select, poll +# More nonblocking unit tests + + +def test(): + print 'Testing:' + print "Comment and uncomment tests manually, if they don't finish." + + resolve_test() + noblock_stream_test() + stream_client_test() + packet_test(raw=True) + packet_test(raw=False) + stream_test() + multi_stream_test(200) + +if __name__ == '__main__': + test() diff --git a/apps/sam/python/src/i2p/test/test_tunnel.py b/apps/sam/python/src/i2p/test/test_tunnel.py new file mode 100644 index 0000000000000000000000000000000000000000..e218ecb2fda478083b20a2bb8dc8481ef644178e --- /dev/null +++ b/apps/sam/python/src/i2p/test/test_tunnel.py @@ -0,0 +1,42 @@ + +# -------------------------------------------------------- +# test_tunnel.py: Demos for tunnel (unit tests needed). +# -------------------------------------------------------- + +# Make sure we can import i2p +import sys; sys.path += ['../../'] + +import time +from i2p import tunnel + +def tunnel_server_demo(): + """Demo for tunnel.TunnelServer.""" + + T = tunnel.TunnelServer('Alisick', 8080, in_depth=0, out_depth=0) + + print 'Server ready at:' + print T.dest + while True: + time.sleep(0.01) + +def tunnel_client_demo(): + """Demo for tunnel.TunnelClient.""" + + T = tunnel.TunnelClient('Alliaha', 8001, 'duck.i2p', \ + in_depth=0, out_depth=0) + + print 'Serving up duck.i2p at http://127.0.0.1:8001/' + while True: + time.sleep(0.01) + + + +def test(): + print 'Demo:' + +# Demos: +# tunnel_server_demo() + tunnel_client_demo() + +if __name__ == '__main__': + test() diff --git a/apps/sam/python/src/i2p/tunnel.py b/apps/sam/python/src/i2p/tunnel.py new file mode 100644 index 0000000000000000000000000000000000000000..438c4de8369fccec66f953828f67d5e812021917 --- /dev/null +++ b/apps/sam/python/src/i2p/tunnel.py @@ -0,0 +1,228 @@ + +# ------------------------------------------------------------- +# tunnel.py: Python SAM Tunnel classes +# ------------------------------------------------------------- + +"""Exchange data between I2P and regular TCP sockets.""" + +import time, threading, sys + +import i2p +import i2p.socket +import i2p.select +from i2p.pylib import socket as pysocket # Import Python socket + +def _exchange_data(A, B): + """Exchanges data A <-> B between open stream sockets A and B.""" + # FIXME: There's recv errors that we should be shutting + # down sockets for, but this seems to work OK. + err = None + try: + # Send data from A -> B while available. + while True: + # A -> B. + A.setblocking(False) + try: s = A.recv(1024) + except Exception, e: s = None + if s == '': raise i2p.socket.ClosedError + if s == None: + # No data available. Stop sending A -> B. + break + else: + B.setblocking(True) + B.sendall(s) + except Exception, e: + err = e + + try: + # Send data from B -> A while available. + while True: + # B -> A. + B.setblocking(False) + try: s = B.recv(1024) + except Exception, e: s = None + if s == '': raise i2p.socket.ClosedError + if s == None: + # No data available. Stop sending B -> A. + break + else: + A.setblocking(True) + A.sendall(s) + except Exception, e: + err = e + + # Re-raise error after finishing communications both ways. + if err != None: raise err + +def _test_connected(B): + """Raises an error if socket B is not yet connected.""" + [Rlist, Wlist, Elist] = i2p.select.select([B], [B], [B], 0.0) + if len(Wlist) == 0: + raise ValueError('socket not yet connected') + +class Tunnel: + def __init__(self, receive, make_send, nconnect=-1, timeout=60.0): + """A Tunnel relays connections from a 'receive' socket to one + or more 'send' sockets. The receive socket must be bound + and listening. For each incoming connection, a new send + socket is created by calling make_send(). Data is then + exchanged between the created streams until one socket is + closed. nconnect is the maximum number of simultaneous + connections (-1 for infinite), and timeout is the time that + a single connection can last for (None allows a connection + to last forever). + + Sockets must accept stream traffic and support the Python + socket interface. A separate daemonic thread is created to + manage the tunnel. For high performance, make_send() should + make a socket and connect in non-blocking mode (you should + catch and discard the i2p.socket.BlockError or socket.error + due to executing connect on a non-blocking socket). + + Security Note: + A firewall is needed to maintain the end user's anonymity. + An attacker could keep a tunnel socket open by pinging it + regularly. The accepted sockets from 'receive' must prevent + this by closing down eventually. + + Socket errors do not cause the Tunnel to shut down. + """ + + self.receive = receive + self.make_send = make_send + self.receive.setblocking(False) + self.alive = True + self.nconnect = nconnect + self.timeout = timeout + T = threading.Thread(target=self._run, args=()) + T.setDaemon(True) + T.start() + + def _run(self): + """Manage the tunnel in a separate thread.""" + tunnels = [] + + while True: + # Look for a new connection + if self.nconnect < 0 or len(tunnels) < self.nconnect: + (A, B) = (None, None) + try: + (A, ignoredest) = self.receive.accept() + A.setblocking(False) + B = self.make_send() + B.setblocking(False) + if self.timeout != None: t = time.time() + self.timeout + else: t = None + tunnels.append((A, B, t)) + except Exception, e: + try: + if A != None: + A.setblocking(False); A.close() + except Exception, e: pass + try: + if B != None: + B.setblocking(False); B.close() + except Exception, e: pass + + # Send data between existing connections + new_tunnels = [] + for (A, B, t) in tunnels: + # For each connection pair, send data. + try: + if t != None: assert time.time() <= t + # Test whether B is successfully connected + _test_connected(B) + + # Send A <-> B. + _exchange_data(A, B) + + if self.timeout != None: t = time.time() + self.timeout + else: t = None + new_tunnels.append((A, B, t)) + except Exception, e: + # Catch errors. Kill the connection if it's been at + # least timeout seconds since last non-erroneous call + # to _exchange_data, or if stream was closed. This + # allows stream-not-finished-connecting errors to be + # dropped within the timeout. + time_ok = True + if self.timeout != None: + if time.time() > t: time_ok = False + if time_ok and not isinstance(e, i2p.socket.ClosedError): + # Don't kill connection yet + new_tunnels.append((A, B, t)) + else: + # We've only gotten errors for 'timeout' s. + # Drop the connection. + try: A.setblocking(False); A.close() + except Exception, e: pass + try: B.setblocking(False); B.close() + except Exception, e: pass + tunnels = new_tunnels + time.sleep(0.01) + + # Shut down all connections if self.close() was called. + if not self.alive: + for (A, B, t) in tunnels: + try: A.setblocking(False); A.close() + except: pass + try: B.setblocking(False); B.close() + except: pass + break + + def close(self): + """Close all connections made for this tunnel.""" + self.alive = False + +class TunnelServer(Tunnel): + dest = property(doc='I2P Destination of server.') + session = property(doc='Session name for server.') + def __init__(self, session, port, samaddr=i2p.socket.samaddr, + nconnect=-1, timeout=None, **kwargs): + """Tunnels incoming SAM streams --> localhost:port. + + nconnect and timeout are the maximum number of connections + and maximum time per connection. All other arguments are + passed to i2p.socket.socket(). This call blocks until the + tunnel is ready.""" + S = i2p.socket.socket(session, i2p.socket.SOCK_STREAM, samaddr, + **kwargs) + S.listen(64) + self.session = session + self.dest = S.dest + def make_send(): + C = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_STREAM) + C.setblocking(False) + try: C.connect(('127.0.0.1', port)) + except: pass # Ignore 'would have blocked' error + return C + Tunnel.__init__(self, S, make_send, nconnect, timeout) + +class TunnelClient(Tunnel): + remotedest = property(doc='Remote Destination.') + dest = property('Local Destination used for routing.') + session = property('Session name for local Destination.') + def __init__(self, session, port, dest, samaddr=i2p.socket.samaddr, + nconnect=-1, timeout=None, **kwargs): + """Tunnels localhost:port --> I2P Destination dest. + + A session named 'session' is created locally, for purposes + of routing to 'dest'. nconnect and timeout are the maximum + number of connections and maximum time per connection. All + other arguments are passed to i2p.socket.socket(). This call + blocks until the tunnel is ready.""" + S = pysocket.socket(pysocket.AF_INET, pysocket.SOCK_STREAM) + S.bind(('', port)) + S.listen(4) + obj = i2p.socket.socket(session, i2p.socket.SOCK_STREAM, samaddr, + **kwargs) + self.session = session + self.dest = obj.dest + def make_send(): + C = i2p.socket.socket(session, i2p.socket.SOCK_STREAM, samaddr, + **kwargs) + C.setblocking(False) + try: C.connect(dest) + except: pass # Ignore 'would have blocked' error + return C + Tunnel.__init__(self, S, make_send, nconnect, timeout)