Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Copyright (c) 2016, Markus Barenhoff and Copyright (c) 2017, Jáchym Hurtík
- #
- # All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are met:
- #
- # * Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- #
- # * Redistributions in binary form must reproduce the above
- # copyright notice, this list of conditions and the following
- # disclaimer in the documentation and/or other materials provided
- # with the distribution.
- #
- # * Neither the name of Markus Barenhoff nor the names of other
- # contributors may be used to endorse or promote products derived
- # from this software without specific prior written permission.
- #
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- import os
- import time
- import ctypes
- import signal
- import sys
- import configparser
- import math
- import serial
- from threading import Thread
- class Konfigurace:
- """
- Objekt Konfigurace.\n
- Ma objekt config - objekt config parseru.\n
- Parametry Fce: Konfigurace.config[<nazev kategorie>][<nazev hodnoty>] vraci hodnotu uvedenou v ini
- """
- def __init__(self):
- """
- Funkce spustena pri vytvoreni objektu Konfigurace.
- Vytvori objekt Konfigurace.config.
- Otevre soubor konfigurace.ini pro cteni konfigurace.
- """
- self.config = configparser.ConfigParser()
- self.config.read(os.path.abspath("konfigurace.ini"))
- '''print(str(self.config))
- for key in self.config:
- print(key)
- print(str(self.config["HLAVNI"]))
- for key in self.config["HLAVNI"]:
- print(key)
- print(str(self.config["POSILEJ"]))
- for key in self.config["POSILEJ"]:
- print(key)
- '''
- class Raildriver:
- """Hlavni objekt aplikace
- """
- def nastavVypis(self,level):
- self.level = level
- def vypis(self,zprava,level,konec=False):
- if level <= self.level:
- levelSlovnik = {1:"INFO: ",2:"DEBUG: "}
- if not konec:
- print(time.strftime("%H:%M:%S ")+levelSlovnik[level]+str(zprava))
- else:
- input(time.strftime("%H:%M:%S ")+levelSlovnik[level]+str(zprava))
- def __init__(self):
- self.init = False
- self.nastavVypis(1)
- self.vypis("Hledám konfigurační soubor v: "+str(os.path.abspath("konfigurace.ini")), 2)
- try:
- self.konf = Konfigurace()
- except Exception:
- input("Konfigurační soubor nebyl nalezen!!!")
- else:
- self.init = True
- self.vypis("Konfigurační soubor nalezen, čtu",2)
- self.start(
- dllPathVar = self.konf.config["HLAVNI"]['CestaDLL'],
- sleepTimeVar = 1/int(self.konf.config["HLAVNI"]['UpdateFrekvence']),
- comPort = self.konf.config["HLAVNI"]['COMport'],
- comBaud = int(self.konf.config["HLAVNI"]['Baudrate']),
- comParita = str.lower(self.konf.config["HLAVNI"]['Parita']),
- debugVar = self.konf.config["HLAVNI"]["Debug"],
- zmeneVar = self.konf.config["HLAVNI"]["PosilejJenZmeny"]
- )
- def str_to_bool(self, s):
- """
- Converts boolean formated as string to bol type;
- Parameters:\n
- string\n
- Returns:\n
- bol; or raise Exception if string is not bol
- """
- if s == 'True':
- return True
- elif s == 'False':
- return False
- else:
- raise ValueError
- def start(self, dllPathVar, sleepTimeVar, comPort, comBaud, comParita, debugVar, zmeneVar):
- """
- Inicializace aplikace. Nastaveni vsech potrebnych veci pred odpovedi hry.
- """
- self.sleepTime = sleepTimeVar
- self.dllPath = dllPathVar
- self.poleSignalek = []
- self.polePrijimanychSignalek = []
- self.pocetSignalek = 0
- self.pocetPrijimanychSignalek = 0
- self.debug = self.str_to_bool(debugVar)
- self.zmene = self.str_to_bool(zmeneVar)
- if self.debug == True:
- self.nastavVypis(2)
- self.vypis("START APLIKACE...",1)
- self.vypis("Načítám posílané názvy kontrolek z konfiguračního souboru.",2)
- #jmena signalek
- for JmenoSignalky in self.konf.config['POSILEJ']:
- self.pocetSignalek += 1
- self.poleSignalek.append(JmenoSignalky)
- #prijate
- for JmenoPrijimaneSignalky in self.konf.config['PRIJIMEJ']:
- self.pocetPrijimanychSignalek += 1
- self.polePrijimanychSignalek.append(JmenoPrijimaneSignalky)
- self.vypis("Načteno %d signálek z konfiguračního souboru."%(self.pocetSignalek),2)
- self.vypis("Načteno %d přijímaných hodnot z konfiguračního souboru."%(self.pocetPrijimanychSignalek),2)
- self.vypis("Přiřazuji hodnoty násobitelů.",2)
- #nasobitele
- self.poleNasobSignalek = []
- self.poleRawNasobitele = []
- for NasobitelSignalky in self.konf.config['NASOB']:
- self.poleNasobSignalek.append(NasobitelSignalky)
- self.poleRawNasobitele.append(self.konf.config['NASOB'][NasobitelSignalky])
- self.poleNasobHodnot = []
- z = 0
- while z < self.pocetSignalek:
- try:
- self.poleNasobHodnot.append(int(self.poleRawNasobitele[self.poleNasobSignalek.index(self.poleSignalek[z])]))
- except:
- self.poleNasobHodnot.append(1)
- self.vypis("Pro hodnotu: %s nebyl uveden násobitel, používám 1!"%(self.poleSignalek[z]),1)
- else:
- self.vypis("Pro hodnotu: %s byl uveden násobitel, používám %d!"%(self.poleSignalek[z],int(self.poleRawNasobitele[self.poleNasobSignalek.index(self.poleSignalek[z])])),2)
- z = z + 1
- #prijate
- self.poleNasobPrijatychSignalek = []
- self.poleRawNasobitelePrijate = []
- for NasobitelPrijateSignalky in self.konf.config['NASOB_PRIJATE']:
- self.poleNasobPrijatychSignalek.append(NasobitelPrijateSignalky)
- self.poleRawNasobitelePrijate.append(self.konf.config['NASOB_PRIJATE'][NasobitelPrijateSignalky])
- self.poleNasobHodnotPrijate = []
- z = 0
- while z < self.pocetPrijimanychSignalek:
- try:
- self.poleNasobHodnotPrijate.append(int(self.poleRawNasobitelePrijate[self.poleNasobPrijatychSignalek.index(self.polePrijimanychSignalek[z])]))
- except:
- self.poleNasobHodnotPrijate.append(1)
- self.vypis("Pro přijímanou hodnotu: %s nebyl uveden násobitel, používám 1!"%(self.polePrijimanychSignalek[z]),1)
- else:
- self.vypis("Pro hodnotu: %s byl uveden násobitel, používám %d!"%(self.polePrijimanychSignalek[z],int(self.poleRawNasobitelePrijate[self.poleNasobPrijatychSignalek.index(self.polePrijimanychSignalek[z])])),2)
- z = z + 1
- self.vypis("Přiřazuji těmto kontrolkám jména ControlValues.",2)
- #prirazovani jmen
- i = 0
- self.NazvyCVSignalek = []
- while i < self.pocetSignalek:
- self.NazvyCVSignalek.append(self.konf.config['POSILEJ'][self.poleSignalek[i]])
- i = i + 1
- self.vypis("Přiřazeno %d názvů CV signálkám."%i,2)
- if self.pocetSignalek == i:
- self.vypis("Počet pojmenovaných signálek je stejný jako přiřazených CV.",2)
- else:
- self.vypis("Počet pojmenovaných signálek není stejný jako přiřazených CV!!!\nKritická chyba při čtení souboru! Zkontroluj soubor!",2)
- i = 0
- self.vypis("Posílané názvy signálek a názvy CV jsou:",2)
- while i < self.pocetSignalek:
- self.vypis(str(self.poleSignalek[i])+"||"+str(self.NazvyCVSignalek[i]),2)
- i = i + 1
- #prijimane
- i = 0
- self.NazvyCVPrijimanychSignalek = []
- while i < self.pocetPrijimanychSignalek:
- self.NazvyCVPrijimanychSignalek.append(self.konf.config['PRIJIMEJ'][self.polePrijimanychSignalek[i]])
- i = i + 1
- self.vypis("Přiřazeno %d názvů CV přijímaným signálkám."%i,2)
- if self.pocetPrijimanychSignalek == i:
- self.vypis("Počet pojmenovaných přijímaných signálek je stejný jako přiřazených CV.",2)
- else:
- self.vypis("Počet pojmenovaných přijímaných signálek není stejný jako přiřazených CV!!!\nKritická chyba při čtení souboru! Zkontroluj soubor!",2)
- i = 0
- self.vypis("Přijímané názvy signálek a názvy CV jsou:",2)
- while i < self.pocetPrijimanychSignalek:
- self.vypis(str(self.polePrijimanychSignalek[i])+"||"+str(self.NazvyCVPrijimanychSignalek[i]),2)
- i = i + 1
- paritaDict = {'suda':'E', 'licha':'O', 'zadna':'N'}
- self.ser = serial.Serial()
- self.ser.port = comPort
- self.ser.baudrate = comBaud
- #self.ser.parity = paritaDict[comParita]
- self.ser.rtscts = False
- self.comOK = False
- try:
- self.ser.open()
- except Exception as e:
- if str(e).find("FileNotFoundError") != -1:
- self.vypis("Nepovedlo se otevřít port %s, protože takový port nebyl v počítači nalezen! Nelze pokračovat!"%(comPort),1,True)
- elif str(e).find("PermissionError") != -1:
- self.vypis("Nepovedlo se otevřít port %s, přístup zamítnut! Nelze pokračovat!"%(comPort),1,True)
- else:
- self.vypis("Nepovedlo se otevřít port %s, z neznámého důvodu! Nelze pokračovat!\nChyba: %s"%(comPort,e),1,True)
- else:
- self.vypis("Port %s úspěšně otevřen."%(self.ser.name),1)
- self.comOK = True
- self.vypis("Soubory načtené v pořádku, čekám na odpověď hry!",1)
- signal.signal(signal.SIGINT, self.signal_handler)
- def signal_handler(self, signal, frame):
- sys.exit()
- self.ser.close()
- def zaokrouhli(self,cislo):
- """
- Parametry:\n
- cislo k zaokrouhleni\n
- Return:\n
- zaokrouhlene cislo"""
- if cislo % 1 >= 0.5:
- return(math.floor(cislo)+1)
- else:
- return(math.floor(cislo))
- def runRaildriver(self):
- """Hlavni vlakno aplikace - procedura; nic nevraci, nema zadne arg"""
- if self.init:
- self.knihovnaOK = False
- try:
- self.api = RaildriverAPI(self.dllPath)
- except OSError:
- self.vypis("Nebyla nalezena potřebná knihovna!!!",1,True)
- else:
- self.knihovnaOK = True
- self.valueSignalek = []
- self.valueSignalekStare = []
- a = 0
- while a < self.pocetSignalek:
- a = a + 1
- self.valueSignalek.append(0)
- if self.zmene:
- self.valueSignalekStare.append(0)
- if (self.knihovnaOK):
- self.api.SetRailSimConnected(True)
- self.api.SetRailDriverConnected(True)
- self.vypis("Načtená knihovna: %s" % self.api.rdDll,2)
- self.idSignalek = []
- i = 0
- while self.api.GetLocoName() == None:
- self.vypis("Čekám na nějakou mašinku",2)
- time.sleep(1)
- self.poleZeHry = self.api.GetControllerList()
- z = 0
- self.CVok = True
- self.vypis("Přiřazuji ID k názvům CV.",2)
- #prirazeni ID hry kontrolkam
- while z < self.pocetSignalek:
- try:
- self.idSignalek.append(self.poleZeHry.index(self.NazvyCVSignalek[z]))
- except Exception:
- self.vypis("Pro signálku s názvem %s nebylo nalezeno odpovídající ID, program se ukončí!"%(self.NazvyCVSignalek[z]),1,True)
- self.CVok = False
- break
- z = z + 1
- #prijimane
- self.idPrijimanychSignalek = []
- z = 0
- while z < self.pocetPrijimanychSignalek:
- try:
- self.idPrijimanychSignalek.append(self.poleZeHry.index(self.NazvyCVPrijimanychSignalek[z]))
- except Exception:
- self.vypis("Pro přijímanou signálku s názvem %s nebylo nalezeno odpovídající ID, program se ukončí!"%(self.NazvyCVPrijimanychSignalek[z]),1,True)
- self.CVok = False
- break
- z = z + 1
- if (self.comOK and self.knihovnaOK and self.CVok):
- if self.debug:
- self.vypis(str(self.api.GetLocoName()),2)
- self.vypis(str(self.api.GetControllerList()),2)
- i = 0
- self.vypis("Posílané ID a názvy CV jsou:",2)
- while i < self.pocetSignalek:
- self.vypis(str(self.idSignalek[i])+"||"+str(self.NazvyCVSignalek[i]),2)
- i = i + 1
- self.vypis("Přijímané ID a názvy CV jsou:",2)
- while i < self.pocetPrijimanychSignalek:
- self.vypis(str(self.idPrijimanychSignalek[i])+"||"+str(self.NazvyCVPrijimanychSignalek[i]),2)
- i = i + 1
- self.vypis("Odpověď v pořádku, aplikace běží!",1)
- v = VlaknoPrijmi(self)
- while True:
- casZacatekCyklu = time.time()
- a = 0
- while a < self.pocetSignalek:
- self.valueSignalek[a] = self.api.GetControllerValue(int(self.idSignalek[a]),0)
- if self.zmene:
- if self.valueSignalek[a] != self.valueSignalekStare[a]:
- vystup = str(self.poleSignalek[a])+str(self.zaokrouhli(int(self.valueSignalek[a]*self.poleNasobHodnot[a])))
- self.vypis(vystup,2)
- self.ser.write(vystup.encode())
- self.valueSignalekStare = self.valueSignalek[:]
- else:
- vystup = str(self.poleSignalek[a])+str(self.zaokrouhli(int(self.valueSignalek[a]*self.poleNasobHodnot[a])))
- self.vypis(vystup,2)
- self.ser.write(vystup.encode())
- a = a + 1
- ubehlyCas = time.time() - casZacatekCyklu
- spi = self.sleepTime-ubehlyCas
- if spi > 0:
- time.sleep(spi)
- else:
- self.vypis("Cyklus je příliš dlouhý! Nestíhám dodržovat obnovovací frekvenci!",1)
- class VlaknoPrijmi:
- def __init__(self,Raildriver):
- self.r = Thread(target=self.ctiCOM(Raildriver))
- self.r.start()
- def ctiCOM(self,Raildriver):
- self.init = False
- while True:
- if self.init == False:
- buff = ""
- lastLomeno = False
- while True:
- buff = buff+Raildriver.ser.read().decode(sys.getfilesystemencoding())
- if buff.find("\n") != -1:
- break
- self.init = True
- else:
- serialPrichozi = Raildriver.ser.readline().decode(sys.getfilesystemencoding())
- Raildriver.vypis("RAW vstup serialu: %s"%(serialPrichozi),2)
- prijateCV = "".join(filter(str.isalpha, serialPrichozi))
- prijataHodnota = int(serialPrichozi[len(prijateCV):])
- Raildriver.vypis("Přijaté CV: %s"%(prijateCV),2)
- Raildriver.vypis("Přijatá hodnota: %d"%(prijataHodnota),2)
- try:
- idPrijateHodnoty = Raildriver.idPrijimanychSignalek[Raildriver.polePrijimanychSignalek.index(prijateCV)]
- except Exception as e:
- Raildriver.vypis("Přijatá hodnota: %s není definovaná! Chyba: %s"%(prijataHodnota,e),1)
- else:
- nasobitelPrijateHodnoty = Raildriver.poleNasobHodnotPrijate[Raildriver.polePrijimanychSignalek.index(prijateCV)]
- Raildriver.api.SetControllerValue(idPrijateHodnoty,prijataHodnota/nasobitelPrijateHodnoty)
- Raildriver.vypis("ID CV přijaté hodnoty: %d"%(idPrijateHodnoty),2)
- class RaildriverAPI:
- """
- API aplikace pro praci s knihovnou RailDriver.dll\n
- Pri prvni definici musi byt zadana cesta k DLL jako parametr.\n
- Funkce:\n
- SetRailSimConnected(connected) - connect or disconnect to api;
- SetRailDriverConnected(connected) - connect or disconnect to Raildriver;
- GetRailSimLocoChanged()- returns True if loco has changed from last call;
- GetLocoName() - returns tuple [Producer, Product,Loco Name];
- GetControllerList() - returns tuple of all control values;
- GetControllerValue(valueID, t) - t=0 (value), t=1 (min), t=2 (max); returns value as int;
- """
- def __init__(self, libpath):
- self.rdDll = ctypes.CDLL(libpath)
- self.SetRailSimConnected_c = self.rdDll.SetRailSimConnected
- self.SetRailSimConnected_c.restype = None
- self.SetRailSimConnected_c.argtypes = [ctypes.c_bool]
- self.SetRailDriverConnected_c = self.rdDll.SetRailDriverConnected
- self.SetRailDriverConnected_c.restype = None
- self.SetRailDriverConnected_c.argtypes = [ctypes.c_bool]
- self.GetRailSimLocoChanged_c = self.rdDll.GetRailSimLocoChanged
- self.GetRailSimLocoChanged_c.restype = ctypes.c_bool
- self.GetRailSimLocoChanged_c.argtypes = None
- self.GetLocoName_c = self.rdDll.GetLocoName
- self.GetLocoName_c.restype = ctypes.c_char_p
- self.GetLocoName_c.argtypes = None
- self.GetControllerList_c = self.rdDll.GetControllerList
- self.GetControllerList_c.restype = ctypes.c_char_p
- self.GetControllerList_c.argtypes = None
- self.GetControllerValue_c = self.rdDll.GetControllerValue
- self.GetControllerValue_c.restype = ctypes.c_float
- self.GetControllerValue_c.argtypes = [ctypes.c_int, ctypes.c_int]
- self.SetControllerValue_c = self.rdDll.SetControllerValue
- self.SetControllerValue_c.restype = ctypes.c_byte
- self.SetControllerValue_c.argtypes = [ctypes.c_int, ctypes.c_float]
- def SetRailSimConnected(self, connected):
- """connect or disconnect to the API. Must be called first with True"""
- self.SetRailSimConnected_c(connected)
- def SetRailDriverConnected(self, connected):
- """connect or disconnect to Raildriver."""
- self.SetRailDriverConnected_c(connected)
- def GetRailSimLocoChanged(self):
- """returns if Loco has changed since last call"""
- return self.GetRailSimLocoChanged_c()
- def GetLocoName(self):
- """returns a tuple with 3 elements of the loko name [Producer, Product,Loco Name]"""
- resp = self.GetLocoName_c().decode("utf-8")
- if resp: return tuple(resp.split('.:.'))
- else: return None
- def GetControllerList(self):
- """returns a list of all controllers of the current loco"""
- ctls = self.GetControllerList_c().decode("utf-8")
- if ctls: return ctls.split("::")
- else: return []
- def GetControllerValue(self, vid, t):
- """get the next new/changed value, where
- t=0 (value), t=1 (min), t=2 (max)
- """
- return self.GetControllerValue_c(vid, t)
- def SetControllerValue(self, vid, value):
- """set the new value"""
- self.SetControllerValue_c(vid, value)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement