Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import serial
- from PyQt5.QtCore import QThread, pyqtSignal
- import time
- class serial_port_mgr(QThread):
- """Manages connections to multiple serial ports in separate threads."""
- data_received = pyqtSignal(str, str) # Signal for incoming data (port, data)
- connection_status = pyqtSignal(str, bool) # Signal for connection changes (port, status)
- def __init__(self, port_settings, parent=None):
- super().__init__(parent)
- self.port_settings = port_settings # Dict of {port: {settings}}
- self.serial_ports = {} # Dict to hold Serial objects {port: Serial}
- self._is_running = False
- def run(self):
- """Starts the threads for each port and monitoring."""
- self._is_running = True
- for port, settings in self.port_settings.items():
- try:
- self.connect_serial_port(port, settings)
- self.connection_status.emit(port, True)
- except serial.SerialException as e:
- print(f"Error connecting to port {port}: {e}")
- self.connection_status.emit(port, False)
- while self._is_running:
- for port, ser in self.serial_ports.items():
- if ser.isOpen():
- try:
- if ser.in_waiting > 0:
- data = ser.readline().decode('utf-8', errors='ignore').strip()
- self.data_received.emit(port, data)
- except serial.SerialException as e:
- print(f"Error reading from port {port}: {e}")
- self.disconnect_serial_port(port)
- self.connection_status.emit(port, False)
- time.sleep(0.01)
- def connect_serial_port(self, port, settings):
- """Connects to a single serial port."""
- if port not in self.serial_ports or not self.serial_ports[port].isOpen():
- try:
- ser = serial.Serial(port=port, **settings) # Use settings dict
- self.serial_ports[port] = ser
- print(f"Connected to port {port}")
- except serial.SerialException as e:
- print(f"Failed to connect to {port}: {e}")
- raise
- def disconnect_serial_port(self, port):
- """Disconnects from a single serial port."""
- if port in self.serial_ports and self.serial_ports[port].isOpen():
- try:
- self.serial_ports[port].close()
- print(f"Disconnected from port {port}")
- except serial.SerialException as e:
- print(f"Error disconnecting from port {port}: {e}")
- finally:
- del self.serial_ports[port]
- def stop(self):
- """Stops the threads and disconnects from all ports."""
- self._is_running = False
- for port in list(self.serial_ports.keys()): # Iterate over a copy
- self.disconnect_serial_port(port)
- print("Serial port manager stopped.")
- self.quit()
- self.wait()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement