Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- from twisted.internet import protocol, defer, reactor
- from twisted.internet.serialport import SerialPort
- from twisted.internet.threads import deferToThread
- from twisted.web import server, resource
- from twisted.python.runtime import platform
- from twisted.python import log
- from twisted.enterprise import adbapi
- from twisted.web.static import File,DirectoryLister
- from twisted.web.util import redirectTo
- import json
- import sys
- import os
- import datetime
- import itertools
- from buffer import LineBuffer, UntillBuffer, Listener, BytesBuffer
- from subprocess import Popen, PIPE
- import collections
- import serial
- import xml.etree.ElementTree as etree
- import time
- try:
- import ujson as json
- except:
- import json
- log.startLogging(sys.stdout)
- #class impl temerature log with method for work with data
- class TempLog:
- def __init__(self):
- self.db = adbapi.ConnectionPool("sqlite3", "log.db", check_same_thread=False)
- def init_db(self):
- return self.db.runQuery("CREATE TABLE IF NOT EXISTS temp_log (sensor_id TEXT, temp TEXT, date_log DATETIME DEFAULT CURRENT_TIMESTAMP)")
- def update(self, id, t):
- self.db.runOperation("INSERT INTO temp_log (sensor_id, temp) VALUES(?,?)",(id,t))
- @defer.inlineCallbacks
- def get_history(self, date = "now",modify="'%Y-%m-%d'", condition = ""):
- '''
- @date : actual data, for which measurents are taken
- @modify: sort indiction for the period
- @condition: ??? for feature
- '''
- if condition:
- condition = "AND "+condition
- res = yield self.db.runQuery("SELECT sensor_id, temp,log_date FROM temp_log WHERE strftime({modify},log_date) = strftime({modify},?) {condition}".format(modify= modify, condition = condition), (date,))
- result = collections.defaultdict(list)
- for sens, temp, _date in res:
- result[sens].append((_date,temp))
- for kk in result.keys():
- result[kk] = sorted(result[kk])
- defer.returnValue(result)
- def get_average_by_sensor(self, sensor, modify):
- '''
- This method created for get averages data days, weeks, months, years and compare it.
- '''
- pass
- def sleep(seconds):
- d = defer.Deferred()
- reactor.callLater(seconds, d.callback, seconds)
- return d
- class CronLog:
- INTERVAL = 15*60
- def __init__(self, log):
- self._log = log
- self.last_update = None
- self._logged = 0
- def write_log(self, id, t):
- if ( self.last_update is None ) or (time.time() - self.last_update > self.INTERVAL):
- self._log.update(id,t)
- def commit_update(self):
- self.last_update = time.time()
- db_log = TempLog()
- cron_log = CronLog(db_log)
- history_action = collections.defaultdict(dict)
- #history_action = {}
- history_temperature = {}
- class BaseHttp(resource.Resource):
- isLeaf = True
- route = None
- def getChild(self, name, request):
- print name
- if name == "":
- return self
- else:
- return resource.Resource.getChild(self, name, request)
- class Static(DirectoryLister):
- def __init__(self, dir):
- DirectoryLister.__init__(self, dir)
- for file in itertools.ifilter(lambda f: os.path.isfile(os.path.join(dir,f)), os.listdir(dir)):
- self.putChild(os.path.join(file),File(os.path.join(dir,file)))
- class Main(BaseHttp):
- isLeaf = False
- def render_GET(self,request):
- return redirectTo("/web/page.html",request)
- class GET_TEMP(BaseHttp):
- isLeaf= False
- def render_GET(self, request):
- return json.dumps(history_temperature)
- class GetHistoryTemp(BaseHttp):
- def __init__(self, tlog):
- self.log = tlog
- @defer.inlineCallbacks
- def _process(self, request, date):
- history = yield self.log.get_history(date)
- print history
- request.write(json.dumps(history))
- request.finish()
- def render_GET(self, request):
- date = request.args.get("date","") or "now"
- if type(date) == list:
- date = date[0]
- self._process(request , date)
- return server.NOT_DONE_YET
- class WrapperWin32Serial(serial.Serial):
- @property
- def hComPort(self):
- return self._port_handle
- class SerialArduino(protocol.Protocol):
- def __init__(self,port, speed):
- self.port = port
- self.speed = speed
- self.callbacks = []
- self.bootstraps = []
- self.lock = defer.DeferredLock()
- self.retry = None
- def connect(self):
- try:
- SerialPort(self, self.port, reactor, baudrate = self.speed)
- #self.retry = None
- except:
- print "reconnect"
- self.retry = reactor.callLater(5, self.connect)
- def connectionMade(self):
- print "Connect OK", self.bootstraps
- for h in self.bootstraps:
- h(self)
- def addBootstrap(self, handle):
- self.bootstraps.append(handle)
- def connectionLost(self,reason = 0):
- print reason
- for b,d in self.callbacks:
- d.errback(reason)
- self.callbacks = []
- self.retry = reactor.callLater(5, self.connect)
- def dataReceived(self,data):
- for byte in data:
- self._byteReceive(byte)
- def _byteReceive(self, byte):
- defer_callbacks = list(self.callbacks)
- self.callbacks = []
- for e,(buf,d) in enumerate(defer_callbacks):
- if buf.write(byte):
- del defer_callbacks[e]
- d.callback(buf.readall())
- self.callbacks += defer_callbacks
- def assyncRead(self,buffer):
- if isinstance(buffer, Listener):
- d = defer.Deferred()
- self.callbacks.append((buffer, d))
- return d
- def startProcess(list_process):
- for proc in list_process:
- try:
- Popen(os.path.abspath(proc),shell=True, cwd = os.getcwd())
- except Exception:
- print "Problem with ",proc
- log.err()
- else:
- log.msg("Success exec process ", proc)
- def updateTempToFile():
- xml = etree.Element("temp")
- for sensor, temp in history_temperature.iteritems():
- sens_t = etree.Element("sensor", {"id":sensor, "name":CONFIG.get("sensors",{}).get(sensor,{}).get("alias","default")})
- sens_t.text = str(temp)
- xml.append(sens_t)
- with open("temp.log", "w") as f:
- f.write(etree.tostring(xml))
- @defer.inlineCallbacks
- def uartProcess(arduino):
- b = BytesBuffer(2)
- data = yield arduino.assyncRead(b)
- last_temp = None
- print data
- while True:
- try:
- yield sleep(1)
- try:
- arduino.transport.write("a")
- except:
- break
- buf = UntillBuffer("%")
- try:
- data = (yield arduino.assyncRead(buf)).strip()
- except:
- break
- raw_data = data.split("\n")
- for sensor_data in raw_data:
- try:
- sensor, temp = sensor_data.strip().split(" ")
- cron_log.write_log(sensor, temp)
- except:
- log.err()
- if sensor in CONFIG.get("sensors",{}).keys():
- print sensor_data
- else:
- print u"NEW" ,sensor_data
- round_temp = int(round(float(temp)))
- history_temperature[sensor] = round_temp # последнее изменение температуры для веба
- if last_temp == round_temp: # Если температура измениась продолжаемс....
- continue
- last_temp = round_temp
- config = CONFIG.get("sensors",{}).get(sensor,{}).get("rules",{})
- for condition, actions in config.iteritems():
- c1,c2 = condition.split(":")
- c1,c2 = int(c1),int(c2)
- last_act = history_action.get(sensor,{}).get(condition)
- if not actions:
- continue
- if round_temp < c1 and last_act != c1:
- history_action[sensor][condition] = c1
- yield deferToThread(startProcess, actions["l"])
- elif round_temp > c2 and last_act != c2:
- history_action[sensor][condition] = c2
- yield deferToThread(startProcess, actions["h"])
- cron_log.commit_update()
- updateTempToFile()
- except :
- log.err()
- @reactor.callWhenRunning
- @defer.inlineCallbacks
- def main():
- yield db_log.init_db()
- s = SerialArduino(CONFIG.get("config",{}).get("port","/dev/ttyUSB0"),CONFIG.get("config",{}).get("b_rate",9600))
- s.addBootstrap(uartProcess)
- s.connect()
- if __name__ == "__main__":
- global CONFIG
- if platform.isWindows():
- SerialPort._serialFactory = WrapperWin32Serial
- with open("config.json") as fp:
- CONFIG = json.load(fp)
- print CONFIG
- root = Main()
- root.putChild("web",Static("./web"))
- root.putChild("history", GetHistoryTemp(db_log))
- site = server.Site(root)
- reactor.listenTCP(CONFIG.get("config",{}).get("web_port",8081), site)
- reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement