Advertisement
Guest User

Untitled

a guest
Feb 20th, 2017
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.61 KB | None | 0 0
  1. class RFDevice:
  2. """Representation of a GPIO RF device."""
  3.  
  4. # pylint: disable=too-many-instance-attributes,too-many-arguments
  5. def __init__(self, gpio,
  6. tx_proto=1, tx_pulselength=None, tx_repeat=10, tx_length=32, rx_tolerance=80):
  7. """Initialize the RF device."""
  8. self.gpio = gpio
  9. self.tx_enabled = False
  10. self.tx_proto = tx_proto
  11. if tx_pulselength:
  12. self.tx_pulselength = tx_pulselength
  13. else:
  14. self.tx_pulselength = PROTOCOLS[tx_proto].pulselength
  15. self.tx_repeat = tx_repeat
  16. self.tx_length = tx_length
  17. self.rx_enabled = False
  18. self.rx_tolerance = rx_tolerance
  19. # internal values
  20. self._rx_timings = [0] * (MAX_CHANGES + 1)
  21. self._rx_last_timestamp = 0
  22. self._rx_change_count = 0
  23. self._rx_repeat_count = 0
  24. # successful RX values
  25. self.rx_code = None
  26. self.rx_code_timestamp = None
  27. self.rx_proto = None
  28. self.rx_bitlength = None
  29. self.rx_pulselength = None
  30.  
  31. GPIO.setmode(GPIO.BCM)
  32. _LOGGER.debug("Using GPIO " + str(gpio))
  33.  
  34. def cleanup(self):
  35. """Disable TX and RX and clean up GPIO."""
  36. if self.tx_enabled:
  37. self.disable_tx()
  38. if self.rx_enabled:
  39. self.disable_rx()
  40. _LOGGER.debug("Cleanup")
  41. GPIO.cleanup()
  42.  
  43. def enable_tx(self):
  44. """Enable TX, set up GPIO."""
  45. if self.rx_enabled:
  46. _LOGGER.error("RX is enabled, not enabling TX")
  47. return False
  48. if not self.tx_enabled:
  49. self.tx_enabled = True
  50. GPIO.setup(self.gpio, GPIO.OUT)
  51. _LOGGER.debug("TX enabled")
  52. return True
  53.  
  54. def disable_tx(self):
  55. """Disable TX, reset GPIO."""
  56. if self.tx_enabled:
  57. # set up GPIO pin as input for safety
  58. GPIO.setup(self.gpio, GPIO.IN)
  59. self.tx_enabled = False
  60. _LOGGER.debug("TX disabled")
  61. return True
  62.  
  63. def tx_code(self, code, tx_proto=None, tx_pulselength=None):
  64. """
  65. Send a decimal code.
  66. Optionally set protocol and pulselength.
  67. When none given reset to default protocol and pulselength.
  68. """
  69. if tx_proto:
  70. self.tx_proto = tx_proto
  71. else:
  72. self.tx_proto = 1
  73. if tx_pulselength:
  74. self.tx_pulselength = tx_pulselength
  75. else:
  76. self.tx_pulselength = PROTOCOLS[self.tx_proto].pulselength
  77. rawcode = format(code, '#0{}b'.format(self.tx_length + 2))[2:]
  78. _LOGGER.debug("TX code: " + str(code))
  79. return self.tx_bin(rawcode)
  80.  
  81. def tx_bin(self, rawcode):
  82. """Send a binary code."""
  83. _LOGGER.debug("TX bin: " + str(rawcode))
  84. for _ in range(0, self.tx_repeat):
  85. for byte in range(0, self.tx_length):
  86. if rawcode[byte] == '0':
  87. if not self.tx_l0():
  88. return False
  89. else:
  90. if not self.tx_l1():
  91. return False
  92. if not self.tx_sync():
  93. return False
  94.  
  95. return True
  96.  
  97. def tx_l0(self):
  98. """Send a '0' bit."""
  99. if not 0 < self.tx_proto < len(PROTOCOLS):
  100. _LOGGER.error("Unknown TX protocol")
  101. return False
  102. return self.tx_waveform(PROTOCOLS[self.tx_proto].zero_high,
  103. PROTOCOLS[self.tx_proto].zero_low)
  104.  
  105. def tx_l1(self):
  106. """Send a '1' bit."""
  107. if not 0 < self.tx_proto < len(PROTOCOLS):
  108. _LOGGER.error("Unknown TX protocol")
  109. return False
  110. return self.tx_waveform(PROTOCOLS[self.tx_proto].one_high,
  111. PROTOCOLS[self.tx_proto].one_low)
  112.  
  113. def tx_sync(self):
  114. """Send a sync."""
  115. if not 0 < self.tx_proto < len(PROTOCOLS):
  116. _LOGGER.error("Unknown TX protocol")
  117. return False
  118. return self.tx_waveform(PROTOCOLS[self.tx_proto].sync_high,
  119. PROTOCOLS[self.tx_proto].sync_low)
  120.  
  121. def tx_waveform(self, highpulses, lowpulses):
  122. """Send basic waveform."""
  123. if not self.tx_enabled:
  124. _LOGGER.error("TX is not enabled, not sending data")
  125. return False
  126. GPIO.output(self.gpio, GPIO.HIGH)
  127. time.sleep((highpulses * self.tx_pulselength) / 1000000)
  128. GPIO.output(self.gpio, GPIO.LOW)
  129. time.sleep((lowpulses * self.tx_pulselength) / 1000000)
  130. return True
  131.  
  132. def enable_rx(self):
  133. """Enable RX, set up GPIO and add event detection."""
  134. if self.tx_enabled:
  135. _LOGGER.error("TX is enabled, not enabling RX")
  136. return False
  137. if not self.rx_enabled:
  138. self.rx_enabled = True
  139. GPIO.setup(self.gpio, GPIO.IN)
  140. GPIO.add_event_detect(self.gpio, GPIO.BOTH)
  141. GPIO.add_event_callback(self.gpio, self.rx_callback)
  142. _LOGGER.debug("RX enabled")
  143. return True
  144.  
  145. def disable_rx(self):
  146. """Disable RX, remove GPIO event detection."""
  147. if self.rx_enabled:
  148. GPIO.remove_event_detect(self.gpio)
  149. self.rx_enabled = False
  150. _LOGGER.debug("RX disabled")
  151. return True
  152.  
  153. # pylint: disable=unused-argument
  154. def rx_callback(self, gpio):
  155. """RX callback for GPIO event detection. Handle basic signal detection."""
  156. timestamp = int(time.perf_counter() * 1000000)
  157. duration = timestamp - self._rx_last_timestamp
  158.  
  159. if duration > 5000:
  160. if duration - self._rx_timings[0] < 200:
  161. self._rx_repeat_count += 1
  162. self._rx_change_count -= 1
  163. if self._rx_repeat_count == 2:
  164. for pnum in range(1, len(PROTOCOLS)):
  165. if self._rx_waveform(pnum, self._rx_change_count, timestamp):
  166. _LOGGER.debug("RX code " + str(self.rx_code))
  167. break
  168. self._rx_repeat_count = 0
  169. self._rx_change_count = 0
  170.  
  171. if self._rx_change_count >= MAX_CHANGES:
  172. self._rx_change_count = 0
  173. self._rx_repeat_count = 0
  174. self._rx_timings[self._rx_change_count] = duration
  175. self._rx_change_count += 1
  176. self._rx_last_timestamp = timestamp
  177.  
  178. def _rx_waveform(self, pnum, change_count, timestamp):
  179. """Detect waveform and format code."""
  180. code = 0
  181. delay = int(self._rx_timings[0] / PROTOCOLS[pnum].sync_low)
  182. delay_tolerance = delay * self.rx_tolerance / 100
  183.  
  184. for i in range(1, change_count, 2):
  185. if (self._rx_timings[i] - delay * PROTOCOLS[pnum].zero_high < delay_tolerance and
  186. self._rx_timings[i+1] - delay * PROTOCOLS[pnum].zero_low < delay_tolerance):
  187. code <<= 1
  188. elif (self._rx_timings[i] - delay * PROTOCOLS[pnum].one_high < delay_tolerance and
  189. self._rx_timings[i+1] - delay * PROTOCOLS[pnum].one_low < delay_tolerance):
  190. code <<= 1
  191. code |= 1
  192. else:
  193. return False
  194.  
  195. if self._rx_change_count > 6 and code != 0:
  196. self.rx_code = code
  197. self.rx_code_timestamp = timestamp
  198. self.rx_bitlength = int(change_count / 2)
  199. self.rx_pulselength = delay
  200. self.rx_proto = pnum
  201. return True
  202.  
  203. return False
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement