Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Use the ZRE protocol (v3) as a discovery layer to build a CJDNS mesh.
- import traceback
- from time import sleep
- import cjdns
- import uuid as uuid
- import re
- from cjdns import key_utils
- import zmq
- import _thread
- import netifaces as ni
- from pyre import Pyre, zhelper
- import queue
- from queue import Queue
- import json
- import sys
- from os import urandom
- import salsa20
- PASS_EXCHANGE_HEAD = b"pe:"
- KEY_NEGOTIATION_HEAD = b"kn:"
- ENCRYPTED_MESSAGE_HEAD = b"em:"
- Permissive_Encoding = 'cp437'
- debugging = False
- ADDRESSES = 'addresses'
- ADDRESS_LIST = 'ADDRESS_LIST'
- PUBKEY = 'pub_key'
- DISCOVERY = b'DISCOVERY'
- WORLD = b'WORLD'
- SESSION_PERSONAL_SECRET = urandom(32)
- cjdns_config_files = ["C:\\Program Files (x86)\\cjdns\\cjdroute.conf", "/etc/cjdns/cjdroute.conf", "/etc/cjdroute.conf"]
- cjdns_config_object = {}
- def make_to_send(foreign_public_key, local_public_key):
- shared_data = make_shared_data(foreign_public_key, local_public_key)
- to_send_int = shared_data * int(SESSION_PERSONAL_SECRET.hex(), 16)
- to_send_hex = hex(to_send_int)
- send_hex_ = to_send_hex[-64:]
- return bytes(send_hex_, Permissive_Encoding)
- def make_shared_secret(received_hex):
- try:
- secret_int = int(received_hex, 16)*int(SESSION_PERSONAL_SECRET.hex(), 16)
- except ValueError:
- print("bad key negotiation!")
- return None
- hexnum = hex(secret_int)[2:]
- # currently, key negotiation must be broken, so only 31 bytes of the key are useful. manually appending null byte.
- try: # fromhex fails with an odd number of hex digits, catch this.
- return bytes.fromhex(hexnum)[-31:]+bytes(1)
- except ValueError:
- return bytes.fromhex("0"+hexnum)[-31:]+bytes(1)
- def make_shared_data(foreign_public_key, local_public_key):
- int_foreign = cjd_pubkey_asint(foreign_public_key)
- int_local = cjd_pubkey_asint(local_public_key)
- shared_data_int = int_foreign * int_local
- return shared_data_int
- def get_cjd_pubkey(nodestring):
- splitstring = nodestring.split(sep=b".")
- try:
- pkey = splitstring[-2]
- except:
- return None
- return pkey+b".k"
- def cjd_pubkey_asint(pkey):
- try:
- as_int = int(key_utils.base32.decode(pkey[:-2]).hex(), 16)
- return as_int
- except ValueError:
- return 0
- # the message queue contains individual ZRE protocol messages in byte-array form, ready for processing.
- message_queue = Queue()
- # the discovery queue is meant to facilitate the exchange of IP data, and thereafter trigger password negotiation
- discovery_queue = Queue() # (cjdns address, pubkey, ip address list, ZRE node)
- # the chat queue is used for miscellaneous chat messages, and is the channel used for encrypted password negotiation
- chat_queue = Queue() # (cjdns address, pubkey, group name, msg(em: if encrypted?), return to)
- # the negotiation queue is used for encrypted password negotiation (first secret exchange, then password exchange)
- negotiation_queue = Queue() # (cjdns address, pubkey, group name, msg(kn:key data or pe:encrypted pass), return to)
- # raw messages are parsed into 'actions' which, if relevant, are sorted into above queues.
- action_queue = Queue()
- # action (list) content:
- # index - meaning
- # 0 - message type
- # 1 - from node
- # 2 - cjdns address
- # 3 - message content; group name if SHOUT
- # 4 - SHOUT message content
- # 5 - cjdns public key
- def queue_to_action_queue(inqueue, outqueue):
- temp_dict = {}
- current_pointer = 0
- while True:
- try:
- last_thing = inqueue.get(timeout=1)
- if to_string(last_thing) in action_processor_dict:
- current_pointer = 0
- if temp_dict:
- outqueue.put_nowait(temp_dict.copy())
- outqueue.task_done()
- temp_dict.clear()
- temp_dict[current_pointer] = last_thing
- current_pointer += 1
- except queue.Empty:
- current_pointer = 0
- if temp_dict:
- outqueue.put_nowait(temp_dict.copy())
- outqueue.task_done()
- temp_dict.clear()
- continue
- def to_string(self):
- try:
- string = self.decode(Permissive_Encoding)
- return string
- except:
- return self
- def parse_if_dict(self):
- try:
- byte_string = self
- if byte_string[0] is b"{":
- return json.load(byte_string)
- return byte_string
- except:
- return None
- def process_action_queue(actions):
- # index - pre:before processing, post: after processing (meaning)
- # 0 - message type
- # 1 - from node
- # 2 - pre:public key and cjdns protocol string, post: cjdns address
- # 3 - message content; group name if SHOUT
- # 4 - SHOUT message content
- # 5 - pre:non-existent, post:cjdns public key
- while True:
- try:
- action = actions.get()
- action[0] = action.get(0)
- action[1] = action.get(1).hex()
- action[2] = get_cjd_pubkey(action.get(2))
- if action[2] is not b"incompatible peer":
- action[5] = to_string(action[2]) # save the key for easier future use
- try:
- action[2] = key_utils.to_ipv6(to_string(action.get(2)))
- except TypeError:
- action[2] = None
- action[3] = parse_if_dict(action.get(3))
- action[4] = parse_if_dict(action.get(4))
- do_action(action)
- except AttributeError:
- print("malformed message! ignoring.")
- action_processor_dict = {
- b"EXIT": lambda doing: close_cjdns_connection(doing[1]),
- b"ENTER": lambda doing: queue_shout(doing),
- b"JOIN": lambda doing: queue_shout(doing),
- b"LEAVE": lambda doing: close_cjdns_connection(doing[1]),
- b"WHISPER": lambda doing: queue_chat(doing),
- b"SHOUT": lambda doing: queue_shout(doing),
- }
- def do_action(todo=None):
- if todo is None:
- todo = {}
- action_processor_dict[todo[0]](todo)
- def queue_shout(shout_action):
- if shout_action[0] == "JOIN" or shout_action[0] == "LEAVE":
- shout_action[4] = shout_action[0]
- if shout_action[0] == "ENTER":
- shout_action[3] = DISCOVERY
- if shout_action[3] == DISCOVERY: # (cjdns address, pubkey, ip address list)
- discovery_queue.put_nowait((shout_action[2], shout_action[5], shout_action[4], shout_action[1]))
- else: # (cjdns address, pubkey, group name, msg, return node)
- try:
- chat_header = shout_action[4][:3]
- except TypeError:
- chat_header = None
- if KEY_NEGOTIATION_HEAD != chat_header and PASS_EXCHANGE_HEAD != chat_header:
- chat_queue.put_nowait(
- (shout_action[2], shout_action[5], shout_action[3], shout_action[4], shout_action[1])
- )
- else:
- negotiation_queue.put_nowait(
- (shout_action[2], shout_action[5], shout_action[3], shout_action[4], shout_action[1])
- )
- def queue_chat(chat_action): # (cjdns address, pubkey, group name (N/A), msg, return node)
- try:
- chat_header = chat_action[3][:3]
- except TypeError:
- chat_header = None
- if KEY_NEGOTIATION_HEAD != chat_header and PASS_EXCHANGE_HEAD != chat_header:
- chat_queue.put_nowait((chat_action[2], chat_action[5], None, chat_action[3], chat_action[1]))
- else:
- negotiation_queue.put_nowait((chat_action[2], chat_action[5], None, chat_action[3], chat_action[1]))
- def process_chats(inqueue):
- while True:
- temp_in = inqueue.get()
- try:
- in_ = temp_in[3][:3]
- if in_ != ENCRYPTED_MESSAGE_HEAD:
- print(node_or_ip(temp_in[4])+": "+str(temp_in[3]).strip())
- else:
- data_in = bytes(temp_in[3][4:], Permissive_Encoding)
- print(node_or_ip(temp_in[4]) + ": " + str(decrypt_data_from_node(temp_in[4], data_in)).strip())
- except TypeError:
- print(node_or_ip(temp_in[4]) + ": null")
- def node_or_ip(node_i_d):
- return verified_node_ips.get(node_i_d, node_i_d)
- def process_discovery(inqueue):
- # (cjdns address, pubkey, ip address list, ZRE node)
- # After discovering a new peer, send them a message to initiate negotiation (based on the provided pubKey)
- # If associated pubkey already has a shared secret associated, do not initiate negotiation, and ignore discovery
- while True:
- datain = inqueue.get()
- print("discovery: "+str(datain))
- foreign_node_id = datain[3]
- if not is_active_session(foreign_node_id):
- # do the stuff
- foreign_public_key = datain[1]
- send_key_negotiation(foreign_node_id, foreign_public_key)
- else:
- # ignore
- continue
- def send_key_negotiation(foreign_node_id, foreign_public_key):
- shared_data = make_to_send(foreign_public_key, localdata[PUBKEY][-54:])
- to_send = KEY_NEGOTIATION_HEAD + shared_data
- node.whisper(uuid.UUID(foreign_node_id), to_send)
- key_negotiation_sent[foreign_node_id] = True
- def send_encrypted_password(chat_node):
- # encrypt password and send pe: message with the encrypted data.
- if not node_local_pass.get(chat_node):
- # make local cjdns password for this node
- peerstring = cjdns_config_object["interfaces"]["UDPInterface"][0]["bind"]
- portnum = peerstring.split(":")[-1]
- portstr = ":" + str(portnum)
- new_pass = urandom(16)+bytes(portstr, Permissive_Encoding) # append :port number to this.
- cjdadmin.AuthorizedPasswords_add(str(new_pass), chat_node)
- node_local_pass[chat_node] = new_pass
- try:
- to_send = encrypt(chat_node, node_local_pass[chat_node])
- node.whisper(
- uuid.UUID(chat_node),
- PASS_EXCHANGE_HEAD+to_send
- )
- except Exception as e0:
- print(e0)
- cjdadmin.AuthorizedPasswords_remove(chat_node)
- try:
- print("attempting new key negotiation.")
- foreign_pubkey = get_from_sessions(chat_node)[0][1]
- send_key_negotiation(chat_node, foreign_pubkey)
- print("sent key negotiation.")
- except TypeError:
- print("no session found for this node!")
- print(cjdadmin.AuthorizedPasswords_list())
- def node_endpoint(chat_node):
- return node.peer_address(uuid.UUID(chat_node))
- # how to IV without sharing information: ( encrypt with first n-bits of secret appended, nonce is a random
- # (relatively small) number on encryption, decrypt by cycling through nonces until plaintext.)
- def encrypt(chat_node, data_in):
- # return encrypted data using verified secret (if possible/exists) try unverified secret otherwise.
- nonce = bytes(6) + urandom(2) # "small" (16 bit) random nonce?
- try:
- secretbit = node_verified_secret[chat_node][:3]
- cipher_text = salsa20.Salsa20_xor(data_in + secretbit, nonce, node_verified_secret[chat_node])
- return cipher_text
- except KeyError:
- secretbit = node_untrusted_secret[chat_node][:3]
- cipher_text = salsa20.Salsa20_xor(data_in + secretbit, nonce, node_untrusted_secret[chat_node])
- return cipher_text
- def decrypt_data_from_node(chat_node, data_in):
- # return decrypted data using verified secret (if possible/exists) try unverified secret otherwise.
- print("attempting decryption.")
- iterate = [0]
- while True:
- for x in range(len(iterate)):
- if iterate[x] > 255:
- iterate[x] = 0
- try:
- iterate[x+1] += 1
- except IndexError:
- iterate.append(1)
- iterbytes = bytes(iterate)
- left = 8-len(iterbytes)
- if left < 6:
- print("nonce not found, aborting decryption.")
- return data_in.strip()
- nonce = bytes(left)+iterbytes
- try:
- try:
- clear_attempt = salsa20.Salsa20_xor(
- data_in,
- nonce,
- node_verified_secret[chat_node]
- )
- current_secret_bit = node_verified_secret[chat_node][:3]
- # print(str(clear_attempt[-3:])+" : "+str(current_secret_bit))
- if clear_attempt[-3:] == current_secret_bit:
- print("decryption success.")
- return clear_attempt[:-4]
- except KeyError:
- clear_attempt = salsa20.Salsa20_xor(
- data_in,
- nonce,
- node_untrusted_secret[chat_node]
- )
- current_secret_bit = node_untrusted_secret[chat_node][:3]
- # print(str(clear_attempt[-3:])+" : "+str(current_secret_bit))
- if clear_attempt[-3:] == current_secret_bit:
- print("decryption success.")
- return clear_attempt[:-4]
- finally:
- iterate[0] += 1
- except TypeError as e1:
- print(e1)
- return data_in
- except ValueError as e2:
- print(e2)
- return data_in
- def attempt_cjdns_connection(attempt_pass, chat_node):
- # try to create a cjdns connection, return it's IPv6, else none.
- peer_endpoint = node_endpoint(chat_node)
- try:
- foreign_pubkey = get_from_sessions(chat_node)[0][1]
- except TypeError:
- print("no session found for this node!")
- return None
- try:
- ip = b"".join(bytes(peer_endpoint, Permissive_Encoding).split(b":")[1:-1]).strip(b'/')
- ip_string = str(ip + b":" + attempt_pass.split(b":")[-1], Permissive_Encoding)
- print(ip_string)
- print(foreign_pubkey)
- result = cjdadmin.UDPInterface_beginConnection(
- foreign_pubkey,
- ip_string,
- 0,
- password=str(b"".join(attempt_pass.split(b":")[:-2]), Permissive_Encoding)
- )
- except Exception as e0:
- result = {"error": e0}
- traceback.print_exc()
- if result["error"] == "none":
- return key_utils.to_ipv6(foreign_pubkey)
- else:
- print(result["error"])
- return None
- def close_cjdns_connection(chat_node):
- # close cjdns connection to pubkey of this ZRE node.
- # if this node had a local pass, disable it.
- cjdadmin.AuthorizedPasswords_remove(chat_node)
- try:
- cjdadmin.InterfaceController_disconnectPeer(get_from_sessions(chat_node)[0][1])
- except TypeError:
- print("no open CJDNS connection to close!")
- key_negotiation_sent = {}
- node_verified_secret = {}
- node_verified_pass = {}
- node_untrusted_pass = {}
- node_untrusted_secret = {}
- node_local_pass = {}
- # (cjdns address, pubkey, group name, msg(kn:key data or pe:encrypted pass), return to)
- def verified_session_handler(session):
- # accept and attempt password changes, and (non-cjdns) ip changes
- # ignore other operations (key negotiation)
- # later, accept blocks and other secure information from verified sessions.
- chat_header = session[3][:3]
- chat_data = session[3][3:]
- chat_node = session[4]
- node_info = (chat_node, session)
- if chat_header == PASS_EXCHANGE_HEAD:
- node_untrusted_pass[chat_node] = decrypt_data_from_node(chat_node, chat_data)
- if node_untrusted_pass[chat_node]:
- temp_ip = attempt_cjdns_connection(node_untrusted_pass[chat_node], chat_node)
- if temp_ip == session[0]:
- verify_session(node_info)
- node_verified_pass[chat_node] = node_untrusted_pass[chat_node]
- else:
- close_cjdns_connection(chat_node)
- untrust_session(node_info)
- print("verified: "+str(session))
- def temporary_session_handler(session):
- # verify or untrust session depending on whether it has access to the CJDNS connection it claims to be.
- # later, ignore blocks and transactions from this node until it is verified.
- chat_header = session[3][:3]
- chat_data = session[3][3:]
- chat_node = session[4]
- node_info = (chat_node, session)
- if chat_header == KEY_NEGOTIATION_HEAD:
- if key_negotiation_sent.get(chat_node) is None:
- send_key_negotiation(chat_node, session[2])
- node_untrusted_secret[chat_node] = make_shared_secret(chat_data)
- send_encrypted_password(chat_node)
- if chat_header == PASS_EXCHANGE_HEAD:
- node_untrusted_pass[chat_node] = decrypt_data_from_node(chat_node, chat_data)
- if node_untrusted_pass[chat_node]:
- temp_ip = attempt_cjdns_connection(node_untrusted_pass[chat_node], chat_node)
- if temp_ip == session[0]:
- verify_session(node_info)
- node_verified_pass[chat_node] = node_untrusted_pass[chat_node]
- else:
- close_cjdns_connection(chat_node)
- untrust_session(node_info)
- print("temporary: "+str(session))
- def untrusted_session_handler(session):
- # only accept key negotiations and move session back into temporary, without sending a pass
- # ignore any attempts at pass exchange or changes in any other credentials
- chat_header = session[3][:3]
- chat_data = session[3][3:]
- chat_node = session[4]
- node_info = (chat_node, session)
- if chat_header == KEY_NEGOTIATION_HEAD:
- send_key_negotiation(chat_node, session[2]) # will send kn: for the case where node failed to receive first one
- node_untrusted_secret[chat_node] = make_shared_secret(chat_data)
- re_queue_session(node_info)
- print("untrusted: "+str(session))
- verified_node_ips = {} # ip-node pairs of verified_sessions
- verified_sessions = {"qname": "verified_sessions"}
- temporary_sessions = {"qname": "temporary_sessions"}
- untrusted_sessions = {"qname": "untrusted_sessions"}
- sessions_dict = {
- verified_sessions["qname"]: verified_session_handler,
- temporary_sessions["qname"]: temporary_session_handler,
- untrusted_sessions["qname"]: untrusted_session_handler
- }
- def process_negotiations(inqueue):
- # (cjdns address, pubkey, group name, msg(kn:key data or pe:encrypted pass), return to)
- # negotiation process continues until CJDNS connection is attempted.
- # persistence/state/session is maintained on a per-ZRE node basis.
- # states become verified once a CJDNS session is established successfully with given credentials, persist shared key
- # states become untrusted if a CJDNS session is unsuccessfully attempted with given credentials
- while True:
- data_in = inqueue.get()
- session_init = (data_in[4], data_in)
- create_session(session_init)
- session_and_type = get_from_sessions(session_init[0])
- # noinspection PyTypeChecker
- sessions_dict[session_and_type[1]["qname"]](session_and_type[0])
- def untrust_session(node_info):
- node_i_d = node_info[0]
- if node_i_d in untrusted_sessions:
- return False
- else:
- temporary_sessions.pop(node_i_d, None)
- verified_sessions.pop(node_i_d, None)
- untrusted_sessions[node_i_d] = node_info[1]
- verified_node_ips.pop(node_i_d, None)
- if node_verified_pass:
- node_untrusted_pass[node_i_d] = node_verified_pass.pop(node_i_d)
- if node_verified_secret:
- node_untrusted_secret[node_i_d] = node_verified_secret.pop(node_i_d)
- return True
- def re_queue_session(node_info):
- node_i_d = node_info[0]
- if node_i_d in temporary_sessions:
- return False
- else:
- untrusted_sessions.pop(node_i_d, None)
- verified_sessions.pop(node_i_d, None)
- temporary_sessions[node_i_d] = node_info[1]
- if node_verified_pass:
- node_untrusted_pass[node_i_d] = node_verified_pass.pop(node_i_d)
- if node_verified_secret:
- node_untrusted_secret[node_i_d] = node_verified_secret.pop(node_i_d)
- return True
- def verify_session(node_info):
- node_i_d = node_info[0]
- if node_i_d in verified_sessions:
- return False
- else:
- temporary_sessions.pop(node_i_d, None)
- untrusted_sessions.pop(node_i_d, None)
- verified_sessions[node_i_d] = node_info[1]
- verified_node_ips[node_i_d] = node_info[1][0]
- node_verified_pass[node_i_d] = node_untrusted_pass.pop(node_i_d)
- node_verified_secret[node_i_d] = node_untrusted_secret.pop(node_i_d)
- return True
- def create_session(node_info):
- node_i_d = node_info[0]
- if node_i_d in verified_sessions or node_i_d in temporary_sessions or node_i_d in untrusted_sessions:
- temporary_sessions[node_i_d] = node_info[1]
- return False
- else:
- temporary_sessions[node_i_d] = node_info[1]
- return True
- def get_from_sessions(node_i_d):
- # returns session tuple, and the dict it came from. None if session does not exist. do not call a put after this.
- was_temporary = temporary_sessions.get(node_i_d, None)
- was_untrusted = untrusted_sessions.get(node_i_d, None)
- was_verified = verified_sessions.get(node_i_d, None)
- if was_temporary:
- return was_temporary, temporary_sessions
- elif was_untrusted:
- return was_untrusted, untrusted_sessions
- elif was_verified:
- return was_verified, verified_sessions
- else:
- return None
- def is_active_session(node_i_d):
- if node_i_d in temporary_sessions:
- return True
- if node_i_d in untrusted_sessions:
- return True
- if node_i_d in verified_sessions:
- return True
- return False
- def process_all_message_queues():
- _thread.start_new_thread(queue_to_action_queue, (message_queue, action_queue))
- _thread.start_new_thread(process_action_queue, (action_queue,))
- _thread.start_new_thread(process_chats, (chat_queue,))
- _thread.start_new_thread(process_negotiations, (negotiation_queue,))
- _thread.start_new_thread(process_discovery, (discovery_queue,))
- def consumer(pyre_node):
- while True:
- try:
- sleep(2)
- current_message_buffer = pyre_node.recv()
- if current_message_buffer:
- for message in current_message_buffer:
- message_queue.put_nowait(message)
- except Exception:
- print(traceback.print_exc())
- def caster(pyre_node):
- while True:
- try:
- sleep(1)
- sent = False
- currline = sys.stdin.readline().strip()
- for peer in pyre_node.peers():
- if peer in node_verified_secret:
- enc_line = encrypt(peer, bytes(currline, Permissive_Encoding))
- pyre_node.whisper(peer, b'em:'+enc_line)
- elif peer in node_untrusted_secret:
- enc_line = encrypt(peer, bytes(currline, Permissive_Encoding))
- pyre_node.whisper(peer, b'em:'+enc_line)
- else:
- pyre_node.whispers(peer, line)
- sent = True
- if not sent:
- print("no peers to send to!")
- except:
- traceback.print_exc()
- continue
- def update_local_addresses():
- addresses = {}
- for interface in interfaces:
- addresses[interface] = []
- addr_raw = ni.ifaddresses(interface)
- addr_raw_values = addr_raw.values()
- for addr_data in addr_raw_values:
- addresses[interface].append(addr_data[0]["addr"])
- localdata[ADDRESSES] = []
- for key in addresses:
- for address in addresses[key]:
- localdata[ADDRESSES].append(address)
- localdata[ADDRESSES] = list(set(localdata[ADDRESSES]))
- def comment_remover(text):
- def replacer(match):
- s = match.group(0)
- if s.startswith('/'):
- return None
- else:
- return s
- pattern = re.compile(
- r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"',
- re.DOTALL | re.MULTILINE
- )
- return re.sub(pattern, replacer, text)
- context = zmq.Context()
- unprocessed_zyre = Queue()
- localdata = {}
- try:
- while True:
- try:
- cjdns_config_file = cjdns_config_files.pop()
- except IndexError:
- print("CJDNS config file location:")
- cjdns_config_files.append(sys.stdin.readline().strip())
- cjdns_config_file = cjdns_config_files.pop()
- try:
- raw_readlines = open(cjdns_config_file).readlines()
- readlines = []
- for line in raw_readlines:
- outline = line.strip()
- outline = comment_remover(outline)
- if outline:
- readlines.append(outline+"\n")
- input_str = str.join("", readlines).strip()
- input_str = comment_remover(input_str)
- cjdns_config_object = json.JSONDecoder().raw_decode(input_str)[0]
- except Exception as e:
- print(e)
- if cjdns_config_object:
- break
- admin_data = cjdns_config_object["admin"]
- ipAddr = str.join("", admin_data["bind"].split(":")[:-1])
- port = int(admin_data["bind"].split(":")[-1])
- password = admin_data["password"]
- cjdadmin = cjdns.connect(ipAddr, port, password)
- cjdadmin.InterfaceController_resetPeering()
- localdata[PUBKEY] = cjdadmin.Core_nodeInfo()["myAddr"]
- except ConnectionError:
- print("CJDNS is not running, or misconfigured.")
- interfaces = ni.interfaces()
- update_local_addresses()
- node = Pyre(name=localdata[PUBKEY])
- node.set_header(PUBKEY, localdata[PUBKEY])
- node.set_header(ADDRESS_LIST, json.dumps(localdata.get(ADDRESSES), ensure_ascii=False))
- localdata_json = json.dumps(localdata, ensure_ascii=False)
- node.join(DISCOVERY.decode(Permissive_Encoding))
- node.join(WORLD.decode(Permissive_Encoding))
- input_thread = _thread.start_new_thread(consumer, (node,))
- output_thread = _thread.start_new_thread(caster, (node,))
- process_all_message_queues()
- node.start()
- if debugging:
- node2 = Pyre(localdata[PUBKEY])
- node2.set_header(ADDRESS_LIST, json.dumps(localdata.get(ADDRESSES), ensure_ascii=False))
- mockNodes = [node2]
- for tempNode in mockNodes:
- tempNode.start()
- tempNode.join(DISCOVERY)
- tempNode.join(WORLD)
- _thread.start_new_thread(consumer, (tempNode,))
- while True:
- try:
- continue
- except KeyboardInterrupt:
- for node_other in node_verified_pass:
- close_cjdns_connection(node_other)
- node.actor.close()
- raise
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement