Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import subprocess
- import serial
- import time
- from threading import Thread
- # Configuration
- SERIAL_PORT = '/dev/ttyUSB0' # Update this to your Arduino's serial port
- BAUD_RATE = 9600
- POLL_INTERVAL = 0.5 # Polling interval for PipeWire in seconds
- # Arduino commands
- COMMAND_NEXT = 'NEXT' # Command to go to the next program
- COMMAND_PREV = 'PREV' # Command to go to the previous program
- COMMAND_VOLUME = 'VOLUME' # Command to adjust volume
- class AudioManager:
- def __init__(self):
- self.programs = []
- self.selected_index = 0
- def update_programs(self):
- """Fetch the active audio streams from PipeWire."""
- try:
- output = subprocess.check_output(['pw-cli', 'list', 'Node'], text=True)
- self.programs = self.parse_pipewire_nodes(output)
- except subprocess.CalledProcessError as e:
- print(f"Error fetching audio streams: {e}")
- def parse_pipewire_nodes(self, output):
- """Parse the output of PipeWire's nodes."""
- nodes = []
- for line in output.splitlines():
- if 'application.name' in line:
- start = line.find('"') + 1
- end = line.rfind('"')
- nodes.append(line[start:end])
- return nodes
- def get_selected_program(self):
- if self.programs:
- return self.programs[self.selected_index]
- return None
- def next_program(self):
- if self.programs:
- self.selected_index = (self.selected_index + 1) % len(self.programs)
- def prev_program(self):
- if self.programs:
- self.selected_index = (self.selected_index - 1) % len(self.programs)
- class ArduinoInterface:
- def __init__(self, port, baud_rate):
- self.serial = serial.Serial(port, baud_rate)
- def send_message(self, message):
- """Send a message to Arduino."""
- self.serial.write((message + '\n').encode())
- def read_message(self):
- """Read a message from Arduino."""
- if self.serial.in_waiting > 0:
- return self.serial.readline().decode().strip()
- return None
- class DeejController:
- def __init__(self, audio_manager, arduino_interface):
- self.audio_manager = audio_manager
- self.arduino_interface = arduino_interface
- self.running = True
- def poll_audio_programs(self):
- while self.running:
- self.audio_manager.update_programs()
- selected_program = self.audio_manager.get_selected_program()
- if selected_program:
- self.arduino_interface.send_message(f"DISPLAY:{selected_program}")
- time.sleep(POLL_INTERVAL)
- def handle_arduino_input(self):
- while self.running:
- message = self.arduino_interface.read_message()
- if message:
- if message == COMMAND_NEXT:
- self.audio_manager.next_program()
- elif message == COMMAND_PREV:
- self.audio_manager.prev_program()
- elif message.startswith(COMMAND_VOLUME):
- try:
- volume = int(message.split(':')[1])
- self.set_program_volume(volume)
- except ValueError:
- print("Invalid volume command")
- def set_program_volume(self, volume):
- selected_program = self.audio_manager.get_selected_program()
- if selected_program:
- print(f"Setting volume for {selected_program} to {volume}")
- # Implement volume adjustment logic here
- def start(self):
- Thread(target=self.poll_audio_programs, daemon=True).start()
- Thread(target=self.handle_arduino_input, daemon=True).start()
- if __name__ == "__main__":
- audio_manager = AudioManager()
- arduino_interface = ArduinoInterface(SERIAL_PORT, BAUD_RATE)
- controller = DeejController(audio_manager, arduino_interface)
- try:
- controller.start()
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- controller.running = False
- print("Exiting...")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement