Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- import pyaudio
- import numpy
- import sys
- import struct
- import hal
- import time
- #Functions
- #HAL Stuff
- h = hal.component("tddetect")
- h.newpin("enable", hal.HAL_BIT, hal.HAL_IO)
- h.newpin("is_mdi_mode", hal.HAL_BIT, hal.HAL_IN)
- h.newpin("spindle_speed", hal.HAL_FLOAT, hal.HAL_IN)
- h.newpin("touch", hal.HAL_BIT, hal.HAL_OUT)
- h.newparam("mov_avg", hal.HAL_FLOAT, hal.HAL_RO)
- h.newparam("pct_change", hal.HAL_FLOAT, hal.HAL_RO)
- h.newparam("pbc", hal.HAL_U32, hal.HAL_RO)
- h.newparam("frange", hal.HAL_U32, hal.HAL_RW)
- h.newparam("mov_avg_window", hal.HAL_U32, hal.HAL_RW)
- h.newparam("td_thresh", hal.HAL_FLOAT, hal.HAL_RW)
- h.ready()
- h['mov_avg_window'] = 1
- float_buffer = numpy.zeros(1024)
- mov_avg_buffer = numpy.zeros(1024)
- mov_avg_index = 0
- power_base_acc = 0.0
- power_base = 0.0
- power_base_cnt = 0
- initialize = True
- chk_enable = False
- chunk = 1024
- FORMAT = pyaudio.paFloat32
- CHANNELS = 1
- RATE = 44100
- F_STEP = 44100.0 / chunk
- p = pyaudio.PyAudio()
- stream = p.open(format = FORMAT,
- channels = CHANNELS,
- rate = RATE,
- input = True,
- frames_per_buffer = chunk)
- debug = True
- try:
- while 1:
- if (h['enable'] and h['is_mdi_mode']):
- lfreq = (h['spindle_speed'] / 60.0) - (h['frange'] / 2)
- lfreq_bin = round(lfreq / F_STEP)
- hfreq = lfreq + (2 * h['frange'])
- hfreq_bin = round(hfreq / F_STEP)
- # if debug:
- # print "lfreq_bin:", lfreq_bin, " hfreq_bin", hfreq_bin
- data = stream.read(chunk)
- float_buffer = struct.unpack('@' + 'f'*1024, data)
- b = numpy.array(float_buffer)
- c = numpy.fft.rfft(b)
- d = numpy.square(abs(c))
- tot_pow = numpy.sum(d[lfreq_bin:hfreq_bin+1])
- mov_avg_buffer[mov_avg_index] = tot_pow
- # if debug:
- # print "mov_avg_buf: ", mov_avg_buffer[0:h['mov_avg_window']]
- h['pbc'] = power_base_cnt
- if power_base_cnt < h['mov_avg_window']:
- power_base_acc += tot_pow
- power_base = power_base_acc / h['mov_avg_window']
- power_base_cnt += 1
- # print power_base_acc, ' ', power_base
- chk_enable = False
- else:
- chk_enable = True
- mov_avg = numpy.sum(mov_avg_buffer[0:h['mov_avg_window']]) / h['mov_avg_window']
- # if debug or not chk_enable :
- # print 'mov_avg: ', mov_avg
- mov_avg_index += 1
- if mov_avg_index == h['mov_avg_window']:
- mov_avg_index = 0
- h['mov_avg'] = float(mov_avg)
- pct_change = abs((mov_avg - power_base)/power_base) * 100
- h['pct_change'] = float(pct_change)
- initialize = True
- if ((pct_change > h['td_thresh']) and chk_enable) :
- # print "pct_change: ", pct_change, ' chk_enable: ', chk_enable
- h['touch'] = True
- time.sleep(0.1)
- h['enable'] = False
- # print "Detected Power Change"
- debug = False
- else:
- if initialize :
- h['touch'] = False
- # if debug:
- # print "Initializing"
- float_buffer = numpy.zeros(1024)
- mov_avg_buffer = numpy.zeros(1024)
- mov_avg_index = 0
- power_base_acc = 0.0
- power_base = 0.0
- power_base_cnt = 0
- initialize = False
- chk_enable = False
- debug = True
- except KeyboardInterrupt:
- raise SystemExit
- finally:
- stream.close()
- p.terminate()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement