Advertisement
Guest User

Untitled

a guest
Jan 6th, 2025
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 4.10 KB | Source Code | 0 0
  1. import subprocess
  2. import serial
  3. import time
  4. from threading import Thread
  5.  
  6. # Configuration
  7. SERIAL_PORT = '/dev/ttyUSB0'  # Update this to your Arduino's serial port
  8. BAUD_RATE = 9600
  9. POLL_INTERVAL = 0.5  # Polling interval for PipeWire in seconds
  10.  
  11. # Arduino commands
  12. COMMAND_NEXT = 'NEXT'  # Command to go to the next program
  13. COMMAND_PREV = 'PREV'  # Command to go to the previous program
  14. COMMAND_VOLUME = 'VOLUME'  # Command to adjust volume
  15.  
  16. class AudioManager:
  17.    def __init__(self):
  18.        self.programs = []
  19.        self.selected_index = 0
  20.  
  21.    def update_programs(self):
  22.        """Fetch the active audio streams from PipeWire."""
  23.        try:
  24.            output = subprocess.check_output(['pw-cli', 'list', 'Node'], text=True)
  25.            self.programs = self.parse_pipewire_nodes(output)
  26.        except subprocess.CalledProcessError as e:
  27.            print(f"Error fetching audio streams: {e}")
  28.  
  29.    def parse_pipewire_nodes(self, output):
  30.        """Parse the output of PipeWire's nodes."""
  31.        nodes = []
  32.        for line in output.splitlines():
  33.            if 'application.name' in line:
  34.                start = line.find('"') + 1
  35.                end = line.rfind('"')
  36.                nodes.append(line[start:end])
  37.        return nodes
  38.  
  39.    def get_selected_program(self):
  40.        if self.programs:
  41.            return self.programs[self.selected_index]
  42.        return None
  43.  
  44.    def next_program(self):
  45.        if self.programs:
  46.            self.selected_index = (self.selected_index + 1) % len(self.programs)
  47.  
  48.    def prev_program(self):
  49.        if self.programs:
  50.            self.selected_index = (self.selected_index - 1) % len(self.programs)
  51.  
  52. class ArduinoInterface:
  53.    def __init__(self, port, baud_rate):
  54.        self.serial = serial.Serial(port, baud_rate)
  55.  
  56.    def send_message(self, message):
  57.        """Send a message to Arduino."""
  58.        self.serial.write((message + '\n').encode())
  59.  
  60.    def read_message(self):
  61.        """Read a message from Arduino."""
  62.        if self.serial.in_waiting > 0:
  63.            return self.serial.readline().decode().strip()
  64.        return None
  65.  
  66. class DeejController:
  67.    def __init__(self, audio_manager, arduino_interface):
  68.        self.audio_manager = audio_manager
  69.        self.arduino_interface = arduino_interface
  70.        self.running = True
  71.  
  72.    def poll_audio_programs(self):
  73.        while self.running:
  74.            self.audio_manager.update_programs()
  75.            selected_program = self.audio_manager.get_selected_program()
  76.            if selected_program:
  77.                self.arduino_interface.send_message(f"DISPLAY:{selected_program}")
  78.            time.sleep(POLL_INTERVAL)
  79.  
  80.    def handle_arduino_input(self):
  81.        while self.running:
  82.            message = self.arduino_interface.read_message()
  83.            if message:
  84.                if message == COMMAND_NEXT:
  85.                    self.audio_manager.next_program()
  86.                elif message == COMMAND_PREV:
  87.                    self.audio_manager.prev_program()
  88.                elif message.startswith(COMMAND_VOLUME):
  89.                    try:
  90.                        volume = int(message.split(':')[1])
  91.                        self.set_program_volume(volume)
  92.                    except ValueError:
  93.                        print("Invalid volume command")
  94.  
  95.    def set_program_volume(self, volume):
  96.        selected_program = self.audio_manager.get_selected_program()
  97.        if selected_program:
  98.            print(f"Setting volume for {selected_program} to {volume}")
  99.            # Implement volume adjustment logic here
  100.  
  101.    def start(self):
  102.        Thread(target=self.poll_audio_programs, daemon=True).start()
  103.        Thread(target=self.handle_arduino_input, daemon=True).start()
  104.  
  105. if __name__ == "__main__":
  106.    audio_manager = AudioManager()
  107.    arduino_interface = ArduinoInterface(SERIAL_PORT, BAUD_RATE)
  108.    controller = DeejController(audio_manager, arduino_interface)
  109.  
  110.    try:
  111.        controller.start()
  112.        while True:
  113.            time.sleep(1)
  114.    except KeyboardInterrupt:
  115.        controller.running = False
  116.        print("Exiting...")
  117.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement