Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- #
- # Copyright 2017 Dr. M. Luetzelberger <webmaster@raspberryblog.de>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program; if not, write to the Free Software
- # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
- # MA 02110-1301, USA.
- #
- #
- from __future__ import print_function
- import serial, struct, time
- import psycopg2
- import signal
- from datetime import datetime, tzinfo
- ser = serial.Serial()
- ser.port = "COM21"
- ser.baudrate = 9600
- ser.open()
- ser.flushInput()
- #Using vrstats because lol
- conn_string = "host='localhost' dbname='vrstats' user='kevin' port=5432 password='postgrespassword'"
- conn = psycopg2.connect(conn_string)
- cursor = conn.cursor()
- insert_stmt = """INSERT INTO aqi (pm25, pm10, ts) VALUES (%s, %s, %s);"""
- def exit_handler(signum, frame):
- import sys
- cursor.close()
- conn.close()
- sys.exit(0)
- signal.signal(signal.SIGINT, exit_handler)
- signal.signal(signal.SIGILL, exit_handler)
- signal.signal(signal.SIGABRT, exit_handler)
- signal.signal(signal.SIGBREAK, exit_handler)
- signal.signal(signal.SIGTERM, exit_handler)
- def insert(pm25, pm10, ts):
- statement = cursor.mogrify(insert_stmt, (pm25, pm10, ts))
- cursor.execute(statement)
- conn.commit()
- def process_frame(d):
- r = struct.unpack('<HHxxBBB', d[2:])
- pm25 = r[0]/10.0
- pm10 = r[1]/10.0
- checksum = sum(ord(v) for v in d[2:8])%256
- valid = (checksum==r[2] and r[3]==0xab)
- if (valid):
- insert(pm25, pm10, datetime.utcnow())
- print("PM 2.5: {} ug/m^3 PM 10: {} ug/m^3 CRC={}".format(pm25, pm10, "OK" if valid else "NOK"))
- def sensor_read():
- byte = 0
- while byte != "\xaa":
- byte = ser.read(size=1)
- d = ser.read(size=10)
- if d[0] == "\xc0":
- process_frame(byte + d)
- # 0xAA, 0xB4, 0x06, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x06, 0xAB
- def sensor_wake():
- bytes = ['\xaa', #head
- '\xb4', #command 1
- '\x06', #data byte 1
- '\x01', #data byte 2 (set mode)
- '\x01', #data byte 3 (sleep)
- '\x00', #data byte 4
- '\x00', #data byte 5
- '\x00', #data byte 6
- '\x00', #data byte 7
- '\x00', #data byte 8
- '\x00', #data byte 9
- '\x00', #data byte 10
- '\x00', #data byte 11
- '\x00', #data byte 12
- '\x00', #data byte 13
- '\xff', #data byte 14 (device id byte 1)
- '\xff', #data byte 15 (device id byte 2)
- '\x05', #checksum
- '\xab'] #tail
- for b in bytes:
- ser.write(b)
- # xAA, 0xB4, 0x06, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x05, 0xAB
- def sensor_sleep():
- bytes = ['\xaa', #head
- '\xb4', #command 1
- '\x06', #data byte 1
- '\x01', #data byte 2 (set mode)
- '\x00', #data byte 3 (sleep)
- '\x00', #data byte 4
- '\x00', #data byte 5
- '\x00', #data byte 6
- '\x00', #data byte 7
- '\x00', #data byte 8
- '\x00', #data byte 9
- '\x00', #data byte 10
- '\x00', #data byte 11
- '\x00', #data byte 12
- '\x00', #data byte 13
- '\xff', #data byte 14 (device id byte 1)
- '\xff', #data byte 15 (device id byte 2)
- '\x05', #checksum
- '\xab'] #tail
- for b in bytes:
- ser.write(b)
- def main():
- while (True):
- print("Sensor waking.")
- sensor_wake()
- time.sleep(15)
- ser.flushInput()
- print("Reading data from sensor.")
- sensor_read()
- time.sleep(5)
- print("Entering sleep mode.")
- sensor_sleep()
- time.sleep(10)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement