Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import os
- import platform
- import serial
- import ble_communication as ble
- import math
- import time
- import datetime
- import gi
- gi.require_version('Gtk', '3.0')
- from gi.repository import Gtk
- from gi.repository import GObject
- continuousModeFlag = 0
- class TempyUI:
- def __init__(self):
- self.gladefile = "tempy.glade"
- self.builder = Gtk.Builder()
- self.builder.add_from_file(self.gladefile)
- self.builder.connect_signals(self)
- self.selected_port = ""
- self.connected = False
- self.progress = 0.0
- self.counter = 300000
- self.current_mode = 0
- self.window = self.builder.get_object("window1")
- self.window.show()
- self.port_selection = self.builder.get_object("port_select")
- self.connect_btn = self.builder.get_object("connect_button")
- self.single_mode_btn = self.builder.get_object("single_mode_btn")
- self.continuous_mode_btn = self.builder.get_object("continuous_mode_btn")
- self.connection_status = self.builder.get_object("connection_status")
- self.mac_address = self.builder.get_object("mac_address")
- self.temperature_1 = self.builder.get_object("temperature_1")
- self.temperature_2 = self.builder.get_object("temperature_2")
- self.battery_level = self.builder.get_object("battery_level")
- self.operation_mode = self.builder.get_object("op_mode")
- self.calibration_data = self.builder.get_object("calib_data")
- self.port_list = self.builder.get_object("port_selection")
- self.read_temperature_btn = self.builder.get_object("read_temperature_btn")
- self.read_calibration_btn = self.builder.get_object("read_calibration_btn")
- self.read_opmode_btn = self.builder.get_object("read_opmode_btn")
- self.progress_bar = self.builder.get_object("progressbar")
- self.status_bar = self.builder.get_object("statusbar")
- self.label_modo = self.builder.get_object("label_modo")
- self.list_serial_ports()
- self.disable_all_buttons()
- self.status_bar.push(0, "Disconnected")
- self.ble_manager = ble.BLEManager(self.ble_callback)
- def check_ble_activity(self):
- GObject.timeout_add(10, self.check_ble_activity)
- self.ble_manager.check_activity()
- def on_window1_destroy(self, object, data=None):
- if self.ble_manager.is_connected():
- self.ble_manager.disconnect()
- Gtk.main_quit()
- def on_connect_button_toggled(self, data=None):
- if self.connect_btn.get_active():
- self.connect_btn.set_label("Disconnect")
- self.status_bar.push(0, "Open serial port...")
- self.connect_serial_port(self.selected_port)
- self.status_bar.push(0, "Serial port ok!")
- self.status_bar.push(0, "Scanning ble devices...")
- self.ble_manager.scan_and_connect()
- GObject.timeout_add(10, self.check_ble_activity)
- else:
- if self.ble_manager.is_connected():
- self.ble_manager.set_operation_mode([0])
- self.ble_manager.disconnect()
- self.single_mode_btn.set_active(True)
- self.connect_btn.set_label("Connect")
- self.status_bar.push(0, "Disconnected")
- self.clear_values()
- self.disable_all_buttons()
- def on_port_select_changed(self, widget, data=None):
- index = widget.get_active()
- model = widget.get_model()
- item = model[index][1]
- self.selected_port = item
- def on_single_mode_btn_toggled(self, data=None):
- global continuousModeFlag
- if self.single_mode_btn.get_active():
- self.ble_manager.set_operation_mode([0])
- continuousModeFlag = 0
- def on_continuous_mode_btn_toggled(self, data=None):
- global continuousModeFlag
- if self.continuous_mode_btn.get_active():
- self.ble_manager.set_operation_mode([1])
- self.get_periodic_measurements()
- continuousModeFlag = 1
- def on_read_temperature_btn_clicked(self, data=None):
- self.ble_manager.request_tempy_data()
- def on_read_calibration_btn_clicked(self, data=None):
- self.ble_manager.request_calibration_data()
- def on_read_opmode_btn_clicked(self, data=None):
- self.ble_manager.request_operation_mode()
- def disable_all_buttons(self):
- self.read_temperature_btn.set_sensitive(False)
- self.read_calibration_btn.set_sensitive(False)
- self.read_opmode_btn.set_sensitive(False)
- self.single_mode_btn.set_sensitive(False)
- self.continuous_mode_btn.set_sensitive(False)
- def enable_all_buttons(self):
- self.read_temperature_btn.set_sensitive(True)
- self.read_calibration_btn.set_sensitive(True)
- self.read_opmode_btn.set_sensitive(True)
- self.single_mode_btn.set_sensitive(True)
- self.continuous_mode_btn.set_sensitive(True)
- def list_serial_ports(self):
- self.port_list = Gtk.ListStore(int, str)
- if os.name == 'nt': # windows
- ports = ['COM%s' % (i + 1) for i in range(256)]
- result = []
- index = 0
- for port in ports:
- try:
- s = serial.Serial(port)
- s.close()
- result.append(port)
- self.port_list.append([index, result[index]])
- index += 1
- except (OSError, serial.SerialException):
- pass
- elif platform.system() == "Darwin": # MAC
- self.port_list.append([0, "/dev/tty.usbmodem1"])
- else: # Linux
- self.port_list.append([0, "/dev/ttyACM0"])
- self.port_selection.set_model(self.port_list)
- cell = Gtk.CellRendererText()
- self.port_selection.pack_start(cell, True)
- self.port_selection.add_attribute(cell, 'text', 1)
- self.port_selection.set_active(0)
- def ble_callback(self, event, data):
- global continuousModeFlag
- if event == ble.EVENT_SCAN_RESPONSE:
- self.status_bar.push(0, "Found BLE device")
- elif event == ble.EVENT_CONNECTION_STATUS:
- self.enable_all_buttons()
- self.status_bar.push(0, "Connected to %s" % ':'.join(['%02X' % b for b in data[::-1]]))
- self.mac_address.set_text("%s" % ':'.join(['%02X' % b for b in data[::-1]]))
- file = open("temperatures.txt", "w")
- file.write("Temperature 1: ")
- file.write("Temperature 2: ")
- file.write("Horario: ")
- file.write("\n")
- file.close()
- elif event == ble.EVENT_GROUP_FOUND:
- self.status_bar.push(0, "Group found")
- elif event == ble.EVENT_CHARACTERISTIC_FOUND:
- self.status_bar.push(0, "Characteristic found %s" % data)
- elif event == ble.EVENT_SERVICE_FOUND:
- self.status_bar.push(0, "Service found %s" % data)
- elif event == ble.EVENT_TEMPY_DATA:
- temperature_1 = data[0] | (data[1] << 8)
- if temperature_1 == 0:
- temperature_1 = 1
- temperature_2 = data[2] | (data[3] << 8)
- if temperature_2 == 0:
- temperature_1 = 1
- print (data)
- print (data[6] | (data[7] << 8))
- if ( data[6] | (data[7] << 8) == 0):
- self.label_modo.set_text("Normal")
- if ( data[6] | (data[7] << 8) == 1):
- self.label_modo.set_text("Prediction Mode")
- if ( data[6] | (data[7] << 8) == 2):
- self.label_modo.set_text("Chute Mode")
- print (temperature_1)
- print (temperature_2)
- now = datetime.datetime.now()
- datetime.time(now.hour, now.minute, now.second)
- if continuousModeFlag == 1:
- file = open("temperxpgreatures.txt", "a")
- file.write(str("%.1f" % self.calculate_temperature(temperature_1)))
- file.write(" ")
- file.write(str("%.1f" % self.calculate_temperature(temperature_2)))
- file.write(" ")
- file.write(str(datetime.time(now.hour, now.minute, now.second)))
- file.write("\n")
- file.close()
- battery = data[4] | (data[5] << 8)
- self.temperature_1.set_text("%.1f" % self.calculate_temperature(temperature_1))
- self.temperature_2.set_text("%.1f" % self.calculate_temperature(temperature_2))
- self.battery_level.set_text("%d%%" % battery)
- self.ble_manager.request_ack()
- elif event == ble.EVENT_CALIBRATION_DATA:
- self.status_bar.push(0, "Tempy calibration received")
- self.calibration_data.set_text("%s" % data)
- elif event == ble.EVENT_OPMODE_DATA:
- self.status_bar.push(0, "Tempy operation mode received")
- if data[0] == 0:
- self.operation_mode.set_text("Single mode")
- self.single_mode_btn.set_active(True)
- elif data[0]==1:
- self.operation_mode.set_text("Cont. mode")
- self.continuous_mode_btn.set_active(True)
- self.get_periodic_measurements()
- self.current_mode = data[0]
- elif event == ble.EVENT_LOST_CONNECTION:
- if self.ble_manager.operation_mode == [1]: # continuous mode
- self.try_to_reconnect()
- else:
- self.connect_btn.set_active(False)
- elif event == ble.EVENT_TIMED_OUT:
- self.status_bar.push(0, "connection timed out")
- def connect_serial_port(self, port_selected):
- try:
- ser = serial.Serial(port=port_selected, baudrate=115200, timeout=1, writeTimeout=1)
- except serial.SerialException as e:
- self.status_bar.push(0, "Serial port error")
- return
- ser.flushInput()
- ser.flushOutput()
- self.ble_manager.initialize(ser)
- def try_to_reconnect(self):
- if not self.ble_manager.is_connected():
- self.status_bar.push(0, "Trying to reconnect...")
- GObject.timeout_add(1000, self.try_to_reconnect)
- self.ble_manager.reconnect()
- def get_periodic_measurements(self):
- if self.ble_manager.is_connected():
- if self.ble_manager.operation_mode == [1]: # continuous mode
- if self.counter < float(30):
- self.counter += 1
- self.progress += 1/(float(30))
- else:
- self.counter = 0
- self.progress = 0.0
- self.progress_bar.set_fraction(self.progress)
- GObject.timeout_add(1000, self.get_periodic_measurements)
- def calculate_temperature(self, adc_value):
- adc_reference = 1.24
- adc_resolution = 2048
- r_25c = 10000.0
- beta = 3950.0
- circuit_reference = 2.048
- adc_voltage = (adc_reference / adc_resolution) * adc_value
- rth = (adc_voltage * 10000) / (circuit_reference - adc_voltage)
- rinf = r_25c * math.exp((-beta / (25 + 273.15)))
- temperature = (beta / math.log(abs((rth / rinf)))) - 273.15
- return temperature
- def clear_values(self):
- self.operation_mode.set_text("- - -")
- self.temperature_1.set_text("- - -")
- self.temperature_2.set_text("- - -")
- self.battery_level.set_text("- - -")
- self.calibration_data.set_text("- - -")
- self.mac_address.set_text("- - : - - : - - : - - : - - : - - ")
- if __name__ == "__main__":
- mainUI = TempyUI()
- Gtk.main()
Advertisement
Add Comment
Please, Sign In to add comment