Guest User

Untitled

a guest
Jan 22nd, 2025
54
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.62 KB | None | 0 0
  1. """NRF24L01 driver for MicroPython
  2. """
  3.  
  4. from micropython import const
  5. import utime
  6.  
  7. # nRF24L01+ registers
  8. CONFIG = const(0x00)
  9. EN_RXADDR = const(0x02)
  10. SETUP_AW = const(0x03)
  11. SETUP_RETR = const(0x04)
  12. RF_CH = const(0x05)
  13. RF_SETUP = const(0x06)
  14. STATUS = const(0x07)
  15. RX_ADDR_P0 = const(0x0A)
  16. TX_ADDR = const(0x10)
  17. RX_PW_P0 = const(0x11)
  18. FIFO_STATUS = const(0x17)
  19. DYNPD = const(0x1C)
  20.  
  21. # CONFIG register
  22. EN_CRC = const(0x08)  # enable CRC
  23. CRCO = const(0x04)  # CRC encoding scheme; 0=1 byte, 1=2 bytes
  24. PWR_UP = const(0x02)  # 1=power up, 0=power down
  25. PRIM_RX = const(0x01)  # RX/TX control; 0=PTX, 1=PRX
  26.  
  27. # RF_SETUP register
  28. POWER_0 = const(0x00)  # -18 dBm
  29. POWER_1 = const(0x02)  # -12 dBm
  30. POWER_2 = const(0x04)  # -6 dBm
  31. POWER_3 = const(0x06)  # 0 dBm
  32. SPEED_1M = const(0x00)
  33. SPEED_2M = const(0x08)
  34. SPEED_250K = const(0x20)
  35.  
  36. # STATUS register
  37. RX_DR = const(0x40)  # RX data ready; write 1 to clear
  38. TX_DS = const(0x20)  # TX data sent; write 1 to clear
  39. MAX_RT = const(0x10)  # max retransmits reached; write 1 to clear
  40.  
  41. # FIFO_STATUS register
  42. RX_EMPTY = const(0x01)  # 1 if RX FIFO is empty
  43.  
  44. # constants for instructions
  45. R_RX_PL_WID = const(0x60)  # read RX payload width
  46. R_RX_PAYLOAD = const(0x61)  # read RX payload
  47. W_TX_PAYLOAD = const(0xA0)  # write TX payload
  48. FLUSH_TX = const(0xE1)  # flush TX FIFO
  49. FLUSH_RX = const(0xE2)  # flush RX FIFO
  50. NOP = const(0xFF)  # use to read STATUS register
  51.  
  52.  
  53. class NRF24L01:
  54.     def __init__(self, spi, cs, ce, channel=46, payload_size=16):
  55.         assert payload_size <= 32
  56.  
  57.         self.buf = bytearray(1)
  58.  
  59.         # store the pins
  60.         self.spi = spi
  61.         self.cs = cs
  62.         self.ce = ce
  63.  
  64.         # init the SPI bus and pins
  65.         self.init_spi(4000000)
  66.  
  67.         # reset everything
  68.         ce.init(ce.OUT, value=0)
  69.         cs.init(cs.OUT, value=1)
  70.  
  71.         self.payload_size = payload_size
  72.         self.pipe0_read_addr = None
  73.         utime.sleep_ms(5)
  74.  
  75.         # set address width to 5 bytes and check for device present
  76.         self.reg_write(SETUP_AW, 0b11)
  77.         if self.reg_read(SETUP_AW) != 0b11:
  78.             raise OSError("nRF24L01+ Hardware not responding")
  79.  
  80.         # disable dynamic payloads
  81.         self.reg_write(DYNPD, 0)
  82.  
  83.         # auto retransmit delay: 1750us
  84.         # auto retransmit count: 8
  85.         self.reg_write(SETUP_RETR, (6 << 4) | 8)
  86.  
  87.         # set rf power and speed
  88.         self.set_power_speed(POWER_3, SPEED_250K)  # Best for point to point links
  89.  
  90.         # init CRC
  91.         self.set_crc(2)
  92.  
  93.         # clear status flags
  94.         self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
  95.  
  96.         # set channel
  97.         self.set_channel(channel)
  98.  
  99.         # flush buffers
  100.         self.flush_rx()
  101.         self.flush_tx()
  102.  
  103.     def init_spi(self, baudrate):
  104.         try:
  105.             master = self.spi.MASTER
  106.         except AttributeError:
  107.             self.spi.init(baudrate=baudrate, polarity=0, phase=0)
  108.         else:
  109.             self.spi.init(master, baudrate=baudrate, polarity=0, phase=0)
  110.  
  111.     def reg_read(self, reg):
  112.         self.cs(0)
  113.         self.spi.readinto(self.buf, reg)
  114.         self.spi.readinto(self.buf)
  115.         self.cs(1)
  116.         return self.buf[0]
  117.  
  118.     def reg_write_bytes(self, reg, buf):
  119.         self.cs(0)
  120.         self.spi.readinto(self.buf, 0x20 | reg)
  121.         self.spi.write(buf)
  122.         self.cs(1)
  123.         return self.buf[0]
  124.  
  125.     def reg_write(self, reg, value):
  126.         self.cs(0)
  127.         self.spi.readinto(self.buf, 0x20 | reg)
  128.         ret = self.buf[0]
  129.         self.spi.readinto(self.buf, value)
  130.         self.cs(1)
  131.         return ret
  132.  
  133.     def flush_rx(self):
  134.         self.cs(0)
  135.         self.spi.readinto(self.buf, FLUSH_RX)
  136.         self.cs(1)
  137.  
  138.     def flush_tx(self):
  139.         self.cs(0)
  140.         self.spi.readinto(self.buf, FLUSH_TX)
  141.         self.cs(1)
  142.  
  143.     # power is one of POWER_x defines; speed is one of SPEED_x defines
  144.     def set_power_speed(self, power, speed):
  145.         setup = self.reg_read(RF_SETUP) & 0b11010001
  146.         self.reg_write(RF_SETUP, setup | power | speed)
  147.  
  148.     # length in bytes: 0, 1 or 2
  149.     def set_crc(self, length):
  150.         config = self.reg_read(CONFIG) & ~(CRCO | EN_CRC)
  151.         if length == 0:
  152.             pass
  153.         elif length == 1:
  154.             config |= EN_CRC
  155.         else:
  156.             config |= EN_CRC | CRCO
  157.         self.reg_write(CONFIG, config)
  158.  
  159.     def set_channel(self, channel):
  160.         self.reg_write(RF_CH, min(channel, 125))
  161.  
  162.     # address should be a bytes object 5 bytes long
  163.     def open_tx_pipe(self, address):
  164.         assert len(address) == 5
  165.         self.reg_write_bytes(RX_ADDR_P0, address)
  166.         self.reg_write_bytes(TX_ADDR, address)
  167.         self.reg_write(RX_PW_P0, self.payload_size)
  168.  
  169.     # address should be a bytes object 5 bytes long
  170.     # pipe 0 and 1 have 5 byte address
  171.     # pipes 2-5 use same 4 most-significant bytes as pipe 1, plus 1 extra byte
  172.     def open_rx_pipe(self, pipe_id, address):
  173.         assert len(address) == 5
  174.         assert 0 <= pipe_id <= 5
  175.         if pipe_id == 0:
  176.             self.pipe0_read_addr = address
  177.         if pipe_id < 2:
  178.             self.reg_write_bytes(RX_ADDR_P0 + pipe_id, address)
  179.         else:
  180.             self.reg_write(RX_ADDR_P0 + pipe_id, address[0])
  181.         self.reg_write(RX_PW_P0 + pipe_id, self.payload_size)
  182.         self.reg_write(EN_RXADDR, self.reg_read(EN_RXADDR) | (1 << pipe_id))
  183.  
  184.     def start_listening(self):
  185.         self.reg_write(CONFIG, self.reg_read(CONFIG) | PWR_UP | PRIM_RX)
  186.         self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
  187.  
  188.         if self.pipe0_read_addr is not None:
  189.             self.reg_write_bytes(RX_ADDR_P0, self.pipe0_read_addr)
  190.  
  191.         self.flush_rx()
  192.         self.flush_tx()
  193.         self.ce(1)
  194.         utime.sleep_us(130)
  195.  
  196.     def stop_listening(self):
  197.         self.ce(0)
  198.         self.flush_tx()
  199.         self.flush_rx()
  200.  
  201.     # returns True if any data available to recv
  202.     def any(self):
  203.         return not bool(self.reg_read(FIFO_STATUS) & RX_EMPTY)
  204.  
  205.     def recv(self):
  206.         # get the data
  207.         self.cs(0)
  208.         self.spi.readinto(self.buf, R_RX_PAYLOAD)
  209.         buf = self.spi.read(self.payload_size)
  210.         self.cs(1)
  211.         # clear RX ready flag
  212.         self.reg_write(STATUS, RX_DR)
  213.  
  214.         return buf
  215.  
  216.     # blocking wait for tx complete
  217.     def send(self, buf, timeout=500):
  218.         self.send_start(buf)
  219.         start = utime.ticks_ms()
  220.         result = None
  221.         while result is None and utime.ticks_diff(utime.ticks_ms(), start) < timeout:
  222.             result = self.send_done()  # 1 == success, 2 == fail
  223.         if result == 2:
  224.             raise OSError("send failed")
  225.  
  226.     # non-blocking tx
  227.     def send_start(self, buf):
  228.         # power up
  229.         self.reg_write(CONFIG, (self.reg_read(CONFIG) | PWR_UP) & ~PRIM_RX)
  230.         utime.sleep_us(150)
  231.         # send the data
  232.         self.cs(0)
  233.         self.spi.readinto(self.buf, W_TX_PAYLOAD)
  234.         self.spi.write(buf)
  235.         if len(buf) < self.payload_size:
  236.             self.spi.write(b"\x00" * (self.payload_size - len(buf)))  # pad out data
  237.         self.cs(1)
  238.  
  239.         # enable the chip so it can send the data
  240.         self.ce(1)
  241.         utime.sleep_us(15)  # needs to be >10us
  242.         self.ce(0)
  243.  
  244.     # returns None if send still in progress, 1 for success, 2 for fail
  245.     def send_done(self):
  246.         if not (self.reg_read(STATUS) & (TX_DS | MAX_RT)):
  247.             return None  # tx not finished
  248.  
  249.         # either finished or failed: get and clear status flags, power down
  250.         status = self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)
  251.         self.reg_write(CONFIG, self.reg_read(CONFIG) & ~PWR_UP)
  252.         return 1 if status & TX_DS else 2
Advertisement
Add Comment
Please, Sign In to add comment