forked from I2P_Developers/i2p.i2p
according to whether we're running on *nix or windoze. build.xml should now work on your platform, as long as you have jython installed and jython is on your execution path. Got SAM STREAMs working - test code added to i2psamclient.py as function demoSTREAM()
1335 lines
37 KiB
Python
1335 lines
37 KiB
Python
#!/usr/bin/env python
|
|
#@+leo-ver=4
|
|
#@+node:@file python/src/i2psamclient.py
|
|
#@@first
|
|
"""
|
|
Implements a client API for I2CP messaging via SAM
|
|
|
|
Very simple I2P messaging interface, which should prove easy
|
|
to reimplement in your language of choice
|
|
|
|
This module can be used from cpython or jython
|
|
|
|
Run this module without arguments to see a demo in action
|
|
(requires SAM server to be already running)
|
|
"""
|
|
#@+others
|
|
#@+node:imports
|
|
import sys, os, socket, thread, threading, Queue, traceback, StringIO, time
|
|
|
|
from pdb import set_trace
|
|
|
|
#@-node:imports
|
|
#@+node:globals
|
|
# -----------------------------------------
|
|
# server access settings
|
|
|
|
i2psamhost = '127.0.0.1'
|
|
i2psamport = 7656
|
|
|
|
# ------------------------------------------
|
|
# logging settings
|
|
|
|
# 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful
|
|
verbosity = 5
|
|
|
|
# change to a filename to log there instead
|
|
logfile = sys.stdout
|
|
|
|
# when set to 1, and when logfile != sys.stdout, log msgs are written
|
|
# both to logfile and console stdout
|
|
log2console = 1
|
|
|
|
# don't touch this!
|
|
loglock = threading.Lock()
|
|
|
|
#@-node:globals
|
|
#@+node:exceptions
|
|
class I2PServerFail(Exception):
|
|
"""
|
|
A failure in connecting to the I2CP server
|
|
"""
|
|
|
|
class I2PCommandFail(Exception):
|
|
"""
|
|
A failure in an I2CP command
|
|
"""
|
|
pass
|
|
|
|
class I2PStreamClosed(Exception):
|
|
"""
|
|
Stream is not open
|
|
"""
|
|
#@-node:exceptions
|
|
#@+node:class I2PSamClient
|
|
class I2PSamClient:
|
|
"""
|
|
Implements a reference client for accessing I2CP via i2psam
|
|
|
|
Connects to i2psam's I2PSamServer, sends commands
|
|
and receives results
|
|
|
|
The primitives should be reasonably self-explanatory
|
|
|
|
Usage summary:
|
|
1. create one or more I2PSamClient instances per process (1 should be fine)
|
|
2. invoke the L{genkeys} method to create destination keypairs
|
|
3. create sessions objects via the L{createSession} method
|
|
4. use these session objects to send and receive data
|
|
5. destroy the session objects when you're done
|
|
|
|
Refer to the function L{demo} for a simple example
|
|
"""
|
|
#@ @+others
|
|
#@+node:attributes
|
|
# server host/port settings exist here in case you might
|
|
# have a reason for overriding in a subclass
|
|
|
|
host = i2psamhost
|
|
port = i2psamport
|
|
|
|
i2cpHost = None
|
|
i2cpPort = None
|
|
|
|
#@-node:attributes
|
|
#@+node:__init__
|
|
def __init__(self, **kw):
|
|
"""
|
|
Creates a client connection to i2psam listener
|
|
|
|
Keywords:
|
|
- host - host to connect to (default 127.0.0.1)
|
|
- port - port to connect to (default 7656)
|
|
"""
|
|
# get optional host/port
|
|
log(4, "entered")
|
|
|
|
self.host = kw.get('host', self.host)
|
|
self.port = int(kw.get('port', self.port))
|
|
|
|
self.cmdLock = threading.Lock()
|
|
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
self.lockHello = threading.Lock()
|
|
self.sendLock = threading.Lock()
|
|
self.qNewDests = Queue.Queue()
|
|
self.qSession = Queue.Queue()
|
|
self.qDatagrams = Queue.Queue()
|
|
self.qRawMessages = Queue.Queue()
|
|
|
|
self.namingReplies = {}
|
|
self.namingCache = {}
|
|
|
|
self.streams = {} # currently open streams, keyed by id
|
|
self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id
|
|
self.qNewStreams = Queue.Queue() # incoming connections
|
|
|
|
self.samNextIdLock = threading.Lock()
|
|
self.samNextId = 1
|
|
|
|
self.isRunning = 1
|
|
|
|
|
|
log(4, "trying connection to SAM server...")
|
|
try:
|
|
self.sock.connect((self.host, self.port))
|
|
except:
|
|
raise I2PServerFail(
|
|
"Connection to i2psam server failed\n"
|
|
"(are you sure your I2P router is running, and\n"
|
|
"listening for I2CP connections on %s:%s?)" % (self.host, self.port)
|
|
)
|
|
|
|
# fire up receiver thread
|
|
thread.start_new_thread(self.threadRx, ())
|
|
|
|
# ping the server
|
|
try:
|
|
log(4, "trying to ping SAM server...")
|
|
self.samHello()
|
|
except:
|
|
logException(4, "Exception on handshaking")
|
|
raise I2PServerFail("Failed to handshake with i2psam server")
|
|
|
|
# connected fine
|
|
log(2, "I2CP Client successfully connected")
|
|
#@-node:__init__
|
|
#@+node:createSession
|
|
def createSession(self, privdest):
|
|
"""
|
|
DEPRECATED - use sam* methods instead!
|
|
|
|
Creates a session using private destkey
|
|
"""
|
|
#3. createsession:
|
|
# - client->server:
|
|
# - createsession <base64private>\n
|
|
# - server->client:
|
|
# - ok\n OR
|
|
# - error[ <reason>]\n
|
|
|
|
self.cmdLock.acquire()
|
|
try:
|
|
self._sendline("createsession %s" % privdest)
|
|
respitems = self._recvline().split(" ", 1)
|
|
if respitems[0] == 'ok':
|
|
res = None
|
|
else:
|
|
res = respitems[1]
|
|
except:
|
|
logException(2, "createsession fail")
|
|
self.cmdLock.release()
|
|
raise
|
|
|
|
self.cmdLock.release()
|
|
|
|
if res:
|
|
raise I2PCommandFail("createsession fail: "+res)
|
|
|
|
return I2PRemoteSession(self, privdest)
|
|
|
|
#@-node:createSession
|
|
#@+node:destroySession
|
|
def destroySession(self, privdest):
|
|
"""
|
|
DEPRECATED - use sam* methods instead!
|
|
|
|
Destrlys a session using private destkey
|
|
"""
|
|
#4. destroysession:
|
|
# - client->server:
|
|
# - destroysession <base64private>\n
|
|
# - server->client:
|
|
# - ok\n OR
|
|
# - error[ <reason>]\n
|
|
|
|
self.cmdLock.acquire()
|
|
try:
|
|
self._sendline("destroysession %s" % privdest)
|
|
respitems = self._recvline().split(" ", 1)
|
|
if respitems[0] == 'ok':
|
|
res = None
|
|
else:
|
|
res = respitems[1]
|
|
except:
|
|
logException(2, "destroysession fail")
|
|
self.cmdLock.release()
|
|
raise
|
|
|
|
self.cmdLock.release()
|
|
|
|
if res:
|
|
raise I2PCommandFail("destroysession fail: " + res)
|
|
|
|
return res
|
|
|
|
#@-node:destroySession
|
|
#@+node:send
|
|
def send(self, privdest, peerdest, msg):
|
|
"""
|
|
DEPRECATED - use sam* methods instead!
|
|
|
|
Sends a block of data from local dest to remote dest
|
|
"""
|
|
#5. send:
|
|
# - client->server:
|
|
# - send <size> <localbase64private> <remotebase64dest>\ndata
|
|
# - server->client:
|
|
# - ok\n OR
|
|
# - error[ <reason>]\n
|
|
|
|
self.cmdLock.acquire()
|
|
try:
|
|
self._sendline("send %s %s %s" % (len(msg), privdest, peerdest))
|
|
self._sendbytes(msg)
|
|
line = self._recvline()
|
|
#print "** %s" % line
|
|
respitems = line.split(" ", 1)
|
|
if respitems[0] == 'ok':
|
|
res = None
|
|
else:
|
|
res = " ".join(respitems[1:])
|
|
except:
|
|
logException(2, "send fail")
|
|
self.cmdLock.release()
|
|
raise
|
|
|
|
self.cmdLock.release()
|
|
|
|
if res:
|
|
raise I2PCommandFail("send fail: " + res)
|
|
|
|
return res
|
|
|
|
#@-node:send
|
|
#@+node:receive
|
|
def receive(self, privdest):
|
|
"""
|
|
DEPRECATED - use sam* methods instead!
|
|
|
|
receives a block of data, returning string, or None if no data available
|
|
"""
|
|
#6. receive:
|
|
# - client->server:
|
|
# - receive <localbase64private>\n
|
|
# - server->client:
|
|
# - ok <size>\ndata OR
|
|
# - error[ <reason>]\n
|
|
|
|
self.cmdLock.acquire()
|
|
try:
|
|
self._sendline("receive %s" % privdest)
|
|
respitems = self._recvline().split(" ", 1)
|
|
if respitems[0] == 'ok':
|
|
res = None
|
|
size = int(respitems[1])
|
|
msg = self._recvbytes(size)
|
|
res = None
|
|
else:
|
|
res = respitems[1]
|
|
except:
|
|
logException(2, "receive fail")
|
|
self.cmdLock.release()
|
|
raise
|
|
|
|
self.cmdLock.release()
|
|
|
|
if res:
|
|
raise I2PCommandFail("destroysession fail: " + res)
|
|
|
|
return msg
|
|
#@-node:receive
|
|
#@+node:samHello
|
|
def samHello(self):
|
|
"""
|
|
Sends a quick HELLO PING to SAM server and awaits response
|
|
Arguments:
|
|
- none
|
|
|
|
Keywords:
|
|
- none
|
|
|
|
Returns:
|
|
- nothing (None) if ping sent and pong received, or raises an exception if
|
|
failed
|
|
"""
|
|
self.lockHello.acquire()
|
|
self.samSend("HELLO", "PING")
|
|
self.lockHello.acquire()
|
|
self.lockHello.release()
|
|
#@-node:samHello
|
|
#@+node:samSessionCreate
|
|
def samSessionCreate(self, style, dest, **kw):
|
|
"""
|
|
Creates a SAM session
|
|
|
|
Arguments:
|
|
- style - one of 'STREAM', 'DATAGRAM' or 'RAW'
|
|
- dest - base64 private destination
|
|
|
|
Keywords:
|
|
- direction - only used for STREAM sessions, can be RECEIVE,
|
|
CREATE or BOTH (default BOTH)
|
|
- i2cphost - hostname for the SAM bridge to contact i2p router on
|
|
- i2cpport - port for the SAM bridge to contact i2p router on
|
|
|
|
Returns:
|
|
- 'OK' if session was created successfully, or a tuple
|
|
(keyword, message) if not
|
|
"""
|
|
kw1 = dict(kw)
|
|
kw1['STYLE'] = self.samStyle = style
|
|
kw1['DESTINATION'] = dest
|
|
if style == 'STREAM':
|
|
direction = kw.get('direction', 'BOTH')
|
|
kw1['DIRECTION'] = direction
|
|
if direction == 'BOTH':
|
|
self.canAccept = 1
|
|
self.canConnect = 1
|
|
elif direction == 'RECEIVE':
|
|
self.canAccept = 1
|
|
self.canConnect = 0
|
|
elif direction == 'CREATE':
|
|
self.canAccept = 0
|
|
self.canConnect = 1
|
|
else:
|
|
raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH")
|
|
|
|
# stick in i2cp host/port if specified
|
|
if kw.has_key('i2cphost'):
|
|
kw1['I2CP.HOST'] = kw['i2cphost']
|
|
if kw.has_key('i2cpport'):
|
|
kw1['I2CP.PORT'] = kw['i2cpport']
|
|
|
|
self.samSend("SESSION", "CREATE",
|
|
**kw1)
|
|
subtopic, args = self.qSession.get()
|
|
|
|
if args['RESULT'] == 'OK':
|
|
return 'OK'
|
|
else:
|
|
return (args['RESULT'], args['MESSAGE'])
|
|
#@-node:samSessionCreate
|
|
#@+node:samDestGenerate
|
|
def samDestGenerate(self):
|
|
"""
|
|
Creates a whole new dest and returns an tuple pub, priv as
|
|
base64 public and private destination keys
|
|
"""
|
|
self.samSend("DEST", "GENERATE")
|
|
pub, priv = self.qNewDests.get()
|
|
return pub, priv
|
|
#@-node:samDestGenerate
|
|
#@+node:samRawSend
|
|
def samRawSend(self, peerdest, msg):
|
|
"""
|
|
Sends a raw anon message to another peer
|
|
|
|
peerdest is the public base64 destination key of the peer
|
|
"""
|
|
self.samSend("RAW", "SEND", msg,
|
|
DESTINATION=peerdest,
|
|
)
|
|
#@-node:samRawSend
|
|
#@+node:samRawCheck
|
|
def samRawCheck(self):
|
|
"""
|
|
Returns 1 if there are received raw messages available, 0 if not
|
|
"""
|
|
return not self.qRawMessages.empty()
|
|
#@-node:samRawCheck
|
|
#@+node:samRawReceive
|
|
def samRawReceive(self, blocking=1):
|
|
"""
|
|
Returns the next raw message available,
|
|
blocking if none is available and the blocking arg is set to 0
|
|
|
|
If blocking is 0, and no messages are available, returns None.
|
|
|
|
Remember that you can check for availability with
|
|
the .samRawCheck() method
|
|
"""
|
|
if not blocking:
|
|
if self.qRawMessages.empty():
|
|
return None
|
|
return self.qRawMessages.get()
|
|
|
|
#@nonl
|
|
#@-node:samRawReceive
|
|
#@+node:samDatagramSend
|
|
def samDatagramSend(self, peerdest, msg):
|
|
"""
|
|
Sends a repliable datagram message to another peer
|
|
|
|
peerdest is the public base64 destination key of the peer
|
|
"""
|
|
self.samSend("DATAGRAM", "SEND", msg,
|
|
DESTINATION=peerdest,
|
|
)
|
|
#@-node:samDatagramSend
|
|
#@+node:samDatagramCheck
|
|
def samDatagramCheck(self):
|
|
"""
|
|
Returns 1 if there are datagram messages received messages available, 0 if not
|
|
"""
|
|
return not self.qDatagrams.empty()
|
|
#@-node:samDatagramCheck
|
|
#@+node:samDatagramReceive
|
|
def samDatagramReceive(self, blocking=1):
|
|
"""
|
|
Returns the next datagram message available,
|
|
blocking if none is available.
|
|
|
|
If blocking is set to 0, and no messages are available,
|
|
returns None.
|
|
|
|
Remember that you can check for availability with
|
|
the .samRawCheck() method
|
|
|
|
Returns 2-tuple: dest, msg
|
|
where dest is the base64 destination of the peer from
|
|
whom the message was received
|
|
"""
|
|
if not blocking:
|
|
if self.qDatagrams.empty():
|
|
return None
|
|
return self.qDatagrams.get()
|
|
#@-node:samDatagramReceive
|
|
#@+node:samStreamConnect
|
|
def samStreamConnect(self, dest):
|
|
"""
|
|
Makes a STREAM connection to a remote dest
|
|
|
|
STREAM STATUS
|
|
RESULT=$result
|
|
ID=$id
|
|
[MESSAGE=...]
|
|
"""
|
|
# need an ID
|
|
id = self.samAllocId()
|
|
|
|
# create queue for connect reply
|
|
q = self.streamConnectReplies[id] = Queue.Queue()
|
|
|
|
# send req
|
|
self.samSend("STREAM", "CONNECT",
|
|
ID=id,
|
|
DESTINATION=dest,
|
|
)
|
|
|
|
# await reply - comes back as a dict
|
|
resp = q.get()
|
|
|
|
# ditch queue
|
|
del self.streamConnectReplies[id]
|
|
del q
|
|
|
|
# check out response
|
|
result = resp['RESULT']
|
|
if result == 'OK':
|
|
conn = I2PSAMStream(self, id, dest)
|
|
self.streams[id] = conn
|
|
return conn
|
|
else:
|
|
msg = resp.get('MESSAGE', '')
|
|
raise I2PCommandFail(result, msg, "STREAM CONNECT")
|
|
|
|
#@-node:samStreamConnect
|
|
#@+node:samStreamAccept
|
|
def samStreamAccept(self):
|
|
"""
|
|
Waits for an incoming connection, returning a wrapped conn obj
|
|
"""
|
|
log(4, "waiting for connection")
|
|
conn = self.qNewStreams.get()
|
|
log(4, "got connection")
|
|
return conn
|
|
#@-node:samStreamAccept
|
|
#@+node:samStreamSend
|
|
def samStreamSend(self, conn, data):
|
|
"""
|
|
DO NOT CALL THIS DIRECTLY
|
|
|
|
Invoked by an I2PSAMStream object to transfer data
|
|
Use the object's .send() method instead.
|
|
|
|
conn is the I2PSAMStream
|
|
|
|
STREAM SEND
|
|
ID=$id
|
|
SIZE=$numBytes\n[$numBytes of data]
|
|
"""
|
|
# dispatch
|
|
self.samSend("STREAM", "SEND", data, ID=conn.id)
|
|
|
|
# useless, but mimics socket paradigm
|
|
return len(data)
|
|
|
|
#@-node:samStreamSend
|
|
#@+node:samStreamClose
|
|
def samStreamClose(self, conn):
|
|
"""
|
|
DO NOT CALL DIRECTLY
|
|
|
|
Invoked by I2PSAMStream to close stream
|
|
Use the object's .send() method instead.
|
|
|
|
STREAM CLOSE
|
|
ID=$id
|
|
"""
|
|
self.samSend("STREAM", "CLOSE", ID=conn.id)
|
|
del self.streams[conn.id]
|
|
|
|
#@-node:samStreamClose
|
|
#@+node:samNamingLookup
|
|
def samNamingLookup(self, host):
|
|
"""
|
|
Looks up a host in hosts.txt
|
|
"""
|
|
# try the cache first
|
|
if self.namingCache.has_key(host):
|
|
log(4, "found host %s in cache" % host)
|
|
return self.namingCache[host]
|
|
|
|
# make a queue for reply
|
|
q = self.namingReplies[host] = Queue.Queue()
|
|
|
|
# send off req
|
|
self.samSend("NAMING", "LOOKUP",
|
|
NAME=host,
|
|
)
|
|
|
|
# get resp
|
|
resp = q.get()
|
|
|
|
result = resp.get('RESULT', 'none')
|
|
if result == 'OK':
|
|
log(4, "adding host %s to cache" % host)
|
|
val = resp['VALUE']
|
|
self.namingCache[host] = val
|
|
return val
|
|
else:
|
|
raise I2PCommandFail("Error looking up '%s': %s %s" % (
|
|
host, result, resp.get('MESSAGE', '')))
|
|
|
|
#@-node:samNamingLookup
|
|
#@+node:samParse
|
|
def samParse(self, flds):
|
|
"""
|
|
carves up a SAM command, returns it as a 3-tuple:
|
|
- cmd - command string
|
|
- subcmd - subcommand string
|
|
- dargs - dict of args
|
|
"""
|
|
cmd = flds[0]
|
|
subcmd = flds[1]
|
|
args = flds[2:]
|
|
|
|
dargs = {}
|
|
for arg in args:
|
|
try:
|
|
name, val = arg.split("=", 1)
|
|
except:
|
|
logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
|
|
raise
|
|
dargs[name] = val
|
|
|
|
# read and add data if any
|
|
if dargs.has_key('SIZE'):
|
|
size = dargs['SIZE'] = int(dargs['SIZE'])
|
|
dargs['DATA'] = self._recvbytes(size)
|
|
|
|
#log(4, "\n".join([cmd+" "+subcmd] + [("%s=%s (...)" % (k,v[:40])) for k,v in dargs.items()]))
|
|
log(4, "\n".join([cmd+" "+subcmd] + [("%s=%s (...)" % (k,v)) for k,v in dargs.items()]))
|
|
|
|
return cmd, subcmd, dargs
|
|
|
|
|
|
|
|
|
|
|
|
#@-node:samParse
|
|
#@+node:samSend
|
|
def samSend(self, topic, subtopic, data=None, **kw):
|
|
"""
|
|
Sends a SAM message (reply?) back to client
|
|
|
|
Arguments:
|
|
- topic - the first word in the reply, eg 'STREAM'
|
|
- subtopic - the second word of the reply, eg 'CONNECTED'
|
|
- data - a string of raw data to send back (optional)
|
|
Keywords:
|
|
- extra 'name=value' items to pass back.
|
|
|
|
Notes:
|
|
1. SIZE is not required. If sending back data, it will
|
|
be sized and a SIZE arg inserted automatically.
|
|
2. a dict of values can be passed to the 'args' keyword, in lieu
|
|
of direct keywords. This allows for cases where arg names would
|
|
cause python syntax clashes, eg 'tunnels.depthInbound'
|
|
"""
|
|
items = [topic, subtopic]
|
|
|
|
# stick in SIZE if needed
|
|
if data is not None:
|
|
kw['SIZE'] = str(len(data))
|
|
else:
|
|
data = '' # for later
|
|
|
|
self.samCreateArgsList(kw, items)
|
|
|
|
# and whack it together
|
|
buf = " ".join(items) + '\n' + data
|
|
|
|
# and ship it
|
|
self.sendLock.acquire()
|
|
try:
|
|
self._sendbytes(buf)
|
|
except:
|
|
self.sendLock.release()
|
|
raise
|
|
self.sendLock.release()
|
|
|
|
#@-node:samSend
|
|
#@+node:samCreateArgsList
|
|
def samCreateArgsList(self, kw1, lst):
|
|
for k,v in kw1.items():
|
|
if k == 'args':
|
|
self.samCreateArgsList(v, lst)
|
|
else:
|
|
lst.append("=".join([str(k), str(v)]))
|
|
#@-node:samCreateArgsList
|
|
#@+node:threadRx
|
|
def threadRx(self):
|
|
"""
|
|
Handles all incoming stuff from SAM, storing in
|
|
local queues as appropriate
|
|
"""
|
|
while self.isRunning:
|
|
try:
|
|
log(4, "Awaiting next message from server")
|
|
line = self._recvline()
|
|
if line == '':
|
|
log(3, "I2P server socket closed")
|
|
return
|
|
flds = line.split(" ")
|
|
topic, subtopic, args = self.samParse(flds)
|
|
log(4, "Got %s %s %s" % (topic, subtopic, args))
|
|
handleMsg = getattr(self, "on_"+topic, None)
|
|
if handleMsg:
|
|
handleMsg(topic, subtopic, args)
|
|
else:
|
|
log(2, "No handler for '%s' message" % topic)
|
|
except:
|
|
#logException(3, "Exception handling %s %s\n%s" % (topic, subtopic, args))
|
|
logException(3, "Exception handling %s" % repr(line))
|
|
#@-node:threadRx
|
|
#@+node:on_HELLO
|
|
def on_HELLO(self, topic, subtopic, args):
|
|
"""
|
|
Handles HELLO PONG messages from server
|
|
"""
|
|
# just wake up the caller
|
|
log(4, "got HELLO")
|
|
self.lockHello.release()
|
|
#@-node:on_HELLO
|
|
#@+node:on_SESSION
|
|
def on_SESSION(self, topic, subtopic, args):
|
|
"""
|
|
Handles SESSION messages from server
|
|
"""
|
|
# just stick whatever on the queue and wake up the caller
|
|
res = subtopic, args
|
|
self.qSession.put(res)
|
|
#@-node:on_SESSION
|
|
#@+node:on_STREAM
|
|
def on_STREAM(self, topic, subtopic, args):
|
|
"""
|
|
Handles STREAM messages from server
|
|
|
|
STREAM STATUS
|
|
RESULT=$result
|
|
ID=$id
|
|
[MESSAGE=...]
|
|
|
|
STREAM CONNECTED
|
|
DESTINATION=$base64key
|
|
ID=$id
|
|
|
|
STREAM RECEIVED
|
|
ID=$id
|
|
SIZE=$numBytes\n[$numBytes of data]
|
|
|
|
STREAM CLOSED
|
|
RESULT=$result
|
|
ID=$id
|
|
[MESSAGE=...]
|
|
"""
|
|
log(4, "got %s %s %s" % (topic, subtopic, args))
|
|
|
|
# which stream?
|
|
id = int(args['ID'])
|
|
|
|
# result of prior connection attempt
|
|
if subtopic == 'STATUS':
|
|
# stick it on the queue that the caller is waiting on and let the
|
|
# caller interpret the result
|
|
self.streamConnectReplies[id].put(args)
|
|
return
|
|
|
|
# notice of incoming connection
|
|
if subtopic == 'CONNECTED':
|
|
|
|
# grab details
|
|
dest = args['DESTINATION']
|
|
|
|
# wrap it in a stream obj
|
|
conn = I2PSAMStream(self, id, dest)
|
|
self.streams[id] = conn
|
|
|
|
# and put it there for anyone calling samStreamAccept()
|
|
self.qNewStreams.put(conn)
|
|
|
|
# done
|
|
return
|
|
|
|
# notice of received data
|
|
elif subtopic == 'RECEIVED':
|
|
# grab details
|
|
data = args['DATA']
|
|
|
|
# lookup the connection
|
|
conn = self.streams.get(id, None)
|
|
if not conn:
|
|
# conn not known, just ditch
|
|
log(2, "got data, but don't recall any conn with id %s" % id)
|
|
return
|
|
|
|
# and post the received data
|
|
conn._notifyIncomingData(data)
|
|
|
|
log(4, "wrote data to conn's inbound queue")
|
|
|
|
# done
|
|
return
|
|
|
|
elif subtopic == 'CLOSED':
|
|
# lookup the connection
|
|
conn = self.streams.get(id, None)
|
|
if not conn:
|
|
# conn not known, just ditch
|
|
return
|
|
|
|
# mark conn as closed and forget it
|
|
conn._notifyIncomingData("") # special signal to close
|
|
conn.isOpen = 0
|
|
del self.streams[id]
|
|
|
|
# done
|
|
return
|
|
|
|
|
|
|
|
#@-node:on_STREAM
|
|
#@+node:on_DATAGRAM
|
|
def on_DATAGRAM(self, topic, subtopic, args):
|
|
"""
|
|
Handles DATAGRAM messages from server
|
|
"""
|
|
remdest = args['DESTINATION']
|
|
data = args['DATA']
|
|
|
|
self.qDatagrams.put((remdest, data))
|
|
#@-node:on_DATAGRAM
|
|
#@+node:on_RAW
|
|
def on_RAW(self, topic, subtopic, args):
|
|
"""
|
|
Handles RAW messages from server
|
|
"""
|
|
data = args['DATA']
|
|
|
|
log(3, "Got anonymous datagram %s" % repr(data))
|
|
self.qRawMessages.put(data)
|
|
#@-node:on_RAW
|
|
#@+node:on_NAMING
|
|
def on_NAMING(self, topic, subtopic, args):
|
|
"""
|
|
Handles NAMING messages from server
|
|
"""
|
|
# just find out hostname, and stick it on resp q
|
|
host = args['NAME']
|
|
self.namingReplies[host].put(args)
|
|
#@-node:on_NAMING
|
|
#@+node:on_DEST
|
|
def on_DEST(self, topic, subtopic, args):
|
|
"""
|
|
Handles DEST messages from server
|
|
"""
|
|
pubkey = args['PUB']
|
|
privkey = args['PRIV']
|
|
res = pubkey, privkey
|
|
self.qNewDests.put(res)
|
|
#@-node:on_DEST
|
|
#@+node:samAllocId
|
|
def samAllocId(self):
|
|
"""
|
|
Allocates a new unique id as required by SAM protocol
|
|
"""
|
|
self.samNextIdLock.acquire()
|
|
id = self.samNextId
|
|
self.samNextId += 1
|
|
self.samNextIdLock.release()
|
|
return id
|
|
#@-node:samAllocId
|
|
#@+node:_recvline
|
|
def _recvline(self):
|
|
"""
|
|
Guaranteed read of a full line
|
|
"""
|
|
chars = []
|
|
while 1:
|
|
c = self.sock.recv(1)
|
|
if c in ['', '\n']:
|
|
break
|
|
chars.append(c)
|
|
return "".join(chars)
|
|
#@-node:_recvline
|
|
#@+node:_recvbytes
|
|
def _recvbytes(self, num):
|
|
"""
|
|
Guaranteed read of num bytes
|
|
"""
|
|
if num <= 0:
|
|
return ""
|
|
|
|
reqd = num
|
|
chunks = []
|
|
while reqd > 0:
|
|
chunk = self.sock.recv(reqd)
|
|
if not chunk:
|
|
raise I2PServerFail("Buffer read fail")
|
|
chunks.append(chunk)
|
|
reqd -= len(chunk)
|
|
return "".join(chunks)
|
|
#@-node:_recvbytes
|
|
#@+node:_sendbytes
|
|
def _sendbytes(self, buf):
|
|
"""
|
|
Guaranteed complete send of a buffer
|
|
"""
|
|
reqd = len(buf)
|
|
while reqd > 0:
|
|
nsent = self.sock.send(buf)
|
|
if nsent == 0:
|
|
raise I2PServerFail("Send to server failed")
|
|
buf = buf[nsent:]
|
|
reqd -= nsent
|
|
#@-node:_sendbytes
|
|
#@+node:_sendline
|
|
def _sendline(self, line):
|
|
"""
|
|
just tacks on a newline and sends
|
|
"""
|
|
self._sendbytes(line+"\n")
|
|
#@-node:_sendline
|
|
#@-others
|
|
#@-node:class I2PSamClient
|
|
#@+node:class I2PSAMStream
|
|
class I2PSAMStream:
|
|
"""
|
|
Wrapper for a stream object
|
|
"""
|
|
#@ @+others
|
|
#@+node:__init__
|
|
def __init__(self, client, id, dest):
|
|
"""
|
|
"""
|
|
self.client = client
|
|
self.id = id
|
|
self.dest = dest
|
|
|
|
self.qIncomingData = Queue.Queue()
|
|
|
|
self.inbuf = ''
|
|
self.isOpen = 1
|
|
#@-node:__init__
|
|
#@+node:send
|
|
def send(self, data):
|
|
"""
|
|
Sends data to a stream connection
|
|
"""
|
|
# barf if stream not open
|
|
if not self.isOpen:
|
|
raise I2PStreamClosed
|
|
|
|
# can send
|
|
return self.client.samStreamSend(self, data)
|
|
#@-node:send
|
|
#@+node:recv
|
|
def recv(self, size):
|
|
"""
|
|
Retrieves n bytes from peer
|
|
"""
|
|
chunks = []
|
|
|
|
while self.isOpen and size > 0:
|
|
# try internal buffer first
|
|
if self.inbuf:
|
|
chunk = self.inbuf[:size]
|
|
chunklen = len(chunk)
|
|
self.inbuf = self.inbuf[chunklen:]
|
|
chunks.append(chunk)
|
|
size -= chunklen
|
|
else:
|
|
# replenish input buffer
|
|
log(4, "I2PSAMStream.recv: replenishing input buffer")
|
|
buf = self.qIncomingData.get()
|
|
if buf == '':
|
|
# connection closed by peer
|
|
self.isOpen = 0
|
|
break
|
|
else:
|
|
# got more data
|
|
log(4, "I2PSAMStream: queue returned %s" % repr(buf))
|
|
self.inbuf += buf
|
|
|
|
# return whatever we've got, hopefully all
|
|
return "".join(chunks)
|
|
|
|
|
|
#@-node:recv
|
|
#@+node:readline
|
|
def readline(self):
|
|
"""
|
|
Read a line of text from stream, return the line without trailing newline
|
|
|
|
This method really shouldn't exist in a class that's trying to look a bit
|
|
like a socket object, but what the hell!
|
|
"""
|
|
chars = []
|
|
while 1:
|
|
char = self.recv(1)
|
|
if char in ['', '\n']:
|
|
break
|
|
chars.append(char)
|
|
return "".join(chars)
|
|
#@-node:readline
|
|
#@+node:close
|
|
def close(self):
|
|
"""
|
|
close this stream connection
|
|
"""
|
|
log(4, "closing stream")
|
|
self.client.samStreamClose(self)
|
|
log(4, "stream closed")
|
|
self.isOpen = 0
|
|
|
|
# and just to make sure...
|
|
self.qIncomingData.put("") # busts out of recv() loops
|
|
|
|
#@-node:close
|
|
#@+node:__del__
|
|
def __del__(self):
|
|
"""
|
|
Dropping last ref to this object closes stream
|
|
"""
|
|
self.close()
|
|
#@-node:__del__
|
|
#@+node:_notifyIncomingData
|
|
def _notifyIncomingData(self, data):
|
|
"""
|
|
Called by client receiver to notify incoming data
|
|
"""
|
|
log(4, "got %s" % repr(data))
|
|
self.qIncomingData.put(data)
|
|
#@-node:_notifyIncomingData
|
|
#@-others
|
|
#@-node:class I2PSAMStream
|
|
#@+node:class I2PRemoteSession
|
|
class I2PRemoteSession:
|
|
"""
|
|
DEPRECATED
|
|
|
|
Wrapper for I2CP connections
|
|
|
|
Do not instantiate this directly - it gets created by
|
|
I2PSamClient.createSession()
|
|
"""
|
|
#@ @+others
|
|
#@+node:__init__
|
|
def __init__(self, client, dest):
|
|
"""
|
|
Do not instantiate this directly
|
|
"""
|
|
self.client = client
|
|
self.dest = dest
|
|
#@-node:__init__
|
|
#@+node:send
|
|
def send(self, peerdest, msg):
|
|
"""
|
|
"""
|
|
return self.client.send(self.dest, peerdest, msg)
|
|
#@-node:send
|
|
#@+node:recv
|
|
def receive(self):
|
|
|
|
return self.client.receive(self.dest)
|
|
#@-node:recv
|
|
#@+node:destroy
|
|
def destroy(self):
|
|
|
|
return self.client.destroySession(self.dest)
|
|
|
|
#@-node:destroy
|
|
#@-others
|
|
#@-node:class I2PRemoteSession
|
|
#@+node:log
|
|
def log(level, msg, nPrev=0):
|
|
|
|
# ignore messages that are too trivial for chosen verbosity
|
|
if level > verbosity:
|
|
return
|
|
|
|
loglock.acquire()
|
|
try:
|
|
# rip the stack
|
|
caller = traceback.extract_stack()[-(2+nPrev)]
|
|
path, line, func = caller[:3]
|
|
path = os.path.split(path)[1]
|
|
full = "%s:%s:%s():\n* %s" % (
|
|
path,
|
|
line,
|
|
func,
|
|
msg.replace("\n", "\n + "))
|
|
now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
|
msg = "%s %s\n" % (now, full)
|
|
|
|
if logfile == sys.stdout:
|
|
print msg
|
|
else:
|
|
file(logfile, "a").write(msg+"\n")
|
|
except:
|
|
s = StringIO.StringIO()
|
|
traceback.print_exc(file=s)
|
|
print s.getvalue()
|
|
print "Logger crashed"
|
|
loglock.release()
|
|
#@nonl
|
|
#@-node:log
|
|
#@+node:logException
|
|
def logException(level, msg=''):
|
|
s = StringIO.StringIO()
|
|
traceback.print_exc(file=s)
|
|
log(level, "%s\n%s" % (s.getvalue(), msg), 1)
|
|
#@-node:logException
|
|
#@+node:demoNAMING
|
|
def demoNAMING():
|
|
"""
|
|
Demonstrates the NAMING service
|
|
"""
|
|
print "Starting SAM NAMING demo..."
|
|
print
|
|
|
|
print "Instantiating client connection..."
|
|
c0 = I2PSamClient()
|
|
print "Client connection created"
|
|
|
|
for host in ['duck.i2p', 'nonexistent.i2p']:
|
|
print "Sending query for host '%s'..." % host
|
|
try:
|
|
res = c0.samNamingLookup(host)
|
|
print "query for %s returned:" % host
|
|
print repr(res)
|
|
except I2PCommandFail, e:
|
|
print "got exception: %s" % repr(e.args)
|
|
|
|
print
|
|
print "---------------------------------"
|
|
print "NAMING service tests succeeded"
|
|
print "---------------------------------"
|
|
print
|
|
|
|
|
|
#@-node:demoNAMING
|
|
#@+node:demoRAW
|
|
def demoRAW():
|
|
"""
|
|
Runs a demo of SAM RAW messaging
|
|
"""
|
|
print "Starting SAM RAW demo..."
|
|
print
|
|
|
|
print "Instantiating client connections..."
|
|
c1 = I2PSamClient()
|
|
c2 = I2PSamClient()
|
|
|
|
print "Creating dests via SAM"
|
|
pub1, priv1 = c1.samDestGenerate()
|
|
pub2, priv2 = c2.samDestGenerate()
|
|
print "SAM Dests generated ok"
|
|
|
|
print "Creating SAM RAW SESSION on connection c1..."
|
|
res = c1.samSessionCreate("RAW", priv1)
|
|
if res != 'OK':
|
|
print "Failed to create session on connection c1: %s" % repr(res)
|
|
return
|
|
print "Session on connection c1 created successfully"
|
|
|
|
print "Creating SAM SESSION on connection c2..."
|
|
res = c2.samSessionCreate("RAW", priv2)
|
|
if res != 'OK':
|
|
print "Failed to create session on connection c2: %s" % repr(res)
|
|
return
|
|
print "Session on connection c2 created successfully"
|
|
|
|
msg = "Hi there!"
|
|
print "sending from c1 to c2: %s" % repr(msg)
|
|
c1.samRawSend(pub2, msg)
|
|
|
|
print "now try to receive from c2 (will block)..."
|
|
msg1 = c2.samRawReceive()
|
|
print "Connection c2 got %s" % repr(msg1)
|
|
|
|
print
|
|
print "---------------------------------"
|
|
print "RAW data transfer tests succeeded"
|
|
print "---------------------------------"
|
|
print
|
|
|
|
#@-node:demoRAW
|
|
#@+node:demoDATAGRAM
|
|
def demoDATAGRAM():
|
|
"""
|
|
Runs a demo of SAM DATAGRAM messaging
|
|
"""
|
|
print "Starting SAM DATAGRAM demo..."
|
|
print
|
|
|
|
print "Instantiating 2 more client connections..."
|
|
c3 = I2PSamClient()
|
|
c4 = I2PSamClient()
|
|
|
|
print "Creating more dests via SAM"
|
|
pub3, priv3 = c3.samDestGenerate()
|
|
pub4, priv4 = c4.samDestGenerate()
|
|
|
|
print "Creating SAM DATAGRAM SESSION on connection c3..."
|
|
res = c3.samSessionCreate("DATAGRAM", priv3)
|
|
if res != 'OK':
|
|
print "Failed to create DATAGRAM session on connection c3: %s" % repr(res)
|
|
return
|
|
print "DATAGRAM Session on connection c3 created successfully"
|
|
|
|
print "Creating SAM DATAGRAM SESSION on connection c4..."
|
|
res = c4.samSessionCreate("DATAGRAM", priv4)
|
|
if res != 'OK':
|
|
print "Failed to create DATAGRAM session on connection c4: %s" % repr(res)
|
|
return
|
|
print "Session on connection c4 created successfully"
|
|
|
|
msg = "Hi there, this is a datagram!"
|
|
print "sending from c3 to c4: %s" % repr(msg)
|
|
c3.samDatagramSend(pub4, msg)
|
|
|
|
print "now try to receive from c4 (will block)..."
|
|
remdest, msg1 = c4.samDatagramReceive()
|
|
print "Connection c4 got %s from %s..." % (repr(msg1), repr(remdest))
|
|
|
|
|
|
print
|
|
print "--------------------------------------"
|
|
print "DATAGRAM data transfer tests succeeded"
|
|
print "--------------------------------------"
|
|
print
|
|
|
|
#@-node:demoDATAGRAM
|
|
#@+node:demoSTREAM
|
|
def demoSTREAM():
|
|
"""
|
|
Runs a demo of SAM STREAM messaging
|
|
"""
|
|
print "Starting SAM STREAM demo..."
|
|
print
|
|
|
|
print "Instantiating client c6..."
|
|
c6 = I2PSamClient()
|
|
|
|
print "Creating dest for c6"
|
|
pub6, priv6 = c6.samDestGenerate()
|
|
|
|
print "Creating SAM STREAM SESSION on connection c6..."
|
|
res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE")
|
|
if res != 'OK':
|
|
print "Failed to create STREAM session on connection c6: %s" % repr(res)
|
|
return
|
|
print "STREAM Session on connection c6 created successfully"
|
|
|
|
print "Launching acceptor thread..."
|
|
thread.start_new_thread(demoSTREAM_thread, (c6,))
|
|
|
|
#print "sleep a while and give the server a chance..."
|
|
#time.sleep(10)
|
|
|
|
print "----------------------------------------"
|
|
|
|
print "Instantiating client c5..."
|
|
c5 = I2PSamClient()
|
|
|
|
print "Creating dest for c5"
|
|
pub5, priv5 = c5.samDestGenerate()
|
|
|
|
print "Creating SAM STREAM SESSION on connection c5..."
|
|
res = c5.samSessionCreate("STREAM", priv5, direction="CREATE")
|
|
if res != 'OK':
|
|
print "Failed to create STREAM session on connection c5: %s" % repr(res)
|
|
return
|
|
print "STREAM Session on connection c5 created successfully"
|
|
|
|
print "----------------------------------------"
|
|
|
|
print "Making connection from c5 to c6..."
|
|
|
|
#set_trace()
|
|
|
|
try:
|
|
conn_c5 = c5.samStreamConnect(pub6)
|
|
except:
|
|
print "Stream Connection failed"
|
|
return
|
|
print "Stream connect succeeded"
|
|
|
|
print "Receiving from c5..."
|
|
buf = conn_c5.readline()
|
|
print "Got %s" % repr(buf)
|
|
|
|
#print "Try to accept connection on c6..."
|
|
#conn_c6 = c6.sam
|
|
|
|
print
|
|
print "--------------------------------------"
|
|
print "DATAGRAM data transfer tests succeeded"
|
|
print "--------------------------------------"
|
|
print
|
|
|
|
|
|
|
|
|
|
|
|
#@-node:demoSTREAM
|
|
#@+node:demoSTREAM_thread
|
|
def demoSTREAM_thread(sess):
|
|
|
|
while 1:
|
|
sock = sess.samStreamAccept()
|
|
log(4, "got incoming connection")
|
|
|
|
print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING"
|
|
|
|
time.sleep(10)
|
|
|
|
sock.send("Hi there, what do you want?\n")
|
|
|
|
print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING"
|
|
time.sleep(300)
|
|
print "**ACCEPTOR CLOSING STREAM"
|
|
|
|
sock.close()
|
|
|
|
#@-node:demoSTREAM_thread
|
|
#@+node:demo
|
|
def demo():
|
|
"""
|
|
This is a simple and straightforward demo of talking to
|
|
the i2psam server socket via the I2PSamClient class.
|
|
|
|
Read the source, Luke, it's never been so easy...
|
|
"""
|
|
print
|
|
print "-----------------------------------------"
|
|
print "Running i2psamclient demo..."
|
|
print "-----------------------------------------"
|
|
print
|
|
|
|
#demoNAMING()
|
|
#demoRAW()
|
|
#demoDATAGRAM()
|
|
demoSTREAM()
|
|
|
|
print
|
|
print "-----------------------------------------"
|
|
print "Demo Finished"
|
|
print "-----------------------------------------"
|
|
|
|
return
|
|
#@-node:demo
|
|
#@+node:MAINLINE
|
|
if __name__ == '__main__':
|
|
|
|
demo()
|
|
#@-node:MAINLINE
|
|
#@-others
|
|
|
|
#@-node:@file python/src/i2psamclient.py
|
|
#@-leo
|