Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import tkinter as tk
- from tkinter import *
- from tkinter import ttk
- import socket
- import threading
- from threading import *
- import json # Import json module
- import logging # Import logging module
- import random
- import time
- logging.basicConfig(level=logging.DEBUG, format='[%(threadName)-10s] %(message)s',)
- class MyLabel(tk.Label):
- def __init__(self, root, func, **kwargs):
- tk.Label.__init__(self, root, **kwargs)
- self.func = func
- self.update()
- def update(self):
- self.configure(text=str(self.func()))
- self.after(500, self.update)
- # To be implemented as an interface
- class MyFrame(ttk.Frame):
- def __init__(self, root):
- super().__init__(root)
- def read(self):
- pass
- def write(self):
- pass
- def update(self):
- pass
- class MyInfoFrame(MyFrame):
- def __init__(self, root, func):
- super().__init__(root)
- dictionary = func()
- self.func = func
- self.values = {}
- row = 0
- ###############
- print('**************')
- print(dictionary)
- ##############
- try:
- for sensor in dictionary['sensor']:
- tk.Label(self, text = sensor['name']).grid(row=row, column=0)
- self.values[sensor['name']] = tk.Label(self, text = sensor['value'], relief=SUNKEN, width=5)
- self.values[sensor['name']].grid(row=row, column=1)
- tk.Label(self, text = sensor['unit']).grid(row=row, column=2)
- row = row + 1
- for handler in dictionary['handler']:
- tk.Label(self, text = handler['name']).grid(row=row, column=0)
- self.values[handler['name']] = tk.Label(self, text = handler['status'], relief=SUNKEN, width=5)
- self.values[handler['name']].grid(row=row,column=1)
- row = row + 1
- except:
- print('MyInfoFrame.__init__ reading error...')
- pass
- self.update()
- def update(self):
- dictionary = self.func()
- try:
- #################
- print('n **************')
- print(dictionary['sensor'])
- print('**************n')
- #################
- for sensor in dictionary['sensor']:
- self.values[sensor['name']].configure(text=sensor['value'])
- for handler in dictionary['handler']:
- self.values[handler['name']].configure(text=handler['status'])
- except:
- #print('MyInfoFrame.update reading error...')
- logging.error('MyInfoFrame.update reading error...')
- pass
- self.after(500, self.update)
- class MySettingsFrame(MyFrame):
- def __init__(self, root, **kwargs):
- super().__init__(root)
- self.systemOn = False
- self.getOutboundMessage = kwargs['func_2']
- self.outboundMessage = self.getOutboundMessage()
- tk.Label(self, text='Encendido').grid(row=0,column=0)
- ttk.Radiobutton(self, text='ON', value='ON', command=self.setSystemOn).grid(row=0,column=1)
- offButton = ttk.Radiobutton(self, text='OFF', value='OFF', command=self.setSystemOff)
- offButton.grid(row=0,column=2)
- offButton.invoke()
- self.scaleValue = 15.0
- self.scale = ttk.Scale(self, orient=HORIZONTAL, length=200, from_=self.scaleValue, to=35.0, command=self.setScaleValue)
- self.scale.set(self.scaleValue)
- self.scale.grid(row=2,column=1)
- tk.Label(self, text='Temp. Consigna').grid(row=1,column=0)
- MyLabel(self, func=self.getScaleValue, relief=SUNKEN, width=5).grid(row=1,column=1)
- tk.Label(self, text='ºC').grid(row=1,column=2)
- self.update()
- def setSystemOn(self):
- self.systemOn = True
- def setSystemOff(self):
- self.systemOn = False
- def setScaleValue(self, *args):
- self.scaleValue = round(self.scale.get(), 1)
- def getScaleValue(self):
- return self.scaleValue
- def write(self):
- self.outboundMessage['timestamp'] = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime())
- self.outboundMessage['origin'] = 'front-end'
- self.outboundMessage['action'] = 'set'
- self.outboundMessage['setup'] = self.scaleValue
- if self.systemOn:
- self.outboundMessage['system'] = 'ON'
- else:
- self.outboundMessage['system'] = 'OFF'
- def update(self):
- self.write()
- self.after(500, self.update)
- class MyAdvancedFrame(MyFrame):
- def __init__(self, root, **kwargs):
- super().__init__(root)
- self.tempIntSimulated = False
- self.tempExtSimulated = False
- self.getInboundMessage = kwargs['func_1']
- self.getOutboundMessage = kwargs['func_2']
- self.tempInt = self.getInboundMessage()['sensor'][0]['value']
- self.tempExt = self.getInboundMessage()['sensor'][1]['value']
- self.bridgeVar = StringVar()
- tk.Label(self, text='Marcha/Paro').grid(row=0,column=0)
- self.bridge_button_on = ttk.Radiobutton(self, text='ON', value='ON', variable=self.bridgeVar, command=self.setBridge)
- self.bridge_button_on.grid(row=0,column=1)
- self.bridge_button_off = ttk.Radiobutton(self, text='OFF', value='OFF', variable=self.bridgeVar, command=self.setBridge)
- self.bridge_button_off.grid(row=0,column=2)
- self.bridge_button_auto = ttk.Radiobutton(self, text='AUTO', value='AUTO', variable=self.bridgeVar, command=self.setBridge)
- self.bridge_button_auto.grid(row=0,column=3)
- self.bridge_button_auto.invoke()
- tk.Label(self, text='Temp. Interior').grid(row=1,column=0)
- self.t_int_label = MyLabel(self, func=self.getTempInt, relief=SUNKEN, width=5)
- self.t_int_label.grid(row=1,column=1)
- self.tempInt = StringVar()
- self.temp_int_entry = ttk.Entry(self, textvariable=self.tempInt, width=5)
- self.temp_int_entry.grid(row=1,column=2)
- #temp_int_entry.config(state='disabled')
- self.t_int_checkbutton = tk.Checkbutton(self, text='Simulado', command=self.setTempIntSimulated)
- self.t_int_checkbutton.grid(row=1,column=3)
- tk.Label(self, text='Temp. Exterior').grid(row=2,column=0)
- MyLabel(self, func=self.getTempExt, relief=SUNKEN, width=5).grid(row=2,column=1)
- self.tempExt = StringVar()
- temp_ext_entry = ttk.Entry(self, textvariable=self.tempExt, width=5)
- temp_ext_entry.grid(row=2,column=2)
- self.t_ext_checkbutton = tk.Checkbutton(self, text='Simulado', command=self.setTempExtSimulated)
- self.t_ext_checkbutton.grid(row=2,column=3)
- self.update()
- def setBridge(self):
- outboundMessage = self.getOutboundMessage()
- outboundMessage['forced']['bridge'] = self.bridgeVar.get()
- def setTempIntSimulated(self):
- if self.tempIntSimulated:
- self.tempIntSimulated = False
- self.temp_int_entry.config(state='normal')
- #self.temp_int_entry.delete(0, END)
- outboundMessage = self.getOutboundMessage()
- outboundMessage['forced']['int'] = ''
- else:
- self.tempIntSimulated = True
- outboundMessage = self.getOutboundMessage()
- outboundMessage['forced']['int'] = self.tempInt.get()
- self.temp_int_entry.config(state='disabled')
- def setTempExtSimulated(self):
- if self.tempExtSimulated:
- self.tempExtSimulated = False
- else:
- self.tempExtSimulated = True
- outboundMessage = self.getOutboundMessage()
- outboundMessage['forced']['ext'] = self.tempExt.get()
- def update(self):
- self.readTempInt()
- self.readTempExt()
- self.after(500, self.update)
- def readTempInt(self):
- inboundMessage = self.getInboundMessage()
- try:
- self.tempInt = inboundMessage['sensor'][0]['value']
- except:
- pass
- def readTempExt(self):
- inboundMessage = self.getInboundMessage()
- try:
- self.tempExt = inboundMessage['sensor'][1]['value']
- except:
- pass
- def getTempInt(self):
- return self.tempInt
- def getTempExt(self):
- return self.tempExt
- class Application(ttk.Frame): #, socket.socket):
- def __init__(self, main_window, targetServer, bufferSizeMsg):
- ttk.Frame.__init__(self, main_window) # Iniciando clase como Frame
- #socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM) # Iniciando clase como socket
- self.socket = None
- # Parametros de comunicacion con el servidor
- self.targetServer = targetServer
- self.bufferSizeMsg = bufferSizeMsg
- main_window.title("GRUPO A3I - Panel de control")
- # Mensaje entrante desde el backend
- self.inboundMessage = {}
- # Mensage saliente. Settea consignas y valores forzados. Se envía al backend
- self.outboundMessage = {'origin':'front-end', 'action':'none', 'forced':{'bridge':'AUTO','int':'','ext':''}}
- # Threading la comunicación como cliente con el backend (servidor)
- '''
- self.thread = threading.Thread(target=self.communicate)
- self.thread.daemon = True
- self.thread.start()
- '''
- self.createCommThread()
- while self.inboundMessage == {}:
- pass
- #time.sleep(1)
- ############
- #self.thread = threading.Thread(target=self.communicate)
- #self.thread.daemon = False #True
- #self.thread.start()
- #self.thread.join()
- #############
- # Crear el panel de pestañas.
- self.notebook = ttk.Notebook(self)
- # Crear el contenido de cada una de las pestañas.
- self.frame_info = MyInfoFrame(self, func=self.getInboundMessage)
- self.frame_settings = MySettingsFrame(self, func_2=self.getOutboundMessage)
- self.frame_advanced = MyAdvancedFrame(self, func_1=self.getInboundMessage, func_2=self.getOutboundMessage)
- # Añadirlas al panel con su respectivo texto.
- self.notebook.add(self.frame_info, text="Info", padding=20)
- self.notebook.add(self.frame_settings, text="Settings", padding=20)
- self.notebook.add(self.frame_advanced, text="Avanzado", padding=20)
- # Margenes
- self.notebook.pack(padx=10, pady=10)
- self.pack()
- def __del__(self):
- if self.socket != None:
- self.socket.close()
- def getInboundMessage(self):
- return self.inboundMessage
- def getOutboundMessage(self):
- return self.outboundMessage
- def setOutboundMessage(self, outboundMessage):
- self.outboundMessage = outboundMessage
- def createCommThread(self):
- self.thread = threading.Thread(target=self.communicate)
- self.thread.daemon = False #True
- self.thread.start()
- #self.thread.join()
- def communicate(self):
- try:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- logging.debug('Connecting to %s', self.targetServer)
- self.socket.connect(self.targetServer)
- logging.debug('Sending message %s', self.outboundMessage) self.socket.sendall(json.dumps(self.outboundMessage).encode('utf-8'))
- inboundMessage = self.socket.recv(self.bufferSizeMsg)
- logging.debug('Inbound Message %s', inboundMessage)
- self.inboundMessage = json.loads(inboundMessage)
- #self.close()
- except socket.timeout as errorTimeoutMsg:
- time.sleep(2)
- print(errorTimeoutMsg)
- #input("Socket Timeout...")
- except socket.gaierror as errorGaiMsg:
- time.sleep(2)
- print(errorGaiMsg)
- #input("Socket Gaierror")
- except socket.error as errorMsg:
- time.sleep(2)
- print(errorMsg)
- #input("Socket Error...")
- finally:
- self.socket.close()
- #self.after(2000, self.communicate)
- self.after(500, self.createCommThread)
- #self.after(2000, self.communicate)
- main_window = tk.Tk()
- targetServer = ('raspberrypi.local', 5005)
- bufferSizeMsg = 2048
- app = Application(main_window, targetServer, bufferSizeMsg)
- app.mainloop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement