Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import os
- sys.path.append(os.path.abspath(os.path.dirname(__file__)))
- import subprocess
- import keyboard
- import asyncio
- import webbrowser
- from dotenv import load_dotenv
- from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
- from livekit.agents.voice_assistant import VoiceAssistant
- from livekit.plugins import openai, silero
- from flask import Flask
- from livekit import api
- load_dotenv()
- app = Flask(__name__)
- u/app.route('/getToken')
- def getToken():
- token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'), os.getenv('LIVEKIT_API_SECRET')) \
- .with_identity("identity") \
- .with_name("my name") \
- .with_grants(api.VideoGrants(
- room_join=True,
- room="my-room",
- ))
- return token.to_jwt()
- async def entrypoint(ctx: JobContext):
- initial_ctx = llm.ChatContext().append(
- role="system",
- text=(
- ""
- ),
- )
- await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
- assistant = VoiceAssistant(
- vad=silero.VAD.load(),
- stt=openai.STT(),
- llm=openai.llm(),
- tts=openai.TTS(),
- chat_ctx=initial_ctx,
- )
- assistant.start(ctx.room)
- def stop_program():
- raise StopIteration
- async def main():
- keyboard.add_hotkey('`', start_program_and_open_playground)
- keyboard.add_hotkey('~', stop_program)
- def start_program_and_open_playground():
- webbrowser.open("https://agents-playground.livekit.io")
- while True:
- if keyboard.read_key() == '`':
- start_program_and_open_playground()
- break
- if keyboard.read_key() == '~':
- stop_program()
- break
- if __name__ == "__main__":
- if len(sys.argv) > 1 and sys.argv[1] == "start":
- try:
- asyncio.run(main())
- except StopIteration:
- print("Program stopped.")
- else:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement