Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- from pathlib import Path
- from uio.utils import fix_ctypes_struct
- from uio.ti.icss import Icss
- from uio.ti.pwmss import Pwmss
- import ctypes
- from ctypes import c_uint32 as uint32, c_uint16 as uint16, c_int32 as int32, c_int16 as int16, c_int8 as int8
- from time import sleep
- from sys import exit
- import numpy as np
- import csv
- class Gpio:
- def __init__( self, name ):
- self.name = name
- self._value_path = Path( '/dev/gpio', name, 'value' )
- def get_value( self ):
- return int( self._value_path.read_text() )
- def set_value( self, value ):
- self._value_path.write_text( str( value ) )
- class Pru0_Poll_Vars( ctypes.Structure ):
- _fields_ = [
- ("position", uint32),
- ]
- class Message( ctypes.Structure ):
- _fields_ = [
- ( 'id', uint32 ),
- ('timestamp', uint32),
- ('position', uint32),
- ('control_signal', int32),
- ('counter', uint32),
- ('force', int16),
- ('motor_effort', int16)
- ]
- # used to communicate ddr layout to C program
- class DDRLayout( ctypes.Structure ):
- _fields_ = [
- ( 'msgbuf', uint32 ),
- ( 'num_msgs', uint16 ),
- ( 'msg_size', uint16 ),
- ]
- # volatile variables in pruss shared memory
- class SharedVars( ctypes.Structure ):
- _fields_ = [
- ( 'abort_file', uint32 ),
- ( 'abort_line', uint32 ),
- ( 'ridx', uint16 ),
- ( 'signal', uint16*1000),
- ]
- data_file = open('control_data.csv', 'w', newline='')
- writer = csv.writer(data_file)
- header = ['timestamp', 'motor_input','position', 'motor_effort']
- writer.writerow(header)
- pruss = Icss( "/dev/uio/pruss/module" )
- pruss.initialize()
- ddr = pruss.ddr
- pruss.uart.initialize(baudrate = 460800)
- frequency= 10000
- divider = 1
- period = round( 100e6 / divider / frequency )
- motor = Pwmss( "/dev/uio/pwmss1" ).pwm
- motor.initialize(period,divider)
- motor_direction = Gpio('motor_dir')
- core_1 = pruss.core1
- core_1.load( 'fw-c/pid_control.out' )
- core_0 = pruss.core0
- core_0.load( 'fw/decoder.bin' )
- pru0_vars = pruss.dram0.map( Pru0_Poll_Vars )
- # if you don't want the ringbuffer at the start of the ddr region, specify offset here
- MSGBUF_OFFSET = 0
- # you can use a fixed ringbuffer size:
- #NUM_MSGS = 1024
- # or you can scale the ringbuffer to fit the size of the ddr region:
- NUM_MSGS = ( ddr.size - MSGBUF_OFFSET - ctypes.sizeof( uint16 ) ) // ctypes.sizeof( Message )
- NUM_MSGS = min( NUM_MSGS, 65535 ) # make sure number of messages fits in u16
- # map shared memory variables
- ddr_layout = core_1.map( DDRLayout, 0x10000 )
- shmem = core_1.map( SharedVars, 0x10100 )
- msgbuf = ddr.map( Message * NUM_MSGS, MSGBUF_OFFSET )
- ddr_widx = ddr.map( uint16, MSGBUF_OFFSET + ctypes.sizeof( msgbuf ) )
- # inform pru about layout of shared ddr memory region
- ddr_layout.msgbuf = ddr.address + MSGBUF_OFFSET
- ddr_layout.num_msgs = NUM_MSGS
- ddr_layout.msg_size = ctypes.sizeof( Message )
- # local copies of read-pointer and write-pointer
- ridx = 0
- widx = 0
- id_value = 0
- timestamp_cycles = 0
- # initialize pointers in shared memory
- shmem.ridx = ridx
- ddr_widx.value = widx
- line_count = 0
- with open('./double_hump_profile.csv','r') as f:
- signal_reader = csv.reader(f)
- for line in signal_reader:
- shmem.signal[line_count] = int(line[0])
- line_count += 1
- print(f' Loaded {line_count} lines in the file')
- # ready, set, go!
- pruss.ecap.pwm.initialize( 2**32 )
- core_0.run()
- pru0_vars.position = 0
- core_1.run()
- def check_core():
- if not core_1.halted:
- return
- if core_1.state.crashed:
- msg = f'core crashed at pc={core_1.pc}'
- elif shmem.abort_file == 0:
- msg = f'core halted at pc={core_1.pc}'
- else:
- # FIXME figure out better way to read C-string from PRU memory
- abort_file = core_1.read( ctypes.c_char * 32, shmem.abort_file ).value
- abort_file = abort_file.decode("ascii")
- msg = f'core aborted at pc={core_1.pc} ({abort_file}:{shmem.abort_line})'
- # dump some potentially interesting information:
- msg += f'\n ridx = {ridx}'
- msg += f'\n shmem.ridx = {shmem.ridx}'
- msg += f'\n ddr_widx = {ddr_widx.value}'
- exit( msg )
- lastid = 0
- def recv_messages():
- global ridx, widx, lastid, timestamp_cycles
- # read updated write-pointer
- widx = ddr_widx.value
- assert widx < NUM_MSGS # sanity-check
- if ridx == widx:
- return # no messages received
- # process messages
- while ridx != widx:
- # note: it may be faster to copy a batch of messages from shared memory
- # instead of directly accessing individual messages and their fields.
- msg = msgbuf[ ridx ]
- # sanity-check that message id increments monotonically
- lastid = ( lastid + 1 ) & 0xffffffff
- assert msg.id == lastid
- # get 32-bit timestamp (in cycles) from message and unwrap it:
- timestamp_cycles += ( msg.timestamp - timestamp_cycles ) & 0xffffffff
- # process rest of message
- position = msg.position
- force = msg.force
- motor_input = msg.control_signal
- motor_effort = msg.motor_effort
- counter = msg.counter
- if (motor_effort > 0):
- motor_direction.set_value( 0 )
- else:
- motor_direction.set_value( 1 )
- # consume message and update read pointer
- del msg # direct access to message forbidden beyond this point
- ridx += 1
- if ridx == NUM_MSGS:
- ridx = 0
- shmem.ridx = ridx
- # update user interface
- ts_ms = timestamp_cycles // 200000
- ts_s = ( ts_ms % 60000 ) / 1000
- ts_m = ts_ms // 60000
- ts_h = ts_m // 60
- ts_m = ts_m % 60
- test = timestamp_cycles *-1
- test2 = timestamp_cycles // 200000
- data = [f'{ts_m:02d}:{ts_s:06.3f}', f'{motor_input:+09d}',f'{position:+09d}', f'{motor_effort:+09d}', f'{counter:+05d}']
- writer.writerow(data)
- print(f'\rtime={ts_h:02d}:{ts_m:02d}:{ts_s:06.3f} position={position:08d} force={force:08d} signal = {motor_input:+09d} effort = {motor_effort:+09d}', end='', flush = True )
- try:
- while True:
- recv_messages()
- check_core()
- sleep( 0.01 )
- except KeyboardInterrupt:
- pass
- finally:
- print( '', flush=True )
- core_0.halt()
- core_1.halt()
- pruss.uart.io.write( b'FFV\r', discard=True, flush=True ) # interrupt continuous transmission
- sleep( 0.01 ) # make sure that response has been received
- pruss.uart.io.discard() # discard response
- control_signal += A[0] * error[0] + A[1] * error[1] + A[2] * error[2];
- if( control_signal < 0 ) {
- if( control_signal < -(4000 << 16) )
- control_signal = -(4000 << 16);
- EPWM1A = (-control_signal) >> 16;
- gpio_set_one_low( &GPIO0, 31 );
- } else {
- if( control_signal > (4000 << 16) )
- control_signal = (4000 << 16);
- EPWM1A = control_signal >> 16;
- gpio_set_one_high( &GPIO0, 31 );
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement