Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import re
- import time
- import test
- import serial
- import audio
- import os
- import pvporcupine
- from pvrecorder import PvRecorder
- from threading import Thread
- sensorReading = 10000
- sensorTriggerEnabled = True
- sensorTriggered = False
- hotwordListened = False
- def sensorReader(serial_port, baud_rate):
- """--------------- Sensor Check ----------------------"""
- global sensorReading, sensorTriggerEnabled, sensorTriggered
- with serial.Serial(serial_port, baud_rate) as ser:
- skippied_readings_count = 0
- # Continuously read data from the serial port
- while True:
- # """--------------- Sensor Check ----------------------"""
- # Read a line of data from the serial port
- line = ser.readline()
- # Decode the line of data from bytes to string
- line_str = line.decode("utf-8")
- # print(line_str)
- # search for the serial float number in the string
- match = re.search(r"distance: (\d+)", line_str)
- # audio.play_audio("/home/alexa/project/audio/will_the _code.mp3")
- if match:
- sensorReading = float(match.group(1))
- # skip large numbers or zero (error)
- if sensorReading == 0 or sensorReading > float(os.environ.get("sensor_max_considered_reading")):
- continue
- # print(f"-------------------{sensorReading}------------------")
- if sensorReading < float(os.environ.get("sensor_trigger_distance")):
- if sensorTriggerEnabled and not sensorTriggered:
- # print("--------------- Sensor Triggered!--------------- ")
- sensorTriggered = True
- # dis-allow trigger by sensor
- sensorTriggerEnabled = False
- skippied_readings_count = 0
- else:
- skippied_readings_count += 1
- if skippied_readings_count >= 6:
- # print("--------------- Sensor Reset!--------------- ")
- # allow trigger by sensor
- sensorTriggerEnabled = True
- sensorTriggered = False
- # time.sleep(1)
- def hotwordListener():
- """--------------- Hotword Check ----------------------"""
- global hotwordListened
- access_key = os.environ.get("access_key")
- alexa_path = '/home/alexa/project/models/alexa.ppn'
- mariam_path = '/home/alexa/project/models/mariam.ppn'
- porcupine = pvporcupine.create(access_key=access_key, keyword_paths =[alexa_path,mariam_path],sensitivities=[0.8,0.75])
- recorder = PvRecorder(
- device_index=-1,
- frame_length=porcupine.frame_length,
- log_overflow=True,
- buffer_size_msec=1000,
- )
- recorder.start()
- # startingTime = time.time()
- while True:
- # --------------- Hotword Check ----------------------
- keyword_index = porcupine.process(recorder.read())
- if keyword_index >= 0:
- hotwordListened = True
- def init():
- """Initialize the threads"""
- global hotwordListened, sensorReading
- sensorReaderThread = Thread(target=sensorReader, args=("/dev/ttyUSB0", 115200))
- hotwordListenerThread = Thread(target=hotwordListener, args=())
- sensorReaderThread.start()
- hotwordListenerThread.start()
- def wait_for_triggers():
- """Wait for triggers to be activated"""
- global hotwordListened, sensorTriggered
- startingTime = time.time()
- while True:
- # print("---------------Waiting for trigger---------------")
- # print(time.time() - startingTime)
- # time.sleep(1)
- if hotwordListened:
- hotwordListened = False
- diff = time.time() - startingTime
- return diff
- if sensorTriggered:
- sensorTriggered = False
- # return large value to reset the session
- return 60.0
- init()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement