Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Test Code for ESP32-S3-ETH-8DI-8RO-C
- https://www.waveshare.com/wiki/ESP32-S3-ETH-8DI-8RO-C#Inteface_Description
- This works:
- - WLAN
- - LAN
- - BT LE
- - WS2812B LED
- - Inputs
- - Outputs
- - Buzzer
- - CAN
- Not tested yet:
- - RTC
- """
- import asyncio
- import time
- import os
- import vfs
- from aioespnow import AIOESPNow
- from machine import Pin, PWM, Signal, I2C, SPI, SDCard, Encoder
- from network import LAN, PHY_W5500
- from neopixel import NeoPixel
- # https://github.com/lewisxhe/PCF8563_PythonLibrary/blob/master/pcf8563.py
- from pcf8563 import PCF8563 as PCF
- from bt import ble_service
- from umodbus.tcp import ModbusTCP
- # https://github.com/straga/micropython-esp32-twai
- import CAN
- # TCA99554PWR is used to control the relay outputs
- TCA_ADDR = 32
- TCA_CONFIG = 0x03
- TCA_OUTPUT = 0x01
- # on/off for encoder input a/b
- on = [1]
- off = [0]
- def irq_a1(pin):
- can.send(on if pin() else off, 0x10)
- def irq_b1(pin):
- can.send(on if pin() else off, 0x11)
- def irq_a2(pin):
- can.send(on if pin() else off, 0x12)
- def irq_b2(pin):
- can.send(on if pin() else off, 0x13)
- def irq_a3(pin):
- can.send(on if pin() else off, 0x14)
- def irq_b3(pin):
- can.send(on if pin() else off, 0x15)
- def setup_encoder() -> tuple[Encoder, Encoder, Encoder]:
- a1 = Pin(4, pull=Pin.PULL_UP)
- b1 = Pin(5, pull=Pin.PULL_UP)
- a2 = Pin(6, pull=Pin.PULL_UP)
- b2 = Pin(7, pull=Pin.PULL_UP)
- a3 = Pin(8, pull=Pin.PULL_UP)
- b3 = Pin(9, pull=Pin.PULL_UP)
- enc_x = Encoder(0, a1, b1, phases=4)
- enc_y = Encoder(1, a2, b2, phases=4)
- enc_z = Encoder(2, a3, b3, phases=4)
- a1.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a1)
- b1.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b1)
- a2.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a2)
- b2.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b2)
- a3.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a3)
- b3.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b3)
- return enc_x, enc_y, enc_z
- def led():
- # LED at USB-C Port
- np = NeoPixel(Pin(38), 1)
- for c in [(255, 0, 0), (0, 255, 0), (0, 0, 255), (0, 0, 0)]:
- np[0] = c
- np.write()
- time.sleep_ms(150)
- def init_i2c():
- # TCA: 32, RTC: 81
- return I2C(scl=Pin(41), sda=Pin(42), freq=400_000)
- def init_outputs():
- # first set all outputs to 0
- i2c.writeto_mem(TCA_ADDR, TCA_OUTPUT, b'\x00')
- #then configure all gpios as outputs
- i2c.writeto_mem(TCA_ADDR, TCA_CONFIG, b'\x00')
- def init_inputs():
- # GPIO-Inputs are from ESP32
- return [Signal(Pin(4 + n, mode=Pin.IN, pull=Pin.PULL_UP), invert=True) for n in range(8)]
- def read_port(port) -> bool:
- """
- Read Port 1 - 8
- """
- return inputs[port - 1].value()
- def set_output(port: int, value: bool):
- """
- Write to Port 1 - 8
- Chip: TCA99554PWR (I2C)
- """
- buffer = bytearray(1)
- i2c.readfrom_mem_into(TCA_ADDR, TCA_OUTPUT, buffer)
- if value:
- buffer[0] |= 1 << (port - 1)
- else:
- buffer[0] &= ~(1 << (port - 1))
- i2c.writeto_mem(TCA_ADDR, TCA_OUTPUT, buffer)
- def init_lan():
- lan_spi = SPI(2, sck=Pin(15), mosi=Pin(13), miso=Pin(14), baudrate=20_000_000)
- lan_int = Pin(12, mode=Pin.IN)
- lan_sel = Pin(16, mode=Pin.OUT, value=1)
- lan_rst = Pin(39, mode=Pin.OUT)
- eth0 = LAN(spi=lan_spi, cs=lan_sel, int=lan_int, phy_type=PHY_W5500, phy_addr=0, reset=lan_rst)
- eth0.active(True)
- return eth0
- def init_sdcard():
- return SDCard(
- slot=0,
- width=1,
- cmd=Pin(47),
- data=(Pin(45),),
- sck=Pin(48),
- )
- def mount_sd(sd):
- os.mount(sd, "/sd")
- def test1():
- # test 1
- for color in [(255,0,0), (0,255,0), (0,0,255)]:
- np[0] = color
- np.write()
- time.sleep(1)
- np[0] = (0,0,0)
- np.write()
- def test2():
- for port in range(1, 9):
- if port > 1:
- set_output(port - 1, False)
- set_output(port, True)
- time.sleep(0.2)
- set_output(8, False)
- async def recv_can():
- while True:
- if can.any():
- msg = can.recv()
- print("CAN:", msg)
- # CAN: (80, False, False, b'\x03')
- if msg[0] == 0x50 and len(msg[-1]) == 1:
- value = msg[-1][0]
- if value & 0x01:
- print("Reset X Encoder")
- enc_x.value(0)
- if value & 0x02:
- print("Reset Y Encoder")
- enc_y.value(0)
- if value & 0x04:
- print("Reset Z Encoder")
- enc_z.value(0)
- elif msg[0] == 0x51 and len(msg[-1]) == 1:
- value = msg[-1][0]
- for shift in range(8):
- set_output(shift + 1, bool(value & (1 << shift)))
- await asyncio.sleep(0.1)
- async def send_can():
- while True:
- if can.info()["msgs_to_tx"] == 0:
- try:
- can.send([1,2,3], 0x82, timeout=1)
- except OSError:
- pass
- await asyncio.sleep(1)
- def modbus_coils_set(address, val, reg_type):
- for addr, value in enumerate(val, address):
- print(f"Set coil {addr} to {value}")
- set_output(addr, value)
- def modbus_coils_get(address, val, reg_type):
- print(address, val, reg_type)
- async def espnow_client(espnow: AIOESPNow):
- print("Waiting for ESPNow messages")
- async for client, msg in espnow:
- if msg == b"ON":
- set_output(1, True)
- elif msg == b"OFF":
- set_output(1, False)
- async def read_enc():
- last = None
- x_factor = 0.2989010989010989
- y_factor = 0.2974560673675727
- z_factor = 0.29498525073746323
- while True:
- current = (enc_x.value(), enc_y.value(), enc_z.value())
- if current != last:
- last = current
- print(f"X: {current[0] * x_factor:5.1f} mm | Y: {current[1] * y_factor:5.1f} mm | Z: {-current[2] * z_factor:5.1f} mm")
- if can.info()["msgs_to_tx"] == 0:
- try:
- can.send([
- current[0] & 0xFF, (current[0] >> 8) & 0xFF,
- current[1] & 0xFF, (current[2] >> 8) & 0xFF,
- current[2] & 0xFF, (current[2] >> 8) & 0xFF,
- ],
- 0x20,
- timeout=1,
- )
- except OSError:
- pass
- await asyncio.sleep_ms(100)
- async def modbus_worker():
- modbus = ModbusTCP()
- modbus.setup_registers()
- modbus.bind("")
- for addr in range(1, 9):
- modbus.add_coil(
- addr,
- False,
- on_set_cb=modbus_coils_set,
- on_get_cb=modbus_coils_get,
- )
- while True:
- modbus.process()
- await asyncio.sleep_ms(10)
- async def main():
- print("Starting CAN receive task")
- asyncio.create_task(recv_can())
- print("Starting CAN send task")
- asyncio.create_task(send_can())
- print("Starting encoder read task")
- asyncio.create_task(read_enc())
- print("Starting BLE task")
- asyncio.create_task(ble_service(set_output))
- print("Starting ESPNow task")
- asyncio.create_task(espnow_client(espnow))
- print("Starting Modbus task")
- asyncio.create_task(modbus_worker())
- while True:
- await asyncio.sleep(5)
- led()
- # CAN blocks if msgs_to_tx == 2
- can = CAN(0, tx=17, rx=18, mode=CAN.NORMAL, bitrate=2_000_000, extframe=0)
- # CAN.NACK << test if this could help with blocking
- # can = CAN(0, tx=17, rx=18, mode=CAN.NACK, bitrate=2_000_000, extframe=0)
- enc_x, enc_y, enc_z = setup_encoder()
- i2c = init_i2c()
- inputs = init_inputs()
- sd_card = init_sdcard()
- mount_sd(sd_card)
- init_outputs()
- rtc = PCF(i2c)
- lan = init_lan()
- buzzer = PWM(Pin(46), freq=880, duty=0)
- espnow = AIOESPNow()
- espnow.active(True)
- espnow.add_peer(bytes([255]*6))
- register_definitions = {
- "COILS": {
- "COIL": {
- "register": 0,
- "len": 8,
- "val": 0
- }
- },
- "HREGS": {
- "EXAMPLE_HREG": {
- "register": 93,
- "len": 1,
- "val": 19
- }
- },
- "ISTS": {
- "EXAMPLE_ISTS": {
- "register": 67,
- "len": 1,
- "val": 0
- }
- },
- "IREGS": {
- "DIN": {
- "register": 0,
- "len": 1,
- "val": 0
- }
- }
- }
- print("Running mainloop")
- asyncio.run(main())
Advertisement