Kademlia @file stasher.py explanatory comments imports constants globals Exceptions Mixins class KBase Main Engine class KCore attributes __init__ subscribe unsubscribe threadRxPackets threadHousekeeping nodeWhichOwnsSock cycle run stop runClient select create instance Basic Classes Node-local Storage class KStorageBase __init__ putRefs getRefs putKey getKey private methods _expandRefsList class KStorageFile __init__ putRefs getRefs putKey getKey class KHash __init__ __str__ asHex distance rawdistance operators class KBucket __init__ justSeenPeer __iter__ class KPeer __init__ send_ping send_store send_findNode send_findData send_reply send_raw justSeen lowlevel __str__ __repr__ __eq__ __ne__ RPC Classes class KRpc attribs __init__ __del__ __str__ __repr__ bindPeerReply unbindPeerReply unbindAll start execute terminate returnValue on_reply on_tick PING class KRpcPing attribs __init__ start on_reply on_tick FIND_NODE class KPeerQueryRecord __init__ hasTimedOut __cmp__ __lt__ etc isCloserThanAllOf isCloserThanOneOf class KPeerQueryTable __init__ setlist getExpired purgeExpired sort select count changeState filter purge chooseN __str__ newtable dump list-like methods extend append remove __getitem__ __len__ __getslice__ __iter__ __add__ __contains__ class KRpcFindNode spec info comments attribs __init__ start sendSomeQueries sendOneQuery findClosestPeersInitial addPeerIfCloser isCloserThanQueried on_reply on_tick checkEndOfRound gotAnyCloser returnTheBestWeGot returnValue reportStats FIND_DATA class KRpcFindData attribs start on_reply on_gotValue on_gotChunk returnValue STORE class KRpcStore attribs __init__ start storeSplit on_doneChunkManifest on_doneChunk returnValue on_doneFindNode on_reply on_tick PINGALL class KRpcPingAll attribs __init__ start on_reply on_tick returnValue Node Socket Server class KNodeServer __init__ serve_forever class KNodeReqHandler handle finish class KNodeClient __init__ hello connect close get put addref getref pingall kill __getitem__ __setitem__ NODE class KNode attributes __init__ __del__ application-level start stop get put addref __getitem__ __setitem__ peer/rpc methods _ping _pingall _findnode _finddata _store _findPeer comms methods _sendRaw engine _threadRx _doChug _doRx _doHousekeeping event handling _on_ping _on_findNode _on_findData _on_store _on_reply _on_unknown Socket Client Server serve lowlevel stuff __str__ __repr__ _msgIdAlloc _normalisePeer __del__ funcs userI2PDir nodePidfile messageEncode messageDecode shahash log logexc spawnproc killproc i2psocket usage err main MAINLINE mainline Deployment @file-nosent stasher contents @file-nosent stasher-launch.py contents @file release.sh @file setup-stasher.py @file-nosent README.txt installing-from-cvs doze warning alpha warning using @file-nosent README-tarball.txt installing as tarball alpha warning using Testing @file ktest.py imports constants TEST NETWORK class KTestSocket class KTestMap class KCore class KNode class KTestNetwork attribs __init__ __del__ __getitem__ __len__ connect start stop dump dumplong findpath testconnectivity purge whohas whocanfind findnode closestto getPeer getPeerIdx getPeerName dumpids __str__ __repr__ Funcs test test1 debug doput MAINLINE mainline @file node1.sh @file node2.sh Utility @file pybloom.py imports globals mixarray_init class Bloom attribs __init__ _make_array _hashfunc insert __contains__ class CountedBloom __init__ insert __contains__ __delitem__ mainline @file-nosent hashcash.py imports globals class HashCash attributes __init__ generate verify _checkBase64 _enc64 _dec64 generate verify binify intify _randomString psyco test ctest ntest mainline JUNK Findnode RPC on_reply on_reply class KPendingResultBase __init__ append wait check destroySelf on_tick on_packet __cmp__ class KPendingResultPing __init__ on_tick on_packet @first #! /usr/bin/env python """ A simple implementation of the U{Kademlia<http://www.infoanarchy.org/wiki/wiki.pl?Kademlia>} P2P distributed storage and retrieval protocol, designed to utilise the U{I2P<http://www.i2p.net>} stealth network as its transport. Most application developers will only need to know about the L{KNode} class """ # I strongly recommend that when editing this file, you use the Leo # outlining and literate programming editor - http://leo.sf.net # If Leo doesn't agree with your religion, please try to leave the markups intact @others # define our exceptions class KValueTooLarge(Exception): """ Trying to insert a value of excessive size into the network. Maximum key size is L{maxValueSize} """ class KBadHash(Exception): """ Invalid hash string """ class KNotImplemented(Exception): """ A required method was not implemented """ class KBadNode(Exception): """ Invalid Node object """ class KBadPeer(Exception): """ Invalid Peer object - should be a KPeer """ class KBadDest(Exception): """Invalid I2P Node Dest""" class KHash(KBase): """ Wraps 160-bit hashes as abstract objects, on which operations such as xor, <, >, etc can be performed. Kademlia node ids and keys are held as objects of this class. Internally, hashes are stored as python long ints """ @others def __init__(self, val=None, **kw): """ Create a new hash object. val can be one of the following: - None (default) - a random value will be created - long int - this will be used as the raw hash - string - the string will be hashed and stored - another KHash object - its value will be taken - a KNode or KPeer object - its hash will be taken If val is not given, a raw hash value can be passed in with the keyword 'raw'. Such value must be a python long int or a 20-char string """ self.value = 0L if val: if isinstance(val, KHash): self.value = val.value elif type(val) in [type(0), type(0L)]: self.value = long(val) elif isinstance(val, KNode) or isinstance(val, KPeer): self.value = val.id.value else: raw = self.raw = shahash(val, bin=1) for c in raw: self.value = self.value * 256 + ord(c) else: rawval = kw.get('raw', None) if rawval == None: # generate random random.seed() for i in range(20): self.value = self.value * 256 + random.randint(0, 256) elif type(rawval) in [type(0), type(0L)]: self.value = long(rawval) elif type(rawval) == type(""): if len(rawval) == 20: for i in rawval: self.value = self.value * 256 + ord(i) elif len(rawval) == 40: try: self.value = long(rawval, 16) except: raise KBadHash(rawval) else: raise KBadHash(rawval) else: print "rawval=%s %s %s" % (type(rawval), rawval.__class__, repr(rawval)) raise KBadHash(rawval) import sys, os, types, sha, random, threading, thread, traceback, Queue import time, math, random, pickle, getopt, re import signal # some windows-specifics (yggghh) if sys.platform == 'win32': try: import win32api import win32process import _winreg except: print "Python win32 extensions not installed." print "Please go to http://sourceforge.net/project/showfiles.php?group_id=78018" print "and download/install the file pywin32-202.win32-py%s.%s.exe" % \ sys.version_info[:2] sys.exit(1) from StringIO import StringIO from pdb import set_trace try: import bencode except: print "The bencode module is missing from your python installation." print "Are you sure you installed Stasher correctly?" sys.exit(1) try: import i2p.socket import i2p.select import i2p.pylib SocketServer = i2p.pylib.SocketServer socket = i2p.pylib.socket except: print "You don't appear to have the I2P Python modules installed." print "Not good. Stasher totally needs them." print "Please to to i2p/apps/sam/python in your I2P cvs tree, and" print "install the core I2P python modules first" sys.exit(1) class KPeer(KBase): """ Encapsulates a peer node of a L{KNode}, storing its ID and contact info """ @others def __init__(self, node, dest): """ Create a ref to a kademlia peer node Arguments: - node - reference to node which has the relationship to this peer - dest - the peer's I2P destination, as base64 """ if not isinstance(node, KNode): raise KBadNode(node) if not isinstance(dest, str): raise KBadDest(dest) self.node = node self.dest = dest self.id = KHash(dest) self.justSeen() def send_ping(self, **kw): """ Sends a ping to remote peer """ self.send_raw(type="ping", **kw) def send_store(self, **kw): """ sends a store command to peer """ self.log(4, "\npeer %s\ndest %s...\nsending store cmd: %s" % (self, self.dest[:12], repr(kw))) self.send_raw(type="store", **kw) def send_findNode(self, hash, **kw): """ sends a findNode command to peer """ if not isinstance(hash, KHash): raise KBadHash self.log(5, "\nquerying peer %s\ntarget hash %s" % (self, hash)) self.send_raw(type="findNode", hash=hash.value, **kw) def send_findData(self, hash, **kw): """ sends a findData command to peer """ if not isinstance(hash, KHash): raise KBadHash self.log(5, "\nquerying peer %s\ntarget hash %s" % (self, hash)) self.send_raw(type="findData", hash=hash.value, **kw) class KNode(KBase): """ B{Public API to this Kademlia implementation} You should not normally need to use, or even be aware of, any of the other classes And in this class, the only methods you need to worry about are: - L{start} - starts the node running - L{stop} - stops the node - L{get} - retrieve a key value - L{put} - stores a key value - L{addref} - imports a noderef This class implements a single kademlia node. Within a single process, you can create as many nodes as you like. """ @others def start(self, doPings=True): """ Starts the node running """ # barf if already running if self.isRunning: self.log(3, "node %s is already running!" % self.name) return self.log(3, "starting node %s" % self.name) # first step - ping all our peers if doPings: for peer in self.peers: self.log(3, "doing initial ping\n%s\n%s" % (self, peer)) KRpcPing(self, peer=peer) # first step - do a findNode against our own node id, and ping our # neighbours if greetPeersOnStartup: neighbours = KRpcFindNode(self, hash=self.id).execute() self.log(3, "neighbours=%s" % repr([n[:10] for n in neighbours])) for n in neighbours: n = self._normalisePeer(n) KRpcPing(self, peer=n) # note now that we're running self.isRunning = True # and enlist with the core if runCore: core.subscribe(self) else: # central core disabled, run our own receiver thread instead thread.start_new_thread(self._threadRx, ()) def stop(self): """ Shuts down the node """ self.isRunning = 0 if runCore: try: core.unsubscribe(self) except: pass def get(self, item, callback=None, **kw): """ Attempts to retrieve data from the network Arguments: - item - the key we desire - callback - optional - if given, the get will be performed asynchronously, and callback will be invoked upon completion, with the result as first argument Keywords: - local - optional - if True, limits this search to this local node default is False Returns: - if no callback - the item value if the item was found, or None if not - if callback, None is returned """ def processResult(r): if isinstance(r, str): return r return None if callback: # create a func to process callback result def onCallback(res): callback(processResult(res)) self._finddata(item, onCallback, **kw) else: return processResult(self._finddata(item, **kw)) def put(self, key, value, callback=None, **kw): """ Inserts a named key into the network Arguments: - key - one of: - None - a secure key will be generated and used - a KHash object - a raw string which will be hashed into a KHash object - val - a string, the value associated with the key Keywords: - local - default False - if True, limits the insert to the local node If the value is larger than L{maxValueSize}, a L{KValueTooLarge} exception will occur. """ return self._store(key, value, callback, **kw) def __getitem__(self, item): """ Allows dict-like accesses on the node object """ return self.get(item) def __setitem__(self, item, val): """ Allows dict-like key setting on the node object """ self.put(item, val) def __str__(self): return "<KHash: 0x%x>" % self.value def __repr__(self): return str(self) def __eq__(self, other): #log(2, "KHash: comparing %s to %s" % (self, other)) res = self.value == getattr(other, 'value', None) #self.log(2, "KHash: res = %s" % repr(res)) return res def __ne__(self, other): return not (self == other) def __lt__(self, other): return self.value < other.value def __gt__(self, other): return self.value > other.value def __le__(self, other): return self.value <= other.value def __ge__(self, other): return self.value >= other.value def __ne__(self, other): return self.value != other.value def __xor__(self, other): return self.value ^ other.value def send_raw(self, **kw): """ Sends a raw datagram to peer No arguments - just keywords, all of which must be strings or other objects which can be bencoded """ self.node._sendRaw(self, **kw) def __init__(self, name, **kw): """ Creates a kademlia node of name 'name'. Name is mandatory, because each name is permanently written to the SAM bridge's store I thought of supporting random name generation, but went off this idea because names get permanently stored to SAM bridge's file Arguments: - name - mandatory - a short text name for the node, should be alphanumerics, '-', '.', '_' This name is used for the SAM socket session. Keywords: - storage - optional - an instance of L{KStorageBase} or one of its subclasses. If not given, default action is to instantiate a L{KStorageFile} object against the given node name """ # remember who we are self.name = name # not running yet, will launch when explicitly started, or implicitly # when the first operation gets done self.isRunning = False # create socket and get its dest, and determine our node id self.id = KHash("<NONE>") self.log(5, "creating socket for node %s" % name) self.log(5, "socket for node %s created" % name) if self.SocketFactory == None: self.SocketFactory = i2p.socket.socket self.sock = self.SocketFactory( "stashernode-"+name, i2p.socket.SOCK_DGRAM, samaddr=samAddr, **kw) #self.sockLock = threading.Lock() # prevents socket API reentrance self.sock.setblocking(0) self.dest = self.sock.dest self.id = KHash(self.dest) # create our buckets self.buckets = [] for i in range(160): self.buckets.append(KBucket()) # create our storage object, default to new instance of KStorageFile self.storage = kw.get('storage', KStorageFile(self)) # dig out all previously known nodes self.peers = self.storage.getRefs() # set up dict of callers awaiting replies # keys are (peerobj, msgId) tuples, values are Queue.Queue objects self.pendingPings = {} # mapping of (peer, msgId) to RPC object, so when RPC replies come in, # they can be passed directly to the RPC object concerned self.rpcBindings = {} # KRpc objects waiting for peer replies - used for checking for timeouts self.rpcPending = [] # miscellaneous shit self._msgIdNext = 0 #self._msgIdLock = threading.Lock() # register in global map _nodes[name] = self class KStorageBase(KBase): """ Base class for node storage objects This needs to be overridden by implementation-specific solutions. """ @others def __init__(self, node, *args, **kw): """ Override this method First argument should be a node instance """ raise KNotImplemented def putRefs(self, *refs): """ Saves one or more noderefs Arguments: - zero or more KPeer objects, or lists or tuples of objects """ raise KNotImplemented def getRefs(self): """ Returns a list of KPeer objects, comprising refs of peers known to this node """ raise KNotImplemented def putKey(self, key, value): """ Stores value, a string, into the local storage under key 'key' """ raise KNotImplemented def getKey(self, key): """ Attempts to retrieve item from node's local, which was stored with key 'key'. Returns value as a string if found, or None if not present """ raise KNotImplemented class KStorageFile(KStorageBase): """ Implements node-local storage, using the local filesystem, with the following hierarchy: - HOME ( ~ in linux, some other shit for windows) - .i2pkademlia - <nodename> - noderefs - <node1 base64 hash> - contains node dest, and other shit - ... - keys - <keyname1> - contains raw key value - ... This is one ugly sukka, perhaps a db4, mysql etc implementation would be better. """ @others def __init__(self, node, storeDir=None): """ Creates a persistent storage object for node 'nodeName', based at directory 'storeDir' (default is nodeDir """ self.node = node self.nodeName = node.name if storeDir == None: # work out local directory self.topDir = userI2PDir() # add node dir and subdirs self.nodeDir = userI2PDir(self.nodeName) self.refsDir = os.path.join(self.nodeDir, "noderefs") if not os.path.isdir(self.refsDir): os.makedirs(self.refsDir) self.keysDir = os.path.join(self.nodeDir, "keys") if not os.path.isdir(self.keysDir): os.makedirs(self.keysDir) class KBucket(KBase): """ Implements the 'k-bucket' object as required in Kademlia spec """ @others def __init__(self): """ Creates a single k-bucket """ # list of known nodes # order is least recently seen at head, most recently seen at tail self.nodes = [] # list of death-row records # refer spec section 2.1, paragraph 2 # we deviate a little: # when we hear from a new peer, and the bucket is full, # we temporarily displace the old peer, and stick the new # peer at end of list, then send out a ping # If we hear from the old peer within a reasonable time, # the new peer gets evicted and replaced with the old peer # # this list holds 2-tuples (oldpeer, newpeer), where # oldpeer is the least-recently-seen peer that we displaced, and # newpeer is the new peer we just heard from. self.deathrow = [] # -------------------------------------------- # START USER-CONFIGURABLE CONSTANTS # -------------------------------------------- # host:port to connect to I2P SAM Bridge samAddr = i2p.socket.samaddr # host:port to listen on for command line client clientAddr = "127.0.0.1:7659" defaultNodename = "0" # will be prefixed by 'stashernode' # maximum size of each stored item maxValueSize = 30000 # maximum number of noderefs that can be stored in a bucket # (refer spec section 2.1, first paragraph) maxBucketSize = 20 # number of peers to return from a search numSearchPeers = 3 # maximum number of concurrent queries per findnode/finddata rpc maxConcurrentQueries = 10 # number of peers to store onto numStorePeers = 10 # Logger settings logFile = None logVerbosity = 2 # data directory location - set to a path to override the default # which is the user's home dir dataDir = None # whether a node, on startup, should do a findnode on itself to # locate its closest neighbours greetPeersOnStartup = False #greetPeersOnStartup = True # multi-purpose testing flag testing = False #testing = True tunnelDepth = 0 # set to True to enable single handler thread that manages all nodes, # or False to make each node run its own handler thread #runCore = False runCore = True # timeouts - calibrate as needed timeout = { 'ping' : 120, 'findNode' : 120, 'findData' : 120, 'store' : 120, } logToSocket = None desperatelyDebugging = False if desperatelyDebugging: runCoreInBackground = False else: runCoreInBackground = True # -------------------------------------------- # END OF USER-CONFIGURABLE CONSTANTS # -------------------------------------------- # ---------------------------------------------- # hack anything below this line at your own risk def justSeenPeer(self, peer): """ Tells the bucket that we've just seen a given node """ nodes = self.nodes if not isinstance(peer, KPeer): raise KBadNode try: idx = nodes.index(peer) except: idx = -1 if idx >= 0: del nodes[idx] nodes.append(peer) else: nodes.append(peer) # might at some time need to implement death-row logic # when we set a bucket size limit - refer __init__ def addref(self, peer, doPing=False): """ Given a peer node's destination, add it to our buckets and internal data store Arguments: - peer - one of: - the I2P destination of the peer node, as a base64 string - a KNode object - a KPeer object - doPing - ping this node automatically (default False) """ peer = self._normalisePeer(peer) # remember peer if not already known if peer.dest == self.dest: self.log(3, "node %s, trying to add ref to ourself???" % self.name) return peer elif not self._findPeer(peer.dest): self.peers.append(peer) self.storage.putRefs(peer) else: self.log(4, "node %s, trying to add duplicate noderef %s" % ( self.name, peer)) return peer # update our KBucket dist = self.id.distance(peer.id) self.buckets[dist].justSeenPeer(peer) if doPing: self.log(4, "doing initial ping\n%s\n%s" % (self, peer)) KRpcPing(self, peer=peer) return peer def _sendRaw(self, peer, **kw): """ Serialises keywords passed, and sends this as a datagram to node 'peer' """ # update our KBucket dist = self.id.distance(peer.id) self.buckets[dist].justSeenPeer(peer) # save ref to this peer self.addref(peer) params = dict(kw) msgId = params.get('msgId', None) if msgId == None: msgId = params['msgId'] = self._msgIdAlloc() objenc = messageEncode(params) self.log(5, "node %s waiting for send lock" % self.name) #self.sockLock.acquire() self.log(5, "node %s got send lock" % self.name) try: self.sock.sendto(objenc, 0, peer.dest) except: traceback.print_exc() #self.sockLock.release() self.log(5, "node %s released send lock" % self.name) self.log(4, "node %s sent %s to peer %s" % (self.name, params, peer.dest)) return msgId def _threadRx(self): """ Thread which listens for incoming datagrams and actions accordingly """ self.log(3, "starting receiver thread for node %s" % self.name) try: # loop to drive the node while self.isRunning: self._doChug() except: traceback.print_exc() self.log(3, "node %s - THREAD CRASH!" % self.name) self.log(3, "receiver thread for node %s terminated" % self.name) @others # keep a dict of existing nodes, so we can prevent # client progs from creating 2 nodes of the same name _nodes = {} version = "0.0.1" logLock = threading.Lock() def log(verbosity, msg, nPrev=0, clsname=None): global logToSocket, logFile # create logfile if not exists if logFile == None: logFile = os.path.join(userI2PDir(), "stasher.log") # rip the stack caller = traceback.extract_stack()[-(2+nPrev)] path, line, func = caller[:3] path = os.path.split(path)[1] #print "func is type %s, val %s" % (type(func), repr(func)) #if hasattr(func, "im_class"): # func = if clsname: func = clsname + "." + func #msg = "%s:%s:%s(): %s" % ( # path, # line, # func, # msg.replace("\n", "\n + ")) msg = "%s():%s: %s" % ( func, line, msg.replace("\n", "\n + ")) # do better logging later if verbosity > logVerbosity: return if logToSocket: try: if isinstance(logToSocket, int): portnum = logToSocket logToSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) connected = 0 while 1: try: logToSocket.connect(("localhost", portnum)) break except socket.error: print "Please open an xterm/nc listening on %s" % logToSocket time.sleep(1) logToSocket.send(msg+"\n") except: traceback.print_exc() else: print msg logLock.acquire() file(logFile, "a+").write(msg + "\n") logLock.release() def _findPeer(self, dest): """ Look up our table of current peers for a given dest. If dest is found, return its object, otherwise return None """ for peerObj in self.peers: if peerObj.dest == dest: return peerObj return None def putRefs(self, *args): """ Saves one or more noderefs into filesystem Arguments: - zero or more KPeer objects, or lists or tuples of objects """ lst = self._expandRefsList(args) for item in lst: b64hash = shahash(item.dest) itemPath = os.path.join(self.refsDir, b64hash) itemDict = {'dest':item.dest} # might need to expand later itemPickle = bencode.bencode(itemDict) file(itemPath, "wb").write(itemPickle) pass @others def _expandRefsList(self, args, lst=None): """ Takes a sequence of args, each of which can be a KPeer object, or a list or tuple of KPeer objects, and expands this into a flat list """ if lst == None: lst = [] for item in args: if type(item) in [type(()), type([])]: self._expandRefsList(item, lst) else: lst.append(item) return lst def getRefs(self): """ Returns a list of KPeer objects, comprising refs of peers known to this node These are read from the directory self.refsDir. Any that can't be unpickled and instantiated are dropped, but logged """ peers = [] for f in os.listdir(self.refsDir): path = os.path.join(self.refsDir, f) pickled = file(path, "rb").read() try: d = bencode.bdecode(pickled) except: self.log(3, "node %s, bad pickle ref file %s" % ( self.nodeName, f)) continue # instantiate a peer object try: peer = KPeer(self.node, d['dest']) except: self.log(3, "node %s, bad unpickled ref file %s" % ( self.nodeName, f)) continue # success peers.append(peer) return peers @others def test(numnodes=10): global n, n0, res stasher.logVerbosity = 3 stasher.core = KCore(1) os.system("rm -rf ~/.i2pkademlia") n = KTestNetwork(10, purge=1) n.connect() n.start() n0 = n[0] return if 0: core.fg = True n = KTestNetwork(10, purge=1) n.connect() n.start() core.cycle() print "about to insert" set_trace() n[0].put('roses', 'red') print "insert completed" #set_trace() return #set_trace() #n[0].put('roses', 'red') #print n[0].get('roses') if 0: successes = [] failures = [] for idx in range(numnodes): # build test network of known topology print "Building test network" n = KTestNetwork(numnodes, purge=1) n.connect() n.start() core.n = n n.trigger = 0 if 0: print n[0]._findnode('roses') break if 1: # store something print "storing something" n[0].put('roses', 'red') # try to retrieve it from node i print "trying to retrieve it from node %s" % idx if n[idx].get('roses') != None: print "Node %s retrieved ok" % idx successes.append(idx) else: print "Node %s failed to retrieve" % idx failures.append(idx) del n print "Successful nodes: %s" % " ".join([str(x) for x in successes]) print "Failed nodes: %s" % " ".join([str(x) for x in failures]) if 0 and desperatelyDebugging: while not n.trigger: time.sleep(1) n.trigger = 0 n[0].put('roses', 'red') while not n.trigger: time.sleep(1) print "retrieving 'roses'" print n[0].get('roses') @others def __str__(self): return "<KNode:%s=0x%s...>" % ( self.name, ("%x" % self.id.value)[:8], ) def __repr__(self): return str(self) @others def __str__(self): return "<KPeer:%s=>0x%s... dest %s...>" % ( self.node.name, ("%x" % self.id.value)[:8], self.dest[:8]) def __repr__(self): return str(self) @others def _on_ping(self, peer, msgId, **kw): """ Handler for ping received events """ KRpcPing(self, (peer, msgId), local=True, **kw) return # old stuff self.log(3, "\nnode %s\nfrom %s\nreceived:\n%s" % (self.name, peer, kw)) # randomly cause ping timeouts if testing if testing: howlong = random.randint(0, 5) self.log(3, "deliberately pausing for %s seconds" % howlong) time.sleep(howlong) # pong back to node peer.send_reply(msgId=msgId) def _on_unknown(self, peer, msgId, **kw): """ Handler for unknown events """ self.log(3, "node %s from %s received msgId=%s:\n%s" % ( self.name, peer, msgId, kw)) def _msgIdAlloc(self): """ issue a new and unique message id """ #self._msgIdLock.acquire() msgId = self._msgIdNext self._msgIdNext += 1 #self._msgIdLock.release() return msgId def _normalisePeer(self, peer): """ Takes either a b64 dest string, a KPeer object or a KNode object, and returns a KPeer object """ # act according to whatever type we're given if isinstance(peer, KPeer): return peer # already desired format elif isinstance(peer, KNode): return KPeer(self, peer.dest) elif isinstance(peer, str) and len(peer) > 256: return KPeer(self, peer) else: self.log(3, "node %s, trying to add invalid noderef %s" % ( self.name, peer)) raise KBadNode(peer) def _ping(self, peer=None, callback=None, **kw): """ Sends a ping to remote peer, and awaits response Not of much real use to application level, except perhaps for testing If the argument 'peer' is not given, the effect is to 'ping the local node', which I guess might be a bit silly The second argument 'callback' is a callable, which if given, makes this an asynchronous (non-blocking) call, in which case the callback will be invoked upon completion (or timeout). If the keyword 'cbArgs' is given in addition to the callback, the callback will fire with the results as first argument and this value as second arg """ if callback: KRpcPing(self, callback, peer=peer, **kw) else: return KRpcPing(self, peer=peer).execute() def distance(self, other): """ calculates the 'distance' between this hash and another hash, and returns it as i (where distance = 2^i, and 0 <= i < 160) """ #log(4, "comparing: %s\nwith %s" % (self.value, other.value)) rawdistance = self.value ^ other.value if not rawdistance: return 0 return int(math.log(rawdistance, 2)) def _on_findNode(self, peer, msgId, **kw): """ Handles incoming findNode command """ KRpcFindNode(self, (peer, msgId), local=True, **kw) def _on_findData(self, peer, msgId, **kw): """ Handles incoming findData command """ KRpcFindData(self, (peer, msgId), local=True, **kw) def justSeen(self): self.timeLastSeen = time.time() class KCore(KBase): """ Singleton class which performs all the needed background processing. By scheduling all processing through this object, we eliminate the need to create threads on a per-node basis, and also make this thing far easier to debug. The core launches only two background threads: - L{threadRxPackets} - listen for incoming packets bound for any node running within a single process - L{threadHousekeeping} - periodically invoke maintenance methods of each node, so the node can check for timeout conditions and other untoward happenings These threads start up when the first node in this process is created, and stop when the last node ceases to exist. Upon first import, the L{stasher} module creates one instance of this class. Upon creation, L{KNode} objects register themselves with this core. """ @others def __init__(self, bg=True): """ Creates the I2P Kademlia core object """ self.bg = bg self.fg = False # subscribed nodes self.nodes = [] #self.nodesLock = threading.Lock() self.isRunning = False self.isRunning_rx = False def subscribe(self, node): """ Called by a node to 'subscribe' for background processing If this is the first node, starts the handler thread """ #self.nodesLock.acquire() try: nodes = self.nodes if node in nodes: self.log(2, "duhhh! node already subscribed" % repr(node)) return nodes.append(node) if not self.isRunning: self.isRunning = True if self.bg and not self.fg: self.log(3, "First node subscribing, launching threads") thread.start_new_thread(self.threadRxPackets, ()) thread.start_new_thread(self.threadHousekeeping, ()) except: traceback.print_exc() self.log(2, "exception") #self.nodesLock.release() def unsubscribe(self, node): """ Unsubscribes a node from the core If this was the last node, stops the handler thread """ #self.nodesLock.acquire() try: nodes = self.nodes if node not in nodes: self.log(4, "duhhh! node %s was not subscribed" % repr(node)) return self.log(2, "trying to unsubscribe node %s" % node.name) nodes.remove(node) if len(nodes) == 0: self.isRunning = False except: traceback.print_exc() self.log(2, "exception") #self.nodesLock.release() def threadRxPackets(self): """ Sits on a select() loop, processing incoming datagrams and actioning them appropriately. """ self.isRunning_rx = True self.log(3, "KCore packet receiver thread running") try: while self.isRunning: socks = [node.sock for node in self.nodes] if desperatelyDebugging: set_trace() try: inlist, outlist, errlist = self.select(socks, [], [], 1) except KeyboardInterrupt: self.isRunning = 0 return self.log(5, "\ninlist=%s" % repr(inlist)) if inlist: self.log(5, "got one or more sockets with inbound data") #self.nodesLock.acquire() for sock in inlist: node = self.nodeWhichOwnsSock(sock) if node != None: node._doRx() #self.nodesLock.release() elif self.fg: return else: time.sleep(0.1) except: #self.nodesLock.release() traceback.print_exc() self.log(1, "core handler thread crashed") self.isRunning_rx = False self.log(3, "core handler thread terminated") # create an instance of _KCore core = KCore() def __del__(self): """ Clean up on delete """ self.log(3, "node dying: %s" % self.name) try: del _nodes[self.name] except: pass self.stop() def _doRx(self): """ Receives and handles one incoming packet Returns True if a packet got handled, or False if timeout """ # get next packet self.log(5, "%s seeking socket lock" % self.name) #self.sockLock.acquire() self.log(5, "%s got socket lock" % self.name) try: item = self.sock.recvfrom(-1) except i2p.socket.BlockError: #self.sockLock.release() self.log(5, "%s released socket lock after timeout" % self.name) if not runCore: time.sleep(0.1) return False except: traceback.print_exc() self.log(5, "%s released socket lock after exception" % self.name) #self.sockLock.release() return True #self.sockLock.release() self.log(5, "%s released socket lock normally" % self.name) try: (data, dest) = item except ValueError: self.log(3, "node %s: recvfrom returned no dest, possible spoof" \ % self.name) data = item[0] dest = None # try to decode try: d = messageDecode(data) except: traceback.print_exc() self.log(3, "failed to unpickle incoming data for node %s" % \ self.name) return True # ditch if not a dict if type(d) != type({}): self.log(3, "node %s: decoded packet is not a dict" % self.name) return True # temporary workaround for sam socket bug if dest == None: if hasattr(d, 'has_key') and d.has_key('dest'): dest = d['dest'] # try to find it in our store peerObj = self._findPeer(dest) if peerObj == None: # previously unknown peer - add it to our store peerObj = self.addref(dest) else: peerObj.justSeen() # already exists - refresh its timestamp self.addref(peerObj.dest) # drop packet if no msgId msgId = d.get('msgId', None) if msgId == None: self.log(3, "no msgId, dropping") return True del d['msgId'] msgType = d.get('type', 'unknown') if desperatelyDebugging: pass #set_trace() # if a local RPC is awaiting this message, fire its callback item = self.rpcBindings.get((peerObj.dest, msgId), None) if item: rpc, peer = item try: rpc.unbindPeerReply(peerObj, msgId) if desperatelyDebugging: set_trace() rpc.on_reply(peerObj, msgId, **d) except: traceback.print_exc() self.log(2, "unhandled exception in RPC on_reply") else: # find a handler, fallback on 'unknown' self.log(5, "\nnode %s\ngot msg id %s type %s:\n%s" % ( self.name, msgId, msgType, d)) hdlrName = d.get('type', 'unknown') hdlr = getattr(self, "_on_"+hdlrName) try: if desperatelyDebugging: set_trace() hdlr(peerObj, msgId, **d) except: traceback.print_exc() self.log(2, "unhandled exception in unbound packet handler %s" % hdlrName) return True def nodeWhichOwnsSock(self, sock): """ returns ref to node which owns a socket """ for node in self.nodes: if node.sock == sock: return node return None class KTestNetwork(stasher.KBase): """ Builds and runs a variable-sized test network """ @others def __init__(self, numnodes=numTestNodes, doPings=True, purge=False): """ Builds the test network """ global Socket, select self.trigger = 0 stasher.core.n = self # for convenience while debugging if purge: self.purge() if 0: if KTestNetwork.aNetworkExists: raise Exception("A test network already exists, may not create another") KTestNetwork.aNetworkExists = True self.nodes = [] for i in range(numnodes): nodeName = "kademlia-testnode-%s" % i self.log(3, "Creating test node %s" % nodeName) node = KNode(nodeName, in_depth=0, out_depth=0) node.idx = i node.n = self self.nodes.append(node) #print node.peers self.log(3, "test network successfully created") def __getitem__(self, num): """ Allows test network to be treated as array, returns the 'num'th node """ return self.nodes[num] def __len__(self): return len(self.nodes) def start(self, doPings=True): """ Starts up the test network """ for node in self.nodes: self.log(3, "starting node %s" % node.name) node.start(doPings) def stop(self): """ Stops (or tries to stop) the test network """ for node in self.nodes: self.log(3, "stopping node %s" % node.name) node.stop() aNetworkExists = False def _doHousekeeping(self): """ Performs periodical housekeeping on this node. Activities include: - checking pending records for timeouts """ now = time.time() # DEPRECATED - SWITCH TO RPC-based # check for expired pings for msgId, (dest, q, pingDeadline) in self.pendingPings.items(): if pingDeadline > now: # not timed out, leave in pending continue # ping has timed out del self.pendingPings[msgId] q.put(False) # check for timed-out RPCs for rpc in self.rpcPending[:]: if rpc.nextTickTime != None and now >= rpc.nextTickTime: try: rpc.on_tick() except: traceback.print_exc() self.log(2, "unhandled exception in RPC on_tick") class KPendingResultBase: """ Class which holds the details of an RPC sent to another node. """ @others def __init__(self, node, typ, **kw): """ Creates a pending result object Arguments: - node - node which is waiting for this result - typ - operation type on which we're awaiting a response, one of 'ping', 'findNode', 'findData', 'store' Keywords: - gotta think about this """ self.node = node self.typ = type # dict of msgId, peer pairs self.msgIds = {} # indicates when to timeout and return the best available result self.deadline = time.time() + timeout[typ] # add ourself to node self.node.pendingResults.append(self) def _doChug(self): """ Do what's needed to drive the node. Handle incoming packets Check on and action timeouts """ # handle all available packets while self._doRx(): pass # do maintenance - eg processing timeouts self._doHousekeeping() def __cmp__(self, other): # for sorting pending results by deadline return cmp(self.deadline, other.deadline) def append(self, peer, msgId): """ Adds a (peer, msgId) pair, to watch out for """ peer = self.node._normalisePeer(peer) self.msgIds[msgId] = peer self.node.awaitingMsgIds[msgId] = self def wait(self): """ Called by application-level routines to wait on and return some kind of result """ return self.queue.get() def check(self, now=None): """ Checks for a timeout condition, which if one occurs, sticks the best available result onto the queue to be picked up by the caller """ if now == None: now = time.time() if now > self.deadline: self.queue.put(self.bestAvailableResult()) def on_tick(self): """ Override this in subclasses. If a timeout occurs, this routine gets called, and should return the 'best available result' to be delivered back to synchronous caller """ raise KNotImplemented def on_packet(self, msgId, **details): """ Called when a packet of id msgId arrives Should return True if this packet was for us, or False if not """ raise KNotImplemented class KPendingResultPing(KPendingResultBase): """ for managing synchronous pings """ @others def __init__(self, node): KPendingResultBase.__init__(self, node, 'ping') def on_tick(self): """ Handle synchronous ping timeouts """ return False def on_packet(self, msgId, **details): """ Must have got back a ping reply """ self.destroySelf() self.queue.put(True) return True def destroySelf(self): """ Remove ourself from node """ self.node.pendingResults.remove(self) def __eq__(self, other): #self.log(2, "KPeer: comparing %s to %s (%s to %s)" % (self, other, self.__class__, other.__class__)) res = self.id == getattr(other, 'id', None) #self.log(2, "KPeer: res=%s" % res) return res def __ne__(self, other): return not (self == other) def execute(self): """ Only for synchronous (application-level) execution. Wait for the RPC to complete (or time out) and return whatever it came up with """ if core.fg: print "servicing background thread" while self.queue.empty(): core.cycle() return self.queue.get() def bindPeerReply(self, peer, msgId): """ Sets up the node to give us a callback when a reply comes in from downstream peer 'peer' with msg id 'msgId' """ self.localNode.rpcBindings[(peer.dest, msgId)] = (self, peer) def unbindPeerReply(self, peer, msgId): """ Disables the callback from node for replies from peer 'peer' with msgId 'msgId' """ bindings = self.localNode.rpcBindings peerdest = peer.dest if bindings.has_key((peerdest, msgId)): del bindings[(peerdest, msgId)] def unbindAll(self): """ Remove all reply bindings """ bindings = self.localNode.rpcBindings self.log(5, "node bindings before: %s" % bindings) for k,v in bindings.items(): if v[0] == self: del bindings[k] self.log(5, "node bindings after: %s" % bindings) class KRpc(KBase): """ Base class for RPCs between nodes. Refer subclasses """ @others def __init__(self, localNode, client=None, **kw): """ Holds all the information for an RPC Arguments: - localNode - the node from which this RPC is being driven - client - a representation of who is initiating this rpc, one of: - None - an API caller, which is to be blocked until the RPC completes or times out - (upstreamPeer, upstreamMsgId) - an upstream peer - callable object - something which requires a callback upon completion in which case the callable will be invoked with the RPC results as the first argument Keywords: - cbArgs - optional - if given, and if client is a callback, the callback will be invoked with the results as first argument, and this object as second argument """ self.localNode = localNode if client == None: # an api client self.isLocal = True self.queue = Queue.Queue() self.callback = None elif callable(client): self.isLocal = False self.callback = client elif isinstance(client, tuple): # we're doing the RPC on behalf of an upstream peer upstreamPeer, upstreamMsgId = client upstreamPeer = localNode._normalisePeer(upstreamPeer) self.isLocal = False self.upstreamPeer = upstreamPeer self.upstreamMsgId = upstreamMsgId self.callback = None # save keywords self.__dict__.update(kw) # set time for receiving a tick. # if this is set to an int absolute time value, the on_tick method # will be called as soon as possible after that time self.nextTickTime = None # and register with node as a pending command self.localNode.rpcPending.append(self) # now start up the request self.start() def start(self): """ Start the RPC running. Override this in subclasses """ raise KNotImplemented def on_reply(self, peer, msgId, **details): """ Callback which fires when a downstream peer replies Override this in subclasses """ raise KNotImplemented def on_tick(self): """ Callback which fires if the whole RPC times out, in which case the RPC should return whatever it can Override in subclasses """ self.localNode.rpcPending.remove(self) class KRpcPing(KRpc): """ Implements the PING rpc as per Kademlia spec """ @others def __init__(self, localNode, client=None, **kw): """ Creates and performs a PING RPC Arguments: - localNode - the node performing this RPC - upstreamPeer - if given, the peer wanting a reply - upstreamMsgId - if upstreamPeer is given, this is the msgId of the RPC message from the upstream peer Keywords: - peer - the peer to ping - default is local node """ peer = kw.get('peer', None) if peer != None: peer = localNode._normalisePeer(peer) self.peerToPing = peer if kw.has_key('cbArgs'): KRpc.__init__(self, localNode, client, cbArgs=kw['cbArgs']) else: KRpc.__init__(self, localNode, client) def start(self): """ Sends out the ping """ peer = self.peerToPing # are we ourselves being pinged? if peer == None: # yes, just reply self.returnValue(True) return # no - we need to ping a peer thisNode = self.localNode msgId = thisNode.msgId = thisNode._msgIdAlloc() # bind for peer response self.bindPeerReply(peer, msgId) # and send it off self.log(3, "node %s sending ping" % self.localNode.name) peer.send_ping(msgId=msgId) # and set a reply timeout self.nextTickTime = time.time() + timeout['ping'] def on_reply(self, peer, msgId, **details): """ Callback for PING reply """ self.log(3, "got ping reply from %s" % peer) self.returnValue(True) def terminate(self): """ Clean up after ourselves. Mainly involves removing ourself from local node """ self.unbindAll() try: self.localNode.rpcPending.remove(self) except: #traceback.print_exc() pass def messageEncode(params): """ Serialise the dict 'params' for sending Temporarily using bencode - replace later with a more efficient struct-based impl. """ try: return bencode.bencode(params) except: log(1, "encoder failed to encode: %s" % repr(params)) raise def messageDecode(raw): return bencode.bdecode(raw) def on_tick(self): """ 'tick' handler. For PING RPC, the only time we should get a tick is when the ping has timed out """ self.log(3, "timeout awaiting ping reply from %s" % self.peerToPing) self.returnValue(False) def _on_reply(self, peer, msgId, **kw): """ This should never happen """ self.log(4, "got unhandled reply:\npeer=%s\nmsgId=%s\nkw=%s" % ( peer, msgId, kw)) def send_reply(self, **kw): """ Sends an RPC reply back to upstream peer """ self.log(5, "\nnode %s\nreplying to peer %s:\n%s" % ( self.node, self, kw)) self.send_raw(type="reply", **kw) def threadHousekeeping(self): """ Periodically invoke nodes' housekeeping """ self.log(3, "\nnode housekeeping thread running") try: while self.isRunning: #self.log(4, "calling nodes' housekeeping methods") #self.nodesLock.acquire() for node in self.nodes: node._doHousekeeping() #self.nodesLock.release() time.sleep(1) self.log(3, "\nnode housekeeping thread terminated") except: #self.nodesLock.release() traceback.print_exc() self.log(1, "\nnode housekeeping thread crashed") class KRpcFindNode(KRpc): """ Implements the FIND_NODE rpc as per Kademlia spec """ @others def __init__(self, localNode, client=None, **kw): """ Creates and launches the findNode rpc Arguments: - localNode - the node performing this RPC - client - see KRpc.__init__ Keywords: - hash - a string, long int or KHash object representing what we're looking for. treatment depends on type: - KHash object - used as is - string - gets wrapped into a KHash object - long int - wrapped into a KHash object refer KHash.__init__ - raw - whether 'hash' is already a hash, default True - local - True/False - whether to only search local store, or pass on the query to the network, default True """ kw = dict(kw) if kw.get('raw', False): h = kw['hash'] del kw['hash'] kw['raw'] = h self.hashWanted = KHash(**kw) else: self.hashWanted = KHash(kw['hash'], **kw) self.isLocalOnly = kw.get('local', True) self.numQueriesPending = 0 self.numRounds = 0 # count number of rounds self.numReplies = 0 # number of query replies received self.numQueriesSent = 0 self.numPeersRecommended = 0 # whichever mode we're called from, we gotta find the k closest peers self.localNode = localNode self.peerTab = self.findClosestPeersInitial() self.log(4, "KRpcFindNode: isLocalOnly=%s" % self.isLocalOnly) if kw.has_key('cbArgs'): KRpc.__init__(self, localNode, client, cbArgs=kw['cbArgs']) else: KRpc.__init__(self, localNode, client) @ Tech overview: - this implementation creates each Node ID as an SHA1 hash of the node's 'destination' - the string which constitutes its address as an I2P endpoint. Datagram formats: - each datagram sent from one node to another is a python dict object, encoded and decoded with the 'bencode' object serialisation module. - we use bencode because regular Python pickle is highly insecure, allowing crackers to create malformed pickles which can have all manner of detrimental effects, including execution of arbitrary code. - the possible messages are listed below, along with their consituent dictionary keys: 1. ping: - msgId - a message identifier guaranteed to be unique with respect to the sending node 2. findNode: - msgId - unique message identifier - hash - the hash we're looking for - initiator - True/False, according to whether this node should initiate/perform the findNode, or whether this rpc is coming from another seeking node 3. findData: - msgId - unique message identifier - hash - the exact key hash of the data we want to retrieve - initiator - True/False, according to whether this node should initiate/perform the findNode, or whether this rpc is coming from another seeking node 4. store: - msgId - unique message identifier - hash - the exact key hash of the data we want to store - data - the data we want to store 5. reply: - msgId - the original msgId we're replying to The other items in a reply message depend on what kind of message we're replying to, listed below: 1. ping - no additional data 2. findNode: - nodes - a list of dests nearest the given hash 3. findData: - nodes - as for findNode, OR - data - the retrieved data, or None if not found 4. store: - status - True or False according to whether the store operation was successful def _findnode(self, something=None, callback=None, **kw): """ Mainly for testing - does a findNode query on the network Arguments: - something - one of: - plain string - the string gets hashed and used for the search - int or long int - this gets used as the raw hash - a KHash object - that's what gets used - None - the value of the 'raw' keyword will be used instead - callback - optional - if given, a callable object which will be called upon completion, with the result as argument Keywords: - local - optional - if True, only returns the closest peers known to node. if False, causes node to query other nodes. default is False - raw - one of: - 20-byte string - this gets used as a binary hash - 40-byte string - this gets used as a hex hash """ if not kw.has_key('local'): kw = dict(kw) kw['local'] = False self.log(3, "about to instantiate findnode rpc") if callback: KRpcFindNode(self, callback, hash=something, **kw) self.log(3, "asynchronously invoked findnode, expecting callback") else: lst = KRpcFindNode(self, hash=something, **kw).execute() self.log(3, "back from findnode rpc") res = [self._normalisePeer(p) for p in lst] # wrap in KPeer objects return res def start(self): """ Kicks off this RPC """ # if we're being called by an upstream initiator, just return the peer list if self.isLocalOnly: peerDests = [peer.dest for peer in self.peerTab] self.log(5, "findNode: local only: returning to upstream with %s" % repr(peerDests)) self.returnValue(peerDests) return # just return nothing if we don't have any peers if len(self.peerTab) == 0: self.returnValue([]) return # send off first round of queries self.sendSomeQueries() return def on_reply(self, peer, msgId, **details): """ Callback for FIND_NODE reply """ # shorthand peerTab = self.peerTab self.numReplies += 1 # ------------------------------------------------------------ # determine who replied, and get the raw dests sent back try: peerRec = peerTab[peer] except: traceback.print_exc() self.log(3, "discarding findNode reply from unknown peer %s %s, discarding" % ( peer, details)) return # one less query to wait for self.numQueriesPending -= 1 # ---------------------------------------------------------- # peerRec is the peer that replied # peers is a list of raw dests # save ref to this peer, it's seemingly good self.localNode.addref(peerRec.peer) # mark it as having replied if peerRec.state != 'queried': self.log(2, "too weird - got a reply from a peer we didn't query") peerRec.state = 'replied' # wrap the returned peers as KPeer objects peersReturned = details.get('result', []) peersReturned = [self.localNode._normalisePeer(p) for p in peersReturned] self.numPeersRecommended += len(peersReturned) # and add them to table in state 'recommended' for p in peersReturned: peerTab.append(p, 'recommended') # try to fire off more queries self.sendSomeQueries() # and check for and action possible end of query round self.checkEndOfRound() def on_tick(self): """ Callback for FIND_NODE reply timeout """ # check for timeouts, and update offending peers now = time.time() for peerRec in self.peerTab: if peerRec.hasTimedOut(now): peerRec.state = 'timeout' # makes room for more queries self.sendSomeQueries() # possible end of round self.checkEndOfRound() # schedule next tick self.nextTickTime = time.time() + 5 @ Verbatim extract from original Kademlia paper follows: The lookup initiator starts by picking x nodes from its closest non-empty k-bucket (or, if that bucket has fewer than x entries, it just takes the closest x nodes it knows of). The initiator then sends parallel, asynchronous FIND NODE RPCs to the x nodes it has chosen. x is a system-wide concurrency parameter, such as 3. In the recursive step, the initiator resends the FIND NODE to nodes it has learned about from previous RPCs. [Paraphrased - in the recursive step, the initiator sends a FIND_NODE to each of the nodes that were returned as results of these previous FIND_NODE RPCs.] (This recursion can begin before all of the previous RPCs have returned). Of the k nodes the initiator has heard of closest to the target, it picks x that it has not yet queried and resends the FIND_NODE RPC to them. Nodes that fail to respond quickly are removed from consideration until and unless they do respond. If a round of FIND_NODEs fails to return a node any closer than the closest already seen, the initiator resends the FIND NODE to all of the k closest nodes it has not already queried. The lookup terminates when the initiator has queried and gotten responses from the k closest nodes it has seen. class KRpcFindData(KRpcFindNode): """ variant of KRpcFindNode which returns key value if found """ @others class KRpcStore(KRpc): """ Implements key storage """ @others def _finddata(self, something=None, callback=None, **kw): """ As for findnode, but if data is found, return the data instead """ if not kw.has_key('local'): kw = dict(kw) kw['local'] = False self.log(3, "about to instantiate finddata rpc") if callback: KRpcFindData(self, callback, hash=something, **kw) self.log(3, "asynchronously invoked finddata, expecting callback") else: res = KRpcFindData(self, hash=something, **kw).execute() self.log(3, "back from finddata rpc") if not isinstance(res, str): self.log(4, "findData RPC returned %s" % repr(res)) res = [self._normalisePeer(p) for p in res] # wrap in KPeer objects return res def _store(self, key, value, callback=None, **kw): """ Performs a STORE rpc Arguments: - key - string - text name of key - value - string - value to store Keywords: - local - if given and true, only store value onto local store """ if not kw.has_key('local'): kw = dict(kw) kw['local'] = False key = shahash(key) if callback: KRpcStore(self, callback, key=key, value=value, **kw) self.log(3, "asynchronously invoked findnode, expecting callback") else: res = KRpcStore(self, key=key, value=value, **kw).execute() return res type = 'unknown' # override in subclass type = 'ping' type = 'findNode' type = 'findData' type = 'store' @ignore @language python """ Bloom filters in Python Adam Langley <agl@imperialviolet.org> """ @others import array import struct __all__ = ['Bloom'] mixarray = array.array ('B', '\x00' * 256) # The mixarray is based on RC4 and used as diffusion in the hashing function def mixarray_init (mixarray): for i in range (256): mixarray[i] = i k = 7 for j in range (4): for i in range (256): s = mixarray[i] k = (k + s) % 256 mixarray[i] = mixarray[k] mixarray[k] = s mixarray_init(mixarray) class Bloom (object): """ Bloom filters provide a fast and compact way of checking set membership. They do this by introducing a risk of a false positive (but there are no false negatives). For more information see: - http://www.cs.wisc.edu/~cao/papers/summary-cache/node8.html """ @others def __init__ (self, bytes, hashes): ''' bytes is the size of the bloom filter in 8-bit bytes and hashes is the number of hash functions to use. Consult the web page linked above for values to use. If in doubt, bytes = num_elements and hashes = 4 ''' self.hashes = hashes self.bytes = bytes self.a = self._make_array (bytes) def _make_array (self, size): a = array.array ('B') # stupidly, there's no good way that I can see of # resizing an array without allocing a huge string to do so # thus I use this, slightly odd, method: blocklen = 256 arrayblock = array.array ('B', '\x00' * blocklen) todo = size while (todo >= blocklen): a.extend (arrayblock) todo -= blocklen if todo: a.extend (array.array ('B', '\x00' * todo)) # now a is of the right length return a def _hashfunc (self, n, val): '''Apply the nth hash function''' global mixarray b = [ord(x) for x in struct.pack ('I', val)] c = array.array ('B', [0, 0, 0, 0]) for i in range (4): c[i] = mixarray[(b[i] + n) % 256] return struct.unpack ('I', c.tostring())[0] def insert(self, val): for i in range(self.hashes): n = self._hashfunc(i, val) % (self.bytes * 8) self.a[n // 8] |= self.bitmask[n % 8] def __contains__ (self, val): for i in range (self.hashes): n = self._hashfunc (i, val) % (self.bytes * 8) if not self.a[n // 8] & self.bitmask[n % 8]: return 0 return 1 class CountedBloom (Bloom): """ Just like a Bloom filter, but provides counting (e.g. you can delete as well). This uses 4 bits per bucket, so is generally four times larger than the same non-counted bloom filter. """ @others def __init__ (self, buckets, hashes): ''' Please note that @buckets must be even. Also note that with a Bloom object you give the number of *bytes* and each byte is 8 buckets. Here you're giving the number of buckets. ''' assert buckets % 2 == 0 self.hashes = hashes self.buckets = buckets self.a = self._make_array (buckets // 2) def insert (self, val): masks = [(0x0f, 0xf0), (0xf0, 0x0f)] shifts = [4, 0 ] for i in range (self.hashes): n = self._hashfunc (i, val) % self.buckets byte = n // 2 bucket = n % 2 (notmask, mask) = masks[bucket] shift = shifts[bucket] bval = ((self.a[byte] & mask) >> shift) if bval < 15: # we shouldn't increment it if it's at the maximum bval += 1 self.a[byte] = (self.a[byte] & notmask) | (bval << shift) def __contains__ (self, val): masks = [(0x0f, 0xf0), (0xf0, 0x0f)] shifts = [4, 0 ] for i in range (self.hashes): n = self._hashfunc (i, val) % self.buckets byte = n // 2 bucket = n % 2 (notmask, mask) = masks[bucket] shift = shifts[bucket] bval = ((self.a[byte] & mask) >> shift) if bval == 0: return 0 return 1 def __delitem__ (self, val): masks = [(0x0f, 0xf0), (0xf0, 0x0f)] shifts = [4, 0 ] for i in range (self.hashes): n = self._hashfunc (i, val) % self.buckets byte = n // 2 bucket = n % 2 (notmask, mask) = masks[bucket] shift = shifts[bucket] bval = ((self.a[byte] & mask) >> shift) if bval < 15: # we shouldn't decrement it if it's at the maximum bval -= 1 self.a[byte] = (self.a[byte] & notmask) | (bval << shift) if __name__ == '__main__': print 'Testing bloom filter: there should be no assertion failures' a = Bloom (3, 4) a.insert (45) print a.a a.insert (17) print a.a a.insert (12) print a.a assert 45 in a assert 45 in a assert not 33 in a assert 45 in a assert 17 in a assert 12 in a c = 0 for x in range (255): if x in a: c += 1 print c print float(c)/255 a = CountedBloom (24, 4) a.insert (45) print a.a a.insert (17) print a.a a.insert (12) print a.a assert 45 in a assert 45 in a assert not 33 in a assert 45 in a assert 17 in a assert 12 in a c = 0 for x in range (255): if x in a: c += 1 print c print float(c)/255 del a[45] assert not 45 in a bitmask = [0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01] def __iter__(self): return iter(self.nodes) def findClosestPeersInitial(self): """ Searches our k-buckets, and returns a table of k of peers closest to wanted hash into self.closestPeersInitial """ hashobj = self.hashWanted lst = [] buckets = self.localNode.buckets for bucket in buckets: for peer in bucket: lst.append(peer) table = KPeerQueryTable(lst, self.hashWanted, 'start') table.sort() return table[:maxBucketSize] def returnValue(self, res=None, **kw): """ Passes a return value back to the original caller, be it the local application, or an upstream peer Arguments: - just one - a result object to pass back, if this RPC was instigated by a local application call. Note that if this RPC was instigated by an upstream peer, this will be ignored. Keywords: - the items to return, in the case that this RPC was instigated by an upstream peer. Ignored if this RPC was instigated by a local application call. Note - the RPC invocation/reply dict keys are listed at the top of this source file. """ self.terminate() if self.callback: if hasattr(self, 'cbArgs'): self.callback(res, self.cbArgs) else: self.callback(res) elif self.isLocal: self.queue.put(res) else: self.upstreamPeer.send_reply(msgId=self.upstreamMsgId, **kw) def addPeerIfCloser(self, peer): """ Maintains the private .peersToQuery array. If the array is not yet maxed (ie, length < maxBucketSize), the peer is simply added. However, if the array is maxed, it finds the least-close peer, and replaces it with the given peer if closer. """ def isCloserThanQueried(self, peer): """ Test function which returns True if argument 'peer' is closer than all the peers in self.peersAlreadyQueried, or False if not """ for p in self.peersAlreadyQueried: if p.id.rawdistance(self.hashWanted) < peer.id.rawdistance(self.hashWanted): return False return True def __del__(self): #self.log(4, "\nRPC %s getting the chop" % (str(self))) pass def __str__(self): return "<%s on node %s>" % (self.__class__.__name__, self.localNode.name) def __repr__(self): return str(self) class KPeerQueryRecord(KBase): """ Keeps state information regarding a peer we're quering """ @others class KPeerQueryTable(KBase): """ Holds zero or more instances of KPeerQuery and presents/sorts table in different forms """ @others def __str__(self): return "<KTestNetwork: %d nodes>" % len(self.nodes) def __repr__(self): return str(self) def sendSomeQueries(self, **kw): """ First step of findNode Select alpha nodes that we haven't yet queried, and send them queries """ # bail if too busy if self.numQueriesPending >= maxConcurrentQueries: return # shorthand localNode = self.localNode hashWanted = self.hashWanted # randomly choose some peers #somePeerRecs = self.peerTab.chooseN(numSearchPeers) somePeerRecs = self.peerTab.select('start') # start our ticker self.nextTickTime = time.time() + timeout['findNode'] numQueriesSent = 0 # and send them findNode queries if len(somePeerRecs) > 0: for peerRec in somePeerRecs: self.log(3, "querying %s" % peerRec) if self.numQueriesPending < maxConcurrentQueries: self.sendOneQuery(peerRec) numQueriesSent += 1 else: break self.log(3, "%s queries sent, awaiting reply" % numQueriesSent) else: self.log(3, "no peer recs???") for peerRec in self.peerTab: self.log(4, "%s state=%s, dest=%s..." % (peerRec, peerRec.state, peerRec.dest[:12])) def returnTheBestWeGot(self): """ Returns the k closest nodes to the wanted hash that we have actually heard from """ # pick the peers which have replied to us closest = self.peerTab.select('closest') self.peerTab.dump() # add ourself to the list - we could easily be one of the best localNode = self.localNode selfDest = localNode._normalisePeer(localNode.dest) closest.append(selfDest, state='closest') # sort in order of least distance first closest.sort() # pick the best k of these #peersHeardFrom = peersHeardFrom[:maxBucketSize] #peersHeardFrom = peersHeardFrom[:numSearchPeers] # extract their dest strings peers = [p.peer.dest for p in closest] # pass these back self.returnValue(peers) # and we're done return def __init__(self, lst=None, sorthash=None, state=None, **kw): self.peers = [] if lst == None: lst = [] else: self.setlist(lst, state, **kw) self.sorthash = sorthash def setlist(self, lst, state=None, **kw): for item in lst: self.append(item, state, **kw) def extend(self, items, state, **kw): for item in items: self.append(item, state, **kw) def append(self, item, state=None, **kw): if isinstance(item, KPeerQueryRecord): self.log(5, "adding a KPeerQueryRecord, state=%s" % state) if state != None: item.state = state item.__dict__.update(kw) peerRec = item elif isinstance(item, KPeer): self.log(5, "adding a KPeer") peerRec = KPeerQueryRecord(item, self, state, **kw) else: self.log(2, "bad peer %s" % repr(item)) raise KBadPeer if peerRec not in self: self.log(5, "peerRec=%s list=%s" % (peerRec, self.peers)) self.peers.append(peerRec) else: self.log(5, "trying to append duplicate peer???") def remove(self, item): self.peers.remove(item) def getExpired(self): """ return a list of peers which have expired """ return KPeerQueryTable( filter(lambda item: item.hasTimedOut(), self.peers), self.sorthash ) def purgeExpired(self): """ Eliminate peers which have expired """ for peer in self.peers: if peer.hasTimedOut(): self.peers.remove(peer) def __getitem__(self, idx): """ Allow the table to be indexed by any of: - KPeerQueryRecord - integer index - long string - treated as dest - short string - treated as peer id hash string - KHash - finds peer with that id - KPeer - returns peer with that peer """ if type(idx) == type(0): return self.peers[idx] elif isinstance(idx, KPeer): for peer in self.peers: if peer.peer == idx: return peer raise IndexError("Query table has no peer %s" % idx) elif isinstance(idx, str): if len(str) > 512: for peer in self.peers: if peer.peer.dest == idx: return peer raise IndexError("No peer with dest %s" % idx) else: for peer in self.peers: if peer.peer.id.value == idx: return peer raise IndexError("No peer with dest hash %s" % idx) elif isinstance(idx, KHash): for peer in self.peers: if peer.peer.id == idx: return peer raise IndexError("No peer with id %s" % idx) else: raise IndexError("Invalid selector %s" % repr(idx)) def __len__(self): return len(self.peers) def __getslice__(self, fromidx, toidx): return KPeerQueryTable(self.peers[fromidx:toidx], self.sorthash) def __iter__(self): return iter(self.peers) def sort(self): """ Sort the table in order of increasing distance from self.sorthash """ self.peers.sort() def select(self, criterion): """ Returns a table of items for which criterion(item) returns True Otherwise, if 'criterion' is a string, returns the items whose state == criterion. Otherwise, if 'criterion' is a list or tuple, return the items whose state is one of the elements in criterion """ if callable(criterion): func = criterion elif type(criterion) in [type(()), type([])]: func = lambda p: p.state in criterion else: func = lambda p: p.state == criterion recs = [] for peerRec in self.peers: if func(peerRec): recs.append(peerRec) return self.newtable(recs) def filter(self, func): """ Eliminate, in place, all items where func(item) returns False """ for peerRec in self.peers: if not func(peerRec): self.peers.remove(peerRec) def purge(self, func): """ Eliminate, in place, all items where func(item) returns True """ if 0 and desperatelyDebugging: set_trace() for peerRec in self.peers: if func(peerRec): self.peers.remove(peerRec) def chooseN(self, n): """ Randomly select n peer query records """ candidates = self.peers[:] self.log(3, "candidates = %s" % repr(candidates)) chosen = [] i = 0 if len(candidates) <= n: chosen = candidates else: while i < n: try: peer = random.choice(candidates) except: self.log(2, "failed to choose one of %s" % repr(candidates)) raise chosen.append(peer) candidates.remove(peer) i += 1 return self.newtable(chosen) def __str__(self): return "<KPeerQueryTable: %d peers>" % len(self) #.peers) def __repr__(self): return str(self) def returnValue(self, items): """ override with a nicer call sig """ # a hack for testing - save this RPC object into the node # so we can introspect it self.localNode.lastrpc = self items = items[:maxBucketSize] self.reportStats() KRpc.returnValue(self, items, result=items) def newtable(self, items, state=None, **kw): """ Returns a new KPeerQueryTable object, based on this one, but containing 'items' """ tab = KPeerQueryTable(items, sorthash=self.sorthash, state=state, **kw) return tab def __add__(self, other): self.extend(other) def __init__(self, peer, table, state=None, **kw): self.peer = peer self.dest = peer.dest self.deadline = time.time() + timeout['findNode'] self.table = table # state is always one of: # - 'start' - have not yet sent query to peer # - 'recommended' - peer was recommended by another peer, no query sent # - 'queried' - sent query, awaiting reply or timeout # - 'replied' - this peer has replied to our query # - 'timeout' - timed out waiting for peer reply # - 'toofar' - too far away to be of interest # - 'closest' - this peer is one of the closest so far if state == None: state = 'start' if not isinstance(state, str): raise Exception("Invalid state %s" % state) self.state = state self.__dict__.update(kw) def hasTimedOut(self, now=None): if now == None: now = time.time() return self.state == 'queried' and now > self.deadline def __cmp__(self, other): return cmp(self.peer.id.rawdistance(self.table.sorthash), other.peer.id.rawdistance(self.table.sorthash)) def __lt__(self, other): return (cmp(self, other) < 0) def __le__(self, other): return (cmp(self, other) <= 0) def __gt__(self, other): return (cmp(self, other) > 0) def __ge__(self, other): return (cmp(self, other) <= 0) def isCloserThanAllOf(self, tab): """ returns True if this peerRec is closer to the desired hash than all of the peerRecs in table 'tab' """ if not isinstance(tab, KPeerQueryTable): self.log(2, "invalid qtable %s" % repr(tab)) raise Exception("invalid qtable %s" % repr(tab)) for rec in tab: if self > rec: return False return True def isCloserThanOneOf(self, tab): """ returns True if this peerRec is closer to the desired hash than one or more of of the peerRecs in table 'tab' """ if not isinstance(tab, KPeerQueryTable): self.log(2, "invalid qtable %s" % repr(tab)) raise Exception("invalid qtable %s" % repr(tab)) for rec in tab: if self < rec: return True return False def __contains__(self, other): self.log(5, "testing if %s is in %s" % (other, self.peers)) for peerRec in self.peers: if peerRec.peer.dest == other.peer.dest: return True return False def sendOneQuery(self, peerRec): """ Sends off a query to a single peer """ if peerRec.state != 'start': self.log(2, "duh!! peer state %s:\n%s" % (peerRec.state, peerRec)) return msgId = self.localNode._msgIdAlloc() self.bindPeerReply(peerRec.peer, msgId) peerRec.msgId = msgId if self.type == 'findData': peerRec.peer.send_findData(hash=self.hashWanted, msgId=msgId) else: peerRec.peer.send_findNode(hash=self.hashWanted, msgId=msgId) peerRec.state = 'queried' self.numQueriesPending += 1 self.numQueriesSent += 1 def run(self, func=None): """ Runs the core in foreground, with the client func in background """ if func==None: func = test self.bg = False thread.start_new_thread(self.runClient, (func,)) set_trace() self.threadRxPackets() def stop(self): self.isRunning = False def runClient(self, func): self.log(3, "Core: running client func") try: func() except: traceback.print_exc() self.log(3, "Core: client func exited") self.stop() def putKey(self, key, val, keyIsHashed=False): """ Stores a string into this storage under the key 'key' Returns True if key was saved successfully, False if not """ try: if keyIsHashed: keyHashed = key else: keyHashed = shahash(key) keyHashed = keyHashed.lower() keyPath = os.path.join(self.keysDir, keyHashed) file(keyPath, "wb").write(val) self.log(4, "stored key: '%s'\nunder hash '%s'\n(keyIsHashed=%s)" % ( key, keyHashed, keyIsHashed)) return True except: traceback.print_exc() self.log(3, "failed to store key") return False def shahash(somestr, bin=False): shaobj = sha.new(somestr) if bin: return shaobj.digest() else: return shaobj.hexdigest() def getKey(self, key, keyIsHashed=False): """ Attempts to retrieve item from node's local file storage, which was stored with key 'key'. Returns value as a string if found, or None if not present """ try: if keyIsHashed: keyHashed = key else: keyHashed = shahash(key) keyHashed = keyHashed.lower() self.log(4, "key=%s, keyHashed=%s, keyIsHashed=%s" % (key, keyHashed, keyIsHashed)) keyPath = os.path.join(self.keysDir, keyHashed) if os.path.isfile(keyPath): return file(keyPath, "rb").read() else: return None except: traceback.print_exc() self.log(3, "error retrieving key '%s'" % key) return None def __init__(self, localNode, client=None, **kw): """ Creates and launches a STORE rpc Arguments: - localNode - the node performing this RPC - client - see KRpc.__init__ Keywords: - key - the key under which we wish to save the data - value - the value we wish to save - local - True/False: - if True, only save in local store - if False, do a findNode to find the nodes to save the key to, and tell them to save it default is True """ self.key = kw['key'] #self.keyHashed = shahash(self.key) self.keyHashed = self.key self.value = kw['value'] self.isLocalOnly = kw.get('local', True) # set 'splitting' flag to indicate if we need to insert as splitfiles self.splitting = len(self.value) > maxValueSize self.log(4, "isLocalOnly=%s" % self.isLocalOnly) if kw.has_key('cbArgs'): KRpc.__init__(self, localNode, client, cbArgs=kw['cbArgs']) else: KRpc.__init__(self, localNode, client) def start(self): """ Kicks off this RPC """ # if too big, then break up into <30k chunks if self.splitting: self.storeSplit() return # not too big - prefix a 0 chunk count, and go ahead as a single entity self.value = "chunks:0\n" + self.value # if local only, or no peers, just save locally if self.isLocalOnly or len(self.localNode.peers) == 0: result = self.localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True) if result: result = 1 else: result = 0 self.returnValue(result) return # no - se have to find peers to store the key to, and tell them to # store the key # launch a findNode rpc, continue in our callback KRpcFindNode(self.localNode, self.on_doneFindNode, hash=self.keyHashed, raw=True, local=False) return def on_doneFindNode(self, lst): """ Receive a callback from findNode Send STORE command to each node that comes back """ localNode = self.localNode # normalise results normalisePeer = localNode._normalisePeer peers = [normalisePeer(p) for p in lst] # wrap in KPeer objects self.log(2, "STORE RPC findNode - got peers %s" % repr(peers)) i = 0 self.numPeersSucceeded = 0 self.numPeersFailed = 0 self.numPeersFinished = 0 # and fire off store messages for each peer for peer in peers: if peer.dest == localNode.dest: self.log(3, "storing to ourself") localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True) self.numPeersSucceeded += 1 self.numPeersFinished += 1 else: msgId = self.localNode._msgIdAlloc() self.log(4, "forwarding store cmd to peer:\npeer=%s\nmsgId=%s" % (peer, msgId)) self.bindPeerReply(peer, msgId) peer.send_store(key=self.keyHashed, value=self.value, msgId=msgId) i += 1 if i >= numStorePeers: break self.nextTickTime = time.time() + timeout['store'] self.log(2, "Sent store cmd to %s peers, awaiting responses" % i) self.numPeersToStore = i def returnValue(self, result): """ an override with a nicer call sig """ # a hack for testing - save this RPC object into the node # so we can introspect it self.localNode.lastrpc = self try: KRpc.returnValue(self, result, status=result) except: traceback.print_exc() self.log(3, "Failed to return %s" % repr(result)) KRpc.returnValue(self, 0, status=0) def on_reply(self, peer, msgId, **details): """ callback which fires when we get a reply from a STORE we sent to a peer """ self.numPeersSucceeded += 1 self.numPeersFinished += 1 if self.numPeersFinished == self.numPeersToStore: # rpc is finished self.returnValue(True) def on_tick(self): self.log(3, "Timeout awaiting store reply from %d out of %d peers" % ( self.numPeersToStore - self.numPeersSucceeded, self.numPeersToStore)) if self.numPeersSucceeded == 0: self.log(3, "Store timeout - no peers replied, storing locally") self.localNode.storage.putKey(self.keyHashed, self.value, keyIsHashed=True) self.returnValue(True) def _on_store(self, peer, msgId, **kw): """ Handles incoming STORE command """ self.log(4, "got STORE rpc from upstream:\npeer=%s\nmsgId=%s\nkw=%s" % (peer, msgId, kw)) KRpcStore(self, (peer, msgId), local=True, **kw) def start(self): """ Kicks off the RPC. If requested key is stored locally, simply returns it. Otherwise, falls back on parent method """ # if we posses the data, just return the data value = self.localNode.storage.getKey(self.hashWanted.asHex(), keyIsHashed=True) if value != None: self.log(4, "Found required value in local storage") self.log(4, "VALUE='%s'" % value) self.on_gotValue(value, self.hashWanted.asHex()) return # no such luck - pass on to parent KRpcFindNode.start(self) def asHex(self): return ("%040x" % self.value).lower() def on_reply(self, peer, msgId, **details): """ Callback for FIND_NODE reply """ res = details.get('result', None) if isinstance(res, str): self.on_gotValue(res, self.hashWanted.asHex()) else: KRpcFindNode.on_reply(self, peer, msgId, **details) def __del__(self): """ Cleanup """ @first #! /usr/bin/env python """ A simple Hashcash implementation Visit U{http://www.hashcash.org} for more info about the theory and usage of hashcash. Run this module through epydoc to get pretty doco. Overview: - implements a class L{HashCash}, with very configurable parameters - offers two convenience wrapper functions, L{generate} and L{verify}, for those who can't be bothered instantiating a class - given a string s, genToken produces a hashcash token string t, as binary or base64 - generating t consumes a lot of cpu time - verifying t against s is almost instantaneous - this implementation produces clusters of tokens, to even out the token generation time Performance: - this implementation is vulnerable to: - people with lots of computers, especially big ones - people writing bruteforcers in C (python is way slow) - even with the smoothing effect of creating token clusters, the time taken to create a token can vary by a factor of 7 Theory of this implementation: - a hashcash token is created by a brute-force algorithm of finding an n-bit partial hash collision - given a string s, and a quality level q, generate a 20-byte string h, such that: 1. h != s 2. len(h) == 20 3. ((sha(s) xor sha(h)) and (2 ^ q - 1)) == 0 - in other words, hash(h) and hash(s) have q least significant bits in common If you come up with a faster, but PURE PYTHON implementation, using only modules included in standard python distribution, please let me know so I can upgrade mine or link to yours. Written by David McNab, August 2004 Released to the public domain. """ @others import sha, array, random, base64, math from random import randint shanew = sha.new def generate(value, quality, b64=False): """ Generates a hashcash token This is a convenience wrapper function which saves you from having to instantiate a HashCash object. Arguments: - value - a string against which to generate token - quality - an int from 1 to 160 - typically values are 16 to 30 - b64 - if True, return the token as base64 (suitable for email, news, and other text-based contexts), otherwise return a binary string Quality values for desktop PC usage should typically be between 16 and 30. Too low, and it makes an attacker's life easy. Too high, and it makes life painful for the user. """ if b64: format = 'base64' else: format = 'binary' h = HashCash(quality=quality, format=format) return h.generate(value) def binify(L): """ Convert a python long int into a binary string """ res = [] while L: res.append(chr(L & 0xFF)) L >>= 8 res.reverse() return "".join(res) def intify(s): """ Convert a binary string to a python long int """ n = 0L for c in s: n = (n << 8) | ord(c) return n # your own config settings - set these to get a good trade-off between # token size and uniformity of time taken to generate tokens # # the final token size will be tokenSize * chunksPerToken for binary # tokens, or ceil(4/3 * tokenSize * chunksPerToken) for base64 tokens # # the reason for building a token out of multiple token chunks is to # try to even out the time taken for token generation # # without this, token generation time is very random, with some tokens # generating almost instantaneously, and other tokens taking ages defaultChunkSize = 3 # size of each chunk in a token defaultNumChunks = 12 # number of chunks in each token defaultQuality = 12 # number of partial hash collision bits required defaultFormat = 'base64' # by default, return tokens in base64 format defaultVerbosity = 0 # increase this to get more verbose output def verify(value, quality, token): """ Verifies a hashcash token. This is a convenience wrapper function which saves you from having to instantiate a HashCash object. Arguments: - value - the string against which to check the hashcash token - quality - the number of bits of token quality we require - token - a hashcash token string """ h = HashCash(quality=quality) return h.verify(value, token) def test(nbits=14): """ Basic test function - perform encoding and decoding, in plain and base64 formats, using the wrapper functions """ print "Test, using wrapper functions" value = _randomString() print "Generated random string\n%s" % value print print "Generating plain binary %s-bit token for:\n%s" % (nbits, value) tok = generate(value, nbits) print "Got token %s, now verifying" % repr(tok) result = verify(value, nbits, tok) print "Verify = %s" % repr(result) print print "Now generating base64 %s-bit token for:\n%s" % (nbits, value) tok = generate(value, nbits, True) print "Got base64 token %s, now verifying" % repr(tok) result = verify(value, nbits, tok) print "Verify = %s" % repr(result) # get a boost of speed if psyco is available on target machine try: import psyco psyco.bind(genToken) psyco.bind(binify) psyco.bind(intify) except: pass class HashCash: """ Class for creating/verifying hashcash tokens Feel free to subclass this, overriding the default attributes: - chunksize - numchunks - quality - format - verbosity """ @others def __init__(self, **kw): """ Create a HashCash object Keywords: - chunksize - size of each token chunk - numchunks - number of chunks per token - quality - strength of token, in bits: - legal values are 1 to 160 - typical values are 10 to 30, larger values taking much longer to generate - format - 'base64' to output tokens in base64 format; any other value causes tokens to be generated in binary string format - verbosity - verbosity of output messages: - 0 = silent - 1 = critical only - 2 = noisy """ for key in ['chunksize', 'numchunks', 'quality', 'format', 'verbosity']: if kw.has_key(key): setattr(self, key, kw[key]) self.b64ChunkLen = int(math.ceil(self.chunksize * 4.0 / 3)) def generate(self, value): """ Generate a hashcash token against string 'value' """ quality = self.quality mask = 2 ** quality - 1 hV = sha.new(value).digest() nHV = intify(hV) maxTokInt = 2 ** (self.chunksize * 8) tokenChunks = [] chunksPerToken = self.numchunks # loop around generating random strings until we get one which, # when xor'ed with value, produces a hash with the first n bits # set to zero while 1: nTok = randint(0, maxTokInt) sNTok = binify(nTok) hSNTok = shanew(sNTok).digest() nHSNTok = intify(hSNTok) if (nHV ^ nHSNTok) & mask == 0: # got a good token if self.format == 'base64': if not self._checkBase64(sNTok): # chunk fails to encode/decode base64 if self.verbosity >= 2: print "Ditching bad candidate token" continue bSNTok = self._enc64(sNTok) if self.verbosity >= 2: print "encoded %s to %s, expect chunklen %s" % ( repr(sNTok), repr(bSNTok), self.b64ChunkLen) sNTok = bSNTok # got something that works, add it to chunks, return if we got enough chunks if sNTok in tokenChunks: continue # already got this one tokenChunks.append(sNTok) if len(tokenChunks) == chunksPerToken: return "".join(tokenChunks) def verify(self, value, token): """ Verifies a hashcash token against string 'value' """ if self.verbosity >= 2: print "Verify: checking token %s (len %s) against %s" % (token, len(token), value) # mask is an int with least-significant 'q' bits set to 1 mask = 2 ** self.quality - 1 # breaking up token into its constituent chunks chunks = [] # verify token size if len(token) != self.chunksize * self.numchunks: # try base64 decoded = False try: for i in range(0, self.numchunks): b64chunk = token[(i * self.b64ChunkLen) : ((i + 1) * self.b64ChunkLen)] chunk = self._dec64(b64chunk) if len(chunk) != self.chunksize: if self.verbosity >= 2: print "Bad chunk length in decoded base64, wanted %s, got %s" % ( self.chunksize, len(chunk)) return False chunks.append(chunk) except: if self.verbosity >= 2: if decoded: print "Bad token length" else: print "Base64 decode failed" return False else: # break up token into its chunks for i in range(0, self.numchunks): chunks.append(token[(i * self.chunksize) : ((i + 1) * self.chunksize)]) # produce hash string and hash int for input string hV = sha.new(value).digest() nHv = intify(hV) # test each chunk if self.verbosity >= 2: print "chunks = %s" % repr(chunks) while chunks: chunk = chunks.pop() # defeat duplicate chunks if chunk in chunks: if self.verbosity >= 2: print "Rejecting token chunk - duplicate exists" return False # hash the string and the token hTok = sha.new(chunk).digest() # defeat the obvious attack if hTok == hV: if self.verbosity >= 2: print "Rejecting token chunk - equal to token" return False # test if these hashes have the least significant n bits in common nHTok = intify(hTok) if (nHTok ^ nHv) & mask != 0: # chunk failed if self.verbosity >= 2: print "Rejecting token chunk %s - hash test failed" % repr(chunk) return False # pass return True def ctest(quality=14): """ Basic test function - perform token generation and verify, against a random string. Instantiate a HashCash class instead of just using the wrapper funcs. """ print "Test using HashCash class" value = _randomString() print "Generated random string\n%s" % value print hc = HashCash(quality=quality, format='base64') print "Generating plain binary %s-bit token for:\n%s" % (quality, value) tok = hc.generate(value) print "Got token %s, now verifying" % repr(tok) result = hc.verify(value, tok) print "Verify = %s" % repr(result) print if __name__ == '__main__': test() def ntest(): """ This function does 256 key generations in a row, and dumps some statistical results """ # adjust these as desired chunksize=3 numchunks=32 quality=6 numIterations = 256 import time try: import stats except: print "This test requires the stats module" print "Get it (and its dependencies) from:" print "http://www.nmr.mgh.harvard.edu/Neural_Systems_Group/gary/python.html" return print "Thrash test" times = [] # create a hashcash token generator object hc = HashCash( chunksize=chunksize, numchunks=numchunks, quality=quality ) # 256 times, create a random string and a matching hashcash token for i in range(numIterations): value = _randomString() # measure time for a single token generation then = time.time() tok = hc.generate(value) now = time.time() times.append(now - then) # sanity check, make sure it's valid result = hc.verify(value, tok) if not result: print "Verify failed, token length=%s" % len(tok) return print "Generated %s of %s tokens" % (i, numIterations) print "---------------------------------" print "Thrash test performance results" print "Token quality: %s bits" % quality print "Min=%.3f max=%.3f max/min=%.3f mean=%.3f, median=%.3f, stdev=%.3f" % ( min(times), max(times), max(times)/min(times), stats.lmean(times), stats.lmedian(times), stats.lstdev(times) ) def _checkBase64(self, item): """ Ensures the item correctly encodes then decodes to/from base64 """ #if self.verbose: # print "Checking candidate token" enc = self._enc64(item) if len(enc) != self.b64ChunkLen: if self.verbosity >= 1: print "Bad candidate token" return False return self._dec64(enc) == item def _enc64(self, item): """ Base64-encode a string, remove padding """ enc = base64.encodestring(item).strip() while enc[-1] == '=': enc = enc[:-1] return enc def _dec64(self, item): """ Base64-decode a string """ dec = base64.decodestring(item+"====") return dec def _randomString(): """ For our tests below. Generates a random-length human-readable random string, between 16 and 80 chars """ chars = [] slen = randint(16, 80) for i in range(slen): chars.append(chr(randint(32, 128))) value = "".join(chars) return value # override these at your pleasure chunksize = defaultChunkSize numchunks = defaultNumChunks quality = defaultQuality format = defaultFormat verbosity = defaultVerbosity def dump(self, detailed=0): """ Outputs a list of nodes and their connections """ if detailed: self.dumplong() return for node in self.nodes: print node.name + ":" print " " + ", ".join([self.getPeerName(peer) for peer in node.peers]) def getPeerName(self, peer): for n in self.nodes: if n.dest == peer.dest: return n.name return "<??%s>" % n.dest[:8] def whohas(self, key): print "Nodes having key %s:" % key h = KHash(key) def hcmp(n1, n2): res = cmp(h.rawdistance(n1.id), h.rawdistance(n2.id)) #print "compared: %s %s = %s" % (n1.idx, n2.idx, res) return res i = 0 nodes = self.nodes[:] nodes.sort(hcmp) for node in nodes: if node.storage.getKey(key) != None: i += 1 print "%3s" % node.idx, if i % 16 == 0: print def whocanfind(self, key): """ Produces a list of nodes which can find key 'key' """ successes = [] failures = [] print "Nodes which can find key %s" % key for i in range(len(self.nodes)): node = self.nodes[i] if node.get(key) != None: print " %s found it" % node.name successes.append(i) else: print " %s failed" % node.name failures.append(i) print "Successful finds: %s" % repr(successes) print "Failed finds: %s" % repr(failures) def closestto(self, key): """ Outputs a list of node names, in order of increasing 'distance' from key """ key = KHash(key) def nodecmp(node1, node2): #self.log(3, "comparing node %s with %s" % (node1, node2)) return cmp(node1.id.rawdistance(key), node2.id.rawdistance(key)) nodes = self.nodes[:] nodes.sort(nodecmp) print "Nodes, in order of increasing distance from '%s'" % key i = 0 for node in nodes: i += 1 print "%3s" % node.idx, if i % 16 == 0: print def findnode(self, idx, key): """ does a findnode on peer 'idx' against key 'key' """ peers = self.nodes[idx]._findnode(key) print "Findnode on node %s (%s) returned:" % (self.nodes[idx].name, key) peers = [self.getPeerIdx(p) for p in peers] i = 0 for p in peers: print "%3d" % p, i += 1 if i % 16 == 0: print def dumpids(self): print "Nodes listed by name and id" for i in range(len(self.nodes)): node = self.nodes[i] print "%s: %s (%s...)" % (i, node.name, node.id.asHex()[:10]) def dumplong(self): """ Outputs a list of nodes and their connections """ for node in self.nodes: print "%s: id=%s dest=%s" % (node.name, node.id.asHex()[:8], node.dest[:8]) for peer in node.peers: npeer = self.getPeer(peer) print " %s: id=%s dest=%s" % (npeer.name, npeer.id.asHex()[:8], npeer.dest[:8]) def getPeer(self, peer): for n in self.nodes: if n.dest == peer.dest: return n return None def logexc(verbosity, msg, nPrev=0, clsname=None): fd = StringIO("%s\n" % msg) traceback.print_exc(file=fd) log(verbosity, fd.getvalue(), nPrev, clsname) class KTestSocket(stasher.KBase): """ Emulates an I2P Socket for testing """ # class-global mapping of b64 dests to sockets opensocks = {} totalQueuedItems = 0 def __init__(self, sessname, *args, **kw): self.log(4, "creating simulated i2p socket %s" % sessname) # that'll do for pseudo-random dests self.dest = stasher.shahash(sessname) + "0" * 256 # set up our inbound queue self.queue = Queue.Queue() # register ourself self.opensocks[self.dest] = self def __del__(self): # deregister ourself del self.opensocks[self.dest] def sendto(self, data, flags, dest): self.opensocks[dest].queue.put((data, self.dest)) KTestSocket.totalQueuedItems += 1 def recvfrom(self, *args): KTestSocket.totalQueuedItems -= 1 return self.queue.get() def select(inlist, outlist, errlist, timeout=0): log = stasher.log log(5, "fake select called") deadline = time.time() + timeout while (time.time() < deadline) and KTestSocket.totalQueuedItems == 0: time.sleep(0.1) if KTestSocket.totalQueuedItems == 0: return [], [], [] socksWithData = [] for sock in inlist: if not sock.queue.empty(): socksWithData.append(sock) log(5, "fake select returning %s" % repr(socksWithData)) return socksWithData, [], [] select = staticmethod(select) def setblocking(self, val): self.blocking = val class KBase: """ A mixin which adds a class-specific logger """ def log(self, verbosity, msg): log(verbosity, msg, 1, self.__class__.__name__) def logexc(self, verbosity, msg): logexc(verbosity, msg, 1, self.__class__.__name__) def debug(): """ Alternative testing entry point which runs the test() function in background, and the engine in foreground, allowing the engine to be stepped through with a debugger """ global desperatelyDebugging desperatelyDebugging = True core.run() def on_reply(self, peer, msgId, **details): """ Callback for FIND_NODE reply """ # shorthand peerRecs = self.peerRecs # who replied? try: peerRec = peerRecs[peer] except: traceback.print_exc() self.log(3, "discarding findNode reply from unknown peer %s %s, discarding" % ( peer, details)) return if logVerbosity >= 3: try: dests = "\n".join([d[:6] for d in details['nodes']]) except: logexc(4, "*** find-node rpc reply = %s" % details) self.log(3, "got findNode reply from %s:\n%s" % (peer, details)) self.unbindPeerReply(peer, msgId) # save ref to this peer, it's seemingly good self.localNode.addref(peerRec.peer) # mark it as having replied peerRec.state = 'replied' # one less query to wait for self.numQueriesPending -= 1 # save these as 'fromReply' peers peersReturned = details.get('nodes', []) peersReturned = [self.localNode._normalisePeer(p) for p in peersReturned] peerRecsReturned = peerRecs.newtable(peersReturned, 'fromReply') peerRecsReturned.sort() peerRecsReturned.purge(lambda p:p in peerRecs or p.peer.dest == self.localNode.dest) # update our node's KBucket for peerObj in peersReturned: dist = self.localNode.id.distance(peerObj.id) self.localNode.buckets[dist].justSeenPeer(peerObj) self.log(5, "peerRecsReturned = %s" % repr(peerRecsReturned)) if len(peerRecsReturned) > 0: peerRecs.extend(peerRecsReturned, 'fromReply') if desperatelyDebugging: print "TRACING???" set_trace() # are there any peers we're still waiting to hear from? if self.numQueriesPending == 0 and len(peerRecs.select(('idle', 'fromQuery'))) == 0: # query round is finished - see how good the results are repliedPeers = peerRecs.select('replied') self.log(3, "====== query round finished, got %s" % repr(repliedPeers)) # if this round returned any peers better than the ones we've already # heard from, then launch another round of queries candidates = peerRecs.select('candidate') ncandidates = len(candidates) # test all returned peers, and see if one or more is nearer than # our candidates closerPeers = [] gotNearer = 0 for rec in peerRecs.select('fromReply'): if ncandidates == 0 or rec.isCloserThanOneOf(candidates): self.log(3, "Got a closer peer (or no candiates yet)") gotNearer = 1 if gotNearer: # mark replied peers as candidates for rec in peerRecs.select('replied'): rec.state = 'candidate' pass else: # no - all queries are exhausted - it's the end of this round self.log(3, "Query round returned no closer peers") self.returnTheBestWeGot() self.sendSomeQueries() def count(self, *args): """ returns the number of records whose state is one of args """ count = 0 for rec in self.peers: if rec.state in args: count += 1 return count def changeState(self, oldstate, newstate): """ for all recs of state 'oldstate', change their state to 'newstate' """ for p in self.peers: if p.state == oldstate: p.state = newstate def checkEndOfRound(self): """ Checks if we've hit the end of a query round. If so, and if either: - we've got some closer peers, OR - we've heard from less than maxBucketSize peers, fire off more queries Otherwise, return the best available """ peerTab = self.peerTab if core.fg: set_trace() # has this query round ended? if peerTab.count('start', 'queried') > 0: # not yet return self.log(2, "********** query round ended") # ------------------------------------ # end of round processing self.numRounds += 1 # did we get any closer to required hash? if self.type == 'findData' \ or self.gotAnyCloser() \ or peerTab.count('closest') < maxBucketSize: # yes - save these query results self.log(4, "starting another round") peerTab.changeState('replied', 'closest') peerTab.changeState('recommended', 'start') # cull the shortlist self.log(2, "culling to k peers") if peerTab.count('closest') > maxBucketSize: peerTab.sort() excess = peerTab.select('closest')[maxBucketSize:] excess.changeState('closest', 'toofar') pass # and start up another round self.sendSomeQueries() # did anything launch? if peerTab.count('start', 'queried') == 0: # no - we're screwed self.returnTheBestWeGot() # done for now return def gotAnyCloser(self): """ Tests if any peer records in state 'recommended' or 'replied' are nearer than the records in state 'closest' """ peerTab = self.peerTab # get current closest peers closest = peerTab.select('closest') # if none yet, then this was just end of first round if len(closest) == 0: return True # get the peers we're considering #candidates = peerTab.select(('recommended', 'replied')) candidates = peerTab.select('recommended') # now test them gotOneCloser = False for c in candidates: #if c.isCloserThanOneOf(closest): if c.isCloserThanAllOf(closest): return True # none were closer return False def doput(): n[0].put('roses', 'red') def dump(self): c = self.count self.log(2, "PeerQueryTable stats:\n" "start: %s\n" "recommended: %s\n" "queried: %s\n" "replied: %s\n" "timeout: %s\n" "closest: %s\n" "toofar: %s\n" "TOTAL: %s\n" % ( c('start'), c('recommended'), c('queried'), c('replied'), c('timeout'), c('closest'), c('toofar'), len(self.peers))) #states = [p.state for p in self.peers] #self.log(3, "PeerQueryTable states:\n%s" % states) class KTestMap(stasher.KBase): """ Creates a random set of interconnections between nodes in a test network """ path = "testnet.topology-%s" def __init__(self, numnodes): path = self.path % numnodes if os.path.isfile(path): self.load(numnodes) return self.log(2, "Creating new test topology of %s nodes" % numnodes) self.refs = {} # key is nodenum, val is list of ref nodenums self.numnodes = numnodes i = 0 for i in range(1, numnodes): # get a random number not equal to i while 1: ref = random.randrange(0, i) if ref != i: break # add it self.refs[i] = ref # now point node 0 somewhere self.refs[0] = random.randrange(1, i) # and save it self.save() def __getitem__(self, idx): """ Returns the ref num for node index idx """ return self.refs[idx] def dump(self): nodes = self.refs.keys() nodes.sort() for n in nodes: print ("%2s -> %2s" % (n, self.refs[n])), if (n + 1) % 8 == 0: print else: print "|", def load(self, numnodes): path = self.path % numnodes encoded = file(path, "rb").read() decoded = pickle.loads(encoded) self.numnodes, self.refs = decoded self.log(2, "Restored existing topology of %s nodes" % numnodes) def save(self): path = self.path % self.numnodes encoded = pickle.dumps((self.numnodes, self.refs)) file(path, "wb").write(encoded) def connect(self, topology=None): """ Connect these nodes together """ if topology: self.map = topology else: self.map = KTestMap(len(self.nodes)) nodeIdxs = self.map.refs.keys() nodeIdxs.sort() for idx in nodeIdxs: #print "Node %s, adding ref to node %s" % (idx, self.map[idx]) self[idx].addref(self[self.map[idx]]) def purge(self): os.system("rm -rf ~/.i2pkademlia") if __name__ == '__main__': main() def __del__(self): pass #KTestNetwork.aNetworkExists = False def cycle(self): self.fg = True self.threadRxPackets() def findpath(self, i0, i1): """ Tries to find a path from idx0 to idx1, printing out the nodes along the way """ def _findpath(self, idx0, idx1, tried): #print "seeking path from %s to %s" % (idx0, idx1) n0 = self.nodes[idx0] n1 = self.nodes[idx1] n0peers = [self.getPeer(p) for p in n0.peers] n0peerIdxs = [self.nodes.index(p) for p in n0peers] possibles = [] for idx in n0peerIdxs: if idx == idx1: # success return [idx1] if idx not in tried: possibles.append(idx) if possibles == []: return None #print " possibles = %s" % repr(possibles) for idx in possibles: tried.append(idx) res = _findpath(self, idx, idx1, tried) if isinstance(res, list): res.insert(0, idx) return res return None res = _findpath(self, i0, i1, [i0]) if isinstance(res, list): res.insert(0, i0) return res def testconnectivity(self): """ Ensures that every peer can reach every other peer """ nNodes = len(self.nodes) for i in range(nNodes): for j in range(nNodes): if i != j and not self.findpath(i, j): print "No route from node %s to node %s" % (i, j) return False print "Every node can reach every other node" return True def getPeerIdx(self, peer): for i in range(len(self.nodes)): n = self.nodes[i] if n.dest == peer.dest: return i return None def rawdistance(self, other): """ calculates the 'distance' between this hash and another hash, and returns it raw as this xor other """ return self.value ^ other.value def reportStats(self): """ Logs a stat dump of query outcome """ if self.isLocalOnly: return self.log(2, "query terminated after %s rounds, %s queries, %s replies, %s recommendations" % ( (self.numRounds+1), self.numQueriesSent, (self.numReplies+1), self.numPeersRecommended ) ) @first #! /usr/bin/env python """ Implements a simulated kademlia test network """ @others @others if __name__ == '__main__': print "starting test" pass test() import sys, os, Queue, pickle, time from pdb import set_trace import stasher class KCore(stasher.KCore): """ Override kademlia core to use simulated I2P sockets """ def select(self, inlist, outlist, errlist, timeout): #print "dummy select" return KTestSocket.select(inlist, outlist, errlist, timeout) class KNode(stasher.KNode): """ Override kademlia node class to use simulated test socket """ SocketFactory = KTestSocket # number of nodes to build in test network numTestNodes = 100 stasher.logToSocket = 19199 SocketFactory = None # defaults to I2P socket def select(self, inlist, outlist, errlist, timeout): return i2p.select.select(inlist, outlist, errlist, timeout) def main(): """ Command line interface """ global samAddr, clientAddr, logVerbosity, dataDir argv = sys.argv argc = len(argv) try: opts, args = getopt.getopt(sys.argv[1:], "h?vV:S:C:sd:fl", ['help', 'version', 'samaddr=', 'clientaddr=', 'verbosity=', 'status', 'datadir=', 'foreground', 'shortversion', 'localonly', ]) except: traceback.print_exc(file=sys.stdout) usage("You entered an invalid option") daemonise = True verbosity = 2 debug = False foreground = False localOnly = False for opt, val in opts: if opt in ['-h', '-?', '--help']: usage(True) elif opt in ['-v', '--version']: print "Stasher version %s" % version sys.exit(0) elif opt in ['-V', '--verbosity']: logVerbosity = int(val) elif opt in ['-f', '--foreground']: foreground = True elif opt in ['-S', '--samaddr']: samAddr = val elif opt in ['-C', '--clientaddr']: clientAddr = val elif opt in ['-s', '--status']: dumpStatus() elif opt in ['-d', '--datadir']: dataDir = val elif opt == '--shortversion': sys.stdout.write("%s" % version) sys.stdout.flush() sys.exit(0) elif opt in ['-l', '--localonly']: localOnly = True #print "Debug - bailing" #print repr(opts) #print repr(args) #sys.exit(0) # Barf if no command given if len(args) == 0: err("No command given") usage(0, 1) cmd = args.pop(0) argc = len(args) #print "cmd=%s, args=%s" % (repr(cmd), repr(args)) if cmd not in ['help', '_start', 'start', 'stop', 'hello', 'get', 'put', 'addref', 'getref', 'pingall']: err("Illegal command '%s'" % cmd) usage(0, 1) if cmd == 'help': usage() # dirty hack if foreground and cmd == 'start': cmd = '_start' # magic undocumented command name - starts node, launches its client server, # this should only happen if we're spawned from a 'start' command if cmd == '_start': if argc not in [0, 1]: err("start: bad argument count") usage() if argc == 0: nodeName = defaultNodename else: nodeName = args[0] # create and serve a node #set_trace() node = KNode(nodeName) node.start() log(3, "Node %s launched, dest = %s" % (node.name, node.dest)) node.serve() sys.exit(0) if cmd == 'start': if argc not in [0, 1]: err("start: bad argument count") usage() if argc == 0: nodeName = defaultNodename else: nodeName = args[0] pidFile = nodePidfile(nodeName) if os.path.exists(pidFile): err(("Stasher node '%s' seems to be already running. If you are\n" % nodeName) +"absolutely sure it's not running, please remove its pidfile:\n" +pidFile+"\n") sys.exit(1) # spawn off a node import stasher pid = spawnproc(sys.argv[0], "-S", samAddr, "-C", clientAddr, "_start", nodeName) file(pidFile, "wb").write("%s" % pid) print "Launched stasher node as pid %s" % pid print "Pidfile is %s" % pidFile sys.exit(0) if cmd == 'stop': if argc not in [0, 1]: err("stop: bad argument count") usage() if argc == 0: nodeName = defaultNodename else: nodename = args[0] pidFile = nodePidfile(nodeName) if not os.path.isfile(pidFile): err("Stasher node '%s' is not running - cannot kill\n" % nodeName) sys.exit(1) pid = int(file(pidFile, "rb").read()) try: killproc(pid) print "Killed stasher node (pid %s)" % pid except: print "Failed to kill node (pid %s)" % pid os.unlink(pidFile) sys.exit(0) try: client = KNodeClient() except: traceback.print_exc() err("Node doesn't seem to be up, or reachable on %s" % clientAddr) return if cmd == 'hello': err("Node seems fine") sys.exit(0) elif cmd == 'get': if argc not in [1, 2]: err("get: bad argument count") usage() key = args[0] if argc == 2: # try to open output file path = args[1] try: outfile = file(path, "wb") except: err("Cannot open output file %s" % repr(path)) usage(0, 1) else: outfile = sys.stdout if logVerbosity >= 3: sys.stderr.write("Searching for key - may take up to %s seconds or more\n" % ( timeout['findData'])) res = client.get(key, local=localOnly) if res == None: err("Failed to retrieve '%s'" % key) sys.exit(1) else: outfile.write(res) outfile.flush() outfile.close() sys.exit(0) elif cmd == 'put': if argc not in [1, 2]: err("put: bad argument count") usage() key = args[0] if argc == 2: # try to open input file path = args[1] try: infile = file(path, "rb") except: err("Cannot open input file %s" % repr(path)) usage(0, 1) else: infile = sys.stdin val = infile.read() if len(val) > maxValueSize: err("File is too big - please trim to %s" % maxValueSize) if logVerbosity >= 3: sys.stderr.write("Inserting key - may take up to %s seconds\n" % ( timeout['findNode'] + timeout['store'])) res = client.put(key, val, local=localOnly) if res == None: err("Failed to insert '%s'" % key) sys.exit(1) else: sys.exit(0) elif cmd == 'addref': if argc not in [0, 1]: err("addref: bad argument count") usage() if argc == 1: # try to open input file path = args[0] try: infile = file(path, "rb") except: err("Cannot open input file %s" % repr(path)) usage(0, 1) else: infile = sys.stdin ref = infile.read() res = client.addref(ref) if res == None: err("Failed to add ref") sys.exit(1) else: sys.exit(0) elif cmd == 'getref': if argc not in [0, 1]: err("getref: bad argument count") usage() res = client.getref() if argc == 1: # try to open output file path = args[0] try: outfile = file(path, "wb") except: err("Cannot open output file %s" % repr(path)) usage(0, 1) else: outfile = sys.stdout if res == None: err("Failed to retrieve node ref") sys.exit(1) else: outfile.write(res) outfile.flush() outfile.close() sys.exit(0) elif cmd == 'pingall': if logVerbosity > 2: print "Pinging all peers, waiting %s seconds for results" % timeout['ping'] res = client.pingall() print res sys.exit(0) def test1(): """ A torturous test """ def usage(detailed=False, ret=0): print "Usage: %s <options> [<command> [<ars>...]]" % sys.argv[0] if not detailed: print "Type %s -h for help" % sys.argv[0] sys.exit(ret) print "This is stasher, distributed file storage network that runs" print "atop the anonymising I2P network (http://www.i2p.net)" print "Written by aum - August 2004" print print "Options:" print " -h, --help - display this help" print " -v, --version - print program version" print " -V, --verbosity=n - verbosity, default 1, 1=quiet ... 4=noisy" print " -S, --samaddr=host:port - host:port of I2P SAM port, " print " default %s" % i2p.socket.samaddr print " -C, --clientaddr=host:port - host:port for socket interface to listen on" print " for clients, default %s" % clientAddr print " -d, --datadir=dir - directory in which stasher files get written" print " default is ~/.stasher" print " -f, --foreground - only valid for 'start' cmd - runs the node" print " in foreground without spawning - for debugging" print " -l, --localonly - only valid for get/put - restricts the get/put" print " operation to the local node only" print print "Commands:" print " start [<nodename>]" print " - launches a single node, which forks off and runs in background" print " nodename is a short unique nodename, default is '%s'" % defaultNodename print " stop [<nodename>]" print " - terminates running node <nodename>" print " get <keyname> [<file>]" print " - attempts to retrieve key <keyname> from the network, saving" print " to file <file> if given, or to stdout if not" print " put <keyname> [<file>]" print " - inserts key <keyname> into the network, taking its content" print " from file <file> if given, otherwise reads content from stdin" print " addref <file>" print " - adds a new noderef to the node, taking the base64 noderef" print " from file <file> if given, or from stdin" print " (if you don't have any refs, visit http://stasher.i2p, or use" print " the dest in the file aum.stasher in cvs)" print " getref <file>" print " - uplifts the running node's dest as base64, writing it to file" print " <file> if given, or to stdout" print " hello" print " - checks that local node is running" print " pingall" print " - diagnostic tool - pings all peers, waits for replies or timeouts," print " reports results" print " help" print " - display this help" print sys.exit(0) class KNodeServer(KBase, SocketServer.ThreadingMixIn, SocketServer.TCPServer): """ Listens for incoming socket connections """ @others class KNodeReqHandler(KBase, SocketServer.StreamRequestHandler): """ Manages a single client connection """ @others def handle(self): """ Conducts all conversation for a single req """ req = self.request client = self.client_address server = self.server node = self.server.node read = self.rfile.read readline = self.rfile.readline write = self.wfile.write flush = self.wfile.flush finish = self.finish # start with a greeting write("Stasher version %s ready\n" % version) # get the command line = readline().strip() try: cmd, args = re.split("\\s+", line, 1) except: cmd = line args = '' self.log(3, "cmd=%s args=%s" % (repr(cmd), repr(args))) if cmd in ["get", "getlocal"]: isLocal = cmd == "getlocal" value = node.get(args, local=isLocal) if value == None: write("notfound\n") else: write("ok\n%s\n%s" % (len(value), value)) flush() time.sleep(2) finish() return elif cmd in ["put", "putlocal"]: isLocal = cmd == "putlocal" try: size = int(readline()) value = read(size) res = node.put(args, value, local=isLocal) if res: write("ok\n") else: write("failed\n") flush() except: traceback.print_exc() write("exception\n") finish() return elif cmd == 'addref': try: res = node.addref(args, True) if res: write("ok\n") else: write("failed\n") flush() except: traceback.print_exc() write("exception\n") finish() return elif cmd == 'getref': res = node.dest write("ok\n") write("%s\n" % res) flush() time.sleep(1) finish() return elif cmd == 'pingall': res = node._pingall() write(res+"\n") finish() return elif cmd == "die": server.isRunning = False write("server terminated\n") finish() else: write("unrecognisedcommand\n") finish() return def __init__(self, node, addr=None): if addr == None: addr = clientAddr self.isRunning = True self.node = node listenHost, listenPort = addr.split(":") listenPort = int(listenPort) self.listenPort = listenPort SocketServer.TCPServer.__init__(self, (listenHost, listenPort), KNodeReqHandler) def serve(self): """ makes this node listen on socket for incoming client connections, and services these connections """ server = KNodeServer(self) server.serve_forever() def serve_forever(self): print "awaiting client connections on port %s" % self.listenPort while self.isRunning: self.handle_request() class KNodeClient(KBase): """ Talks to a KNodeServer over a socket Subclass this to implement Stasher clients in Python """ @others def __init__(self, address=clientAddr): if type(address) in [type(()), type([])]: self.host, self.port = clientAddr else: self.host, self.port = clientAddr.split(":") self.port = int(self.port) self.hello() def connect(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self.rfile = self.sock.makefile("rb") self.read = self.rfile.read self.readline = self.rfile.readline self.wfile = self.sock.makefile("wb") self.write = self.wfile.write self.flush = self.wfile.flush # read greeting greeting = self.readline() parts = re.split("\\s+", greeting) if parts[0] != "Stasher": self.close() raise Exception("Not connected to valid stasher interface") def get(self, key, **kw): """ sends a get command to stasher socket, and retrieves and interprets result Arguments: - key - key to retrieve Keywords: - local - default False - if True, only looks in local storage Returns key's value if found, or None if key not found """ if kw.get('local', False): cmd = 'getlocal' else: cmd = 'get' self.connect() self.write("%s %s\n" % (cmd, key)) self.flush() #print "waiting for resp line" res = self.readline().strip() if res == "ok": size = int(self.readline()) val = self.read(size) self.close() return val else: self.close() return None def close(self): self.rfile.close() #self.wfile.close() self.sock.close() def hello(self): self.connect() self.close() def put(self, key, val, **kw): """ Tells remote stasher port to insert a file into the network Arguments: - key - key to insert under - val - value to insert under this key Keywords: - local - default False - if True, only looks in local storage """ if kw.get('local', False): cmd = 'putlocal' else: cmd = 'put' self.connect() self.write("%s %s\n" % (cmd, key)) self.write("%s\n" % len(val)) self.write(val) self.flush() res = self.readline().strip() self.close() if res == "ok": return True else: print repr(res) return False def __getitem__(self, item): return self.get(item) def __setitem__(self, item, val): if not self.put(item, val): raise Exception("Failed to insert") def kill(self): """ Tells remote server to fall on its sword """ try: while 1: self.connect() self.write("die\n") self.flush() self.close() except: pass def finish(self): SocketServer.StreamRequestHandler.finish(self) def err(msg): sys.stderr.write(msg+"\n") def addref(self, ref): """ Passes a new noderef to node """ self.connect() self.write("addref %s\n" % ref) self.flush() res = self.readline().strip() self.close() if res == "ok": return True else: print repr(res) return False def userI2PDir(nodeName=None): """ Returns a directory under user's home dir into which stasher files can be written If nodename is given, a subdirectory will be found/created Return value is toplevel storage dir if nodename not given, otherwise absolute path including node dir """ if dataDir != None: if not os.path.isdir(dataDir): os.makedirs(dataDir) return dataDir if sys.platform == 'win32': home = os.getenv("APPDATA") if home: topDir = os.path.join(home, "stasher") else: topDir = os.path.join(os.getcwd(), "stasher") else: #return os.path.dirname(__file__) topDir = os.path.join(os.path.expanduser('~'), ".stasher") if not os.path.isdir(topDir): os.makedirs(topDir) if nodeName == None: return topDir else: nodeDir = os.path.join(topDir, nodeName) if not os.path.isdir(nodeDir): os.makedirs(nodeDir) return nodeDir def nodePidfile(nodename): return os.path.join(userI2PDir(nodename), "node.pid") def spawnproc(*args, **kw): """ Spawns a process and returns its PID VOMIT! I have to do a pile of odious for the win32 side Returns a usable PID Keywords: - priority - priority at which to spawn - default 20 (highest) """ # get priority, convert to a unix 'nice' value priority = 20 - kw.get('priority', 20) if sys.platform != 'win32': # *nix - easy #print "spawnproc: launching %s" % repr(args) # insert nice invocation args = ['/usr/bin/nice', '-n', str(priority)] + list(args) return os.spawnv(os.P_NOWAIT, args[0], args) else: # just close your eyes here and pretend this abomination isn't happening! :(( args = list(args) args.insert(0, sys.executable) cmd = " ".join(args) #print "spawnproc: launching %s" % repr(cmd) if 0: try: c = _winreg.ConnectRegistry(None, _winreg.HKEY_LOCAL_MACHINE) c1 = _winreg.OpenKey(c, "SOFTWARE") c2 = _winreg.OpenKey(c1, "Microsoft") c3 = _winreg.OpenKey(c2, "Windows NT") c4 = _winreg.OpenKey(c3, "CurrentVersion") supportsBelowNormalPriority = 1 except: supportsBelowNormalPriority = 0 else: if sys.getwindowsversion()[3] != 2: supportsBelowNormalPriority = 0 else: supportsBelowNormalPriority = 1 # frig the priority into a windows value if supportsBelowNormalPriority: if priority < 7: pri = win32process.IDLE_PRIORITY_CLASS elif priority < 14: pri = 0x4000 else: pri = win32process.NORMAL_PRIORITY_CLASS else: if priority < 11: pri = win32process.IDLE_PRIORITY_CLASS else: pri = win32process.NORMAL_PRIORITY_CLASS print "spawnproc: launching %s" % repr(args) si = win32process.STARTUPINFO() hdl = win32process.CreateProcess( sys.executable, # lpApplicationName cmd, # lpCommandLine None, # lpProcessAttributes None, # lpThreadAttributes 0, # bInheritHandles 0, # dwCreationFlags None, # lpEnvironment None, # lpCurrentDirectory si, # lpStartupInfo ) pid = hdl[2] #print "spawnproc: pid=%s" % pid return pid def killproc(pid): if sys.platform == 'win32': print repr(pid) handle = win32api.OpenProcess(1, 0, pid) print "pid %s -> %s" % (pid, repr(handle)) #return (0 != win32api.TerminateProcess(handle, 0)) win32process.TerminateProcess(handle, 0) else: return os.kill(pid, signal.SIGKILL) def i2psocket(self, *args, **kw): return i2p.socket.socket(*args, **kw) def getref(self): """ Uplifts node's own ref """ self.connect() self.write("getref\n") self.flush() res = self.readline().strip() if res == "ok": ref = self.readline().strip() self.close() return ref else: self.close() return "failed" @first #! /usr/bin/env python @others @first #! /bin/sh export I2PPYDIR=/main/i2p/cvs/i2p/apps/stasher/python export WEBDIR=/main/i2p/services/stasher.i2p export CVSDIR=/main/i2p/cvs/i2p/apps/stasher/python cp README.txt $I2PPYDIR cp stasher.py $I2PPYDIR/src cp bencode.py $I2PPYDIR/src cp code.leo $I2PPYDIR/src cp *.stasher $I2PPYDIR/noderefs cp *.stasher $WEBDIR/noderefs cp stasher $I2PPYDIR/scripts cp stasher-launch.py $I2PPYDIR/scripts/stasher.py cp setup-stasher.py $I2PPYDIR/setup.py # generate API dco epydoc -n "Stasher Python API" -o api stasher.py # make a release tarball rm -rf release/* export STVERSION=`./stasher.py --shortversion` export TARDIR=release/stasher-$STVERSION export DIRNAME=stasher-$STVERSION export TARBALLNAME=stasher.tar.gz mkdir $TARDIR cp -a /main/i2p/cvs/i2p/apps/sam/python/i2p $TARDIR cp -a code.leo stasher stasher.py bencode.py api $TARDIR cp README-tarball.txt $TARDIR/README.txt mkdir $TARDIR/noderefs cp *.stasher $TARDIR/noderefs cd release tar cfz $TARBALLNAME $DIRNAME # copy tarball and doco to websites cp $TARBALLNAME $WEBDIR cd .. cp -a api $WEBDIR # last but not least, commit to cvs cp stasher.py $CVSDIR/src cp *.stasher $CVSDIR/noderefs cd $CVSDIR cvs commit @others @first #! /usr/bin/env python # wrapper script to run stasher node # set this to the directory where you've installed stasher stasherDir = "/path/to/my/stasher/dir" import sys sys.path.append(stasherDir) import stasher stasher.main() @first #! /bin/sh rm -rf /tmp/node1 stasher -V4 -Slocalhost:7656 -Clocalhost:7659 -d/tmp/node1 _start node1 @first #! /bin/sh rm -rf /tmp/node2 stasher -V4 -Slocalhost:17656 -Clocalhost:17659 -d/tmp/node2 _start node2 @first #! /usr/bin/env python """ This is the installation script for Stasher, a distributed file storage framework for I2P. """ import sys, os from distutils.core import setup oldcwd = os.getcwd() os.chdir("src") if sys.platform == 'win32': stasherScript = "..\\scripts\\stasher.py" else: stasherScript = "../scripts/stasher" try: import i2p import i2p.socket import i2p.select except: print "Sorry, but you don't seem to have the core I2P" print "python library modules installed." print "If you're installing from cvs, please go to" print "i2p/apps/sam/python, become root, and type:" print " python setup.py install" print "Then, retry this installation." sys.exit(1) setup(name="Stasher", version="0.0", description="Kademlia-based P2P distributed file storage app for I2P", author="aum", author_email="aum_i2p@hotmail.com", url="http://stasher.i2p", py_modules = ['stasher', 'bencode'], scripts = [stasherScript], ) @nocolor STASHER README @others @nocolor STASHER README @others ----------------------- INSTALLING STASHER Prerequisite: Before you can install/run Stasher, you will first need to have installed the I2P Python modules - available in cvs at i2p/apps/sam/python. To install stasher, just make sure you've got the latest cvs, then type python setup.py install as root. This installs the stasher engine, plus a wrapper client script called 'stasher', which setup.py will install into your execution path. If you don't like the thought of becoming root, you could just put stasher.py on your execution path, and/or create a symlink called 'stasher'. Test your installation by typing 'stasher -h' - this should display a help message. ------------------------ DOZE USERS PLEASE NOTE You'll need to watch and see where the stasher.py wrapper script gets installed. On my box, it ends up on d:\python23\scripts, but on your box it'll likely go into c:\python23\scripts. You may either update your system PATH environment variable to include your python scripts directory, OR, you can copy stasher.py to anywhere that's on your path. In the explanations below, note that wherever I say to type 'stasher', you'll need to type 'stasher.py' instead. ------------------------ WARNING This is a very early pre-alpha test version of stasher. It is only capable of storing or retrieving files of less than 29k in size. Also, files are totally insecure - anyone can overwrite any keys you insert, and vice versa. I'll be adding support for CHK-like and SSK-like keys in due course. ------------------------ USING STASHER To see stasher's options, type: stasher -h This should dump out a verbose help message. To start a stasher node, type: stasher start To shut down a stasher node, type: stasher stop To insert a file into stasher, type: stasher put mykey myfile Note, if you don't supply a filename, stasher can read the file from standard input. To retrieve a file from stasher, type: stasher get mykey INSTALLING STASHER FROM THE TARBALL For regular users: 1. Crack this tarball, and copy the 'stasher-n.n.n' directory somewhere safe, such as your home directory 2. Edit the small 'stasher' script therein, and set the variable 'stasherDir' as directed 3. Either put this directory onto your PATH, or create a symlink called 'stasher' within any of your PATH dirs, pointing to the stasher script 3. Test your installation by typing: stasher -v For windows users: 1. Make sure you have python2.3 or later installed 2. Untar this directory, and copy the directory into C:\Program Files, or wherever you like to put your appz 3. Wherever you put the directory, add that to your system-wide PATH environment variable 4. Test your installation by opening up an ugly black DOS window, and typing: stasher.py -v 5. Note - in the USAGE directions below, instead of typing 'stasher', you'll need to type 'stasher.py' class KRpcPingAll(KRpc): """ Pings all peers """ @others type = 'pingall' def __init__(self, localNode, client=None, **kw): """ Creates and launches a PINGALL rpc Arguments: - localNode - the node performing this RPC - client - see KRpc.__init__ Keywords: none """ if kw.has_key('cbArgs'): KRpc.__init__(self, localNode, client, cbArgs=kw['cbArgs']) else: KRpc.__init__(self, localNode, client) def start(self): """ Kicks off this RPC """ # launch a findNode rpc against each of our peers peers = self.localNode.peers self.numSent = self.numPending = len(peers) self.numReplied = self.numFailed = 0 for peer in peers: KRpcPing(self.localNode, self.on_reply, peer=peer) return def returnValue(self, result): """ an override with a nicer call sig """ # a hack for testing - save this RPC object into the node # so we can introspect it self.localNode.lastrpc = self try: KRpc.returnValue(self, result, status=result) except: traceback.print_exc() self.log(3, "Failed to return %s" % repr(result)) KRpc.returnValue(self, 0, status=0) def on_reply(self, result): """ callback which fires when we get a reply from a STORE we sent to a peer """ log(3, "got %s" % repr(result)) if result: self.numReplied += 1 else: self.numFailed += 1 self.numPending -= 1 if self.numPending <= 0: res = "pinged:%s replied:%s timeout:%s" % ( self.numSent, self.numReplied, self.numFailed) self.log(3, res) self.returnValue(res) def on_tick(self): self.log(3, "this shouldn't have happened") self.returnValue(False) def _pingall(self, callback=None): """ Sends a ping to all peers, returns text string on replies/failures """ if callback: KRpcPingAll(self, callback, **kw) else: return KRpcPingAll(self).execute() def pingall(self): """ Uplifts node's own ref """ self.connect() self.write("pingall\n") self.flush() res = self.readline().strip() self.close() return res def returnValue(self, items): """ override with a nicer call sig """ # a hack for testing - save this RPC object into the node # so we can introspect it self.localNode.lastrpc = self # another debugging hack self.reportStats() KRpc.returnValue(self, items, result=items) def storeSplit(self): """ Gets called if we're splitting a big file into smaller chunks Here, we: - break the file up into chunks - build a manifest - launch store RPCs to store each chunk, where the key is SHA(chunk) - launch a store RPC to store the 'manifest' (noting that if the manifest is too big, it'll get recursively inserted as a splitfile as well """ # break up into chunks chunks = [] hashes = [] size = len(self.value) i = 0 self.nchunks = 0 while i < size: chunks.append(self.value[i:i+maxValueSize]) hashes.append(shahash(chunks[-1])) i += maxValueSize self.nchunks += 1 # build the manifest manifest = "chunks:%s\n%s\n" % (self.nchunks, "\n".join(hashes)) # set progress attributes self.chunkManifestInserted = False self.chunksInserted = 0 # launch nested Store RPCs for manifest, and each chunk KRpcStore(self.localNode, self.on_doneChunkManifest, local=self.isLocalOnly, key=self.key, value=manifest) i = 0 while i < self.nchunks: KRpcStore(self.localNode, self.on_doneChunk, local=self.isLocalOnly, key=hashes[i], value=chunks[i]) i += 1 # now sit back and wait for the callbacks def on_doneChunkManifest(self, result): """ Callback which fires when a manifest insert succeeds/fails """ # the chunk callback handles all self.on_doneChunk(result, isManifest=True) def on_doneChunk(self, result, isManifest=False): """ Callback which fires when a single chunk insert succeeds/fails """ # a failure either way means the whole RPC has failed if not result: # one huge fuck-up self.returnValue(False) return # update our tally if isManifest: self.chunkManifestInserted = True else: self.chunksInserted += 1 # finished? if self.chunkManifestInserted and (self.chunksInserted == self.nchunks): # yep = success self.returnValue(True) def on_gotValue(self, value, hash=None): """ Callback which fires when we get the value stored under a key Value is either the real value, or a splitfile manifest If a real value, just return it. If a splitfile manifest, launch nested findValue RPCs to get each chunk """ nchunks = 0 try: firstline, rest = value.split("\n", 1) firstline = firstline.strip() kwd, str_nchunks = firstline.split(":") if kwd != 'chunks': raise hell nchunks = int(nchunks) value = rest except: pass # in this case, hell hath no fury at all if nchunks == 0: self.returnValue(value) return # now we get to the hard bit - we have to set up nested findData RPCs to # get all the chunks and reassemble them hashes = rest.strip().split("\n") # do sanity checks hashesAllValid = [len(h) == 40 for h in hashes] if len(hashes) != nchunks: self.log( 2, "Splitfile retrieval failure\nmanifest contains %s hashes, should have been %s" % ( len(hashes), nchunks)) self.returnValue(None) if False in hashesAllValid: self.log(2, "Splitfile retrieval failure - one or more invalid hashes") # now this is a bit weird - we need to bind each chunk to its hash, so we create a # class which produces callables which fire our on_gotChunk callback class ChunkNotifier: def __init__(me, h, cb): me.h = h me.cb = cb def __call__(me, val): me.cb(me.h, val) # now launch the chunk retrieval RPCs # result is that for each retrieved chunk, our on_gotChunk callback will # be invoked with the arguments (hash, value), so we can tick them off self.numChunks = nchunks self.numChunksReceived = 0 self.chunkHashes = hashes self.chunks = dict.fromkeys(hashes) for h in hashes: KRpcFindData(self.localNode, h, ChunkNotifier(h, self.on_gotChunk)) # now, we can sit back and receive the chunks def on_gotChunk(self, hexhash, value): """ Callback which fires when a nested chunk findNode returns """ if value == None: self.log(2, "Chunk retrieval failed, fatal to this findData") self.returnValue(None) return # got a value - vet it against hash if shahash(value) != hexhash: self.log(2, "Got a chunk, but it doesn't hash right - fatal to this findData") self.returnValue(None) return # it's valid - stash it self.chunks[hexhash] = value self.numChunksReceived += 1 # have we finished yet? if self.numChunksReceived <= self.numChunks: # no self.log(4, "Received chunk %s of %s" % (self.numChunksReceived, self.numChunks)) return # maybe we have self.log(4, "We appear to have all chunks, checking further") # sanity check if None in self.chunks.values(): self.log(2, "Fatal - reached chunk count, but chunks still missing") self.returnValue(None) return # finally done - got all chunks, hashes are valid, reassemble in order allChunks = [self.chunks[h] for h in self.chunkHashes] reassembled = "".join(allChunks) self.log(4, "Reassembled all %s chunks, SUCCESS" % self.numChunks) self.returnValue(reassembled)