Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pprint import pprint
- from threading import Thread
- import queue
- import struct
- import crc8
- import json
- import math
- from matplotlib import pyplot
- START = 0x01
- vbats = []
- x = range(1000)
- class ChecksumException(Exception):
- def __init__(self, message):
- super().__init__(message)
- self.message = message
- class Parser():
- def __init__(self, in_file, out_file):
- self.in_file = in_file
- self.out_file = out_file
- self.data_queue = queue.Queue()
- def run(self):
- temp_size = 0
- with open(self.in_file, "rb") as f:
- data = f.read(1)
- self.data_queue.put(data[0])
- while len(vbats) < 1000:
- data = f.read(1)
- self.data_queue.put(data[0])
- if self.data_queue.qsize() >= 2:
- if self.data_queue.queue[0] == START:
- if self.data_queue.queue[1] in type_dict:
- constructor, length = type_dict[self.data_queue.queue[1]]
- while self.data_queue.qsize() < length:
- data = f.read(1)
- self.data_queue.put(data[0])
- try:
- packet = constructor(
- list(self.data_queue.queue)[:length])
- for i in range(length-1):
- self.data_queue.get()
- except ChecksumException:
- pass
- # print("Checksum failed")
- except ZeroDivisionError:
- pass
- # print("Division by zero")
- except ValueError:
- pass
- # print("Value error")
- self.data_queue.get()
- pyplot.plot(x, vbats)
- pyplot.show()
- class Pack:
- def __init__(self):
- self.type = None
- self.timestamp = None
- self.checksum = None
- def show(self):
- pprint(vars(self))
- def validate(self, data):
- if crc8.crc8((bytearray(data)[:len(bytearray(data))-2])) != self.checksum:
- raise ChecksumException
- class TemperaturePack(Pack):
- def __init__(self, data):
- Pack.__init__(self)
- self.type = "TEMP"
- self.temp1 = None
- self.temp2 = None
- self.temp3 = None
- self.temp4 = None
- self.pressure = None
- self.vbat = None
- self.humid = None
- self.checksum = None
- self.calibrationNTC = {'R_0': 33000, 'T_0': 298.15,
- 'R_1': 33000, 'B': 5000, 'resolution': 12}
- self.calibrationVBAT = {'multiplier': 0.5, 'reference': 2.50}
- self.parse(data)
- def parse(self, data):
- raw_temp1, self.timestamp, raw_temp2, self.temp3, self.temp4, raw_pressure, raw_vbat, self.humid, self.checksum = struct.unpack(
- 'xxHIHhhhHBB', bytearray(data))
- self.pressure = self.convert_pressure(raw_pressure)
- self.temp1 = self.convertNTC(raw_temp1, **self.calibrationNTC)
- self.temp2 = self.convertNTC(raw_temp2, **self.calibrationNTC)
- self.vbat = self.convert_vbat(raw_vbat, **self.calibrationVBAT)
- vbats.append(self.vbat)
- def convert_pressure(self, pressureRaw):
- return (pressureRaw / 100.0) + 750.0
- def convertNTC(self, temperature_raw, R_1, resolution, T_0, B, R_0):
- R = R_1 * ((2**resolution) / temperature_raw - 1)
- T = 1/(1/T_0 + 1/B * math.log(R/R_0))
- return T - 273.18
- def convert_vbat(self, vbat_raw, multiplier, reference):
- return vbat_raw / 4096 * 2.5 * 2
- class GPSPack(Pack):
- def __init__(self, data):
- Pack.__init__(self)
- self.type = "GPS"
- self.hdop = None
- self.hour = None
- self.minute = None
- self.second = None
- self.lat = None
- self.lon = None
- self.height = None
- self.parse(data)
- def parse(self, data):
- rawHDOP, self.timestamp, rawGPStime, self.lat, self.lon, self.height, self.checksum = struct.unpack(
- "xxBxIIfffB", bytearray(data))
- self.hdop = self.convertHDOP(rawHDOP)
- self.hour, self.minute, self.second = self.convertGPStime(rawGPStime)
- def convertHDOP(self, rawHDOP):
- return rawHDOP / 10.0
- def convertGPStime(self, rawGPStime):
- try:
- text = str(rawGPStime)
- hour = int(text[:2])
- minute = int(text[2:4])
- second = int(text[4:6])
- return [hour, minute, second]
- except ValueError:
- return [0, 0, 0]
- class AirPack(Pack):
- def __init__(self, data):
- Pack.__init__(self)
- self.type = "AIR"
- self.millis = None
- self.deviation = None
- self.range = None
- self.parse(data)
- def parse(self, data):
- self.deviation, self.timestamp, self.millis, self.range, self.checksum = struct.unpack(
- "xxHIIHB", bytearray(data))
- class AccPack(Pack):
- def __init__(self, data):
- Pack.__init__(self)
- self.type = "ACC"
- self.accx = None
- self.accy = None
- self.accz = None
- self.gyrx = None
- self.gyry = None
- self.gyrz = None
- self.parse(data)
- def parse(self, data):
- self.accx, self.timestamp, self.accy, self.accz, self.gyrx, self.gyry, self.gyrz, self.checksum = struct.unpack(
- "xxhIhhhhhB", bytearray(data))
- class RSSIPack(Pack):
- def __init__(self, data):
- Pack.__init__(self)
- self.type = "RSSI"
- self.rssi = None
- self.parse(data)
- def parse(self, data):
- self.rssi, self.timestamp, self.checksum = struct.unpack(
- "xxhIB", bytearray(data))
- type_dict = {
- 2: (TemperaturePack, 20),
- 3: (GPSPack, 25),
- 4: (AirPack, 15),
- 5: (AccPack, 19),
- 6: (RSSIPack, 9),
- }
- if __name__ == "__main__":
- p = Parser("data3", None)
- p.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement