Advertisement
Guest User

Untitled

a guest
Oct 22nd, 2019
277
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.12 KB | None | 0 0
  1. """
  2. Sending and receiving 433/315Mhz signals with low-cost GPIO RF Modules on a Raspberry Pi.
  3. """
  4.  
  5. import logging
  6. import time
  7. from collections import namedtuple
  8.  
  9. from RPi import GPIO
  10.  
  11. MAX_CHANGES = 67
  12.  
  13. _LOGGER = logging.getLogger(__name__)
  14.  
  15. Protocol = namedtuple('Protocol',
  16. ['pulselength',
  17. 'sync_high', 'sync_low',
  18. 'zero_high', 'zero_low',
  19. 'one_high', 'one_low'])
  20. PROTOCOLS = (None,
  21. Protocol(350, 1, 31, 1, 3, 3, 1),
  22. Protocol(650, 1, 10, 1, 2, 2, 1),
  23. Protocol(100, 30, 71, 4, 11, 9, 6),
  24. Protocol(380, 1, 6, 1, 3, 3, 1),
  25. Protocol(500, 6, 14, 1, 2, 2, 1),
  26. Protocol(200, 1, 10, 1, 5, 1, 1))
  27.  
  28.  
  29. class RFDevice:
  30. """Representation of a GPIO RF device."""
  31.  
  32. # pylint: disable=too-many-instance-attributes,too-many-arguments
  33. def __init__(self, gpio, tx_proto=1, tx_pulselength=None, tx_repeat=3, tx_length=24, rx_tolerance=80):
  34. """Initialize the RF device."""
  35. self.gpio = gpio
  36. self.tx_enabled = False
  37. self.tx_proto = tx_proto
  38. self.tx_pulselength = tx_pulselength if tx_pulselength else PROTOCOLS[tx_proto].pulselength
  39.  
  40. self.tx_repeat = tx_repeat
  41. self.tx_length = tx_length
  42. self.rx_enabled = False
  43. self.rx_tolerance = rx_tolerance
  44. # internal values
  45. self._rx_timings = [0] * (MAX_CHANGES + 1)
  46. self._rx_last_timestamp = 0
  47. self._rx_change_count = 0
  48. self._rx_repeat_count = 0
  49. # successful RX values
  50. self.rx_code = None
  51. self.rx_code_timestamp = None
  52. self.rx_proto = None
  53. self.rx_bitlength = None
  54. self.rx_pulselength = None
  55.  
  56. GPIO.setmode(GPIO.BOARD)
  57. # GPIO.setmode(GPIO.BCM)
  58.  
  59. _LOGGER.debug("Using GPIO " + str(gpio))
  60.  
  61. def cleanup(self):
  62. """Disable TX and RX and clean up GPIO."""
  63. if self.tx_enabled:
  64. self.disable_tx()
  65. if self.rx_enabled:
  66. self.disable_rx()
  67. _LOGGER.debug("Cleanup")
  68. GPIO.cleanup()
  69.  
  70. def enable_tx(self):
  71. """Enable TX, set up GPIO."""
  72. if self.rx_enabled:
  73. _LOGGER.error("RX is enabled, not enabling TX")
  74. return False
  75. if not self.tx_enabled:
  76. self.tx_enabled = True
  77. GPIO.setup(self.gpio, GPIO.OUT)
  78. _LOGGER.debug("TX enabled")
  79. return True
  80.  
  81. def disable_tx(self):
  82. """Disable TX, reset GPIO."""
  83. if self.tx_enabled:
  84. # set up GPIO pin as input for safety
  85. GPIO.setup(self.gpio, GPIO.IN)
  86. self.tx_enabled = False
  87. _LOGGER.debug("TX disabled")
  88. return True
  89.  
  90. def tx_code(self, code, tx_proto=None, tx_pulselength=None, tx_length=None):
  91. """
  92. Send a decimal code.
  93. Optionally set protocol, pulselength and code length.
  94. When none given reset to default protocol, default pulselength and set code length to 24 bits.
  95. """
  96. if tx_proto:
  97. self.tx_proto = tx_proto
  98. else:
  99. self.tx_proto = 1
  100. if tx_pulselength:
  101. self.tx_pulselength = tx_pulselength
  102. elif not self.tx_pulselength:
  103. self.tx_pulselength = PROTOCOLS[self.tx_proto].pulselength
  104. if tx_length:
  105. self.tx_length = tx_length
  106. elif self.tx_proto == 6: # May be I need this protocol or tx_length
  107. self.tx_length = 32
  108. elif (code > 16777216):
  109. self.tx_length = 32
  110. else:
  111. self.tx_length = 24
  112. # rawcode = format(code, '#0{}b'.format(self.tx_length + 2))[2:]
  113. rawcode = "{0:b}".format(code).zfill(self.tx_length)
  114. # Encryption of some kind..
  115. if self.tx_proto == 6:
  116. nexacode = ""
  117. for bit in rawcode:
  118. if bit == '0':
  119. nexacode = nexacode + "01"
  120. if bit == '1':
  121. nexacode = nexacode + "10"
  122. rawcode = nexacode
  123. self.tx_length = 64
  124. _LOGGER.debug("TX code: " + str(code))
  125. return self.tx_bin(rawcode)
  126.  
  127. def tx_bin(self, rawcode):
  128. """Send a binary code."""
  129. _LOGGER.debug("TX bin: " + str(rawcode))
  130. for _ in range(0, self.tx_repeat):
  131. if self.tx_proto == 6:
  132. if not self.tx_sync():
  133. return False
  134. for byte in range(0, self.tx_length):
  135. if rawcode[byte] == '0':
  136. if not self.tx_l0():
  137. return False
  138. else:
  139. if not self.tx_l1():
  140. return False
  141. if not self.tx_sync():
  142. return False
  143.  
  144. return True
  145.  
  146. def tx_l0(self):
  147. """Send a '0' bit."""
  148. if not 0 < self.tx_proto < len(PROTOCOLS):
  149. _LOGGER.error("Unknown TX protocol")
  150. return False
  151. return self.tx_waveform(PROTOCOLS[self.tx_proto].zero_high,
  152. PROTOCOLS[self.tx_proto].zero_low)
  153.  
  154. def tx_l1(self):
  155. """Send a '1' bit."""
  156. if not 0 < self.tx_proto < len(PROTOCOLS):
  157. _LOGGER.error("Unknown TX protocol")
  158. return False
  159. return self.tx_waveform(PROTOCOLS[self.tx_proto].one_high,
  160. PROTOCOLS[self.tx_proto].one_low)
  161.  
  162. def tx_sync(self):
  163. """Send a sync."""
  164. if not 0 < self.tx_proto < len(PROTOCOLS):
  165. _LOGGER.error("Unknown TX protocol")
  166. return False
  167. return self.tx_waveform(PROTOCOLS[self.tx_proto].sync_high,
  168. PROTOCOLS[self.tx_proto].sync_low)
  169.  
  170. def tx_waveform(self, highpulses, lowpulses):
  171. """Send basic waveform."""
  172. if not self.tx_enabled:
  173. _LOGGER.error("TX is not enabled, not sending data")
  174. return False
  175. GPIO.output(self.gpio, GPIO.HIGH)
  176. self._sleep((highpulses * self.tx_pulselength) / 1000000)
  177. GPIO.output(self.gpio, GPIO.LOW)
  178. self._sleep((lowpulses * self.tx_pulselength) / 1000000)
  179. return True
  180.  
  181. # For Reception....
  182. def enable_rx(self):
  183. """Enable RX, set up GPIO and add event detection."""
  184. if self.tx_enabled:
  185. _LOGGER.error("TX is enabled, not enabling RX")
  186. return False
  187. if not self.rx_enabled:
  188. self.rx_enabled = True
  189. GPIO.setup(self.gpio, GPIO.IN)
  190. GPIO.add_event_detect(self.gpio, GPIO.BOTH)
  191. GPIO.add_event_callback(self.gpio, self.rx_callback)
  192. _LOGGER.debug("RX enabled")
  193. return True
  194.  
  195. def disable_rx(self):
  196. """Disable RX, remove GPIO event detection."""
  197. if self.rx_enabled:
  198. GPIO.remove_event_detect(self.gpio)
  199. self.rx_enabled = False
  200. _LOGGER.debug("RX disabled")
  201. return True
  202.  
  203. # pylint: disable=unused-argument
  204. def rx_callback(self, gpio):
  205. """RX callback for GPIO event detection. Handle basic signal detection."""
  206. timestamp = int(time.perf_counter() * 1000000)
  207. duration = timestamp - self._rx_last_timestamp
  208.  
  209. if duration > 5000:
  210. if abs(duration - self._rx_timings[0]) < 200:
  211. self._rx_repeat_count += 1
  212. self._rx_change_count -= 1
  213. if self._rx_repeat_count == 2:
  214. for pnum in range(1, len(PROTOCOLS)):
  215. if self._rx_waveform(pnum, self._rx_change_count, timestamp):
  216. _LOGGER.debug("RX code " + str(self.rx_code))
  217. break
  218. self._rx_repeat_count = 0
  219. self._rx_change_count = 0
  220.  
  221. if self._rx_change_count >= MAX_CHANGES:
  222. self._rx_change_count = 0
  223. self._rx_repeat_count = 0
  224. self._rx_timings[self._rx_change_count] = duration
  225. self._rx_change_count += 1
  226. self._rx_last_timestamp = timestamp
  227.  
  228. def _rx_waveform(self, pnum, change_count, timestamp):
  229. """Detect waveform and format code."""
  230. code = 0
  231. delay = int(self._rx_timings[0] / PROTOCOLS[pnum].sync_low)
  232. delay_tolerance = delay * self.rx_tolerance / 100
  233.  
  234. for i in range(1, change_count, 2):
  235. if (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].zero_high) < delay_tolerance and
  236. abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].zero_low) < delay_tolerance):
  237. code <<= 1
  238. elif (abs(self._rx_timings[i] - delay * PROTOCOLS[pnum].one_high) < delay_tolerance and
  239. abs(self._rx_timings[i+1] - delay * PROTOCOLS[pnum].one_low) < delay_tolerance):
  240. code <<= 1
  241. code |= 1
  242. else:
  243. return False
  244.  
  245. if self._rx_change_count > 6 and code != 0:
  246. self.rx_code = code
  247. self.rx_code_timestamp = timestamp
  248. self.rx_bitlength = int(change_count / 2)
  249. self.rx_pulselength = delay
  250. self.rx_proto = pnum
  251. return True
  252.  
  253. return False
  254.  
  255. def _sleep(self, delay):
  256. _delay = delay / 100
  257. end = time.time() + delay - _delay
  258. while time.time() < end:
  259. time.sleep(_delay)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement