DeaD_EyE

ESP32-S3-ETH-8DI-8RO-C

Aug 13th, 2025 (edited)
847
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.73 KB | None | 0 0
  1. """
  2. Test Code for ESP32-S3-ETH-8DI-8RO-C
  3.  
  4. https://www.waveshare.com/wiki/ESP32-S3-ETH-8DI-8RO-C#Inteface_Description
  5.  
  6. This works:
  7. - WLAN
  8. - LAN
  9. - BT LE
  10. - WS2812B LED
  11. - Inputs
  12. - Outputs
  13. - Buzzer
  14. - CAN
  15.  
  16. Not tested yet:
  17. - RTC
  18.  
  19. """
  20. import asyncio
  21. import time
  22. import os
  23. import vfs
  24.  
  25. from aioespnow import AIOESPNow
  26. from machine import Pin, PWM, Signal, I2C, SPI, SDCard, Encoder
  27. from network import LAN, PHY_W5500
  28. from neopixel import NeoPixel
  29.  
  30. # https://github.com/lewisxhe/PCF8563_PythonLibrary/blob/master/pcf8563.py
  31. from pcf8563 import PCF8563 as PCF
  32.  
  33. from bt import ble_service
  34. from umodbus.tcp import ModbusTCP
  35.  
  36. # https://github.com/straga/micropython-esp32-twai
  37. import CAN
  38.  
  39. # TCA99554PWR is used to control the relay outputs
  40. TCA_ADDR = 32
  41. TCA_CONFIG = 0x03
  42. TCA_OUTPUT = 0x01
  43.  
  44. # on/off for encoder input a/b
  45. on = [1]
  46. off = [0]
  47.  
  48. def irq_a1(pin):
  49.     can.send(on if pin() else off, 0x10)
  50.  
  51. def irq_b1(pin):
  52.     can.send(on if pin() else off, 0x11)
  53.  
  54. def irq_a2(pin):
  55.     can.send(on if pin() else off, 0x12)
  56.  
  57. def irq_b2(pin):
  58.     can.send(on if pin() else off, 0x13)
  59.  
  60. def irq_a3(pin):
  61.     can.send(on if pin() else off, 0x14)
  62.  
  63. def irq_b3(pin):
  64.     can.send(on if pin() else off, 0x15)
  65.  
  66. def setup_encoder() -> tuple[Encoder, Encoder, Encoder]:
  67.     a1 = Pin(4, pull=Pin.PULL_UP)
  68.     b1 = Pin(5, pull=Pin.PULL_UP)
  69.     a2 = Pin(6, pull=Pin.PULL_UP)
  70.     b2 = Pin(7, pull=Pin.PULL_UP)
  71.     a3 = Pin(8, pull=Pin.PULL_UP)
  72.     b3 = Pin(9, pull=Pin.PULL_UP)
  73.    
  74.     enc_x = Encoder(0, a1, b1, phases=4)
  75.     enc_y = Encoder(1, a2, b2, phases=4)
  76.     enc_z = Encoder(2, a3, b3, phases=4)
  77.    
  78.     a1.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a1)
  79.     b1.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b1)
  80.     a2.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a2)
  81.     b2.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b2)
  82.     a3.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a3)
  83.     b3.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b3)
  84.    
  85.     return enc_x, enc_y, enc_z
  86.  
  87.  
  88. def led():
  89.     # LED at USB-C Port
  90.     np = NeoPixel(Pin(38), 1)
  91.     for c in [(255, 0, 0), (0, 255, 0), (0, 0, 255), (0, 0, 0)]:
  92.         np[0] = c
  93.         np.write()
  94.         time.sleep_ms(150)
  95.  
  96.  
  97. def init_i2c():
  98.     # TCA: 32, RTC: 81
  99.     return I2C(scl=Pin(41), sda=Pin(42), freq=400_000)
  100.  
  101. def init_outputs():
  102.     # first set all outputs to 0
  103.     i2c.writeto_mem(TCA_ADDR, TCA_OUTPUT, b'\x00')
  104.     #then configure all gpios as outputs
  105.     i2c.writeto_mem(TCA_ADDR, TCA_CONFIG, b'\x00')
  106.  
  107. def init_inputs():
  108.     # GPIO-Inputs are from ESP32
  109.     return [Signal(Pin(4 + n, mode=Pin.IN, pull=Pin.PULL_UP), invert=True) for n in range(8)]
  110.  
  111. def read_port(port) -> bool:
  112.     """
  113.    Read Port 1 - 8
  114.    """
  115.     return inputs[port - 1].value()
  116.  
  117. def set_output(port: int, value: bool):
  118.     """
  119.    Write to Port 1 - 8
  120.    
  121.    Chip: TCA99554PWR (I2C)
  122.    """
  123.     buffer = bytearray(1)
  124.     i2c.readfrom_mem_into(TCA_ADDR, TCA_OUTPUT, buffer)
  125.    
  126.     if value:
  127.         buffer[0] |= 1 << (port - 1)
  128.     else:
  129.         buffer[0] &= ~(1 << (port - 1))
  130.  
  131.     i2c.writeto_mem(TCA_ADDR, TCA_OUTPUT, buffer)
  132.  
  133. def init_lan():
  134.     lan_spi = SPI(2, sck=Pin(15), mosi=Pin(13), miso=Pin(14), baudrate=20_000_000)
  135.     lan_int = Pin(12, mode=Pin.IN)
  136.     lan_sel = Pin(16, mode=Pin.OUT, value=1)
  137.     lan_rst = Pin(39, mode=Pin.OUT)
  138.     eth0 = LAN(spi=lan_spi, cs=lan_sel, int=lan_int, phy_type=PHY_W5500, phy_addr=0, reset=lan_rst)
  139.     eth0.active(True)
  140.     return eth0
  141.  
  142.  
  143. def init_sdcard():
  144.     return SDCard(
  145.         slot=0,
  146.         width=1,
  147.         cmd=Pin(47),
  148.         data=(Pin(45),),
  149.         sck=Pin(48),
  150.     )
  151.  
  152.  
  153. def mount_sd(sd):
  154.     os.mount(sd, "/sd")
  155.  
  156.  
  157. def test1():
  158.     # test 1
  159.     for color in [(255,0,0), (0,255,0), (0,0,255)]:
  160.         np[0] = color
  161.         np.write()
  162.         time.sleep(1)
  163.     np[0] = (0,0,0)
  164.     np.write()
  165.  
  166.  
  167. def test2():
  168.     for port in range(1, 9):
  169.         if port > 1:
  170.             set_output(port - 1, False)
  171.         set_output(port, True)
  172.         time.sleep(0.2)
  173.     set_output(8, False)
  174.  
  175.  
  176. async def recv_can():
  177.     while True:
  178.         if can.any():
  179.             msg = can.recv()
  180.             print("CAN:", msg)
  181.             # CAN: (80, False, False, b'\x03')
  182.             if msg[0] == 0x50 and len(msg[-1]) == 1:
  183.                 value = msg[-1][0]
  184.                 if value & 0x01:
  185.                     print("Reset X Encoder")
  186.                     enc_x.value(0)
  187.                 if value & 0x02:
  188.                     print("Reset Y Encoder")
  189.                     enc_y.value(0)
  190.                 if value & 0x04:
  191.                     print("Reset Z Encoder")
  192.                     enc_z.value(0)
  193.             elif msg[0] == 0x51 and len(msg[-1]) == 1:
  194.                 value = msg[-1][0]
  195.                 for shift in range(8):
  196.                     set_output(shift + 1, bool(value & (1 << shift)))
  197.                    
  198.         await asyncio.sleep(0.1)
  199.  
  200.  
  201. async def send_can():
  202.     while True:
  203.         if can.info()["msgs_to_tx"] == 0:
  204.             try:
  205.                 can.send([1,2,3], 0x82, timeout=1)
  206.             except OSError:
  207.                 pass
  208.  
  209.         await asyncio.sleep(1)
  210.  
  211.  
  212. def modbus_coils_set(address, val, reg_type):
  213.     for addr, value in enumerate(val, address):
  214.         print(f"Set coil {addr} to {value}")
  215.         set_output(addr, value)
  216.  
  217.  
  218. def modbus_coils_get(address, val, reg_type):
  219.     print(address, val, reg_type)
  220.  
  221.  
  222. async def espnow_client(espnow: AIOESPNow):
  223.     print("Waiting for ESPNow messages")
  224.     async for client, msg in espnow:
  225.         if msg == b"ON":
  226.             set_output(1, True)
  227.         elif msg == b"OFF":
  228.             set_output(1, False)
  229.  
  230.  
  231. async def read_enc():
  232.     last = None
  233.    
  234.     x_factor = 0.2989010989010989
  235.     y_factor = 0.2974560673675727
  236.     z_factor = 0.29498525073746323
  237.    
  238.     while True:
  239.         current = (enc_x.value(), enc_y.value(), enc_z.value())
  240.        
  241.         if current != last:
  242.             last = current
  243.             print(f"X: {current[0] * x_factor:5.1f} mm | Y: {current[1] * y_factor:5.1f} mm | Z: {-current[2] * z_factor:5.1f} mm")
  244.             if can.info()["msgs_to_tx"] == 0:
  245.                 try:
  246.                     can.send([
  247.                         current[0] & 0xFF, (current[0] >> 8) & 0xFF,
  248.                         current[1] & 0xFF, (current[2] >> 8) & 0xFF,
  249.                         current[2] & 0xFF, (current[2] >> 8) & 0xFF,
  250.                         ],
  251.                         0x20,
  252.                         timeout=1,
  253.                     )
  254.                 except OSError:
  255.                     pass
  256.                
  257.         await asyncio.sleep_ms(100)
  258.  
  259.  
  260. async def modbus_worker():
  261.     modbus = ModbusTCP()
  262.     modbus.setup_registers()
  263.     modbus.bind("")
  264.    
  265.     for addr in range(1, 9):
  266.         modbus.add_coil(
  267.             addr,
  268.             False,
  269.             on_set_cb=modbus_coils_set,
  270.             on_get_cb=modbus_coils_get,
  271.         )
  272.    
  273.     while True:
  274.         modbus.process()      
  275.         await asyncio.sleep_ms(10)
  276.  
  277.  
  278. async def main():
  279.     print("Starting CAN receive task")
  280.     asyncio.create_task(recv_can())
  281.     print("Starting CAN send task")
  282.     asyncio.create_task(send_can())
  283.    
  284.     print("Starting encoder read task")
  285.     asyncio.create_task(read_enc())
  286.    
  287.     print("Starting BLE task")
  288.     asyncio.create_task(ble_service(set_output))
  289.     print("Starting ESPNow task")
  290.     asyncio.create_task(espnow_client(espnow))
  291.    
  292.     print("Starting Modbus task")
  293.     asyncio.create_task(modbus_worker())
  294.    
  295.     while True:
  296.         await asyncio.sleep(5)
  297.  
  298.  
  299.  
  300. led()
  301.  
  302. # CAN blocks if msgs_to_tx == 2
  303. can = CAN(0, tx=17, rx=18, mode=CAN.NORMAL, bitrate=2_000_000, extframe=0)
  304. # CAN.NACK << test if this could help with blocking
  305. # can = CAN(0, tx=17, rx=18, mode=CAN.NACK, bitrate=2_000_000, extframe=0)
  306. enc_x, enc_y, enc_z = setup_encoder()
  307.  
  308. i2c = init_i2c()
  309. inputs = init_inputs()
  310. sd_card = init_sdcard()
  311. mount_sd(sd_card)
  312. init_outputs()
  313. rtc = PCF(i2c)
  314. lan = init_lan()
  315. buzzer = PWM(Pin(46), freq=880, duty=0)
  316. espnow = AIOESPNow()
  317. espnow.active(True)
  318. espnow.add_peer(bytes([255]*6))
  319.  
  320.  
  321. register_definitions = {
  322.     "COILS": {
  323.         "COIL": {
  324.             "register": 0,
  325.             "len": 8,
  326.             "val": 0
  327.         }
  328.     },
  329.     "HREGS": {
  330.         "EXAMPLE_HREG": {
  331.             "register": 93,
  332.             "len": 1,
  333.             "val": 19
  334.         }
  335.     },
  336.     "ISTS": {
  337.         "EXAMPLE_ISTS": {
  338.             "register": 67,
  339.             "len": 1,
  340.             "val": 0
  341.         }
  342.     },
  343.     "IREGS": {
  344.         "DIN": {
  345.             "register": 0,
  346.             "len": 1,
  347.             "val": 0
  348.         }
  349.     }
  350. }
  351.  
  352.  
  353.  
  354. print("Running mainloop")
  355. asyncio.run(main())
  356.  
  357.  
Advertisement