Advertisement
Guest User

Untitled

a guest
Dec 28th, 2020
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.63 KB | None | 0 0
  1. import socket
  2. from threading import Thread
  3. from time import sleep
  4. from control_packets import *
  5. # from decode import decode_int
  6. from datetime import datetime
  7. from random import randint
  8.  
  9. IP_ = '127.0.0.1'
  10. PORT = 1883
  11. SIGNAL = 'received_packet'
  12. g_generated_client_id = 0
  13.  
  14.  
  15. def decode_packet_type(raw_packet):
  16.     packet_type = raw_packet >> 4
  17.     switch = {
  18.         0x02: "CONNACK",
  19.         0x03: "PUBLISH",
  20.         0x04: "PUBACK",
  21.         0x05: "PUBREC",
  22.         0x06: "PUBREL",
  23.         0x07: "PUBCOMP",
  24.         0x09: "SUBACK",
  25.         0x0b: "UNSUBACK",
  26.         0x0d: "PINGRESP",
  27.         0x0e: "DISCONNECT",
  28.         0x0f: "AUTH"
  29.     }
  30.  
  31.     return switch[packet_type]
  32.  
  33.  
  34. class MQTTClient:
  35.     def __init__(self, IP, port, client_name, keep_alive):
  36.         # create a IPv4, TCP socket
  37.         self.IP = IP
  38.         self.port = port
  39.         self.client_name = client_name
  40.         self.keep_alive = keep_alive
  41.         self.MQTT_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  42.         self.last_received_packet = ""
  43.         self.running = True
  44.         self.sending_thread = Thread(target=self.run)
  45.         self.receiving_thread = Thread(target=self.receive)
  46.         self.keep_alive_thread = Thread(target=self.client_keep_alive)
  47.         self.connect_packet = ConnectPacket(self.keep_alive, self.client_name)
  48.         self.connect()
  49.         self.send(self.connect_packet.pack())
  50.  
  51.     def connect(self):
  52.         self.MQTT_socket.connect((self.IP, self.port))
  53.  
  54.     def send(self, data):
  55.         print(datetime.now().strftime("%H:%M:%S") + "SENT " + str(data))
  56.         self.MQTT_socket.sendall(data)
  57.  
  58.     def receive(self):
  59.         while self.running is True:
  60.             received_data = self.MQTT_socket.recv(4096)
  61.             if received_data:
  62.                 self.last_received_packet = decode_packet_type(received_data[0])
  63.                 print("\n<<" + self.client_name + ">>")
  64.                 print(datetime.now().strftime("%H:%M:%S") + ": RECEIVED " + decode_packet_type(
  65.                     received_data[0]) + " PACKET.")
  66.                 # if server sends us disconnect, we stop the threads
  67.                 if decode_packet_type(received_data[0]) == "DISCONNECT":
  68.                     self.disconnect()
  69.                 print(datetime.now().strftime("%H:%M:%S") + ": RAW DATA: " + str(received_data))
  70.  
  71.     def disconnect(self):
  72.         print("DISCONNECTED.")
  73.         # send disconnect package
  74.         self.send(DisconnectPacket().pack())
  75.         self.receiving_thread.join()
  76.         self.sending_thread.join()
  77.         self.keep_alive_thread.join()
  78.         self.running = False
  79.  
  80.     def run(self):
  81.         pass
  82.  
  83.     # keep alive functionality
  84.     def client_keep_alive(self):
  85.         while self.running:
  86.             sleep(self.keep_alive)
  87.             self.send(PingreqPacket().pack())
  88.  
  89.  
  90. class MQTTPublisher(MQTTClient):
  91.     def __init__(self, IP, port, client_name, keep_alive, qos):
  92.         global g_generated_client_id
  93.         super().__init__(IP, port, client_name, keep_alive)
  94.         self.qos = qos
  95.         self.generated_client_id = randint(1, 1 << 16 - 1)
  96.         g_generated_client_id = self.generated_client_id
  97.         # to be replaced in the future
  98.         self.publish_packet = PublishPacket("/OS", "CPU", self.generated_client_id, self.qos)
  99.         self.sending_thread.start()
  100.         self.receiving_thread.start()
  101.         self.keep_alive_thread.start()
  102.  
  103.     def run(self):
  104.         if self.qos == 1 or self.qos == 0:
  105.             while self.running:
  106.                 self.send(self.publish_packet.pack())
  107.                 sleep(2)
  108.         if self.qos == 2:
  109.             while self.running:
  110.                 if self.last_received_packet == "CONNACK" or self.last_received_packet == "PUBCOMP":
  111.                     self.send(self.publish_packet.pack())
  112.                 elif self.last_received_packet == "PUBREC":
  113.                     self.send(PubrelPacket(self.generated_client_id).pack())
  114.                 sleep(2)
  115.  
  116.  
  117. class MQTTSubscriber(MQTTClient):
  118.     def __init__(self, IP, port, client_name, topics, keep_alive, qos_level):
  119.         super().__init__(IP, port, client_name, keep_alive)
  120.         QoS = []
  121.         self.qos_level = qos_level
  122.  
  123.         # to be modified
  124.         for topic in topics:
  125.             QoS.append(qos_level)
  126.  
  127.         # must introduce a way to monitor how many clients are active
  128.         # in order to give the client a pertinent ID.
  129.         # env variable??
  130.         self.subscribe_packet = SubscribePacket(topics, QoS, randint(1, 1 << 16 - 1))
  131.         self.send(self.subscribe_packet.pack())
  132.         self.sending_thread.start()
  133.         self.receiving_thread.start()
  134.         self.keep_alive_thread.start()
  135.  
  136.     def run(self):
  137.         if self.qos_level == 1:
  138.             while self.running:
  139.                 if self.last_received_packet == "PUBLISH":
  140.                     self.send(PubackPacket(randint(1, 1 << 16 - 1)).pack())
  141.                     self.last_received_packet = ""
  142.         elif self.qos_level == 2:
  143.             while self.running:
  144.                 if self.last_received_packet == "PUBLISH":
  145.                     self.send(PubrecPacket(g_generated_client_id).pack())
  146.                     self.last_received_packet = ""
  147.                 elif self.last_received_packet == "PUBREL":
  148.                     self.send(PubcompPacket(g_generated_client_id).pack())
  149.                     self.last_received_packet = ""
  150.  
  151.  
  152. # right now the output is mixed on the terminal, will be resolved when we get to the GUI phase
  153. publisher = MQTTPublisher(IP_, PORT, "publisher", 5, 2)
  154. subscriber = MQTTSubscriber(IP_, PORT, "subscriber", ["/OS"], 5, 2)
  155.  
  156. # cb's -> PyDispatcher
  157. # test wildcards ( $, +)
  158.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement