Guest User

P2p Example

a guest
Nov 28th, 2022
55
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 29.35 KB | None | 0 0
  1. import socket
  2. import time
  3. import threading
  4. import random
  5. import hashlib
  6.  
  7. from network.nodeconnection import NodeConnection
  8.  
  9. class Node(threading.Thread):
  10.  
  11.     def __init__(self, host, port, id=None, callback=None, max_connections=0):
  12.         """Create instance of a Node. If you want to implement the Node functionality with a callback, you should
  13.           provide a callback method. It is preferred to implement a new node by extending this Node class.
  14.            host: The host name or ip address that is used to bind the TCP/IP server to.
  15.            port: The port number that is used to bind the TCP/IP server to.
  16.            id: (optional) This id will be associated with the node. When not given a unique ID will be created.
  17.            callback: (optional) The callback that is invokes when events happen inside the network.
  18.            max_connections: (optional) limiting the maximum nodes that are able to connect to this node."""
  19.         super(Node, self).__init__()
  20.  
  21.         # When this flag is set, the node will stop and close
  22.         self.terminate_flag = threading.Event()
  23.  
  24.         # Server details, host (or ip) to bind to and the port
  25.         self.host = host
  26.         self.port = port
  27.  
  28.         # Events are send back to the given callback
  29.         self.callback = callback
  30.  
  31.         # Nodes that have established a connection with this node
  32.         self.nodes_inbound = []  # Nodes that are connect with us N->(US)
  33.  
  34.         # Nodes that this nodes is connected to
  35.         self.nodes_outbound = []  # Nodes that we are connected to (US)->N
  36.  
  37.         # A list of nodes that should be reconnected to whenever the connection was lost
  38.         self.reconnect_to_nodes = []
  39.  
  40.         # Create a unique ID for each node if the ID is not given.
  41.         if id == None:
  42.             self.id = self.generate_id()
  43.  
  44.         else:
  45.             self.id = str(id) # Make sure the ID is a string!
  46.  
  47.         # Start the TCP/IP server
  48.         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  49.         self.init_server()
  50.  
  51.         # Message counters to make sure everyone is able to track the total messages
  52.         self.message_count_send = 0
  53.         self.message_count_recv = 0
  54.         self.message_count_rerr = 0
  55.        
  56.         # Connection limit of inbound nodes (nodes that connect to us)
  57.         self.max_connections = max_connections
  58.  
  59.         # Debugging on or off!
  60.         self.debug = False
  61.  
  62.     @property
  63.     def all_nodes(self):
  64.         """Return a list of all the nodes, inbound and outbound, that are connected with this node."""
  65.         return self.nodes_inbound + self.nodes_outbound
  66.    
  67.     def outboundNodes(self):
  68.         return self.nodes_outbound
  69.    
  70.     def inboundNodes(self):
  71.         return self.nodes_inbound
  72.  
  73.     def debug_print(self, message):
  74.         """When the debug flag is set to True, all debug messages are printed in the console."""
  75.         if self.debug:
  76.             print("DEBUG (" + self.id + "): " + message)
  77.  
  78.     def generate_id(self):
  79.         """Generates a unique ID for each node."""
  80.         id = hashlib.sha512()
  81.         t = self.host + str(self.port) + str(random.randint(1, 99999999))
  82.         id.update(t.encode('ascii'))
  83.         return id.hexdigest()
  84.  
  85.     def init_server(self):
  86.         """Initialization of the TCP/IP server to receive connections. It binds to the given host and port."""
  87.         print("Initialisation of the Node on port: " + str(self.port) + " on node (" + self.id + ")")
  88.         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  89.         self.sock.bind((self.host, self.port))
  90.         self.sock.settimeout(10.0)
  91.         self.sock.listen(1)
  92.  
  93.     def print_connections(self):
  94.         """Prints the connection overview of the node. How many inbound and outbound connections have been made."""
  95.         print("Node connection overview:")
  96.         print("- Total nodes connected with us: %d" % len(self.nodes_inbound))
  97.         print("- Total nodes connected to     : %d" % len(self.nodes_outbound))
  98.  
  99.     def send_to_nodes(self, data, exclude=[], compression='none'):
  100.         """ Send a message to all the nodes that are connected with this node. data is a python variable which is
  101.            converted to JSON that is send over to the other node. exclude list gives all the nodes to which this
  102.            data should not be sent.
  103.            TODO: When sending was not successfull, the user is not notified."""
  104.         self.message_count_send += 1
  105.         # for n in self.nodes_inbound:
  106.         #     if n in exclude:
  107.         #         self.debug_print("Node send_to_nodes: Excluding node in sending the message")
  108.         #     else:
  109.         #         self.send_to_node(n, data, compression)
  110.  
  111.         for n in self.nodes_outbound:
  112.             if n in exclude:
  113.                 self.debug_print("Node send_to_nodes: Excluding node in sending the message")
  114.             else:
  115.                 self.send_to_node(n, data, compression)
  116.  
  117.     def send_to_node(self, n, data, compression='none'):
  118.         """ Send the data to the node n if it exists."""
  119.         self.message_count_send = self.message_count_send + 1
  120.         if n in self.nodes_inbound or n in self.nodes_outbound:
  121.             n.send(data, compression=compression)
  122.  
  123.         else:
  124.             self.debug_print("Node send_to_node: Could not send the data, node is not found!")
  125.  
  126.     def connect_with_node(self, host, port, reconnect=False):
  127.         """ Make a connection with another node that is running on host with port. When the connection is made,
  128.            an event is triggered outbound_node_connected. When the connection is made with the node, it exchanges
  129.            the id's of the node. First we send our id and then we receive the id of the node we are connected to.
  130.            When the connection is made the method outbound_node_connected is invoked. If reconnect is True, the
  131.            node will try to reconnect to the code whenever the node connection was closed. The method returns
  132.            True when the node is connected with the specific host."""
  133.  
  134.         if host == self.host and port == self.port:
  135.             print("connect_with_node: Cannot connect with yourself!!")
  136.             return False
  137.         # Check if node is already connected with this node!
  138.         for node in self.nodes_outbound:
  139.             if node.host == host and node.port == port:
  140.                 print("connect_with_node: Already connected with this node (" + node.id + ").")
  141.                 return True
  142.  
  143.         try:
  144.             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  145.             self.debug_print("connecting to %s port %s" % (host, port))
  146.             sock.connect((host, port))
  147.             conn_msg = (self.id + ":" + str(self.port)).encode('utf-8')
  148.             msg_type = 'connection'
  149.             # Basic information exchange (not secure) of the id's of the nodes!
  150.             sock.send((self.id + ":" + str(self.port)).encode('utf-8')) # Send my id and port to the connected node!
  151.             connected_node_id = sock.recv(4096).decode('utf-8') # When a node is connected, it sends its id!
  152.             print()
  153.             print(connected_node_id)
  154.             # Cannot connect with yourself
  155.             if self.id == connected_node_id:
  156.                 print("connect_with_node: You cannot connect with yourself?!")
  157.                 sock.send("CLOSING: Already having a connection together".encode('utf-8'))
  158.                 sock.close()
  159.                 return True
  160.  
  161.             # Fix bug: Cannot connect with nodes that are already connected with us!
  162.             #          Send message and close the socket.
  163.             for node in self.nodes_inbound:
  164.                 if node.host == host and node.id == connected_node_id:
  165.                     print("connect_with_node: This node (" + node.id + ") is already connected with us.")
  166.                     sock.send("CLOSING: Already having a connection together".encode('utf-8'))
  167.                     sock.close()
  168.                     return True
  169.  
  170.             thread_client = self.create_new_connection(sock, connected_node_id, host, port)
  171.             thread_client.start()
  172.  
  173.             self.nodes_outbound.append(thread_client)
  174.             self.outbound_node_connected(thread_client)
  175.  
  176.             # If reconnection to this host is required, it will be added to the list!
  177.             if reconnect:
  178.                 self.debug_print("connect_with_node: Reconnection check is enabled on node " + host + ":" + str(port))
  179.                 self.reconnect_to_nodes.append({
  180.                     "host": host, "port": port, "tries": 0
  181.                 })
  182.  
  183.             return True
  184.  
  185.         except Exception as e:
  186.             self.debug_print("TcpServer.connect_with_node: Could not connect with node. (" + str(e) + ")")
  187.             return False
  188.  
  189.     def disconnect_with_node(self, node):
  190.         """Disconnect the TCP/IP connection with the specified node. It stops the node and joins the thread.
  191.           The node will be deleted from the nodes_outbound list. Before closing, the method
  192.           node_disconnect_with_outbound_node is invoked."""
  193.         if node in self.nodes_outbound:
  194.             self.node_disconnect_with_outbound_node(node)
  195.             node.stop()
  196.  
  197.         else:
  198.             self.debug_print("Node disconnect_with_node: cannot disconnect with a node with which we are not connected.")
  199.  
  200.     def stop(self):
  201.         """Stop this node and terminate all the connected nodes."""
  202.         self.node_request_to_stop()
  203.         self.terminate_flag.set()
  204.  
  205.     # This method can be overrided when a different nodeconnection is required!
  206.     def create_new_connection(self, connection, id, host, port):
  207.         """When a new connection is made, with a node or a node is connecting with us, this method is used
  208.           to create the actual new connection. The reason for this method is to be able to override the
  209.           connection class if required. In this case a NodeConnection will be instantiated to represent
  210.           the node connection."""
  211.         return NodeConnection(self, connection, id, host, port)
  212.  
  213.     def reconnect_nodes(self):
  214.         """This method checks whether nodes that have the reconnection status are still connected. If not
  215.           connected these nodes are started again."""
  216.         for node_to_check in self.reconnect_to_nodes:
  217.             found_node = False
  218.             self.debug_print("reconnect_nodes: Checking node " + node_to_check["host"] + ":" + str(node_to_check["port"]))
  219.  
  220.             for node in self.nodes_outbound:
  221.                 if node.host == node_to_check["host"] and node.port == node_to_check["port"]:
  222.                     found_node = True
  223.                     node_to_check["trials"] = 0 # Reset the trials
  224.                     self.debug_print("reconnect_nodes: Node " + node_to_check["host"] + ":" + str(node_to_check["port"]) + " still running!")
  225.  
  226.             if not found_node: # Reconnect with node
  227.                 node_to_check["trials"] += 1
  228.                 if self.node_reconnection_error(node_to_check["host"], node_to_check["port"], node_to_check["trials"]):
  229.                     self.connect_with_node(node_to_check["host"], node_to_check["port"]) # Perform the actual connection
  230.  
  231.                 else:
  232.                     self.debug_print("reconnect_nodes: Removing node (" + node_to_check["host"] + ":" + str(node_to_check["port"]) + ") from the reconnection list!")
  233.                     self.reconnect_to_nodes.remove(node_to_check)
  234.  
  235.     def run(self):
  236.         """The main loop of the thread that deals with connections from other nodes on the network. When a
  237.           node is connected it will exchange the node id's. First we receive the id of the connected node
  238.           and secondly we will send our node id to the connected node. When connected the method
  239.           inbound_node_connected is invoked."""
  240.         while not self.terminate_flag.is_set():  # Check whether the thread needs to be closed
  241.             try:
  242.                 self.debug_print("Node: Wait for incoming connection")
  243.                 connection, client_address = self.sock.accept()
  244.  
  245.                 self.debug_print("Total inbound connections:" + str(len(self.nodes_inbound)))
  246.                 # When the maximum connections is reached, it disconnects the connection
  247.                 if self.max_connections == 0 or len(self.nodes_inbound) < self.max_connections:
  248.                    
  249.                     # Basic information exchange (not secure) of the id's of the nodes!
  250.                     connected_node_port = client_address[1] # backward compatibilty
  251.                     connected_node_id   = connection.recv(4096).decode('utf-8')
  252.                     if ":" in connected_node_id:
  253.                         (connected_node_id, connected_node_port) = connected_node_id.split(':') # When a node is connected, it sends it id!
  254.                     connection.send(self.id.encode('utf-8')) # Send my id to the connected node!
  255.  
  256.                     thread_client = self.create_new_connection(connection, connected_node_id, client_address[0], connected_node_port)
  257.                     thread_client.start()
  258.  
  259.                     self.nodes_inbound.append(thread_client)
  260.                     self.inbound_node_connected(thread_client)
  261.  
  262.                 else:
  263.                     self.debug_print("New connection is closed. You have reached the maximum connection limit!")
  264.                     connection.close()
  265.            
  266.             except socket.timeout:
  267.                 self.debug_print('Node: Connection timeout!')
  268.  
  269.             except Exception as e:
  270.                 raise e
  271.  
  272.             self.reconnect_nodes()
  273.  
  274.             time.sleep(0.01)
  275.  
  276.         print("Node stopping...")
  277.         for t in self.nodes_inbound:
  278.             t.stop()
  279.  
  280.         for t in self.nodes_outbound:
  281.             t.stop()
  282.  
  283.         time.sleep(1)
  284.  
  285.         for t in self.nodes_inbound:
  286.             t.join()
  287.  
  288.         for t in self.nodes_outbound:
  289.             t.join()
  290.  
  291.         self.sock.settimeout(None)  
  292.         self.sock.close()
  293.         print("Node stopped")
  294.  
  295.     def outbound_node_connected(self, node):
  296.         """This method is invoked when a connection with a outbound node was successfull. The node made
  297.           the connection itself."""
  298.         self.debug_print("outbound_node_connected: " + node.id)
  299.         if self.callback is not None:
  300.             self.callback("outbound_node_connected", self, node, {})
  301.  
  302.     def inbound_node_connected(self, node):
  303.         """This method is invoked when a node successfully connected with us."""
  304.         self.debug_print("inbound_node_connected: " + node.id)
  305.         if self.callback is not None:
  306.             self.callback("inbound_node_connected", self, node, {})
  307.  
  308.     def node_disconnected(self, node):
  309.         """While the same nodeconnection class is used, the class itself is not able to
  310.           determine if it is a inbound or outbound connection. This function is making
  311.           sure the correct method is used."""
  312.         self.debug_print("node_disconnected: " + node.id)
  313.  
  314.         if node in self.nodes_inbound:
  315.             del self.nodes_inbound[self.nodes_inbound.index(node)]
  316.             self.inbound_node_disconnected(node)
  317.  
  318.         if node in self.nodes_outbound:
  319.             del self.nodes_outbound[self.nodes_outbound.index(node)]
  320.             self.outbound_node_disconnected(node)
  321.  
  322.     def inbound_node_disconnected(self, node):
  323.         """This method is invoked when a node, that was previously connected with us, is in a disconnected
  324.           state."""
  325.         self.debug_print("inbound_node_disconnected: " + node.id)
  326.         if self.callback is not None:
  327.             self.callback("inbound_node_disconnected", self, node, {})
  328.  
  329.     def outbound_node_disconnected(self, node):
  330.         """This method is invoked when a node, that we have connected to, is in a disconnected state."""
  331.         self.debug_print("outbound_node_disconnected: " + node.id)
  332.         if self.callback is not None:
  333.             self.callback("outbound_node_disconnected", self, node, {})
  334.  
  335.     def node_message(self, node, data):
  336.         """This method is invoked when a node send us a message."""
  337.         self.debug_print("node_message: " + node.id + ": " + str(data))
  338.         if self.callback is not None:
  339.             self.callback("node_message", self, node, data)
  340.  
  341.     def node_disconnect_with_outbound_node(self, node):
  342.         """This method is invoked just before the connection is closed with the outbound node. From the node
  343.           this request is created."""
  344.         self.debug_print("node wants to disconnect with oher outbound node: " + node.id)
  345.         if self.callback is not None:
  346.             self.callback("node_disconnect_with_outbound_node", self, node, {})
  347.  
  348.     def node_request_to_stop(self):
  349.         """This method is invoked just before we will stop. A request has been given to stop the node and close
  350.           all the node connections. It could be used to say goodbey to everyone."""
  351.         self.debug_print("node is requested to stop!")
  352.         if self.callback is not None:
  353.             self.callback("node_request_to_stop", self, {}, {})
  354.  
  355.     def node_reconnection_error(self, host, port, trials):
  356.         """This method is invoked when a reconnection error occurred. The node connection is disconnected and the
  357.           flag for reconnection is set to True for this node. This function can be overidden to implement your
  358.           specific logic to take action when a lot of trials have been done. If the method returns True, the
  359.           node will try to perform the reconnection. If the method returns False, the node will stop reconnecting
  360.           to this node. The node will forever tries to perform the reconnection."""
  361.         self.debug_print("node_reconnection_error: Reconnecting to node " + host + ":" + str(port) + " (trials: " + str(trials) + ")")
  362.         return True
  363.  
  364.     def __str__(self):
  365.         return 'Node: {}:{}'.format(self.host, self.port)
  366.  
  367.     def __repr__(self):
  368.         return '<Node {}:{} id: {}>'.format(self.host, self.port, self.id)
  369.    
  370.  
  371.  
  372. import socket
  373. import time
  374. import threading
  375. import json
  376. import zlib, bz2, lzma, base64
  377.  
  378. """
  379. Author : Maurice Snoeren <macsnoeren(at)gmail.com>
  380. Version: 0.3 beta (use at your own risk)
  381. Date: 7-5-2020
  382.  
  383. Python package p2pnet for implementing decentralized peer-to-peer network applications
  384. """
  385. class NodeConnection(threading.Thread):
  386.     """The class NodeConnection is used by the class Node and represent the TCP/IP socket connection with another node.
  387.       Both inbound (nodes that connect with the server) and outbound (nodes that are connected to) are represented by
  388.       this class. The class contains the client socket and hold the id information of the connecting node. Communication
  389.       is done by this class. When a connecting node sends a message, the message is relayed to the main node (that created
  390.       this NodeConnection in the first place).
  391.      
  392.       Instantiates a new NodeConnection. Do not forget to start the thread. All TCP/IP communication is handled by this
  393.       connection.
  394.        main_node: The Node class that received a connection.
  395.        sock: The socket that is assiociated with the client connection.
  396.        id: The id of the connected node (at the other side of the TCP/IP connection).
  397.        host: The host/ip of the main node.
  398.        port: The port of the server of the main node."""
  399.  
  400.     def __init__(self, main_node, sock, id, host, port):
  401.         """Instantiates a new NodeConnection. Do not forget to start the thread. All TCP/IP communication is handled by this connection.
  402.            main_node: The Node class that received a connection.
  403.            sock: The socket that is assiociated with the client connection.
  404.            id: The id of the connected node (at the other side of the TCP/IP connection).
  405.            host: The host/ip of the main node.
  406.            port: The port of the server of the main node."""
  407.  
  408.         super(NodeConnection, self).__init__()
  409.  
  410.         self.host = host
  411.         self.port = port
  412.         self.main_node = main_node
  413.         self.sock = sock
  414.         self.terminate_flag = threading.Event()
  415.  
  416.         # The id of the connected node
  417.         self.id = str(id) # Make sure the ID is a string
  418.  
  419.         # End of transmission character for the network streaming messages.
  420.         self.EOT_CHAR = 0x04.to_bytes(1, 'big')
  421.  
  422.         # Indication that the message has been compressed
  423.         self.COMPR_CHAR = 0x02.to_bytes(1, 'big')
  424.  
  425.         # Datastore to store additional information concerning the node.
  426.         self.info = {}
  427.  
  428.         # Use socket timeout to determine problems with the connection
  429.         self.sock.settimeout(10.0)
  430.  
  431.         self.main_node.debug_print("NodeConnection: Started with client (" + self.id + ") '" + self.host + ":" + str(self.port) + "'")
  432.  
  433.     def compress(self, data, compression):
  434.         """Compresses the data given the type. It is used to provide compression to lower the network traffic in case of
  435.           large data chunks. It stores the compression type inside the data, so it can be easily retrieved."""
  436.  
  437.         self.main_node.debug_print(self.id + ":compress:" + compression)
  438.         self.main_node.debug_print(self.id + ":compress:input: " + str(data))
  439.  
  440.         compressed = data
  441.         try:
  442.             if compression == 'zlib':
  443.                 compressed = base64.b64encode( zlib.compress(data, 6) + b'zlib' )
  444.            
  445.             elif compression == 'bzip2':
  446.                 compressed = base64.b64encode( bz2.compress(data) + b'bzip2' )
  447.            
  448.             elif compression == 'lzma':
  449.                 compressed = base64.b64encode( lzma.compress(data) + b'lzma' )
  450.  
  451.             else:
  452.                 self.main_node.debug_print(self.id + ":compress:Unknown compression")
  453.                 return None
  454.  
  455.         except Exception as e:
  456.             self.main_node.debug_print("compress: exception: " + str(e))
  457.  
  458.         self.main_node.debug_print(self.id + ":compress:b64encode:" + str(compressed))
  459.         self.main_node.debug_print(self.id + ":compress:compression:" + str(int(10000*len(compressed)/len(data))/100) + "%")
  460.  
  461.         return compressed
  462.  
  463.     def decompress(self, compressed):
  464.         """Decompresses the data given the type. It is used to provide compression to lower the network traffic in case of
  465.           large data chunks."""
  466.         self.main_node.debug_print(self.id + ":decompress:input: " + str(compressed))
  467.         compressed = base64.b64decode(compressed)
  468.         self.main_node.debug_print(self.id + ":decompress:b64decode: " + str(compressed))
  469.  
  470.         try:
  471.             if compressed[-4:] == b'zlib':
  472.                 compressed = zlib.decompress(compressed[0:len(compressed)-4])
  473.            
  474.             elif compressed[-5:] == b'bzip2':
  475.                 compressed = bz2.decompress(compressed[0:len(compressed)-5])
  476.            
  477.             elif compressed[-4:] == b'lzma':
  478.                 compressed = lzma.decompress(compressed[0:len(compressed)-4])
  479.         except Exception as e:
  480.             print("Exception: " + str(e))
  481.  
  482.         self.main_node.debug_print(self.id + ":decompress:result: " + str(compressed))
  483.  
  484.         return compressed
  485.  
  486.     def send(self, data, encoding_type='utf-8', compression='none'):
  487.         """Send the data to the connected node. The data can be pure text (str), dict object (send as json) and bytes object.
  488.           When sending bytes object, it will be using standard socket communication. A end of transmission character 0x04
  489.           utf-8/ascii will be used to decode the packets ate the other node. When the socket is corrupted the node connection
  490.           is closed. Compression can be enabled by using zlib, bzip2 or lzma. When enabled the data is compressed and send to
  491.           the client. This could reduce the network bandwith when sending large data chunks.
  492.           """
  493.         print(type(data))
  494.         print(threading.current_thread().name)
  495.         if isinstance(data, str):
  496.             try:
  497.                 if compression == 'none':
  498.                     self.sock.sendall( data.encode(encoding_type) + self.EOT_CHAR )
  499.                 else:
  500.                     data = self.compress(data.encode(encoding_type), compression)
  501.                     if data != None:
  502.                         self.sock.sendall(data + self.COMPR_CHAR + self.EOT_CHAR)
  503.  
  504.             except Exception as e: # Fixed issue #19: When sending is corrupted, close the connection
  505.                 self.main_node.debug_print("nodeconnection send: Error sending data to node: " + str(e))
  506.                 self.stop() # Stopping node due to failure
  507.  
  508.         elif isinstance(data, dict):
  509.             try:
  510.                 if compression == 'none':
  511.                     self.sock.sendall(json.dumps(data).encode(encoding_type) + self.EOT_CHAR)
  512.                 else:
  513.                     data = self.compress(json.dumps(data).encode(encoding_type), compression)
  514.                     if data != None:
  515.                         self.sock.sendall(data + self.COMPR_CHAR + self.EOT_CHAR)
  516.  
  517.             except TypeError as type_error:
  518.                 self.main_node.debug_print('This dict is invalid')
  519.                 self.main_node.debug_print(type_error)
  520.  
  521.             except Exception as e: # Fixed issue #19: When sending is corrupted, close the connection
  522.                 self.main_node.debug_print("nodeconnection send: Error sending data to node: " + str(e))
  523.                 self.stop() # Stopping node due to failure
  524.  
  525.         elif isinstance(data, bytes):
  526.             print(555)
  527.             print(data)
  528.             try:
  529.                 if compression == 'none':
  530.                     self.sock.sendall(data + self.EOT_CHAR)
  531.                 else:
  532.                     data = self.compress(data, compression)
  533.                     if data != None:
  534.                         self.sock.sendall(data + self.COMPR_CHAR + self.EOT_CHAR)
  535.  
  536.             except Exception as e: # Fixed issue #19: When sending is corrupted, close the connection
  537.                 self.main_node.debug_print("nodeconnection send: Error sending data to node: " + str(e))
  538.                 self.stop() # Stopping node due to failure
  539.  
  540.         else:
  541.             self.main_node.debug_print('datatype used is not valid plese use str, dict (will be send as json) or bytes')
  542.  
  543.     def stop(self):
  544.         """Terminates the connection and the thread is stopped. Stop the node client. Please make sure you join the thread."""
  545.         self.terminate_flag.set()
  546.  
  547.     def parse_packet(self, packet):
  548.         """Parse the packet and determines wheter it has been send in str, json or byte format. It returns
  549.           the according data."""
  550.        
  551.         if packet.find(self.COMPR_CHAR) == len(packet)-1: # Check if packet was compressed
  552.             packet = self.decompress(packet[0:-1])
  553.  
  554.         try:
  555.             packet_decoded = packet.decode('utf-8')
  556.  
  557.             try:
  558.                 return json.loads(packet_decoded)
  559.  
  560.             except json.decoder.JSONDecodeError:
  561.                 return packet_decoded
  562.  
  563.         except UnicodeDecodeError:
  564.             return packet
  565.  
  566.     # Required to implement the Thread. This is the main loop of the node client.
  567.     def run(self):
  568.         """The main loop of the thread to handle the connection with the node. Within the
  569.           main loop the thread waits to receive data from the node. If data is received
  570.           the method node_message will be invoked of the main node to be processed."""          
  571.         buffer = b'' # Hold the stream that comes in!
  572.  
  573.         while not self.terminate_flag.is_set():
  574.             chunk = b''
  575.  
  576.             try:
  577.                 chunk = self.sock.recv(4096)
  578.  
  579.             except socket.timeout:
  580.                 self.main_node.debug_print("NodeConnection: timeout")
  581.  
  582.             except Exception as e:
  583.                 self.terminate_flag.set() # Exception occurred terminating the connection
  584.                 self.main_node.debug_print('Unexpected error')
  585.                 self.main_node.debug_print(e)
  586.  
  587.             # BUG: possible buffer overflow when no EOT_CHAR is found => Fix by max buffer count or so?
  588.             if chunk != b'':
  589.                 buffer += chunk
  590.                 eot_pos = buffer.find(self.EOT_CHAR)
  591.  
  592.                 while eot_pos > 0:
  593.                     packet = buffer[:eot_pos]
  594.                     buffer = buffer[eot_pos + 1:]
  595.  
  596.                     self.main_node.message_count_recv += 1
  597.                     self.main_node.node_message( self, self.parse_packet(packet) )
  598.  
  599.                     eot_pos = buffer.find(self.EOT_CHAR)
  600.  
  601.             time.sleep(0.01)
  602.  
  603.         # IDEA: Invoke (event) a method in main_node so the user is able to send a bye message to the node before it is closed?
  604.         self.sock.settimeout(None)
  605.         self.sock.close()
  606.         self.main_node.node_disconnected( self ) # Fixed issue #19: Send to main_node when a node is disconnected. We do not know whether it is inbounc or outbound.
  607.         self.main_node.debug_print("NodeConnection: Stopped")
  608.  
  609.     def set_info(self, key, value):
  610.         self.info[key] = value
  611.  
  612.     def get_info(self, key):
  613.         return self.info[key]
  614.  
  615.     def __str__(self):
  616.         return 'NodeConnection: {}:{} <-> {}:{} ({})'.format(self.main_node.host, self.main_node.port, self.host, self.port, self.id)
  617.  
  618.     def __repr__(self):
  619.         return '<NodeConnection: Node {}:{} <-> Connection {}:{}>'.format(self.main_node.host, self.main_node.port, self.host, self.port)
  620.  
  621. if __name__ == "__main__":
  622.     node = Node()
  623.     node.start(<host>, <port>)
  624.     node.send_to_node(<n>, <data>)
Add Comment
Please, Sign In to add comment