Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import threading
- import socket
- import pyk8055_nv
- import time
- import hashlib
- ############################
- ###Variables à configurer###
- ############################
- Adresse_cart = 3 #Voir cavalier carte
- UDP_PORT = 26010 #Port de communication commum
- PW = "Password" #MDP Commun
- timer_in = 0
- marge_ana_in = 1
- ############################
- ####Fonctions autorisés#####
- ############################
- autorise_func = [
- "analog_clear",
- "analog_all_clear",
- "analog_out",
- "analog_all_out",
- "digital_write",
- "digital_off",
- "digital_all_off",
- "digital_on",
- "digital_all_on",
- "counter_reset",
- "counter_read",
- "counter_set_debounce"
- ]
- ############################
- #####Def. des fonctions#####
- ############################
- def presque_egal(l1,l2,eps):
- for i in range(len(l1)):
- if abs(l1[i] - l2[i]) > eps:
- return False
- return True
- def dat_heure():
- return time.strftime('%d/%m/%y %H:%M:%S',time.localtime())
- def enter_k8055_result(timer):
- back_digi_result = []
- back_ana_result = [0, 0]
- while a.isAlive() is True:
- time.sleep(timer)
- compteur = 1
- digi_result = []
- ana_result = []
- while (compteur < 6):
- digi_result.append(dev.digital_in(compteur))
- compteur += 1
- ana_result.append(dev.analog_in(1))
- ana_result.append(dev.analog_in(2))
- if (add_eme != "None" and (not presque_egal(ana_result,back_ana_result,marge_ana_in) or (back_digi_result != digi_result) )):
- sock_renvoie.sendto(hashlib.sha256(PW).hexdigest() +"|-|L|-|" + str(digi_result) +"|-|"+ str(ana_result), (add_eme, UDP_PORT))
- print dat_heure(), "(" + add_eme + ")" + " Données envoyés: "+str(digi_result)+str(ana_result)
- back_digi_result = digi_result
- back_ana_result = ana_result
- def out_k8055_result():
- while b.isAlive() is True:
- data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
- data_traitement = data.split('|-|')
- PW_Server = data.split('|-|')[0]
- if ((hashlib.sha256(PW).hexdigest() == PW_Server)):
- if data_traitement[1] == "end":
- end()
- elif len(data_traitement) == 2:
- data_result = "dev."+data_traitement[1]+"()"
- elif len(data_traitement) == 3:
- data_result = "dev."+data_traitement[1]+"("+data_traitement[2]+")"
- elif len(data_traitement) == 4:
- data_result = "dev."+data_traitement[1]+"("+data_traitement[2]+","+data_traitement[2]+")"
- print dat_heure(), "(" + addr[0] + ")" + " Données recu: data_result"
- try:
- if data_traitement[1] in autorise_func:
- exec(data_result)
- sock_renvoie.sendto(hashlib.sha256(PW).hexdigest()+"|-|Commande exécuté", (addr[0], UDP_PORT))
- print dat_heure(), "(" + addr[0] + ")" + " Données envoyés: Commande exécuté"
- global add_eme
- add_eme = addr[0]
- except:
- sock_renvoie.sendto(hashlib.sha256(PW).hexdigest()+"|-|Commande échoué !", (addr[0], UDP_PORT))
- print dat_heure(), "(" + addr[0] + ")" + " Données envoyés: Commande échoué !"
- else:
- sock_renvoie.sendto(PW_Server+"|-| Mot de passe incorrect", (addr[0], UDP_PORT))
- print dat_heure(), "(" + addr[0] + ")" + " Données envoyés: Mot de passe incorrect"
- def end():
- dev.disconnect()
- print dat_heure(),"Déconnexion carte"
- sock.close()
- print dat_heure(),"Déconnexion du socket"
- a._Thread__stop()
- print dat_heure(),"Arrêt du thread d'envoie des informations d'entrées"
- b._Thread__stop()
- print dat_heure(),"Arrêt du thread de réception"
- add_eme = "None"
- dev = pyk8055_nv.device(Adresse_cart)
- UDP_IP = socket.gethostbyname(socket.gethostname())
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock_renvoie = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.bind((UDP_IP, UDP_PORT))
- a = threading.Thread(None,enter_k8055_result,None,(timer_in,))
- b = threading.Thread(None,out_k8055_result)
- print dat_heure(),"Lancement du thread d'envoie des entrées"
- a.start()
- print dat_heure(),"Lancement du thread de reception des commandes"
- b.start()
- print dat_heure(),"Démmarage effectué"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement