diff --git a/apps/sam/python/README.stasher.txt b/apps/sam/python/README.stasher.txt deleted file mode 100644 index 218f6a4a0adff24cf693f611d05891477b0ffa63..0000000000000000000000000000000000000000 --- a/apps/sam/python/README.stasher.txt +++ /dev/null @@ -1,68 +0,0 @@ -STASHER README - ------------------------ -INSTALLING STASHER - -To install stasher, just make sure you've got the latest cvs, then type - python setup.py install - -This installs the stasher engine, plus a wrapper client script called -'stasher', which setup.py will install into your execution path. - -Test your installation by typing 'stasher -h' - this should display -a help message. - ------------------------- -DOZE USERS PLEASE NOTE - -You'll need to watch and see where the stasher.py -wrapper script gets installed. On my box, it ends up on d:\python23\scripts, -but on your box it'll likely go into c:\python23\scripts. - -You may either update your system PATH environment variable to include your -python scripts directory, OR, you can copy stasher.py to anywhere that's -on your path. - -In the explanations below, note that wherever I say to type 'stasher', you'll -need to type 'stasher.py' instead. - ------------------------- -WARNING - -This is a very early pre-alpha test version of stasher. -It is only capable of storing or retrieving files of -less than 29k in size. - -Also, files are totally insecure - anyone can overwrite any keys you -insert, and vice versa. - -I'll be adding support for CHK-like and SSK-like keys in due course. - ------------------------- -USING STASHER - -To see stasher's options, type: - - stasher -h - -This should dump out a verbose help message. - -To start a stasher node, type: - - stasher start - -To shut down a stasher node, type: - - stasher stop - -To insert a file into stasher, type: - - stasher put mykey myfile - -Note, if you don't supply a filename, stasher can read -the file from standard input. - -To retrieve a file from stasher, type: - - stasher get mykey - diff --git a/apps/sam/python/aum.stasher b/apps/sam/python/aum.stasher deleted file mode 100644 index 61e19af7c69eb824e8697b809007dcb4daa63861..0000000000000000000000000000000000000000 --- a/apps/sam/python/aum.stasher +++ /dev/null @@ -1 +0,0 @@ -Yg1FeoQi9GEsby~t~IClgWYc8AQoxBcvuXR9HY~cwc21y5-NVeAnhrfOiF6HWGhHUywh4eCINE52Zh28Jgn2wMVPqbr-r48ikU3oSXQO8BoPyGLz43x~UqR3Vw4mR9jtTq-9TQ70LxuYPNKPvpkM3C~d8wx7VX42KaiNlQlBzvC34MwivrP0esHst6RfNf11lTRrdQbQYMXqLzjdVs6Kl5jeniTwFodqb5yG1O4~hjXasVtIFoepzL6y3x1TyKQonzTGftkd3r4Weh3jh9w6yyRabTAPFMjeXKjMfjpKDHl6C0C62MdjxKvNos5JYYzXZG6BeSoeZP-8c6wndfzuYV3QRR~M61fgXcQq~EVuy8fDpdhfuv8ZRXeOZIp07hQkpMxSNP43Rk1afPRv7QLzuS2UCDQe2WGhp3DQB0dSOmxImdNRmNCbeF8r~bKpgM~okOUC0xwzXfJFxWvqB0IsmNp8KVeoTGjSJOZHDdGDKaBwxHxxYg4PQWo7FuXQ5FihAAAA \ No newline at end of file diff --git a/apps/sam/python/setup.py b/apps/sam/python/setup.py index d22318d7a68ad8b9144351eb826c1c92b4dcd6f8..02622c5d4abb6ec429681bb7e26fbc50ff519692 100644 --- a/apps/sam/python/setup.py +++ b/apps/sam/python/setup.py @@ -5,18 +5,11 @@ import os, sys os.chdir('./src') -if sys.platform == 'win32': - stasherFrontEnd = "..\\stasher.py" -else: - stasherFrontEnd = "../stasher" - setup(name="Python I2P API", version="0.91", description="Python Interface to I2P", - author="Connelly Barnes + Aum", + author="Connelly Barnes", author_email="'Y29ubmVsbHliYXJuZXNAeWFob28uY29t\n'.decode('base64')", url="http://www.i2p.net/", packages=['i2p', 'i2p.pylib'], - py_modules=['bencode'], - scripts=[stasherFrontEnd], ) diff --git a/apps/sam/python/src/bencode.py b/apps/sam/python/src/bencode.py deleted file mode 100644 index 93af40744a6365578f7aa4b1589a98c451fe49f4..0000000000000000000000000000000000000000 --- a/apps/sam/python/src/bencode.py +++ /dev/null @@ -1,254 +0,0 @@ -# Written by Petru Paler -# see LICENSE.txt for license information - -from types import IntType, LongType, StringType, ListType, TupleType, DictType -import re -from cStringIO import StringIO - -int_filter = re.compile('(0|-?[1-9][0-9]*)e') - -def decode_int(x, f): - m = int_filter.match(x, f) - if m is None: - raise ValueError - return (long(m.group(1)), m.end()) - -string_filter = re.compile('(0|[1-9][0-9]*):') - -def decode_string(x, f): - m = string_filter.match(x, f) - if m is None: - raise ValueError - l = int(m.group(1)) - s = m.end() - return (x[s:s+l], s + l) - -def decode_list(x, f): - r = [] - while x[f] != 'e': - v, f = bdecode_rec(x, f) - r.append(v) - return (r, f + 1) - -def decode_dict(x, f): - r = {} - lastkey = None - while x[f] != 'e': - k, f = decode_string(x, f) - if lastkey is not None and lastkey >= k: - raise ValueError - lastkey = k - v, f = bdecode_rec(x, f) - r[k] = v - return (r, f + 1) - -def bdecode_rec(x, f): - t = x[f] - if t == 'i': - return decode_int(x, f + 1) - elif t == 'l': - return decode_list(x, f + 1) - elif t == 'd': - return decode_dict(x, f + 1) - else: - return decode_string(x, f) - -def bdecode(x): - try: - r, l = bdecode_rec(x, 0) - except IndexError: - raise ValueError - if l != len(x): - raise ValueError - return r - -def test_bdecode(): - try: - bdecode('0:0:') - assert 0 - except ValueError: - pass - try: - bdecode('ie') - assert 0 - except ValueError: - pass - try: - bdecode('i341foo382e') - assert 0 - except ValueError: - pass - assert bdecode('i4e') == 4L - assert bdecode('i0e') == 0L - assert bdecode('i123456789e') == 123456789L - assert bdecode('i-10e') == -10L - try: - bdecode('i-0e') - assert 0 - except ValueError: - pass - try: - bdecode('i123') - assert 0 - except ValueError: - pass - try: - bdecode('') - assert 0 - except ValueError: - pass - try: - bdecode('i6easd') - assert 0 - except ValueError: - pass - try: - bdecode('35208734823ljdahflajhdf') - assert 0 - except ValueError: - pass - try: - bdecode('2:abfdjslhfld') - assert 0 - except ValueError: - pass - assert bdecode('0:') == '' - assert bdecode('3:abc') == 'abc' - assert bdecode('10:1234567890') == '1234567890' - try: - bdecode('02:xy') - assert 0 - except ValueError: - pass - try: - bdecode('l') - assert 0 - except ValueError: - pass - assert bdecode('le') == [] - try: - bdecode('leanfdldjfh') - assert 0 - except ValueError: - pass - assert bdecode('l0:0:0:e') == ['', '', ''] - try: - bdecode('relwjhrlewjh') - assert 0 - except ValueError: - pass - assert bdecode('li1ei2ei3ee') == [1, 2, 3] - assert bdecode('l3:asd2:xye') == ['asd', 'xy'] - assert bdecode('ll5:Alice3:Bobeli2ei3eee') == [['Alice', 'Bob'], [2, 3]] - try: - bdecode('d') - assert 0 - except ValueError: - pass - try: - bdecode('defoobar') - assert 0 - except ValueError: - pass - assert bdecode('de') == {} - assert bdecode('d3:agei25e4:eyes4:bluee') == {'age': 25, 'eyes': 'blue'} - assert bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee') == {'spam.mp3': {'author': 'Alice', 'length': 100000}} - try: - bdecode('d3:fooe') - assert 0 - except ValueError: - pass - try: - bdecode('di1e0:e') - assert 0 - except ValueError: - pass - try: - bdecode('d1:b0:1:a0:e') - assert 0 - except ValueError: - pass - try: - bdecode('d1:a0:1:a0:e') - assert 0 - except ValueError: - pass - try: - bdecode('i03e') - assert 0 - except ValueError: - pass - try: - bdecode('l01:ae') - assert 0 - except ValueError: - pass - try: - bdecode('9999:x') - assert 0 - except ValueError: - pass - try: - bdecode('l0:') - assert 0 - except ValueError: - pass - try: - bdecode('d0:0:') - assert 0 - except ValueError: - pass - try: - bdecode('d0:') - assert 0 - except ValueError: - pass - -def bencode_rec(x, b): - t = type(x) - if t in (IntType, LongType): - b.write('i%de' % x) - elif t is StringType: - b.write('%d:%s' % (len(x), x)) - elif t in (ListType, TupleType): - b.write('l') - for e in x: - bencode_rec(e, b) - b.write('e') - elif t is DictType: - b.write('d') - keylist = x.keys() - keylist.sort() - for k in keylist: - assert type(k) is StringType - bencode_rec(k, b) - bencode_rec(x[k], b) - b.write('e') - else: - assert 0 - -def bencode(x): - b = StringIO() - bencode_rec(x, b) - return b.getvalue() - -def test_bencode(): - assert bencode(4) == 'i4e' - assert bencode(0) == 'i0e' - assert bencode(-10) == 'i-10e' - assert bencode(12345678901234567890L) == 'i12345678901234567890e' - assert bencode('') == '0:' - assert bencode('abc') == '3:abc' - assert bencode('1234567890') == '10:1234567890' - assert bencode([]) == 'le' - assert bencode([1, 2, 3]) == 'li1ei2ei3ee' - assert bencode([['Alice', 'Bob'], [2, 3]]) == 'll5:Alice3:Bobeli2ei3eee' - assert bencode({}) == 'de' - assert bencode({'age': 25, 'eyes': 'blue'}) == 'd3:agei25e4:eyes4:bluee' - assert bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}) == 'd8:spam.mp3d6:author5:Alice6:lengthi100000eee' - try: - bencode({1: 'foo'}) - assert 0 - except AssertionError: - pass - diff --git a/apps/sam/python/src/i2p/stasher.py b/apps/sam/python/src/i2p/stasher.py deleted file mode 100644 index cadb482a86f5dbb52af4bea72b7695bd5c9cccdd..0000000000000000000000000000000000000000 --- a/apps/sam/python/src/i2p/stasher.py +++ /dev/null @@ -1,3977 +0,0 @@ -#! /usr/bin/env python -#@+leo-ver=4 -#@+node:@file stasher.py -#@@first -""" -Indroduction: - - A simple implementation of the - U{Kademlia<http://www.infoanarchy.org/wiki/wiki.pl?Kademlia>} - P2P distributed storage and retrieval protocol, designed to - utilise the U{I2P<http://www.i2p.net>} stealth network as its transport. - -I strongly recommend that when editing this file, you use the Leo -outlining and literate programming editor - http://leo.sf.net - -If Leo doesn't agree with your religion, please try to leave the markups intact -""" -#@+others -#@+node:explanatory comments -#@+at -# Tech overview: -# - this implementation creates each Node ID as an SHA1 hash of -# the node's 'destination' - the string which constitutes its -# address as an I2P endpoint. -# -# Datagram formats: -# - each datagram sent from one node to another is a python dict object, -# encoded and decoded with the 'bencode' object serialisation module. -# - we use bencode because regular Python pickle is highly insecure, -# allowing crackers to create malformed pickles which can have all -# manner of detrimental effects, including execution of arbitrary code. -# - the possible messages are listed below, along with their consituent -# dictionary keys: -# 1. ping: -# - msgId - a message identifier guaranteed to be unique -# with respect to the sending node -# 2. findNode: -# - msgId - unique message identifier -# - hash - the hash we're looking for -# - initiator - True/False, according to whether this node -# should initiate/perform the findNode, or whether this -# rpc is coming from another seeking node -# 3. findData: -# - msgId - unique message identifier -# - hash - the exact key hash of the data we want to retrieve -# - initiator - True/False, according to whether this node -# should initiate/perform the findNode, or whether this -# rpc is coming from another seeking node -# 4. store: -# - msgId - unique message identifier -# - hash - the exact key hash of the data we want to store -# - data - the data we want to store -# 5. reply: -# - msgId - the original msgId we're replying to. The other items in -# a reply message depend on what kind of message we're replying to, -# listed below: -# 1. ping - no additional data -# 2. findNode: -# - nodes - a list of dests nearest the given hash -# 3. findData: -# - nodes - as for findNode, OR -# - data - the retrieved data, or None if not found -# 4. store: -# - status - True or False according to whether -# the store operation was successful -# -#@-at -#@-node:explanatory comments -#@+node:imports -import sys, os, types, sha, random, threading, thread, traceback, Queue -import time, math, random, pickle, getopt, re -import signal - -# some windows-specifics (yggghh) -if sys.platform == 'win32': - try: - import win32api - import win32process - import _winreg - except: - print "Python win32 extensions not installed." - print "Please go to http://sourceforge.net/project/showfiles.php?group_id=78018" - print "and download/install the file pywin32-202.win32-py%s.%s.exe" % \ - sys.version_info[:2] - sys.exit(1) - -from StringIO import StringIO -from pdb import set_trace - -import bencode - -import i2p.socket -import i2p.select - -import i2p.pylib -SocketServer = i2p.pylib.SocketServer -socket = i2p.pylib.socket - -#@-node:imports -#@+node:constants - -# -------------------------------------------- -# START USER-CONFIGURABLE CONSTANTS -# -------------------------------------------- - -# host:port to connect to I2P SAM Bridge -samAddr = i2p.socket.samaddr - -# host:port to listen on for command line client -clientAddr = "127.0.0.1:7659" - -defaultNodename = "0" # will be prefixed by 'stashernode' - -# maximum size of each stored item -maxValueSize = 30000 - -# maximum number of noderefs that can be stored in a bucket -# (refer spec section 2.1, first paragraph) -maxBucketSize = 20 - -# number of peers to return from a search -numSearchPeers = 3 - -# maximum number of concurrent queries per findnode/finddata rpc -maxConcurrentQueries = 10 - -# number of peers to store onto -numStorePeers = 10 - -# Logger settings -logFile = None -logVerbosity = 2 - -# data directory location - set to a path to override the default -# which is the user's home dir -dataDir = None - -# whether a node, on startup, should do a findnode on itself to -# locate its closest neighbours -greetPeersOnStartup = False -#greetPeersOnStartup = True - -# multi-purpose testing flag -testing = False -#testing = True - -tunnelDepth = 0 - -# set to True to enable single handler thread that manages all nodes, -# or False to make each node run its own handler thread -#runCore = False -runCore = True - -# timeouts - calibrate as needed -timeout = { - 'ping' : 120, - 'findNode' : 120, - 'findData' : 120, - 'store' : 120, - } - -logToSocket = None - -desperatelyDebugging = False - -if desperatelyDebugging: - runCoreInBackground = False -else: - runCoreInBackground = True - -# -------------------------------------------- -# END OF USER-CONFIGURABLE CONSTANTS -# -------------------------------------------- - -# ---------------------------------------------- -# hack anything below this line at your own risk -#@-node:constants -#@+node:globals -# keep a dict of existing nodes, so we can prevent -# client progs from creating 2 nodes of the same name -_nodes = {} - -version = "0.0" - -#@-node:globals -#@+node:Exceptions -# define our exceptions - -class KValueTooLarge(Exception): - """ - Trying to insert a value of excessive size into the network. - Maximum key size is L{maxValueSize} - """ - -class KBadHash(Exception): - """ - Invalid hash string - """ - -class KNotImplemented(Exception): - """ - A required method was not implemented - """ - -class KBadNode(Exception): - """ - Invalid Node object - """ - -class KBadPeer(Exception): - """ - Invalid Peer object - should be a KPeer - """ - -class KBadDest(Exception): - """Invalid I2P Node Dest""" - -#@-node:Exceptions -#@+node:Mixins -#@+node:class KBase -class KBase: - """ - A mixin which adds a class-specific logger - """ - def log(self, verbosity, msg): - - log(verbosity, msg, 1, self.__class__.__name__) - - def logexc(self, verbosity, msg): - - logexc(verbosity, msg, 1, self.__class__.__name__) - -#@-node:class KBase -#@-node:Mixins -#@+node:Main Engine -#@+node:class KCore -class KCore(KBase): - """ - Singleton class which performs all the needed background processing. - - By scheduling all processing through this object, we eliminate the - need to create threads on a per-node basis, and also make this thing - far easier to debug. - - The core launches only two background threads: - - L{threadRxPackets} - listen for incoming packets bound for - any node running within a single process - - L{threadHousekeeping} - periodically invoke maintenance methods - of each node, so the node can check for timeout conditions and - other untoward happenings - - These threads start up when the first node in this process is created, - and stop when the last node ceases to exist. - - Upon first import, the L{kademlia} module creates one instance of this - class. Upon creation, L{KNode} objects register themselves with this core. - """ - #@ @+others - #@+node:attributes - #@-node:attributes - #@+node:__init__ - def __init__(self, bg=True): - """ - Creates the I2P Kademlia core object - """ - self.bg = bg - self.fg = False - - # subscribed nodes - self.nodes = [] - #self.nodesLock = threading.Lock() - - self.isRunning = False - self.isRunning_rx = False - - #@-node:__init__ - #@+node:subscribe - def subscribe(self, node): - """ - Called by a node to 'subscribe' for background processing - If this is the first node, starts the handler thread - """ - #self.nodesLock.acquire() - try: - nodes = self.nodes - - if node in nodes: - self.log(2, "duhhh! node already subscribed" % repr(node)) - return - - nodes.append(node) - - if not self.isRunning: - self.isRunning = True - if self.bg and not self.fg: - self.log(3, "First node subscribing, launching threads") - thread.start_new_thread(self.threadRxPackets, ()) - thread.start_new_thread(self.threadHousekeeping, ()) - except: - traceback.print_exc() - self.log(2, "exception") - - #self.nodesLock.release() - - #@-node:subscribe - #@+node:unsubscribe - def unsubscribe(self, node): - """ - Unsubscribes a node from the core - - If this was the last node, stops the handler thread - """ - #self.nodesLock.acquire() - try: - nodes = self.nodes - - if node not in nodes: - self.log(4, "duhhh! node %s was not subscribed" % repr(node)) - return - - self.log(2, "trying to unsubscribe node %s" % node.name) - nodes.remove(node) - - if len(nodes) == 0: - self.isRunning = False - except: - traceback.print_exc() - self.log(2, "exception") - - #self.nodesLock.release() - - #@-node:unsubscribe - #@+node:threadRxPackets - def threadRxPackets(self): - """ - Sits on a select() loop, processing incoming datagrams - and actioning them appropriately. - """ - self.isRunning_rx = True - self.log(3, "KCore packet receiver thread running") - try: - while self.isRunning: - socks = [node.sock for node in self.nodes] - if desperatelyDebugging: - set_trace() - try: - inlist, outlist, errlist = self.select(socks, [], [], 1) - except KeyboardInterrupt: - self.isRunning = 0 - return - - self.log(5, "\ninlist=%s" % repr(inlist)) - if inlist: - self.log(5, "got one or more sockets with inbound data") - #self.nodesLock.acquire() - for sock in inlist: - node = self.nodeWhichOwnsSock(sock) - if node != None: - node._doRx() - #self.nodesLock.release() - - elif self.fg: - return - - else: - time.sleep(0.1) - except: - #self.nodesLock.release() - traceback.print_exc() - self.log(1, "core handler thread crashed") - self.isRunning_rx = False - self.log(3, "core handler thread terminated") - - #@-node:threadRxPackets - #@+node:threadHousekeeping - def threadHousekeeping(self): - """ - Periodically invoke nodes' housekeeping - """ - self.log(3, "\nnode housekeeping thread running") - try: - while self.isRunning: - #self.log(4, "calling nodes' housekeeping methods") - #self.nodesLock.acquire() - for node in self.nodes: - node._doHousekeeping() - #self.nodesLock.release() - time.sleep(1) - self.log(3, "\nnode housekeeping thread terminated") - except: - #self.nodesLock.release() - traceback.print_exc() - self.log(1, "\nnode housekeeping thread crashed") - - #@-node:threadHousekeeping - #@+node:nodeWhichOwnsSock - def nodeWhichOwnsSock(self, sock): - """ - returns ref to node which owns a socket - """ - for node in self.nodes: - if node.sock == sock: - return node - return None - #@-node:nodeWhichOwnsSock - #@+node:cycle - def cycle(self): - - self.fg = True - self.threadRxPackets() - - #@-node:cycle - #@+node:run - def run(self, func=None): - """ - Runs the core in foreground, with the client func in background - """ - if func==None: - func = test - - self.bg = False - - thread.start_new_thread(self.runClient, (func,)) - - set_trace() - - self.threadRxPackets() - - #@-node:run - #@+node:stop - def stop(self): - self.isRunning = False - - #@-node:stop - #@+node:runClient - def runClient(self, func): - - self.log(3, "Core: running client func") - try: - func() - except: - traceback.print_exc() - self.log(3, "Core: client func exited") - self.stop() - #@-node:runClient - #@+node:select - def select(self, inlist, outlist, errlist, timeout): - - return i2p.select.select(inlist, outlist, errlist, timeout) - - #@-node:select - #@-others - -#@-node:class KCore -#@+node:create instance -# create an instance of _KCore -core = KCore() - -#@-node:create instance -#@-node:Main Engine -#@+node:Basic Classes -#@+node:Node-local Storage -#@+node:class KStorageBase -class KStorageBase(KBase): - """ - Base class for node storage objects - - This needs to be overridden by implementation-specific - solutions. - """ - #@ @+others - #@+node:__init__ - def __init__(self, node, *args, **kw): - """ - Override this method - - First argument should be a node instance - """ - raise KNotImplemented - - #@-node:__init__ - #@+node:putRefs - def putRefs(self, *refs): - """ - Saves one or more noderefs - - Arguments: - - zero or more KPeer objects, or lists or tuples of objects - """ - raise KNotImplemented - #@-node:putRefs - #@+node:getRefs - def getRefs(self): - """ - Returns a list of KPeer objects, comprising refs - of peers known to this node - """ - raise KNotImplemented - - #@-node:getRefs - #@+node:putKey - def putKey(self, key, value): - """ - Stores value, a string, into the local storage - under key 'key' - """ - raise KNotImplemented - - #@-node:putKey - #@+node:getKey - def getKey(self, key): - """ - Attempts to retrieve item from node's local, which was - stored with key 'key'. - - Returns value as a string if found, or None if not present - """ - raise KNotImplemented - #@-node:getKey - #@+node:private methods - #@+others - #@+node:_expandRefsList - def _expandRefsList(self, args, lst=None): - """ - Takes a sequence of args, each of which can be a KPeer - object, or a list or tuple of KPeer objects, and expands - this into a flat list - """ - if lst == None: - lst = [] - for item in args: - if type(item) in [type(()), type([])]: - self._expandRefsList(item, lst) - else: - lst.append(item) - return lst - - #@-node:_expandRefsList - #@-others - #@-node:private methods - #@-others -#@-node:class KStorageBase -#@+node:class KStorageFile -class KStorageFile(KStorageBase): - """ - Implements node-local storage, using the local filesystem, - with the following hierarchy: - - - HOME ( ~ in linux, some other shit for windows) - - .i2pkademlia - - <nodename> - - noderefs - - <node1 base64 hash> - - contains node dest, and other shit - - ... - - keys - - <keyname1> - - contains raw key value - - ... - - This is one ugly sukka, perhaps a db4, mysql etc implementation - would be better. - """ - #@ @+others - #@+node:__init__ - def __init__(self, node, storeDir=None): - """ - Creates a persistent storage object for node - 'nodeName', based at directory 'storeDir' (default - is nodeDir - """ - self.node = node - self.nodeName = node.name - - if storeDir == None: - # work out local directory - self.topDir = userI2PDir() - - # add node dir and subdirs - self.nodeDir = userI2PDir(self.nodeName) - - self.refsDir = os.path.join(self.nodeDir, "noderefs") - if not os.path.isdir(self.refsDir): - os.makedirs(self.refsDir) - - self.keysDir = os.path.join(self.nodeDir, "keys") - if not os.path.isdir(self.keysDir): - os.makedirs(self.keysDir) - - #@-node:__init__ - #@+node:putRefs - def putRefs(self, *args): - """ - Saves one or more noderefs into filesystem - - Arguments: - - zero or more KPeer objects, or lists or tuples of objects - """ - lst = self._expandRefsList(args) - for item in lst: - b64hash = shahash(item.dest) - itemPath = os.path.join(self.refsDir, b64hash) - itemDict = {'dest':item.dest} # might need to expand later - itemPickle = bencode.bencode(itemDict) - file(itemPath, "wb").write(itemPickle) - pass - #@-node:putRefs - #@+node:getRefs - def getRefs(self): - """ - Returns a list of KPeer objects, comprising refs - of peers known to this node - - These are read from the directory self.refsDir. - Any that can't be unpickled and instantiated are dropped, but logged - """ - peers = [] - for f in os.listdir(self.refsDir): - - path = os.path.join(self.refsDir, f) - pickled = file(path, "rb").read() - try: - d = bencode.bdecode(pickled) - except: - self.log(3, "node %s, bad pickle ref file %s" % ( - self.nodeName, f)) - continue - - # instantiate a peer object - try: - peer = KPeer(self.node, d['dest']) - except: - self.log(3, "node %s, bad unpickled ref file %s" % ( - self.nodeName, f)) - continue - - # success - peers.append(peer) - - return peers - - #@-node:getRefs - #@+node:putKey - def putKey(self, key, val, keyIsHashed=False): - """ - Stores a string into this storage under the key 'key' - - Returns True if key was saved successfully, False if not - """ - try: - if keyIsHashed: - keyHashed = key - else: - keyHashed = shahash(key) - keyHashed = keyHashed.lower() - keyPath = os.path.join(self.keysDir, keyHashed) - file(keyPath, "wb").write(val) - self.log(4, "stored key: '%s'\nunder hash '%s'\n(keyIsHashed=%s)" % ( - key, keyHashed, keyIsHashed)) - return True - except: - traceback.print_exc() - self.log(3, "failed to store key") - return False - - #@-node:putKey - #@+node:getKey - def getKey(self, key, keyIsHashed=False): - """ - Attempts to retrieve item from node's local file storage, which was - stored with key 'key'. - - Returns value as a string if found, or None if not present - """ - try: - if keyIsHashed: - keyHashed = key - else: - keyHashed = shahash(key) - - keyHashed = keyHashed.lower() - self.log(4, "key=%s, keyHashed=%s, keyIsHashed=%s" % (key, keyHashed, keyIsHashed)) - - keyPath = os.path.join(self.keysDir, keyHashed) - - if os.path.isfile(keyPath): - return file(keyPath, "rb").read() - else: - return None - except: - traceback.print_exc() - self.log(3, "error retrieving key '%s'" % key) - return None - - #@-node:getKey - #@-others -#@-node:class KStorageFile -#@-node:Node-local Storage -#@+node:class KHash -class KHash(KBase): - """ - Wraps 160-bit hashes as abstract objects, on which - operations such as xor, <, >, etc can be performed. - - Kademlia node ids and keys are held as objects - of this class. - - Internally, hashes are stored as python long ints - """ - #@ @+others - #@+node:__init__ - def __init__(self, val=None, **kw): - """ - Create a new hash object. - - val can be one of the following: - - None (default) - a random value will be created - - long int - this will be used as the raw hash - - string - the string will be hashed and stored - - another KHash object - its value will be taken - - a KNode or KPeer object - its hash will be taken - - If val is not given, a raw hash value can be passed in - with the keyword 'raw'. Such value must be a python long int - or a 20-char string - """ - self.value = 0L - if val: - if isinstance(val, KHash): - self.value = val.value - elif type(val) in [type(0), type(0L)]: - self.value = long(val) - elif isinstance(val, KNode) or isinstance(val, KPeer): - self.value = val.id.value - else: - raw = self.raw = shahash(val, bin=1) - for c in raw: - self.value = self.value * 256 + ord(c) - else: - rawval = kw.get('raw', None) - if rawval == None: - # generate random - random.seed() - for i in range(20): - self.value = self.value * 256 + random.randint(0, 256) - elif type(rawval) in [type(0), type(0L)]: - self.value = long(rawval) - elif type(rawval) == type(""): - if len(rawval) == 20: - for i in rawval: - self.value = self.value * 256 + ord(i) - elif len(rawval) == 40: - try: - self.value = long(rawval, 16) - except: - raise KBadHash(rawval) - else: - raise KBadHash(rawval) - else: - print "rawval=%s %s %s" % (type(rawval), rawval.__class__, repr(rawval)) - raise KBadHash(rawval) - - #@-node:__init__ - #@+node:__str__ - def __str__(self): - return "<KHash: 0x%x>" % self.value - - def __repr__(self): - return str(self) - - #@-node:__str__ - #@+node:asHex - def asHex(self): - return ("%040x" % self.value).lower() - - #@-node:asHex - #@+node:distance - def distance(self, other): - """ - calculates the 'distance' between this hash and another hash, - and returns it as i (where distance = 2^i, and 0 <= i < 160) - """ - - #log(4, "comparing: %s\nwith %s" % (self.value, other.value)) - - rawdistance = self.value ^ other.value - if not rawdistance: - return 0 - - return int(math.log(rawdistance, 2)) - - #@-node:distance - #@+node:rawdistance - def rawdistance(self, other): - """ - calculates the 'distance' between this hash and another hash, - and returns it raw as this xor other - """ - return self.value ^ other.value - - #@-node:rawdistance - #@+node:operators - def __eq__(self, other): - #log(2, "KHash: comparing %s to %s" % (self, other)) - res = self.value == getattr(other, 'value', None) - #self.log(2, "KHash: res = %s" % repr(res)) - return res - - def __ne__(self, other): - return not (self == other) - - def __lt__(self, other): - return self.value < other.value - - def __gt__(self, other): - return self.value > other.value - - def __le__(self, other): - return self.value <= other.value - - def __ge__(self, other): - return self.value >= other.value - - def __ne__(self, other): - return self.value != other.value - - def __xor__(self, other): - return self.value ^ other.value - - #@-node:operators - #@-others -#@-node:class KHash -#@+node:class KBucket -class KBucket(KBase): - """ - Implements the 'k-bucket' object as required in Kademlia spec - """ - #@ @+others - #@+node:__init__ - def __init__(self): - """ - Creates a single k-bucket - """ - # list of known nodes - # order is least recently seen at head, most recently seen at tail - self.nodes = [] - - # list of death-row records - # refer spec section 2.1, paragraph 2 - # we deviate a little: - # when we hear from a new peer, and the bucket is full, - # we temporarily displace the old peer, and stick the new - # peer at end of list, then send out a ping - # If we hear from the old peer within a reasonable time, - # the new peer gets evicted and replaced with the old peer - # - # this list holds 2-tuples (oldpeer, newpeer), where - # oldpeer is the least-recently-seen peer that we displaced, and - # newpeer is the new peer we just heard from. - self.deathrow = [] - - #@-node:__init__ - #@+node:justSeenPeer - def justSeenPeer(self, peer): - """ - Tells the bucket that we've just seen a given node - """ - nodes = self.nodes - - if not isinstance(peer, KPeer): - raise KBadNode - - try: - idx = nodes.index(peer) - except: - idx = -1 - if idx >= 0: - del nodes[idx] - nodes.append(peer) - else: - nodes.append(peer) - - # might at some time need to implement death-row logic - # when we set a bucket size limit - refer __init__ - #@-node:justSeenPeer - #@+node:__iter__ - def __iter__(self): - return iter(self.nodes) - #@-node:__iter__ - #@-others -#@-node:class KBucket -#@+node:class KPeer -class KPeer(KBase): - """ - Encapsulates a peer node of a L{KNode}, - storing its ID and contact info - """ - #@ @+others - #@+node:__init__ - def __init__(self, node, dest): - """ - Create a ref to a kademlia peer node - - Arguments: - - node - reference to node which has the relationship - to this peer - - dest - the peer's I2P destination, as base64 - """ - if not isinstance(node, KNode): - raise KBadNode(node) - if not isinstance(dest, str): - raise KBadDest(dest) - - self.node = node - self.dest = dest - self.id = KHash(dest) - - self.justSeen() - - #@-node:__init__ - #@+node:send_ping - def send_ping(self, **kw): - """ - Sends a ping to remote peer - """ - self.send_raw(type="ping", **kw) - #@-node:send_ping - #@+node:send_store - def send_store(self, **kw): - """ - sends a store command to peer - """ - self.log(4, "\npeer %s\ndest %s...\nsending store cmd: %s" % (self, self.dest[:12], repr(kw))) - - self.send_raw(type="store", **kw) - - #@-node:send_store - #@+node:send_findNode - def send_findNode(self, hash, **kw): - """ - sends a findNode command to peer - """ - if not isinstance(hash, KHash): - raise KBadHash - - self.log(5, "\nquerying peer %s\ntarget hash %s" % (self, hash)) - - self.send_raw(type="findNode", hash=hash.value, **kw) - - #@-node:send_findNode - #@+node:send_findData - def send_findData(self, hash, **kw): - """ - sends a findData command to peer - """ - if not isinstance(hash, KHash): - raise KBadHash - - self.log(5, "\nquerying peer %s\ntarget hash %s" % (self, hash)) - - self.send_raw(type="findData", hash=hash.value, **kw) - - #@-node:send_findData - #@+node:send_reply - def send_reply(self, **kw): - """ - Sends an RPC reply back to upstream peer - """ - self.log(5, "\nnode %s\nreplying to peer %s:\n%s" % ( - self.node, self, kw)) - self.send_raw(type="reply", **kw) - - #@-node:send_reply - #@+node:send_raw - def send_raw(self, **kw): - """ - Sends a raw datagram to peer - - No arguments - just keywords, all of which must be strings or - other objects which can be bencoded - """ - self.node._sendRaw(self, **kw) - #@-node:send_raw - #@+node:justSeen - def justSeen(self): - self.timeLastSeen = time.time() - - #@-node:justSeen - #@+node:lowlevel - #@+others - #@+node:__str__ - def __str__(self): - - return "<KPeer:%s=>0x%s...>" % ( - self.node.name, ("%x" % self.id.value)[:8]) - - #@-node:__str__ - #@+node:__repr__ - def __repr__(self): - - return str(self) - - #@-node:__repr__ - #@+node:__eq__ - def __eq__(self, other): - - #self.log(2, "KPeer: comparing %s to %s (%s to %s)" % (self, other, self.__class__, other.__class__)) - res = self.id == getattr(other, 'id', None) - #self.log(2, "KPeer: res=%s" % res) - return res - - #@-node:__eq__ - #@+node:__ne__ - def __ne__(self, other): - return not (self == other) - #@-node:__ne__ - #@-others - #@-node:lowlevel - #@-others -#@-node:class KPeer -#@-node:Basic Classes -#@+node:RPC Classes -#@+node:class KRpc -class KRpc(KBase): - """ - Base class for RPCs between nodes. - Refer subclasses - """ - #@ @+others - #@+node:attribs - type = 'unknown' # override in subclass - - #@-node:attribs - #@+node:__init__ - def __init__(self, localNode, client=None, **kw): - """ - Holds all the information for an RPC - - Arguments: - - localNode - the node from which this RPC is being driven - - client - a representation of who is initiating this rpc, one of: - - None - an API caller, which is to be blocked until the RPC completes - or times out - - (upstreamPeer, upstreamMsgId) - an upstream peer - - callable object - something which requires a callback upon completion - in which case the callable will be invoked with the RPC results as the - first argument - - Keywords: - - cbArgs - optional - if given, and if client is a callback, the callback - will be invoked with the results as first argument, and this object as - second argument - """ - self.localNode = localNode - - if client == None: - # an api client - self.isLocal = True - self.queue = Queue.Queue() - self.callback = None - elif callable(client): - self.isLocal = False - self.callback = client - elif isinstance(client, tuple): - # we're doing the RPC on behalf of an upstream peer - upstreamPeer, upstreamMsgId = client - upstreamPeer = localNode._normalisePeer(upstreamPeer) - self.isLocal = False - self.upstreamPeer = upstreamPeer - self.upstreamMsgId = upstreamMsgId - self.callback = None - - # save keywords - self.__dict__.update(kw) - - # set time for receiving a tick. - # if this is set to an int absolute time value, the on_tick method - # will be called as soon as possible after that time - self.nextTickTime = None - - # and register with node as a pending command - self.localNode.rpcPending.append(self) - - # now start up the request - self.start() - - #@-node:__init__ - #@+node:__del__ - def __del__(self): - - #self.log(4, "\nRPC %s getting the chop" % (str(self))) - pass - - #@-node:__del__ - #@+node:__str__ - def __str__(self): - - return "<%s on node %s>" % (self.__class__.__name__, self.localNode.name) - - #@-node:__str__ - #@+node:__repr__ - def __repr__(self): - return str(self) - #@-node:__repr__ - #@+node:bindPeerReply - def bindPeerReply(self, peer, msgId): - """ - Sets up the node to give us a callback when a reply - comes in from downstream peer 'peer' with msg id 'msgId' - """ - self.localNode.rpcBindings[(peer.dest, msgId)] = (self, peer) - - #@-node:bindPeerReply - #@+node:unbindPeerReply - def unbindPeerReply(self, peer, msgId): - """ - Disables the callback from node for replies - from peer 'peer' with msgId 'msgId' - """ - bindings = self.localNode.rpcBindings - peerdest = peer.dest - if bindings.has_key((peerdest, msgId)): - del bindings[(peerdest, msgId)] - - #@-node:unbindPeerReply - #@+node:unbindAll - def unbindAll(self): - """ - Remove all reply bindings - """ - bindings = self.localNode.rpcBindings - self.log(5, "node bindings before: %s" % bindings) - for k,v in bindings.items(): - if v[0] == self: - del bindings[k] - self.log(5, "node bindings after: %s" % bindings) - - #@-node:unbindAll - #@+node:start - def start(self): - """ - Start the RPC running. - Override this in subclasses - """ - raise KNotImplemented - - #@-node:start - #@+node:execute - def execute(self): - """ - Only for synchronous (application-level) execution. - Wait for the RPC to complete (or time out) and return - whatever it came up with - """ - if core.fg: - print "servicing background thread" - while self.queue.empty(): - core.cycle() - - return self.queue.get() - - #@-node:execute - #@+node:terminate - def terminate(self): - """ - Clean up after ourselves. - Mainly involves removing ourself from local node - """ - self.unbindAll() - try: - self.localNode.rpcPending.remove(self) - except: - #traceback.print_exc() - pass - - #@-node:terminate - #@+node:returnValue - def returnValue(self, result=None, **kw): - """ - Passes a return value back to the original caller, be it - the local application, or an upstream peer - - Arguments: - - just one - a result object to pass back, if this RPC - was instigated by a local application call. - Note that if this RPC was instigated by an upstream - peer, this will be ignored. - - Keywords: - - the items to return, in the case that this RPC was - instigated by an upstream peer. Ignored if this - RPC was instigated by a local application call. - Note - the RPC invocation/reply dict keys are - listed at the top of this source file. - """ - self.terminate() - if self.callback: - if hasattr(self, 'cbArgs'): - self.callback(result, self.cbArgs) - else: - self.callback(result) - elif self.isLocal: - self.queue.put(result) - else: - self.upstreamPeer.send_reply(msgId=self.upstreamMsgId, - **kw) - #@-node:returnValue - #@+node:on_reply - def on_reply(self, peer, msgId, **details): - """ - Callback which fires when a downstream peer replies - - Override this in subclasses - """ - raise KNotImplemented - - #@-node:on_reply - #@+node:on_tick - def on_tick(self): - """ - Callback which fires if the whole RPC times out, in which - case the RPC should return whatever it can - - Override in subclasses - """ - self.localNode.rpcPending.remove(self) - - #@-node:on_tick - #@-others -#@-node:class KRpc -#@+node:PING -#@+node:class KRpcPing -class KRpcPing(KRpc): - """ - Implements the PING rpc as per Kademlia spec - """ - #@ @+others - #@+node:attribs - type = 'ping' - - #@-node:attribs - #@+node:__init__ - def __init__(self, localNode, client=None, **kw): - """ - Creates and performs a PING RPC - - Arguments: - - localNode - the node performing this RPC - - upstreamPeer - if given, the peer wanting a reply - - upstreamMsgId - if upstreamPeer is given, this is the msgId - of the RPC message from the upstream peer - - Keywords: - - peer - the peer to ping - default is local node - """ - peer = kw.get('peer', None) - if peer != None: - peer = localNode._normalisePeer(peer) - self.peerToPing = peer - - if kw.has_key('cbArgs'): - KRpc.__init__(self, localNode, client, cbArgs=kw['cbArgs']) - else: - KRpc.__init__(self, localNode, client) - - #@-node:__init__ - #@+node:start - def start(self): - """ - Sends out the ping - """ - peer = self.peerToPing - - # are we ourselves being pinged? - if peer == None: - # yes, just reply - self.returnValue(True) - return - - # no - we need to ping a peer - thisNode = self.localNode - - msgId = thisNode.msgId = thisNode._msgIdAlloc() - - # bind for peer response - self.bindPeerReply(peer, msgId) - - # and send it off - self.log(3, "node %s sending ping" % self.localNode.name) - peer.send_ping(msgId=msgId) - - # and set a reply timeout - self.nextTickTime = time.time() + timeout['ping'] - - #@-node:start - #@+node:on_reply - def on_reply(self, peer, msgId, **details): - """ - Callback for PING reply - """ - self.log(3, "got ping reply from %s" % peer) - self.returnValue(True) - - #@-node:on_reply - #@+node:on_tick - def on_tick(self): - """ - 'tick' handler. - - For PING RPC, the only time we should get a tick is when the ping - has timed out - """ - self.log(3, "timeout awaiting ping reply from %s" % self.peerToPing) - self.returnValue(False) - - #@-node:on_tick - #@-others -#@-node:class KRpcPing -#@-node:PING -#@+node:FIND_NODE -#@+node:class KPeerQueryRecord -class KPeerQueryRecord(KBase): - """ - Keeps state information regarding a peer we're quering - """ - #@ @+others - #@+node:__init__ - def __init__(self, peer, table, state=None, **kw): - - self.peer = peer - self.dest = peer.dest - self.deadline = time.time() + timeout['findNode'] - self.table = table - - # state is always one of: - # - 'start' - have not yet sent query to peer - # - 'recommended' - peer was recommended by another peer, no query sent - # - 'queried' - sent query, awaiting reply or timeout - # - 'replied' - this peer has replied to our query - # - 'timeout' - timed out waiting for peer reply - # - 'toofar' - too far away to be of interest - # - 'closest' - this peer is one of the closest so far - - if state == None: - state = 'start' - if not isinstance(state, str): - raise Exception("Invalid state %s" % state) - - self.state = state - - self.__dict__.update(kw) - - #@-node:__init__ - #@+node:hasTimedOut - def hasTimedOut(self, now=None): - if now == None: - now = time.time() - return self.state == 'queried' and now > self.deadline - - #@-node:hasTimedOut - #@+node:__cmp__ - def __cmp__(self, other): - - return cmp(self.peer.id.rawdistance(self.table.sorthash), - other.peer.id.rawdistance(self.table.sorthash)) - - #@-node:__cmp__ - #@+node:__lt__ etc - def __lt__(self, other): - return (cmp(self, other) < 0) - - def __le__(self, other): - return (cmp(self, other) <= 0) - - def __gt__(self, other): - return (cmp(self, other) > 0) - - def __ge__(self, other): - return (cmp(self, other) <= 0) - - #@-node:__lt__ etc - #@+node:isCloserThanAllOf - def isCloserThanAllOf(self, tab): - """ - returns True if this peerRec is closer to the desired hash - than all of the peerRecs in table 'tab' - """ - if not isinstance(tab, KPeerQueryTable): - self.log(2, "invalid qtable %s" % repr(tab)) - raise Exception("invalid qtable %s" % repr(tab)) - - for rec in tab: - if self > rec: - return False - return True - - #@-node:isCloserThanAllOf - #@+node:isCloserThanOneOf - def isCloserThanOneOf(self, tab): - """ - returns True if this peerRec is closer to the desired hash - than one or more of of the peerRecs in table 'tab' - """ - if not isinstance(tab, KPeerQueryTable): - self.log(2, "invalid qtable %s" % repr(tab)) - raise Exception("invalid qtable %s" % repr(tab)) - - for rec in tab: - if self < rec: - return True - return False - - #@-node:isCloserThanOneOf - #@-others -#@-node:class KPeerQueryRecord -#@+node:class KPeerQueryTable -class KPeerQueryTable(KBase): - """ - Holds zero or more instances of KPeerQuery and - presents/sorts table in different forms - """ - #@ @+others - #@+node:__init__ - def __init__(self, lst=None, sorthash=None, state=None, **kw): - self.peers = [] - if lst == None: - lst = [] - else: - self.setlist(lst, state, **kw) - self.sorthash = sorthash - - #@-node:__init__ - #@+node:setlist - def setlist(self, lst, state=None, **kw): - for item in lst: - self.append(item, state, **kw) - - #@-node:setlist - #@+node:getExpired - def getExpired(self): - """ - return a list of peers which have expired - """ - return KPeerQueryTable( - filter(lambda item: item.hasTimedOut(), self.peers), - self.sorthash - ) - - #@-node:getExpired - #@+node:purgeExpired - def purgeExpired(self): - """ - Eliminate peers which have expired - """ - for peer in self.peers: - if peer.hasTimedOut(): - self.peers.remove(peer) - - #@-node:purgeExpired - #@+node:sort - def sort(self): - """ - Sort the table in order of increasing distance from self.sorthash - """ - self.peers.sort() - - #@-node:sort - #@+node:select - def select(self, criterion): - """ - Returns a table of items for which criterion(item) returns True - Otherwise, if 'criterion' is a string, returns the items whose - state == criterion. - Otherwise, if 'criterion' is a list or tuple, return the items - whose state is one of the elements in criterion - """ - if callable(criterion): - func = criterion - elif type(criterion) in [type(()), type([])]: - func = lambda p: p.state in criterion - else: - func = lambda p: p.state == criterion - - recs = [] - for peerRec in self.peers: - if func(peerRec): - recs.append(peerRec) - return self.newtable(recs) - - #@-node:select - #@+node:count - def count(self, *args): - """ - returns the number of records whose state is one of args - """ - count = 0 - for rec in self.peers: - if rec.state in args: - count += 1 - return count - - #@-node:count - #@+node:changeState - def changeState(self, oldstate, newstate): - """ - for all recs of state 'oldstate', change their - state to 'newstate' - """ - for p in self.peers: - if p.state == oldstate: - p.state = newstate - #@-node:changeState - #@+node:filter - def filter(self, func): - """ - Eliminate, in place, all items where func(item) returns False - """ - for peerRec in self.peers: - if not func(peerRec): - self.peers.remove(peerRec) - - #@-node:filter - #@+node:purge - def purge(self, func): - """ - Eliminate, in place, all items where func(item) returns True - """ - if 0 and desperatelyDebugging: - set_trace() - for peerRec in self.peers: - if func(peerRec): - self.peers.remove(peerRec) - - #@-node:purge - #@+node:chooseN - def chooseN(self, n): - """ - Randomly select n peer query records - """ - candidates = self.peers[:] - - self.log(3, "candidates = %s" % repr(candidates)) - - chosen = [] - i = 0 - - if len(candidates) <= n: - chosen = candidates - else: - while i < n: - try: - peer = random.choice(candidates) - except: - self.log(2, "failed to choose one of %s" % repr(candidates)) - raise - chosen.append(peer) - candidates.remove(peer) - i += 1 - - return self.newtable(chosen) - - #@-node:chooseN - #@+node:__str__ - def __str__(self): - return "<KPeerQueryTable: %d peers>" % len(self) #.peers) - - def __repr__(self): - return str(self) - - #@-node:__str__ - #@+node:newtable - def newtable(self, items, state=None, **kw): - """ - Returns a new KPeerQueryTable object, based on this - one, but containing 'items' - """ - tab = KPeerQueryTable(items, sorthash=self.sorthash, state=state, **kw) - return tab - - #@-node:newtable - #@+node:dump - def dump(self): - - c = self.count - self.log(2, - "PeerQueryTable stats:\n" - "start: %s\n" - "recommended: %s\n" - "queried: %s\n" - "replied: %s\n" - "timeout: %s\n" - "closest: %s\n" - "toofar: %s\n" - "TOTAL: %s\n" % ( - c('start'), - c('recommended'), - c('queried'), - c('replied'), - c('timeout'), - c('closest'), - c('toofar'), - len(self.peers))) - - #states = [p.state for p in self.peers] - #self.log(3, "PeerQueryTable states:\n%s" % states) - - #@-node:dump - #@+node:list-like methods - #@+node:extend - def extend(self, items, state, **kw): - for item in items: - self.append(item, state, **kw) - - #@-node:extend - #@+node:append - def append(self, item, state=None, **kw): - - if isinstance(item, KPeerQueryRecord): - self.log(5, "adding a KPeerQueryRecord, state=%s" % state) - if state != None: - item.state = state - item.__dict__.update(kw) - peerRec = item - - elif isinstance(item, KPeer): - self.log(5, "adding a KPeer") - peerRec = KPeerQueryRecord(item, self, state, **kw) - - else: - self.log(2, "bad peer %s" % repr(item)) - raise KBadPeer - - if peerRec not in self: - self.log(5, "peerRec=%s list=%s" % (peerRec, self.peers)) - self.peers.append(peerRec) - else: - self.log(5, "trying to append duplicate peer???") - - #@-node:append - #@+node:remove - def remove(self, item): - self.peers.remove(item) - - #@-node:remove - #@+node:__getitem__ - def __getitem__(self, idx): - """ - Allow the table to be indexed by any of: - - KPeerQueryRecord - - integer index - - long string - treated as dest - - short string - treated as peer id hash string - - KHash - finds peer with that id - - KPeer - returns peer with that peer - """ - if type(idx) == type(0): - return self.peers[idx] - elif isinstance(idx, KPeer): - for peer in self.peers: - if peer.peer == idx: - return peer - raise IndexError("Query table has no peer %s" % idx) - elif isinstance(idx, str): - if len(str) > 512: - for peer in self.peers: - if peer.peer.dest == idx: - return peer - raise IndexError("No peer with dest %s" % idx) - else: - for peer in self.peers: - if peer.peer.id.value == idx: - return peer - raise IndexError("No peer with dest hash %s" % idx) - elif isinstance(idx, KHash): - for peer in self.peers: - if peer.peer.id == idx: - return peer - raise IndexError("No peer with id %s" % idx) - else: - raise IndexError("Invalid selector %s" % repr(idx)) - - #@-node:__getitem__ - #@+node:__len__ - def __len__(self): - return len(self.peers) - - #@-node:__len__ - #@+node:__getslice__ - def __getslice__(self, fromidx, toidx): - return KPeerQueryTable(self.peers[fromidx:toidx], self.sorthash) - - #@-node:__getslice__ - #@+node:__iter__ - def __iter__(self): - return iter(self.peers) - - #@-node:__iter__ - #@+node:__add__ - def __add__(self, other): - self.extend(other) - - #@-node:__add__ - #@+node:__contains__ - def __contains__(self, other): - self.log(5, "testing if %s is in %s" % (other, self.peers)) - for peerRec in self.peers: - if peerRec.peer.dest == other.peer.dest: - return True - return False - - #@-node:__contains__ - #@-node:list-like methods - #@-others -#@-node:class KPeerQueryTable -#@+node:class KRpcFindNode -class KRpcFindNode(KRpc): - """ - Implements the FIND_NODE rpc as per Kademlia spec - """ - #@ @+others - #@+node:spec info comments - #@+at - # Verbatim extract from original Kademlia paper follows: - # - # The lookup initiator starts by picking x nodes from its closest - # non-empty k-bucket (or, if that bucket has fewer than x - # entries, it just takes the closest x nodes it knows of). - # - # The initiator then sends parallel, asynchronous - # FIND NODE RPCs to the x nodes it has chosen. - # x is a system-wide concurrency parameter, such as 3. - # - # In the recursive step, the initiator resends the - # FIND NODE to nodes it has learned about from previous RPCs. - # - # [Paraphrased - in the recursive step, the initiator sends a FIND_NODE to - # each of the nodes that were returned as results of these previous - # FIND_NODE RPCs.] - # - # (This recursion can begin before all of the previous RPCs have - # returned). - # - # Of the k nodes the initiator has heard of closest to - # the target, it picks x that it has not yet queried and resends - # the FIND_NODE RPC to them. - # - # Nodes that fail to respond quickly are removed from consideration - # until and unless they do respond. - # - # If a round of FIND_NODEs fails to return a node any closer - # than the closest already seen, the initiator resends - # the FIND NODE to all of the k closest nodes it has - # not already queried. - # - # The lookup terminates when the initiator has queried and gotten - # responses from the k closest nodes it has seen. - #@-at - #@-node:spec info comments - #@+node:attribs - type = 'findNode' - #@-node:attribs - #@+node:__init__ - def __init__(self, localNode, client=None, **kw): - """ - Creates and launches the findNode rpc - - Arguments: - - localNode - the node performing this RPC - - client - see KRpc.__init__ - - Keywords: - - hash - a string, long int or KHash object representing - what we're looking for. treatment depends on type: - - KHash object - used as is - - string - gets wrapped into a KHash object - - long int - wrapped into a KHash object - refer KHash.__init__ - - raw - whether 'hash' is already a hash, default True - - local - True/False - whether to only search local store, - or pass on the query to the network, default True - """ - kw = dict(kw) - if kw.get('raw', False): - h = kw['hash'] - del kw['hash'] - kw['raw'] = h - self.hashWanted = KHash(**kw) - else: - self.hashWanted = KHash(kw['hash'], **kw) - self.isLocalOnly = kw.get('local', True) - - self.numQueriesPending = 0 - - self.numRounds = 0 # count number of rounds - self.numReplies = 0 # number of query replies received - self.numQueriesSent = 0 - self.numPeersRecommended = 0 - - # whichever mode we're called from, we gotta find the k closest peers - self.localNode = localNode - self.peerTab = self.findClosestPeersInitial() - - self.log(4, "KRpcFindNode: isLocalOnly=%s" % self.isLocalOnly) - - if kw.has_key('cbArgs'): - KRpc.__init__(self, localNode, client, cbArgs=kw['cbArgs']) - else: - KRpc.__init__(self, localNode, client) - - #@-node:__init__ - #@+node:start - def start(self): - """ - Kicks off this RPC - """ - # if we're being called by an upstream initiator, just return the peer list - if self.isLocalOnly: - peerDests = [peer.dest for peer in self.peerTab] - self.log(5, "findNode: local only: returning to upstream with %s" % repr(peerDests)) - self.returnValue(peerDests) - return - - # just return nothing if we don't have any peers - if len(self.peerTab) == 0: - self.returnValue([]) - return - - # send off first round of queries - self.sendSomeQueries() - - return - - #@-node:start - #@+node:sendSomeQueries - def sendSomeQueries(self, **kw): - """ - First step of findNode - - Select alpha nodes that we haven't yet queried, and send them queries - """ - # bail if too busy - if self.numQueriesPending >= maxConcurrentQueries: - return - - # shorthand - localNode = self.localNode - hashWanted = self.hashWanted - - # randomly choose some peers - #somePeerRecs = self.peerTab.chooseN(numSearchPeers) - somePeerRecs = self.peerTab.select('start') - - # start our ticker - self.nextTickTime = time.time() + timeout['findNode'] - - # and send them findNode queries - if len(somePeerRecs) > 0: - for peerRec in somePeerRecs: - self.log(3, "querying %s" % peerRec) - if self.numQueriesPending < maxConcurrentQueries: - self.sendOneQuery(peerRec) - else: - break - self.log(3, "queries sent, awaiting reply") - else: - self.log(3, "no peer recs???") - for peerRec in self.peerTab: - self.log(4, "%s state=%s" % (peerRec, peerRec.state)) - - #@-node:sendSomeQueries - #@+node:sendOneQuery - def sendOneQuery(self, peerRec): - """ - Sends off a query to a single peer - """ - if peerRec.state != 'start': - self.log(2, "duh!! peer state %s:\n%s" % (peerRec.state, peerRec)) - return - - msgId = self.localNode._msgIdAlloc() - self.bindPeerReply(peerRec.peer, msgId) - peerRec.msgId = msgId - - if self.type == 'findData': - peerRec.peer.send_findData(hash=self.hashWanted, msgId=msgId) - else: - peerRec.peer.send_findNode(hash=self.hashWanted, msgId=msgId) - - peerRec.state = 'queried' - - self.numQueriesPending += 1 - - self.numQueriesSent += 1 - - #@-node:sendOneQuery - #@+node:findClosestPeersInitial - def findClosestPeersInitial(self): - """ - Searches our k-buckets, and returns a table of k of - peers closest to wanted hash into self.closestPeersInitial - """ - hashobj = self.hashWanted - - lst = [] - buckets = self.localNode.buckets - for bucket in buckets: - for peer in bucket: - lst.append(peer) - - table = KPeerQueryTable(lst, self.hashWanted, 'start') - table.sort() - - return table[:maxBucketSize] - - #@-node:findClosestPeersInitial - #@+node:addPeerIfCloser - def addPeerIfCloser(self, peer): - """ - Maintains the private .peersToQuery array. - If the array is not yet maxed (ie, length < maxBucketSize), - the peer is simply added. - However, if the array is maxed, it finds the least-close peer, - and replaces it with the given peer if closer. - """ - #@-node:addPeerIfCloser - #@+node:isCloserThanQueried - def isCloserThanQueried(self, peer): - """ - Test function which returns True if argument 'peer' - is closer than all the peers in self.peersAlreadyQueried, - or False if not - """ - for p in self.peersAlreadyQueried: - if p.id.rawdistance(self.hashWanted) < peer.id.rawdistance(self.hashWanted): - return False - return True - - #@-node:isCloserThanQueried - #@+node:on_reply - def on_reply(self, peer, msgId, **details): - """ - Callback for FIND_NODE reply - """ - # shorthand - peerTab = self.peerTab - - self.numReplies += 1 - - # ------------------------------------------------------------ - # determine who replied, and get the raw dests sent back - try: - peerRec = peerTab[peer] - except: - traceback.print_exc() - self.log(3, "discarding findNode reply from unknown peer %s %s, discarding" % ( - peer, details)) - return - - # one less query to wait for - self.numQueriesPending -= 1 - - # ---------------------------------------------------------- - # peerRec is the peer that replied - # peers is a list of raw dests - - # save ref to this peer, it's seemingly good - self.localNode.addref(peerRec.peer) - - # mark it as having replied - if peerRec.state != 'queried': - self.log(2, "too weird - got a reply from a peer we didn't query") - peerRec.state = 'replied' - - # wrap the returned peers as KPeer objects - peersReturned = details.get('nodes', []) - peersReturned = [self.localNode._normalisePeer(p) for p in peersReturned] - - self.numPeersRecommended += len(peersReturned) - - # and add them to table in state 'recommended' - for p in peersReturned: - peerTab.append(p, 'recommended') - - # try to fire off more queries - self.sendSomeQueries() - - # and check for and action possible end of query round - self.checkEndOfRound() - - #@-node:on_reply - #@+node:on_tick - def on_tick(self): - """ - Callback for FIND_NODE reply timeout - """ - # check for timeouts, and update offending peers - now = time.time() - for peerRec in self.peerTab: - if peerRec.hasTimedOut(now): - peerRec.state = 'timeout' - - # makes room for more queries - self.sendSomeQueries() - - # possible end of round - self.checkEndOfRound() - - # schedule next tick - self.nextTickTime = time.time() + 5 - - #@-node:on_tick - #@+node:checkEndOfRound - def checkEndOfRound(self): - """ - Checks if we've hit the end of a query round. - If so, and if either: - - we've got some closer peers, OR - - we've heard from less than maxBucketSize peers, - fire off more queries - - Otherwise, return the best available - """ - peerTab = self.peerTab - - if core.fg: - set_trace() - - # has this query round ended? - if peerTab.count('start', 'queried') > 0: - # not yet - return - - self.log(2, "********** query round ended") - - # ------------------------------------ - # end of round processing - - self.numRounds += 1 - - # did we get any closer to required hash? - if self.type == 'findData' \ - or self.gotAnyCloser() \ - or peerTab.count('closest') < maxBucketSize: - - # yes - save these query results - self.log(4, "starting another round") - peerTab.changeState('replied', 'closest') - peerTab.changeState('recommended', 'start') - - # cull the shortlist - self.log(2, "culling to k peers") - if peerTab.count('closest') > maxBucketSize: - peerTab.sort() - excess = peerTab.select('closest')[maxBucketSize:] - excess.changeState('closest', 'toofar') - pass - - # and start up another round - self.sendSomeQueries() - - # did anything launch? - if peerTab.count('start', 'queried') == 0: - # no - we're screwed - self.returnTheBestWeGot() - - # done for now - return - - #@-node:checkEndOfRound - #@+node:gotAnyCloser - def gotAnyCloser(self): - """ - Tests if any peer records in state 'recommended' or 'replied' - are nearer than the records in state 'closest' - """ - peerTab = self.peerTab - - # get current closest peers - closest = peerTab.select('closest') - - # if none yet, then this was just end of first round - if len(closest) == 0: - return True - - # get the peers we're considering - #candidates = peerTab.select(('recommended', 'replied')) - candidates = peerTab.select('recommended') - - # now test them - gotOneCloser = False - for c in candidates: - #if c.isCloserThanOneOf(closest): - if c.isCloserThanAllOf(closest): - return True - - # none were closer - return False - - #@-node:gotAnyCloser - #@+node:returnTheBestWeGot - def returnTheBestWeGot(self): - """ - Returns the k closest nodes to the wanted hash that we have - actually heard from - """ - # pick the peers which have replied to us - closest = self.peerTab.select('closest') - - self.peerTab.dump() - - # add ourself to the list - we could easily be one of the best - localNode = self.localNode - selfDest = localNode._normalisePeer(localNode.dest) - closest.append(selfDest, state='closest') - - # sort in order of least distance first - closest.sort() - - # pick the best k of these - #peersHeardFrom = peersHeardFrom[:maxBucketSize] - #peersHeardFrom = peersHeardFrom[:numSearchPeers] - - # extract their dest strings - peers = [p.peer.dest for p in closest] - - # pass these back - self.returnValue(peers) - - # and we're done - return - - #@-node:returnTheBestWeGot - #@+node:returnValue - def returnValue(self, items): - """ - override with a nicer call sig - """ - # a hack for testing - save this RPC object into the node - # so we can introspect it - self.localNode.lastrpc = self - - items = items[:maxBucketSize] - - self.reportStats() - - KRpc.returnValue(self, items, nodes=items) - - #@-node:returnValue - #@+node:reportStats - def reportStats(self): - """ - Logs a stat dump of query outcome - """ - if self.isLocalOnly: - return - self.log(2, - "query terminated after %s rounds, %s queries, %s replies, %s recommendations" % ( - (self.numRounds+1), - self.numQueriesSent, - (self.numReplies+1), - self.numPeersRecommended - ) - ) - #@-node:reportStats - #@-others -#@-node:class KRpcFindNode -#@-node:FIND_NODE -#@+node:FIND_DATA -#@+node:class KRpcFindData -class KRpcFindData(KRpcFindNode): - """ - variant of KRpcFindNode which returns key value if found - """ - #@ @+others - #@+node:attribs - type = 'findData' - #@-node:attribs - #@+node:start - def start(self): - """ - Kicks off the RPC. - If requested key is stored locally, simply returns it. - Otherwise, falls back on parent method - """ - # if we posses the data, just return the data - value = self.localNode.storage.getKey(self.hashWanted.asHex(), keyIsHashed=True) - if value != None: - self.log(4, "Found required value in local storage") - self.returnValue(value) - return - - # no such luck - pass on to parent - KRpcFindNode.start(self) - - #@-node:start - #@+node:on_reply - def on_reply(self, peer, msgId, **details): - """ - Callback for FIND_NODE reply - """ - res = details.get('nodes', None) - if isinstance(res, str): - self.returnValue(res) - else: - KRpcFindNode.on_reply(self, peer, msgId, **details) - - #@-node:on_reply - #@-others - -#@-node:class KRpcFindData -#@-node:FIND_DATA -#@+node:STORE -#@+node:class KRpcStore -class KRpcStore(KRpc): - """ - Implements key storage - """ - #@ @+others - #@+node:attribs - type = 'store' - #@-node:attribs - #@+node:__init__ - def __init__(self, localNode, client=None, **kw): - """ - Creates and launches a STORE rpc - - Arguments: - - localNode - the node performing this RPC - - client - see KRpc.__init__ - - Keywords: - - key - the key under which we wish to save the data - - value - the value we wish to save - - local - True/False: - - if True, only save in local store - - if False, do a findNode to find the nodes to save the - key to, and tell them to save it - default is True - """ - self.key = kw['key'] - #self.keyHashed = shahash(self.key) - self.keyHashed = self.key - self.value = kw['value'] - self.isLocalOnly = kw.get('local', True) - - self.log(4, "isLocalOnly=%s" % self.isLocalOnly) - - if kw.has_key('cbArgs'): - KRpc.__init__(self, localNode, client, cbArgs=kw['cbArgs']) - else: - KRpc.__init__(self, localNode, client) - - #@-node:__init__ - #@+node:start - def start(self): - """ - Kicks off this RPC - """ - # if local only, or no peers, just save locally - if self.isLocalOnly or len(self.localNode.peers) == 0: - result = self.localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True) - if result: - result = 1 - else: - result = 0 - self.returnValue(result) - return - - # no - se have to find peers to store the key to, and tell them to - # store the key - - # launch a findNode rpc, continue in our callback - KRpcFindNode(self.localNode, self.on_doneFindNode, - hash=self.keyHashed, raw=True, local=False) - return - - #@-node:start - #@+node:returnValue - def returnValue(self, result): - """ - an override with a nicer call sig - """ - # a hack for testing - save this RPC object into the node - # so we can introspect it - self.localNode.lastrpc = self - - try: - KRpc.returnValue(self, result, status=result) - except: - traceback.print_exc() - self.log(3, "Failed to return %s" % repr(result)) - KRpc.returnValue(self, 0, status=0) - - #@-node:returnValue - #@+node:on_doneFindNode - def on_doneFindNode(self, lst): - """ - Receive a callback from findNode - - Send STORE command to each node that comes back - """ - localNode = self.localNode - - # normalise results - normalisePeer = localNode._normalisePeer - peers = [normalisePeer(p) for p in lst] # wrap in KPeer objects - - self.log(2, "STORE RPC findNode - got peers %s" % repr(peers)) - - self.numPeersToStore = min(len(peers), numStorePeers) - self.numPeersSucceeded = 0 - self.numPeersFailed = 0 - self.numPeersFinished = 0 - - i = 0 - - # and fire off store messages for each peer - for peer in peers: - - if peer.dest == localNode.dest: - self.log(3, "storing to ourself") - localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True) - self.numPeersSucceeded += 1 - self.numPeersFinished += 1 - else: - msgId = self.localNode._msgIdAlloc() - self.log(4, "forwarding store cmd to peer:\npeer=%s\nmsgId=%s" % (peer, msgId)) - self.bindPeerReply(peer, msgId) - peer.send_store(key=self.keyHashed, value=self.value, msgId=msgId) - i += 1 - if i >= numStorePeers: - break - - self.log(2, "Sent store cmd to %s peers, awaiting responses" % i) - - #@-node:on_doneFindNode - #@+node:on_reply - def on_reply(self, peer, msgId, **details): - """ - callback which fires when we get a reply from a STORE we sent to a - peer - """ - self.numPeersSucceeded += 1 - self.numPeersFinished += 1 - - if self.numPeersFinished == self.numPeersToStore: - # rpc is finished - self.returnValue(True) - - #@-node:on_reply - #@+node:on_tick - def on_tick(self): - self.log(3, "got a timeout tick, what should we do??") - - self.nextTickTime = time.time() + 3 - - #@-node:on_tick - #@-others -#@-node:class KRpcStore -#@-node:STORE -#@-node:RPC Classes -#@+node:Node Socket Server -#@+node:class KNodeServer -class KNodeServer(KBase, SocketServer.ThreadingMixIn, SocketServer.TCPServer): - """ - Listens for incoming socket connections - """ - #@ @+others - #@+node:__init__ - def __init__(self, node, addr=None): - - if addr == None: - addr = clientAddr - - self.isRunning = True - - self.node = node - - listenHost, listenPort = addr.split(":") - listenPort = int(listenPort) - self.listenPort = listenPort - SocketServer.TCPServer.__init__(self, (listenHost, listenPort), KNodeReqHandler) - - #@-node:__init__ - #@+node:serve_forever - def serve_forever(self): - - print "awaiting client connections on port %s" % self.listenPort - while self.isRunning: - self.handle_request() - - #@-node:serve_forever - #@-others -#@-node:class KNodeServer -#@+node:class KNodeReqHandler -class KNodeReqHandler(KBase, SocketServer.StreamRequestHandler): - """ - Manages a single client connection - """ - #@ @+others - #@+node:handle - def handle(self): - """ - Conducts all conversation for a single req - """ - req = self.request - client = self.client_address - server = self.server - node = self.server.node - - read = self.rfile.read - readline = self.rfile.readline - write = self.wfile.write - flush = self.wfile.flush - - finish = self.finish - - # start with a greeting - write("Stasher version %s ready\n" % version) - - # get the command - line = readline().strip() - - try: - cmd, args = re.split("\\s+", line, 1) - except: - cmd = line - args = '' - - self.log(3, "cmd=%s args=%s" % (repr(cmd), repr(args))) - - if cmd == "get": - value = node.get(args) - if value == None: - write("notfound\n") - else: - write("ok\n%s\n%s" % (len(value), value)) - flush() - finish() - return - - elif cmd == "put": - try: - size = int(readline()) - value = read(size) - res = node.put(args, value) - if res: - write("ok\n") - else: - write("failed\n") - flush() - except: - traceback.print_exc() - write("exception\n") - finish() - return - - elif cmd == 'addref': - try: - res = node.addref(args, True) - if res: - write("ok\n") - else: - write("failed\n") - flush() - except: - traceback.print_exc() - write("exception\n") - finish() - return - - elif cmd == 'getref': - res = node.dest - write("ok\n") - write("%s\n" % res) - finish() - return - - elif cmd == "die": - server.isRunning = False - write("server terminated\n") - finish() - - else: - write("unrecognisedcommand\n") - finish() - return - - #@-node:handle - #@+node:finish - def finish(self): - - SocketServer.StreamRequestHandler.finish(self) - self.connection.close() - - #@-node:finish - #@-others -#@-node:class KNodeReqHandler -#@+node:class KNodeClient -class KNodeClient(KBase): - """ - Talks to a KNodeServer over a socket - - Subclass this to implement Stasher clients in Python - """ - #@ @+others - #@+node:__init__ - def __init__(self, address=clientAddr): - - if type(address) in [type(()), type([])]: - self.host, self.port = clientAddr - else: - self.host, self.port = clientAddr.split(":") - self.port = int(self.port) - - self.hello() - - #@-node:__init__ - #@+node:hello - def hello(self): - - self.connect() - self.close() - #@-node:hello - #@+node:connect - def connect(self): - - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect((self.host, self.port)) - self.rfile = self.sock.makefile("rb") - self.read = self.rfile.read - self.readline = self.rfile.readline - self.wfile = self.sock.makefile("wb") - self.write = self.wfile.write - self.flush = self.wfile.flush - - # read greeting - greeting = self.readline() - parts = re.split("\\s+", greeting) - if parts[0] != "Stasher": - self.close() - raise Exception("Not connected to valid stasher interface") - - #@-node:connect - #@+node:close - def close(self): - - self.rfile.close() - self.wfile.close() - self.sock.close() - - #@-node:close - #@+node:get - def get(self, key): - """ - sends a get command to stasher socket, and retrieves - and interprets result - """ - self.connect() - - self.write("get %s\n" % key) - self.flush() - - #print "waiting for resp line" - res = self.readline().strip() - - if res == "ok": - size = int(self.readline()) - val = self.read(size) - self.close() - return val - else: - self.close() - return None - - #@-node:get - #@+node:put - def put(self, key, val): - """ - Tells remote stasher port to insert a file into the network - """ - self.connect() - self.write("put %s\n" % key) - self.write("%s\n" % len(val)) - self.write(val) - self.flush() - - res = self.readline().strip() - - self.close() - - if res == "ok": - return True - else: - print repr(res) - return False - - #@-node:put - #@+node:addref - def addref(self, ref): - """ - Passes a new noderef to node - """ - self.connect() - self.write("addref %s\n" % ref) - self.flush() - - res = self.readline().strip() - - self.close() - - if res == "ok": - return True - else: - print repr(res) - return False - - #@-node:addref - #@+node:getref - def getref(self): - """ - Uplifts node's own ref - """ - self.connect() - self.write("getref\n") - self.flush() - - res = self.readline().strip() - - self.close() - - if res == "ok": - ref = self.readline().strip() - return ref - else: - return None - - #@-node:getref - #@+node:kill - def kill(self): - """ - Tells remote server to fall on its sword - """ - try: - while 1: - self.connect() - self.write("die\n") - self.flush() - self.close() - except: - pass - - #@-node:kill - #@+node:__getitem__ - def __getitem__(self, item): - - return self.get(item) - - #@-node:__getitem__ - #@+node:__setitem__ - def __setitem__(self, item, val): - - if not self.put(item, val): - raise Exception("Failed to insert") - - #@-node:__setitem__ - #@-others - -#@-node:class KNodeClient -#@-node:Node Socket Server -#@+node:NODE -#@+node:class KNode -class KNode(KBase): - """ - B{Public API to this Kademlia implementation} - - You should not normally need to use, or even be aware of, - any of the other classes - - And in this class, the only methods you need to worry about are: - - L{start} - starts the node running - - L{stop} - stops the node - - L{get} - retrieve a key value - - L{put} - stores a key value - - This class implements a single kademlia node. - Within a single process, you can create as many nodes as you like. - """ - #@ @+others - #@+node:attributes - SocketFactory = None # defaults to I2P socket - - #@-node:attributes - #@+node:__init__ - def __init__(self, name, **kw): - """ - Creates a kademlia node of name 'name'. - - Name is mandatory, because each name is permanently written - to the SAM bridge's store - - I thought of supporting random name generation, but went off this - idea because names get permanently stored to SAM bridge's file - - Arguments: - - name - mandatory - a short text name for the node, should - be alphanumerics, '-', '.', '_' - This name is used for the SAM socket session. - - Keywords: - - storage - optional - an instance of L{KStorageBase} or one of - its subclasses. If not given, default action is to instantiate - a L{KStorageFile} object against the given node name - """ - # remember who we are - self.name = name - - # not running yet, will launch when explicitly started, or implicitly - # when the first operation gets done - self.isRunning = False - - # create socket and get its dest, and determine our node id - self.id = KHash("<NONE>") - self.log(5, "creating socket for node %s" % name) - self.log(5, "socket for node %s created" % name) - if self.SocketFactory == None: - self.SocketFactory = i2p.socket.socket - self.sock = self.SocketFactory( - "stashernode-"+name, - i2p.socket.SOCK_DGRAM, - samaddr=samAddr, - **kw) - #self.sockLock = threading.Lock() # prevents socket API reentrance - self.sock.setblocking(0) - self.dest = self.sock.dest - self.id = KHash(self.dest) - - # create our buckets - self.buckets = [] - for i in range(160): - self.buckets.append(KBucket()) - - # create our storage object, default to new instance of KStorageFile - self.storage = kw.get('storage', KStorageFile(self)) - - # dig out all previously known nodes - self.peers = self.storage.getRefs() - - # set up dict of callers awaiting replies - # keys are (peerobj, msgId) tuples, values are Queue.Queue objects - self.pendingPings = {} - - # mapping of (peer, msgId) to RPC object, so when RPC replies come in, - # they can be passed directly to the RPC object concerned - self.rpcBindings = {} - - # KRpc objects waiting for peer replies - used for checking for timeouts - self.rpcPending = [] - - # miscellaneous shit - self._msgIdNext = 0 - #self._msgIdLock = threading.Lock() - - # register in global map - _nodes[name] = self - - - #@-node:__init__ - #@+node:__del__ - def __del__(self): - """ - Cleanup - """ - - #@-node:__del__ - #@+node:application-level - #@+node:start - def start(self, doPings=True): - """ - Starts the node running - """ - # barf if already running - if self.isRunning: - self.log(3, "node %s is already running!" % self.name) - return - - self.log(3, "starting node %s" % self.name) - - # first step - ping all our peers - if doPings: - for peer in self.peers: - self.log(3, "doing initial ping\n%s\n%s" % (self, peer)) - KRpcPing(self, peer=peer) - - # first step - do a findNode against our own node id, and ping our - # neighbours - if greetPeersOnStartup: - neighbours = KRpcFindNode(self, hash=self.id).execute() - self.log(3, "neighbours=%s" % repr([n[:10] for n in neighbours])) - for n in neighbours: - n = self._normalisePeer(n) - KRpcPing(self, peer=n) - - # note now that we're running - self.isRunning = True - - # and enlist with the core - if runCore: - core.subscribe(self) - else: - # central core disabled, run our own receiver thread instead - thread.start_new_thread(self._threadRx, ()) - #@-node:start - #@+node:stop - def stop(self): - """ - Shuts down the node - """ - self.isRunning = 0 - if runCore: - try: - core.unsubscribe(self) - except: - pass - #@-node:stop - #@+node:get - def get(self, item, callback=None, **kw): - """ - Attempts to retrieve data from the network - - Arguments: - - item - the key we desire - - callback - optional - if given, the get will be performed - asynchronously, and callback will be invoked upon completion, with - the result as first argument - Keywords: - - local - optional - if True, limits this search to this local node - default is False - - Returns: - - if no callback - the item value if the item was found, or None if not - - if callback, None is returned - """ - def processResult(r): - if isinstance(r, str): - return r - return None - - if callback: - # create a func to process callback result - def onCallback(res): - callback(processResult(res)) - - self._finddata(item, onCallback, **kw) - else: - return processResult(self._finddata(item, **kw)) - - #@-node:get - #@+node:put - def put(self, key, value, callback=None, **kw): - """ - Inserts a named key into the network - - Arguments: - - key - one of: - - None - a secure key will be generated and used - - a KHash object - - a raw string which will be hashed into a KHash object - - val - a string, the value associated with the key - - If the value is larger than L{maxValueSize}, a L{KValueTooLarge} - exception will occur. - """ - return self._store(key, value, callback, **kw) - - #@-node:put - #@+node:addref - def addref(self, peer, doPing=False): - """ - Given a peer node's destination, add it to our - buckets and internal data store - - Arguments: - - peer - one of: - - the I2P destination of the peer node, as - a base64 string - - a KNode object - - a KPeer object - - doPing - ping this node automatically (default False) - """ - peer = self._normalisePeer(peer) - - # remember peer if not already known - if peer.dest == self.dest: - self.log(3, "node %s, trying to add ref to ourself???" % self.name) - return peer - elif not self._findPeer(peer.dest): - self.peers.append(peer) - self.storage.putRefs(peer) - else: - self.log(4, "node %s, trying to add duplicate noderef %s" % ( - self.name, peer)) - return peer - - # update our KBucket - dist = self.id.distance(peer.id) - self.buckets[dist].justSeenPeer(peer) - - if doPing: - self.log(4, "doing initial ping\n%s\n%s" % (self, peer)) - KRpcPing(self, peer=peer) - - return peer - - #@-node:addref - #@+node:__getitem__ - def __getitem__(self, item): - """ - Allows dict-like accesses on the node object - """ - return self.get(item) - #@-node:__getitem__ - #@+node:__setitem__ - def __setitem__(self, item, val): - """ - Allows dict-like key setting on the node object - """ - self.put(item, val) - - #@-node:__setitem__ - #@-node:application-level - #@+node:peer/rpc methods - #@+node:_ping - def _ping(self, peer=None, callback=None, **kw): - """ - Sends a ping to remote peer, and awaits response - - Not of much real use to application level, except - perhaps for testing - - If the argument 'peer' is not given, the effect is to 'ping the - local node', which I guess might be a bit silly - - The second argument 'callback' is a callable, which if given, makes this - an asynchronous (non-blocking) call, in which case the callback will be - invoked upon completion (or timeout). - - If the keyword 'cbArgs' is given in addition to the callback, the callback - will fire with the results as first argument and this value as second arg - """ - if callback: - KRpcPing(self, callback, peer=peer, **kw) - else: - return KRpcPing(self, peer=peer).execute() - - #@-node:_ping - #@+node:_findnode - def _findnode(self, something=None, callback=None, **kw): - """ - Mainly for testing - does a findNode query on the network - - Arguments: - - something - one of: - - plain string - the string gets hashed and used for the search - - int or long int - this gets used as the raw hash - - a KHash object - that's what gets used - - None - the value of the 'raw' keyword will be used instead - - callback - optional - if given, a callable object which will be - called upon completion, with the result as argument - - Keywords: - - local - optional - if True, only returns the closest peers known to - node. if False, causes node to query other nodes. - default is False - - raw - one of: - - 20-byte string - this gets used as a binary hash - - 40-byte string - this gets used as a hex hash - """ - if not kw.has_key('local'): - kw = dict(kw) - kw['local'] = False - - self.log(3, "about to instantiate findnode rpc") - if callback: - KRpcFindNode(self, callback, hash=something, **kw) - self.log(3, "asynchronously invoked findnode, expecting callback") - else: - lst = KRpcFindNode(self, hash=something, **kw).execute() - self.log(3, "back from findnode rpc") - res = [self._normalisePeer(p) for p in lst] # wrap in KPeer objects - return res - - #@-node:_findnode - #@+node:_finddata - def _finddata(self, something=None, callback=None, **kw): - """ - As for findnode, but if data is found, return the data instead - """ - if not kw.has_key('local'): - kw = dict(kw) - kw['local'] = False - - self.log(3, "about to instantiate finddata rpc") - if callback: - KRpcFindData(self, callback, hash=something, **kw) - self.log(3, "asynchronously invoked finddata, expecting callback") - else: - res = KRpcFindData(self, hash=something, **kw).execute() - self.log(3, "back from finddata rpc") - if not isinstance(res, str): - self.log(4, "findData RPC returned %s" % repr(res)) - res = [self._normalisePeer(p) for p in res] # wrap in KPeer objects - return res - - #@-node:_finddata - #@+node:_store - def _store(self, key, value, callback=None, **kw): - """ - Performs a STORE rpc - - Arguments: - - key - string - text name of key - - value - string - value to store - - Keywords: - - local - if given and true, only store value onto local store - """ - if not kw.has_key('local'): - kw = dict(kw) - kw['local'] = False - - key = shahash(key) - if callback: - KRpcStore(self, callback, key=key, value=value, **kw) - self.log(3, "asynchronously invoked findnode, expecting callback") - else: - res = KRpcStore(self, key=key, value=value, **kw).execute() - return res - - #@-node:_store - #@+node:_findPeer - def _findPeer(self, dest): - """ - Look up our table of current peers for a given dest. - - If dest is found, return its object, otherwise return None - """ - for peerObj in self.peers: - if peerObj.dest == dest: - return peerObj - return None - - #@-node:_findPeer - #@-node:peer/rpc methods - #@+node:comms methods - #@+node:_sendRaw - def _sendRaw(self, peer, **kw): - """ - Serialises keywords passed, and sends this as a datagram - to node 'peer' - """ - # update our KBucket - dist = self.id.distance(peer.id) - self.buckets[dist].justSeenPeer(peer) - - # save ref to this peer - self.addref(peer) - - params = dict(kw) - msgId = params.get('msgId', None) - if msgId == None: - msgId = params['msgId'] = self._msgIdAlloc() - - objenc = messageEncode(params) - self.log(5, "node %s waiting for send lock" % self.name) - #self.sockLock.acquire() - self.log(5, "node %s got send lock" % self.name) - try: - self.sock.sendto(objenc, 0, peer.dest) - except: - traceback.print_exc() - #self.sockLock.release() - self.log(5, "node %s released send lock" % self.name) - - self.log(4, "node %s sent %s to peer %s" % (self.name, params, peer.dest)) - return msgId - - #@-node:_sendRaw - #@-node:comms methods - #@+node:engine - #@+node:_threadRx - def _threadRx(self): - """ - Thread which listens for incoming datagrams and actions - accordingly - """ - self.log(3, "starting receiver thread for node %s" % self.name) - - try: - # loop to drive the node - while self.isRunning: - self._doChug() - except: - traceback.print_exc() - self.log(3, "node %s - THREAD CRASH!" % self.name) - - self.log(3, "receiver thread for node %s terminated" % self.name) - - #@-node:_threadRx - #@+node:_doChug - def _doChug(self): - """ - Do what's needed to drive the node. - Handle incoming packets - Check on and action timeouts - """ - # handle all available packets - while self._doRx(): - pass - - # do maintenance - eg processing timeouts - self._doHousekeeping() - - #@-node:_doChug - #@+node:_doRx - def _doRx(self): - """ - Receives and handles one incoming packet - - Returns True if a packet got handled, or False if timeout - """ - # get next packet - self.log(5, "%s seeking socket lock" % self.name) - #self.sockLock.acquire() - self.log(5, "%s got socket lock" % self.name) - try: - item = self.sock.recvfrom(-1) - except i2p.socket.BlockError: - #self.sockLock.release() - self.log(5, "%s released socket lock after timeout" % self.name) - if not runCore: - time.sleep(0.1) - return False - except: - traceback.print_exc() - self.log(5, "%s released socket lock after exception" % self.name) - #self.sockLock.release() - return True - #self.sockLock.release() - self.log(5, "%s released socket lock normally" % self.name) - - try: - (data, dest) = item - except ValueError: - self.log(3, "node %s: recvfrom returned no dest, possible spoof" \ - % self.name) - data = item[0] - dest = None - - # try to decode - try: - d = messageDecode(data) - except: - traceback.print_exc() - self.log(3, "failed to unpickle incoming data for node %s" % \ - self.name) - return True - - # ditch if not a dict - if type(d) != type({}): - self.log(3, "node %s: decoded packet is not a dict" % self.name) - return True - - # temporary workaround for sam socket bug - if dest == None: - if hasattr(d, 'has_key') and d.has_key('dest'): - dest = d['dest'] - - # try to find it in our store - peerObj = self._findPeer(dest) - if peerObj == None: - # previously unknown peer - add it to our store - peerObj = self.addref(dest) - else: - peerObj.justSeen() # already exists - refresh its timestamp - self.addref(peerObj.dest) - - # drop packet if no msgId - msgId = d.get('msgId', None) - if msgId == None: - self.log(3, "no msgId, dropping") - return True - del d['msgId'] - - msgType = d.get('type', 'unknown') - - if desperatelyDebugging: - pass - #set_trace() - - # if a local RPC is awaiting this message, fire its callback - item = self.rpcBindings.get((peerObj.dest, msgId), None) - if item: - rpc, peer = item - try: - rpc.unbindPeerReply(peerObj, msgId) - if desperatelyDebugging: - set_trace() - rpc.on_reply(peerObj, msgId, **d) - - except: - traceback.print_exc() - self.log(2, "unhandled exception in RPC on_reply") - else: - # find a handler, fallback on 'unknown' - self.log(5, "\nnode %s\ngot msg id %s type %s:\n%s" % ( - self.name, msgId, msgType, d)) - hdlrName = d.get('type', 'unknown') - hdlr = getattr(self, "_on_"+hdlrName) - try: - if desperatelyDebugging: - set_trace() - hdlr(peerObj, msgId, **d) - except: - traceback.print_exc() - self.log(2, "unhandled exception in unbound packet handler %s" % hdlrName) - - return True - - #@-node:_doRx - #@+node:_doHousekeeping - def _doHousekeeping(self): - """ - Performs periodical housekeeping on this node. - - Activities include: - - checking pending records for timeouts - """ - now = time.time() - - # DEPRECATED - SWITCH TO RPC-based - # check for expired pings - for msgId, (dest, q, pingDeadline) in self.pendingPings.items(): - - if pingDeadline > now: - # not timed out, leave in pending - continue - - # ping has timed out - del self.pendingPings[msgId] - q.put(False) - - # check for timed-out RPCs - for rpc in self.rpcPending[:]: - if now >= rpc.nextTickTime: - try: - rpc.on_tick() - except: - traceback.print_exc() - self.log(2, "unhandled exception in RPC on_tick") - - #@-node:_doHousekeeping - #@-node:engine - #@+node:event handling - #@+others - #@+node:_on_ping - def _on_ping(self, peer, msgId, **kw): - """ - Handler for ping received events - """ - KRpcPing(self, (peer, msgId), local=True, **kw) - return - - # old stuff - - self.log(3, "\nnode %s\nfrom %s\nreceived:\n%s" % (self.name, peer, kw)) - - # randomly cause ping timeouts if testing - if testing: - howlong = random.randint(0, 5) - self.log(3, "deliberately pausing for %s seconds" % howlong) - time.sleep(howlong) - - # pong back to node - peer.send_reply(msgId=msgId) - - - #@nonl - #@-node:_on_ping - #@+node:_on_findNode - def _on_findNode(self, peer, msgId, **kw): - """ - Handles incoming findNode command - """ - KRpcFindNode(self, (peer, msgId), local=True, **kw) - - #@-node:_on_findNode - #@+node:_on_findData - def _on_findData(self, peer, msgId, **kw): - """ - Handles incoming findData command - """ - KRpcFindData(self, (peer, msgId), local=True, **kw) - - #@-node:_on_findData - #@+node:_on_store - def _on_store(self, peer, msgId, **kw): - """ - Handles incoming STORE command - """ - self.log(4, "got STORE rpc from upstream:\npeer=%s\nmsgId=%s\nkw=%s" % (peer, msgId, kw)) - - KRpcStore(self, (peer, msgId), local=True, **kw) - - #@-node:_on_store - #@+node:_on_reply - def _on_reply(self, peer, msgId, **kw): - """ - This should never happen - """ - self.log(4, "got unhandled reply:\npeer=%s\nmsgId=%s\nkw=%s" % ( - peer, msgId, kw)) - - #@-node:_on_reply - #@+node:_on_unknown - def _on_unknown(self, peer, msgId, **kw): - """ - Handler for unknown events - """ - self.log(3, "node %s from %s received msgId=%s:\n%s" % ( - self.name, peer, msgId, kw)) - - #@-node:_on_unknown - #@-others - #@-node:event handling - #@+node:Socket Client Server - #@+node:serve - def serve(self): - """ - makes this node listen on socket for incoming client - connections, and services these connections - """ - server = KNodeServer(self) - server.serve_forever() - - #@-node:serve - #@-node:Socket Client Server - #@+node:lowlevel stuff - #@+others - #@+node:__str__ - def __str__(self): - return "<KNode:%s=0x%s...>" % ( - self.name, - ("%x" % self.id.value)[:8], - ) - #@-node:__str__ - #@+node:__repr__ - def __repr__(self): - return str(self) - - #@-node:__repr__ - #@+node:_msgIdAlloc - def _msgIdAlloc(self): - """ - issue a new and unique message id - """ - #self._msgIdLock.acquire() - msgId = self._msgIdNext - self._msgIdNext += 1 - #self._msgIdLock.release() - return msgId - #@-node:_msgIdAlloc - #@+node:_normalisePeer - def _normalisePeer(self, peer): - """ - Takes either a b64 dest string, a KPeer object or a KNode object, - and returns a KPeer object - """ - # act according to whatever type we're given - if isinstance(peer, KPeer): - return peer # already desired format - elif isinstance(peer, KNode): - return KPeer(self, peer.dest) - elif isinstance(peer, str) and len(peer) > 256: - return KPeer(self, peer) - else: - self.log(3, "node %s, trying to add invalid noderef %s" % ( - self.name, peer)) - raise KBadNode(peer) - - #@-node:_normalisePeer - #@+node:__del__ - def __del__(self): - """ - Clean up on delete - """ - self.log(3, "node dying: %s" % self.name) - - try: - del _nodes[self.name] - except: - pass - - self.stop() - - #@-node:__del__ - #@-others - #@-node:lowlevel stuff - #@-others -#@-node:class KNode -#@-node:NODE -#@+node:funcs -#@+others -#@+node:userI2PDir -def userI2PDir(nodeName=None): - """ - Returns a directory under user's home dir into which - stasher files can be written - - If nodename is given, a subdirectory will be found/created - - Return value is toplevel storage dir if nodename not given, - otherwise absolute path including node dir - """ - if dataDir != None: - if not os.path.isdir(dataDir): - os.makedirs(dataDir) - return dataDir - - if sys.platform == 'win32': - home = os.getenv("APPDATA") - if home: - topDir = os.path.join(home, "stasher") - else: - topDir = os.path.join(os.getcwd(), "stasher") - else: - #return os.path.dirname(__file__) - topDir = os.path.join(os.path.expanduser('~'), ".stasher") - - if not os.path.isdir(topDir): - os.makedirs(topDir) - if nodeName == None: - return topDir - else: - nodeDir = os.path.join(topDir, nodeName) - if not os.path.isdir(nodeDir): - os.makedirs(nodeDir) - return nodeDir - -#@-node:userI2PDir -#@+node:nodePidfile -def nodePidfile(nodename): - return os.path.join(userI2PDir(nodename), "node.pid") - -#@-node:nodePidfile -#@+node:messageEncode -def messageEncode(params): - """ - Serialise the dict 'params' for sending - - Temporarily using bencode - replace later with a more - efficient struct-based impl. - """ - try: - return bencode.bencode(params) - except: - log(1, "encoder failed to encode: %s" % repr(params)) - raise - -#@-node:messageEncode -#@+node:messageDecode -def messageDecode(raw): - return bencode.bdecode(raw) -#@-node:messageDecode -#@+node:shahash -def shahash(somestr, bin=False): - shaobj = sha.new(somestr) - if bin: - return shaobj.digest() - else: - return shaobj.hexdigest() - -#@-node:shahash -#@+node:log -logLock = threading.Lock() - -def log(verbosity, msg, nPrev=0, clsname=None): - - global logToSocket, logFile - - # create logfile if not exists - if logFile == None: - logFile = os.path.join(userI2PDir(), "stasher.log") - - # rip the stack - caller = traceback.extract_stack()[-(2+nPrev)] - path, line, func = caller[:3] - path = os.path.split(path)[1] - - #print "func is type %s, val %s" % (type(func), repr(func)) - - #if hasattr(func, "im_class"): - # func = - - if clsname: - func = clsname + "." + func - - #msg = "%s:%s:%s(): %s" % ( - # path, - # line, - # func, - # msg.replace("\n", "\n + ")) - - msg = "%s():%s: %s" % ( - func, - line, - msg.replace("\n", "\n + ")) - - # do better logging later - if verbosity > logVerbosity: - return - - if logToSocket: - try: - if isinstance(logToSocket, int): - portnum = logToSocket - logToSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - connected = 0 - while 1: - try: - logToSocket.connect(("localhost", portnum)) - break - except socket.error: - print "Please open an xterm/nc listening on %s" % logToSocket - time.sleep(1) - - logToSocket.send(msg+"\n") - except: - traceback.print_exc() - else: - print msg - - logLock.acquire() - file(logFile, "a+").write(msg + "\n") - logLock.release() -#@-node:log -#@+node:logexc -def logexc(verbosity, msg, nPrev=0, clsname=None): - - fd = StringIO("%s\n" % msg) - traceback.print_exc(file=fd) - log(verbosity, fd.getvalue(), nPrev, clsname) - -#@-node:logexc -#@+node:spawnproc -def spawnproc(*args, **kw): - """ - Spawns a process and returns its PID - - VOMIT! - - I have to do a pile of odious for the win32 side - - Returns a usable PID - - Keywords: - - priority - priority at which to spawn - default 20 (highest) - """ - # get priority, convert to a unix 'nice' value - priority = 20 - kw.get('priority', 20) - - if sys.platform != 'win32': - # *nix - easy - #print "spawnproc: launching %s" % repr(args) - - # insert nice invocation - args = ['/usr/bin/nice', '-n', str(priority)] + list(args) - return os.spawnv(os.P_NOWAIT, args[0], args) - - else: - # just close your eyes here and pretend this abomination isn't happening! :(( - args = list(args) - args.insert(0, sys.executable) - cmd = " ".join(args) - #print "spawnproc: launching %s" % repr(cmd) - - if 0: - try: - c = _winreg.ConnectRegistry(None, _winreg.HKEY_LOCAL_MACHINE) - c1 = _winreg.OpenKey(c, "SOFTWARE") - c2 = _winreg.OpenKey(c1, "Microsoft") - c3 = _winreg.OpenKey(c2, "Windows NT") - c4 = _winreg.OpenKey(c3, "CurrentVersion") - supportsBelowNormalPriority = 1 - except: - supportsBelowNormalPriority = 0 - else: - if sys.getwindowsversion()[3] != 2: - supportsBelowNormalPriority = 0 - else: - supportsBelowNormalPriority = 1 - - # frig the priority into a windows value - if supportsBelowNormalPriority: - if priority < 7: - pri = win32process.IDLE_PRIORITY_CLASS - elif priority < 14: - pri = 0x4000 - else: - pri = win32process.NORMAL_PRIORITY_CLASS - else: - if priority < 11: - pri = win32process.IDLE_PRIORITY_CLASS - else: - pri = win32process.NORMAL_PRIORITY_CLASS - - print "spawnproc: launching %s" % repr(args) - si = win32process.STARTUPINFO() - hdl = win32process.CreateProcess( - sys.executable, # lpApplicationName - cmd, # lpCommandLine - None, # lpProcessAttributes - None, # lpThreadAttributes - 0, # bInheritHandles - 0, # dwCreationFlags - None, # lpEnvironment - None, # lpCurrentDirectory - si, # lpStartupInfo - ) - pid = hdl[2] - #print "spawnproc: pid=%s" % pid - return pid -#@-node:spawnproc -#@+node:killproc -def killproc(pid): - if sys.platform == 'win32': - print repr(pid) - handle = win32api.OpenProcess(1, 0, pid) - print "pid %s -> %s" % (pid, repr(handle)) - #return (0 != win32api.TerminateProcess(handle, 0)) - win32process.TerminateProcess(handle, 0) - else: - return os.kill(pid, signal.SIGKILL) -#@-node:killproc -#@+node:i2psocket -def i2psocket(self, *args, **kw): - return i2p.socket.socket(*args, **kw) - -#@-node:i2psocket -#@+node:usage -def usage(detailed=False, ret=0): - - print "Usage: %s <options> [<command> [<ars>...]]" % sys.argv[0] - if not detailed: - print "Type %s -h for help" % sys.argv[0] - sys.exit(ret) - - print "This is stasher, distributed file storage network that runs" - print "atop the anonymising I2P network (http://www.i2p.net)" - print "Written by aum - August 2004" - print - print "Options:" - print " -h, --help - display this help" - print " -v, --version - print program version" - print " -V, --verbosity=n - verbosity, default 1, 1=quiet ... 4=noisy" - print " -S, --samaddr=host:port - host:port of I2P SAM port, " - print " default %s" % i2p.socket.samaddr - print " -C, --clientaddr=host:port - host:port for socket interface to listen on" - print " for clients, default %s" % clientAddr - print " -d, --datadir=dir - directory in which stasher files get written" - print " default is ~/.i2pstasher" - print - print "Commands:" - print " start [<nodename>]" - print " - launches a single node, which forks off and runs in background" - print " nodename is a short unique nodename, default is '%s'" % defaultNodename - print " stop [<nodename>]" - print " - terminates running node <nodename>" - print " get <keyname> [<file>]" - print " - attempts to retrieve key <keyname> from the network, saving" - print " to file <file> if given, or to stdout if not" - print " put <keyname> [<file>]" - print " - inserts key <keyname> into the network, taking its content" - print " from file <file> if given, otherwise reads content from stdin" - print " addref <file>" - print " - adds a new noderef to the node, taking the base64 noderef" - print " from file <file> if given, or from stdin" - print " (if you don't have any refs, visit http://stasher.i2p, or use" - print " the dest in the file aum.stasher in cvs)" - print " getref <file>" - print " - uplifts the running node's dest as base64, writing it to file" - print " <file> if given, or to stdout" - print " status" - print " - do a status dump - connectivity, stats etc" - print " help" - print " - display this help" - print - - sys.exit(0) - -#@-node:usage -#@+node:err -def err(msg): - sys.stderr.write(msg+"\n") -#@-node:err -#@+node:main -def main(): - """ - Command line interface - """ - global samAddr, clientAddr, logVerbosity, dataDir - - argv = sys.argv - argc = len(argv) - - try: - opts, args = getopt.getopt(sys.argv[1:], - "h?vV:S:C:sd:", - ['help', 'version', 'samaddr=', 'clientaddr=', - 'verbosity=', 'status', 'datadir=', - ]) - except: - traceback.print_exc(file=sys.stdout) - usage("You entered an invalid option") - - daemonise = True - verbosity = 2 - debug = False - - for opt, val in opts: - - if opt in ['-h', '-?', '--help']: - usage(True) - - elif opt in ['-v', '--version']: - print "Stasher version %s" % version - sys.exit(0) - - elif opt in ['-V', '--verbosity']: - logVerbosity = int(val) - - elif opt in ['-S', '--samaddr']: - samAddr = val - - elif opt in ['-C', '--clientaddr']: - clientAddr = val - - elif opt in ['-s', '--status']: - dumpStatus() - - elif opt in ['-d', '--datadir']: - dataDir = val - - #print "Debug - bailing" - #print repr(opts) - #print repr(args) - #sys.exit(0) - - # Barf if no command given - if len(args) == 0: - err("No command given") - usage(0, 1) - - cmd = args.pop(0) - argc = len(args) - - print "cmd=%s, args=%s" % (repr(cmd), repr(args)) - - if cmd not in ['_start', 'start', 'stop', 'hello', 'get', 'put', 'addref', 'getref']: - err("Illegal command '%s'" % cmd) - usage(0, 1) - - # magic undocumented command name - starts node, launches its client server, - # this should only happen if we're spawned from a 'start' command - if cmd == '_start': - if argc not in [0, 1]: - err("start: bad argument count") - usage() - if argc == 0: - nodeName = defaultNodename - else: - nodeName = args[0] - - # create and serve a node - #set_trace() - node = KNode(nodeName) - node.start() - log(3, "Node %s launched, dest = %s" % (node.name, node.dest)) - node.serve() - sys.exit(0) - - if cmd == 'start': - if argc not in [0, 1]: - err("start: bad argument count") - usage() - if argc == 0: - nodeName = defaultNodename - else: - nodeName = args[0] - pidFile = nodePidfile(nodeName) - - if os.path.exists(pidFile): - err(("Stasher node '%s' seems to be already running. If you are\n" % nodeName) - +"absolutely sure it's not running, please remove its pidfile:\n" - +pidFile+"\n") - sys.exit(1) - - # spawn off a node - import stasher - pid = spawnproc(sys.argv[0], "-S", samAddr, "-C", clientAddr, "_start", nodeName) - file(pidFile, "wb").write("%s" % pid) - print "Launched stasher node as pid %s" % pid - print "Pidfile is %s" % pidFile - sys.exit(0) - - if cmd == 'stop': - if argc not in [0, 1]: - err("stop: bad argument count") - usage() - if argc == 0: - nodeName = defaultNodename - else: - nodename = args[0] - - pidFile = nodePidfile(nodeName) - - if not os.path.isfile(pidFile): - err("Stasher node '%s' is not running - cannot kill\n" % nodeName) - sys.exit(1) - - pid = int(file(pidFile, "rb").read()) - try: - killproc(pid) - print "Killed stasher node (pid %s)" % pid - except: - print "Failed to kill node (pid %s)" % pid - os.unlink(pidFile) - sys.exit(0) - - try: - client = KNodeClient() - except: - traceback.print_exc() - err("Node doesn't seem to be up, or reachable on %s" % clientAddr) - return - - - if cmd == 'hello': - err("Node seems fine") - sys.exit(0) - - elif cmd == 'get': - if argc not in [1, 2]: - err("get: bad argument count") - usage() - - key = args[0] - - if argc == 2: - # try to open output file - path = args[1] - try: - outfile = file(path, "wb") - except: - err("Cannot open output file %s" % repr(path)) - usage(0, 1) - else: - outfile = sys.stdout - - res = client.get(key) - if res == None: - err("Failed to retrieve '%s'" % key) - sys.exit(1) - else: - outfile.write(res) - outfile.flush() - outfile.close() - sys.exit(0) - - elif cmd == 'put': - if argc not in [1, 2]: - err("put: bad argument count") - usage() - - key = args[0] - - if argc == 2: - # try to open input file - path = args[1] - try: - infile = file(path, "rb") - except: - err("Cannot open input file %s" % repr(path)) - usage(0, 1) - else: - infile = sys.stdin - - val = infile.read() - if len(val) > maxValueSize: - err("File is too big - please trim to %s" % maxValueSize) - - res = client.put(key, val) - if res == None: - err("Failed to insert '%s'" % key) - sys.exit(1) - else: - sys.exit(0) - - elif cmd == 'addref': - if argc not in [0, 1]: - err("addref: bad argument count") - usage() - - if argc == 1: - # try to open input file - path = args[0] - try: - infile = file(path, "rb") - except: - err("Cannot open input file %s" % repr(path)) - usage(0, 1) - else: - infile = sys.stdin - - ref = infile.read() - - res = client.addref(ref) - if res == None: - err("Failed to add ref") - sys.exit(1) - else: - sys.exit(0) - - elif cmd == 'getref': - if argc not in [0, 1]: - err("getref: bad argument count") - usage() - - if argc == 1: - # try to open output file - path = args[0] - try: - outfile = file(path, "wb") - except: - err("Cannot open output file %s" % repr(path)) - usage(0, 1) - else: - outfile = sys.stdout - - res = client.getref() - if res == None: - err("Failed to retrieve node ref") - sys.exit(1) - else: - outfile.write(res) - outfile.flush() - outfile.close() - sys.exit(0) - - - -#@-node:main -#@-others -#@-node:funcs -#@+node:MAINLINE -#@+others -#@+node:mainline -if __name__ == '__main__': - - main() - -#@-node:mainline -#@-others -#@-node:MAINLINE -#@-others - -#@-node:@file stasher.py -#@-leo diff --git a/apps/sam/python/stasher b/apps/sam/python/stasher deleted file mode 100644 index ab98404979cd5fdeac1c5b92b91307574c28861e..0000000000000000000000000000000000000000 --- a/apps/sam/python/stasher +++ /dev/null @@ -1,4 +0,0 @@ -#! /usr/bin/env python -# wrapper script to run stasher node -import i2p.stasher -i2p.stasher.main() diff --git a/apps/sam/python/stasher.py b/apps/sam/python/stasher.py deleted file mode 100644 index 36c608a2894c69ee42c5ca2528d76301586ead75..0000000000000000000000000000000000000000 --- a/apps/sam/python/stasher.py +++ /dev/null @@ -1,3 +0,0 @@ -# wrapper script to run stasher node -import i2p.stasher -i2p.stasher.main()