Advertisement
rr1024

Qt5 Serial Port Manager

Jun 1st, 2025
27
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.05 KB | Software | 0 0
  1. import serial
  2. from PyQt5.QtCore import QThread, pyqtSignal
  3. import time
  4.  
  5.  
  6.  
  7. class serial_port_mgr(QThread):
  8.     """Manages connections to multiple serial ports in separate threads."""
  9.  
  10.     data_received = pyqtSignal(str, str) # Signal for incoming data (port, data)
  11.     connection_status = pyqtSignal(str, bool) # Signal for connection changes (port, status)
  12.  
  13.     def __init__(self, port_settings, parent=None):
  14.         super().__init__(parent)
  15.         self.port_settings = port_settings  # Dict of {port: {settings}}
  16.         self.serial_ports = {} # Dict to hold Serial objects {port: Serial}
  17.         self._is_running = False
  18.  
  19.     def run(self):
  20.         """Starts the threads for each port and monitoring."""
  21.         self._is_running = True
  22.         for port, settings in self.port_settings.items():
  23.             try:
  24.                 self.connect_serial_port(port, settings)
  25.                 self.connection_status.emit(port, True)
  26.             except serial.SerialException as e:
  27.                 print(f"Error connecting to port {port}: {e}")
  28.                 self.connection_status.emit(port, False)
  29.  
  30.         while self._is_running:
  31.             for port, ser in self.serial_ports.items():
  32.                 if ser.isOpen():
  33.                     try:
  34.                         if ser.in_waiting > 0:
  35.                           data = ser.readline().decode('utf-8', errors='ignore').strip()
  36.                           self.data_received.emit(port, data)
  37.                     except serial.SerialException as e:
  38.                         print(f"Error reading from port {port}: {e}")
  39.                         self.disconnect_serial_port(port)
  40.                         self.connection_status.emit(port, False)
  41.                        
  42.             time.sleep(0.01)
  43.  
  44.     def connect_serial_port(self, port, settings):
  45.          """Connects to a single serial port."""
  46.          if port not in self.serial_ports or not self.serial_ports[port].isOpen():
  47.             try:
  48.                 ser = serial.Serial(port=port, **settings) # Use settings dict
  49.                 self.serial_ports[port] = ser
  50.                 print(f"Connected to port {port}")
  51.             except serial.SerialException as e:
  52.                 print(f"Failed to connect to {port}: {e}")
  53.                 raise
  54.  
  55.  
  56.     def disconnect_serial_port(self, port):
  57.          """Disconnects from a single serial port."""
  58.          if port in self.serial_ports and self.serial_ports[port].isOpen():
  59.             try:
  60.                 self.serial_ports[port].close()
  61.                 print(f"Disconnected from port {port}")
  62.             except serial.SerialException as e:
  63.                 print(f"Error disconnecting from port {port}: {e}")
  64.             finally:
  65.                 del self.serial_ports[port]
  66.  
  67.  
  68.     def stop(self):
  69.         """Stops the threads and disconnects from all ports."""
  70.         self._is_running = False
  71.         for port in list(self.serial_ports.keys()): # Iterate over a copy
  72.             self.disconnect_serial_port(port)
  73.         print("Serial port manager stopped.")
  74.         self.quit()
  75.         self.wait()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement