Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import queue
- import sys
- import threading
- import mysql.connector
- from time import sleep
- import serial
- from PyQt5 import QtWidgets
- from PyQt5 import uic
- from PyQt5.QtWidgets import QMessageBox
- from datetime import datetime
- port = '/dev/pts/5' # define the com port name (to be added in UI)
- baud = 9600 # define baud rate
- connected = False # standard connected flag as global
- hb_retry=10
- Serial_Com = serial.Serial(port, baud, timeout=1) # define serial communication object
- WorkQ = queue.Queue(maxsize=100) # define communication queue
- mydb = mysql.connector.connect(host="localhost",user="root",passwd="hellroot007")
- print (mydb)
- class Ui(QtWidgets.QMainWindow): # UI class
- def __init__(self):
- super(Ui, self).__init__()
- uic.loadUi('tty_content.ui', self) # Load UI file
- self.statusBar().showMessage('Message in statusbar.')
- self.show()
- # starting thread for reading from serial connection
- thread_read_from_com = threading.Thread(target=self.read_from_com, args=(
- Serial_Com,)) # start the thread for getting the payload from COM conn
- thread_read_from_com.daemon = True
- thread_read_from_com.start()
- # starting thread for handling incoming data
- thread_read_q_data = threading.Thread(target=self.read_q_data,
- args=()) # start the thread to get the actual data from the comm queue and present it in the UI
- thread_read_q_data.daemon = True
- thread_read_q_data.start()
- self.show_dialog.clicked.connect(self.clickDialog)
- self.show_values.clicked.connect(self.clickValues)
- def clickDialog(self):
- QMessageBox.about(self, "Title", "Message")
- def clickValues(self):
- QMessageBox.critical(self, "Testing Values",
- "Current values: " + str(self.adc_bar_1.value()) + " & " + str(self.adc_bar_2.value()))
- def handle_rcvd_data(self,
- data): # define function to be called after data is received / payload is added to the queue
- if data == '': # if payload is empty do nothing
- pass
- else:
- print(data) # print received data for debug purpose
- WorkQ.put(data) # put payload in the queue
- def read_from_com(self, ser): # define the function that will get the data from com connection on a separate thread
- global connected
- while not connected:
- connected = True
- while True:
- read_line = ser.readline().strip()
- value = read_line.decode('utf-8')
- if value == 'exit':
- os._exit(0)
- else:
- self.handle_rcvd_data(value) # call the function to handle the incoming data
- # sleep(0.5) #sleep 1 second (you don't wanna keep the CPU to 100%)
- def read_q_data(self): # define the function that would handle the payload and present it in the UI
- global hb_retry
- while True:
- if WorkQ.empty():
- hb_retry = hb_retry+1
- if hb_retry<10:
- self.status_label.setStyleSheet("QLabel {background-color:yellow;color:black;border-radius: 10px}")
- self.status_label.setText("WAIT")
- print (hb_retry)
- if hb_retry==10:
- self.status_label.setStyleSheet("QLabel {background-color:red;color:black;border-radius: 10px}")
- self.status_label.setText("HB NOK")
- timestampStr = datetime.now().strftime("%d-%b-%Y (%H:%M:%S.%f)")
- self.textEdit.append("Error reading value @ " + timestampStr)
- print(timestampStr)
- print(hb_retry)
- hb_retry=0
- else:
- Payload = WorkQ.get()
- Values = Payload.split(':')
- # print(Values)
- if Values[0] == 'V':
- # self.textEdit.append(WorkQ.get())
- self.adc_1.display(Values[1])
- self.adc_2.display(Values[2])
- sleep(0.01)
- self.adc_bar_1.setValue(int(Values[1]))
- self.adc_bar_2.setValue(int(Values[2]))
- else:
- self.textEdit.append(Values[0])
- self.status_label.setStyleSheet("QLabel {background-color:green;color:white;border-radius: 10px;}")
- self.status_label.setText("HB OK")
- print(hb_retry)
- hb_retry=0
- sleep(1)
- def Cleanup_App():
- # Ui.thread_read_q_data._exit()
- # thread_read_from_com._exit()
- print("About to quit the app")
- if __name__ == '__main__':
- app = QtWidgets.QApplication(sys.argv)
- window = Ui()
- app.aboutToQuit.connect(Ui.Cleanup_App)
- sys.exit(app.exec_())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement