Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import types
- import threading
- import socket
- import struct
- import os
- class DebugThread(threading.Thread):
- def __init__(self, host):
- threading.Thread.__init__(self)
- self.m_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.m_socket.setblocking(0)
- self.m_loop = True
- self.m_server_addrs = (host, 6540) #port = 6540, defined in DENNOp
- self.m_msg_type_int = 0
- self.m_msg_type_float = 1
- self.m_msg_type_double = 2
- self.m_msg_type_string = 3
- def run(self):
- #try to connect
- if not self.connection():
- return
- #read message
- while self.m_loop:
- #read
- value = self.read_messages()
- #bad case
- if value is False:
- break
- #close connection
- self.m_socket.shutdown()
- self.m_socket.close()
- def connection(self):
- while self.m_loop:
- err = self.m_socket.connect_ex(self.m_server_addrs)
- if err == 0:
- return True
- return False
- def read_messages(self):
- #read type
- data = self.m_socket.recv(4)
- #if type arrived
- if data != None:
- #data type
- type_of_data = int.from_bytes(bytearray(data), byteorder='little')
- #types
- if type_of_data == self.m_msg_type_int:
- int_data = struct.unpack('<i', bytearray(self.m_socket.recv(4)))
- return int_data
- elif type_of_data == self.m_msg_type_float:
- float_data = struct.unpack('<f', bytearray(self.m_socket.recv(4)))
- return float_data
- elif type_of_data == self.m_msg_type_double:
- double_data = struct.unpack('<d', bytearray(self.m_socket.recv(4)))
- return double_data
- elif type_of_data == self.m_msg_type_string:
- len_data = struct.unpack('<i', bytearray(self.m_socket.recv(4)))
- str_data = bytearray(self.m_socket.recv(len_data)).decode("utf-8")
- return str_data
- else:
- return False
- else:
- return True
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement