Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- import collections
- import select
- import threading
- from Logger import Logger
- GPIOType = collections.namedtuple("GPIOType", [ "io", "number", "name", "inverted", "filename" ])
- _log = Logger("IOAccess")
- class InputPoller(threading.Thread):
- def __init__(self, inputs, callback):
- threading.Thread.__init__(self)
- self._inputs = inputs
- self._callback = callback
- self._files = { gpio.name: open(gpio.filename, "r") for gpio in self._inputs }
- self._pollobj = select.poll()
- for (name, f) in self._files.items():
- self._pollobj.register(f.fileno(), select.POLLPRI)
- def _getall(self):
- values = { }
- for gpio in self._inputs:
- f = self._files[gpio.name]
- f.seek(0)
- data = f.read()[:-1]
- data = (data == "1")
- data ^= gpio.inverted
- values[gpio.name] = data
- return values
- def run(self):
- while True:
- pollresult = self._pollobj.poll(1000)
- if len(pollresult) > 0:
- self._callback(self._getall())
- class IOAccess():
- def __init__(self):
- self._gpios = { }
- self._lastvalues = { }
- def addinput(self, gpiono, name, inverted = False):
- self._gpios[name] = GPIOType(io = "in", number = gpiono, name = name, inverted = inverted, filename = IOAccess._gpiofilename(gpiono))
- def addoutput(self, gpiono, name, inverted = False):
- self._gpios[name] = GPIOType(io = "out", number = gpiono, name = name, inverted = inverted, filename = IOAccess._gpiofilename(gpiono))
- def _gpiofilename(gpiono):
- filename = "/sys/class/gpio/gpio%d/value" % (gpiono)
- return filename
- def listen(self, listener):
- inputs = { gpio for gpio in self._gpios.values() if gpio.io == "in" }
- thread = InputPoller(inputs, listener)
- thread.start()
- def setoutput(self, gpio, value):
- if value:
- writestr = "1"
- else:
- writestr = "0"
- f = open(gpio.filename, "w")
- print(writestr, file = f)
- f.close()
- def pulse(self, gpioname, length = 0.01):
- gpio = self._gpios[gpioname]
- assert(gpio.io == "out")
- _log.log("Pulse %s len %.3f sec" % (str(gpio), length))
- t0 = time.time()
- self.setoutput(gpio, not gpio.inverted)
- time.sleep(length)
- self.setoutput(gpio, gpio.inverted)
- t1 = time.time()
- _log.log("Actual pulse len %.3f sec (%+.3f sec)" % (t1 - t0, (t1 - t0) - length))
- def multipulse(self, gpioname, ontime, offtime, count):
- for i in range(count):
- self.pulse(gpioname, ontime)
- time.sleep(offtime)
- if __name__ == "__main__":
- import sys
- ioa = IOAccess()
- ioa.addinput(21, "BtnUp", True)
- ioa.addinput(4, "BtnDown", True)
- ioa.addinput(17, "BtnRight", True)
- ioa.addinput(22, "BtnLeft", True)
- ioa.addoutput(23, "AudioSilent", False)
- ioa.addoutput(24, "AudioLouder", False)
- # ioa.pulse(sys.argv[1], float(sys.argv[2]))
- def listener(states):
- print(states)
- ioa.listen(listener)
- while True:
- time.sleep(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement