Advertisement
DeaD_EyE

GpioCsvWriter

Aug 25th, 2021
911
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.23 KB | None | 0 0
  1. import csv
  2. import time
  3. from datetime import date, datetime
  4. from pathlib import Path
  5.  
  6. HEADER = ("Date", "Time", "Sig_Name", "Pin_Status")
  7.  
  8.  
  9. class GpioCsvWriter:
  10.     def __init__(self, base_path, base_name, gpio_pins=None, header=HEADER):
  11.         self.base_name = base_name
  12.         self.base_path = base_path
  13.         self.gpio_pins = gpio_pins or []
  14.         self.header = header
  15.         self._fd = None
  16.         self._csv = None
  17.         self._setup_gpio()
  18.  
  19.     @property
  20.     def current_file(self):
  21.         return Path(self.base_path, f"{date.today():%Y-%m-%d}_{self.base_name}.csv")
  22.  
  23.     @property
  24.     def name(self):
  25.         if self._fd is None:
  26.             return ""
  27.  
  28.         return self._fd.name
  29.  
  30.     def __enter__(self):
  31.         return self
  32.  
  33.     def __exit__(self, exc_type, exc_obj, tb):
  34.         self.close()
  35.  
  36.     def close(self):
  37.         self._csv = None
  38.         if self._fd is not None:
  39.             self._fd.close()
  40.         self._fd = None
  41.  
  42.     def _init_csv(self):
  43.         if self._fd is not None:
  44.             self._fd.close()
  45.  
  46.         if not self.current_file.exists():
  47.             write_header = True
  48.         else:
  49.             write_header = False
  50.  
  51.         self._fd = self.current_file.open("at", encoding="utf8")
  52.         self._csv = csv.writer(self._fd)
  53.  
  54.         if write_header:
  55.             self._csv.writerow(self.header)
  56.  
  57.     def writerow(self, row):
  58.         if self._fd is None or self._fd.name != self.current_file:
  59.             self._init_csv()
  60.  
  61.         self._csv.writerow(row)
  62.         self._fd.flush()
  63.  
  64.     def _signal(self, pin):
  65.         now = datetime.now()
  66.         self.writerow(
  67.             [
  68.                 now.strptime("%Y-%m-%d"),
  69.                 now.strptime("%H:%M:%s"),
  70.                 str(gpio_pin),
  71.                 str(GPIO.input(gpio_pin)),
  72.             ]
  73.         )
  74.  
  75.     def _setup_gpio(self):
  76.         if not self.gpio_pins:
  77.             return
  78.  
  79.         GPIO.setmode(GPIO.BCM)
  80.         for pin in self.gpio_pins:
  81.             GPIO.setup(pin, GPIO.IN, GPIO.PUD_DOWN)
  82.             GPIO.add_event_detect(pin, GPIO.BOTH, callback=self._signal)
  83.  
  84.  
  85. if __name__ == "__main__":
  86.     writer = GpioCsvWriter("/tmp", "foo_bar")
  87.     # writer.writerow(["1970-01-01", "00:00", "1", "1"])
  88.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement