Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys # We need sys so that we can pass argv to QApplication
- import asyncio
- from uio.utils import ctypes
- from uio.ti.icss import Icss
- from uio.ti.pwmss import Pwmss
- from ctypes import c_uint32 as uint32, c_uint8 as uint8, c_uint16 as uint16,c_int16 as int16, c_uint64 as uint64, c_char as char, c_int32 as int32, c_bool as boolean, Structure
- from ctypes import sizeof as c_sizeof
- from pathlib import Path
- from time import sleep
- import numpy as np
- import csv
- pruss = Icss( "/dev/uio/pruss/module" )
- core_1 = pruss.core1
- frequency= 10000
- period = None
- divider = 1
- position = None
- weight = None
- pruvars = None
- top_motor_speed = period
- current_motor_speed = None
- maxloadposition = None
- minloadposition = None
- last_timestamp_cycles = 0
- stepper_step = None
- stepper_direction = None
- server1 = None
- server2 = None
- coro1 = None
- coro2 = None
- data_queue = None
- active_run = None
- shut_down = None
- NUM_MSGS = None
- ddr_layout = None
- shmem = None
- msgbuf = None
- ddr_widx = None
- ddr_layout = None
- rdix = 0
- wdix = 0
- lastid = 0
- timestamp_cycles = 0
- last_encoder_count = uint32(0)
- last_encoder_angle = 0
- encoder_count = uint32(0)
- encoder_angle = 0
- tare_coeff = None
- emi_coeff = None
- val_stream = None
- line_count = 0
- pruss.initialize()
- period = round( 100e6 / divider / frequency )
- top_motor_speed = period
- pruss.uart.initialize(baudrate = 460800)
- magnet = Pwmss( "/dev/uio/pwmss1" ).pwm
- magnet.initialize(period,divider)
- current_magnet_speed = 0
- # used to communicate ddr layout to C program
- class DDRLayout( Structure ):
- _fields_ = [
- ( 'msgbuf', uint32 ),
- ( 'num_msgs', uint16 ),
- ( 'msg_size', uint16 ),
- ]
- # volatile variables in pruss shared memory
- class SharedVars( Structure ):
- _fields_ = [
- ( 'abort_file', uint32),
- ( 'abort_line', uint32),
- ( 'time', uint32),
- ( 'ridx', uint16),
- ( 'cycle_limit', uint16),
- ( 'time_limit', uint16),
- ( 'signal_length', uint16),
- ( 'signal', uint16*2000)
- ]
- class Message( ctypes.Structure ):
- _fields_ = [
- ( 'id', uint32 ),
- ('timestamp', uint32),
- ('control_signal', int32),
- ('voltage', int32),
- ('force', int16),
- ('cycle_count', uint16),
- ('counter', uint16),
- ('signal_length', uint16)
- ]
- class Gpio:
- def __init__( self, name ):
- self.name = name
- self._value_path = Path( '/dev/gpio', name, 'value' )
- def get_value( self ):
- return int( self._value_path.read_text() )
- def set_value( self, value ):
- self._value_path.write_text( str( value ) )
- def StopEM100():
- pruss.uart.io.write( b'FFV\r', discard=True, flush=True ) # interrupt continuous transmission
- sleep( 0.1 ) # make sure that response has been received
- pruss.uart.io.discard() # discard response
- def check_core():
- if not core_1.halted:
- return
- if core_1.state.crashed:
- msg = f'core crashed at pc={core_1.pc}'
- elif shmem.abort_file == 0:
- msg = f'core halted at pc={core_1.pc}'
- else:
- # FIXME figure out better way to read C-string from PRU memory
- abort_file = core_1.read( char * 32, shmem.abort_file ).value
- abort_file = abort_file.decode("ascii")
- msg = f'core aborted at pc={core_1.pc} ({abort_file}:{shmem.abort_line})'
- # dump some potentially interesting information:
- msg += f'\n ridx = {ridx}'
- msg += f'\n shmem.ridx = {shmem.ridx}'
- msg += f'\n ddr_widx = {ddr_widx.value}'
- exit( msg )
- astid = 0
- print(f'placed signal limit value of {shmem.signal_length}')
- def recv_messages():
- # read updated write-pointer
- global widx
- global ridx
- global lastid
- global timestamp_cycles
- widx = ddr_widx.value
- assert widx < NUM_MSGS # sanity-check
- if widx == ridx:
- return # nothing to do
- txdata = ''
- # process batch of messages
- # note: it may be faster to copy a batch of messages from shared memory
- # instead of directly accessing individual messages and their fields.
- while ridx != widx:
- msg = msgbuf[ ridx ]
- # sanity-check that message id increments monotonically
- # (i.e. no messages from pru got dropped)
- dropped = ( msg.id - lastid - 1 ) & 0xffffffff
- if dropped != 0:
- raise RuntimeError( f'{dropped} messages dropped by PRU (0x{lastid:08x} -> 0x{msg.id:08x})' )
- lastid = msg.id
- # get 32-bit timestamp (in cycles) from message and unwrap it:
- timestamp_cycles += ( msg.timestamp - timestamp_cycles ) & 0xfffffff
- # convert to timestamp in seconds
- force = int(msg.force )
- voltage = int(msg.voltage)
- signal = int(msg.control_signal)
- counter = int(msg.counter)
- cycle_count = int(msg.cycle_count)
- signal_length = int(msg.signal_length)
- timestamp_ms = timestamp_cycles // 200000
- timestamp_s = ( timestamp_ms % 60000 ) / 1000
- timestamp_m = timestamp_ms // 60000
- timestamp_h = timestamp_m // 60
- timestamp_m = timestamp_m % 60
- timestamp_label = f'{timestamp_h:02d}:{timestamp_m:02d}:{timestamp_s:06.3f}'
- time = timestamp_s + timestamp_m*60 + timestamp_h *360
- txmsg = [time, timestamp_label,force,voltage,signal,cycle_count,counter,signal_length]
- txdata += ','.join(map( str, txmsg )) + '\n'
- # or just: txdata += f'{self.timestamp_cycles},{force},{angle}\n'
- # consume message and update read pointer
- ridx += 1
- if (ridx == NUM_MSGS):
- ridx = 0
- del msg
- shmem.ridx = ridx # direct access to message forbidden beyond this point
- # send data to client
- #writer.writerow( txdata.encode('ascii') )
- writer.writerow( txmsg )
- # TODO maybe bail out with an error if too much data buffered in the writer?
- # you can check with writer.get_write_buffer_size()
- print( f'\ridx=0x{ridx:04x} id=0x{lastid:08x} time={timestamp_label} force={force:08d} control signal={signal:08d} voltage={voltage:08d} counter={counter:08d}', end='', flush=True )
- current_magnet_speed = 0
- magnet_direction = Gpio('magnet-dir')
- magnet_direction.set_value( 0 )
- data_queue = asyncio.Queue()
- stop_active_run = False
- shut_down = False
- ddr = pruss.ddr
- core_1.load('direct_control.out')
- # if you don't want the ringbuffer at the start of the ddr region, specify offset here
- MSGBUF_OFFSET = 0
- # you can use a fixed ringbuffer size:
- #NUM_MSGS = 1024
- # or you can scale the ringbuffer to fit the size of the ddr region:
- NUM_MSGS = (ddr.size - MSGBUF_OFFSET - c_sizeof( uint16 ) ) // c_sizeof( Message )
- NUM_MSGS = min( NUM_MSGS, 65535 ) # make sure number of messages fits in u16
- # map shared memory variables
- ddr_layout = core_1.map( DDRLayout, 0x10000 )
- shmem = core_1.map( SharedVars, 0x10100 )
- msgbuf = ddr.map( Message * NUM_MSGS, MSGBUF_OFFSET )
- ddr_widx = ddr.map( uint16, MSGBUF_OFFSET + c_sizeof( msgbuf ) )
- # inform pru about layout of shared ddr memory region
- ddr_layout.msgbuf = ddr.address + MSGBUF_OFFSET
- ddr_layout.num_msgs = NUM_MSGS
- ddr_layout.msg_size = c_sizeof( Message )
- # local copies of read-pointer and write-pointer
- ridx = 0
- widx = 0
- # initialize pointers in shared memory
- shmem.ridx = ridx
- ddr_widx.value = widx
- #loading wave profile
- with open('./steps_direct.csv','r') as f:
- signal_reader = csv.reader(f)
- for line in signal_reader:
- shmem.signal[line_count] = int(line[0])
- line_count += 1
- print(f' Loaded {line_count} lines in the file')
- shmem.cycle_limit = 750
- shmem.signal_length = line_count
- #open file for writing
- data_file = open('run_data.csv', 'w', newline='')
- writer = csv.writer(data_file)
- header = ['time','timestamp', 'force_g', 'voltage','signal','cycle_count','counter','signal_length']
- writer.writerow(header)
- # ready, set, go!
- print('lauching core 1')
- pruss.ecap.pwm.initialize( 2**32 )
- core_1.run()
- sleep(.00001)
- try:
- while (stop_active_run == False and shut_down == False):
- recv_messages()
- check_core()
- sleep(0)
- except KeyboardInterrupt:
- pass
- finally:
- print( '', flush=True )
- StopEM100()
- core_1.halt()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement