Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- async def produce_output(queue, commands):
- for command in commands:
- #execute the adb command
- if 'keypress' in command:
- #command contains 'input keypress ENTER'
- adb.Shell(command)
- #mark the task done because there's nothing to process
- queue.task_done()
- else:
- #command contains 'dumpsys audio'
- output = adb.Shell(command)
- #put result in queue
- await queue.put(output)
- async def process_adb(queue):
- while True:
- output = await queue.get()
- #return output (somehow?)
- queue.task_done()
- async def update():
- adb_queue = asyncio.Queue()
- asyncio.create_task(produce_output(adb_queue,
- [self._screen_on,
- self.current_app,
- self._wake_lock,
- self._dump('audio')]))
- #Not sure how to proceed
- if not self._adb:
- self.state = STATE_UNKNOWN
- self.app_id = None
- # Check if device is off.
- # Fetching result of first item in the queue - self._screen_on
- elif not await adb_queue.get():
- self.state = STATE_OFF
- self.app_id = None
- else:
- # Fetching result of second item in the queue - self.current_app
- self.app_id = await adb_queue.get()
- # Fetching result of third item in the queue - self._wake_lock
- if await adb_queue.get():
- self.state = STATE_PLAYING
- elif self.app_id not in (self.package_launcher, self.package_settings):
- # Check if state was playing on last update
- if self.state == STATE_PLAYING:
- self.state = STATE_PAUSED
- elif self.state != STATE_PAUSED:
- self.state = STATE_IDLE
- else:
- # We're on either the launcher or in settings
- self.state = STATE_ON
- # Get information from the audio status.
- # Fetching result of fourth item in the queue - self._dump('audio')
- audio_output = await adb_queue.get()
- stream_block = re.findall(BLOCK_REGEX, audio_output,
- re.DOTALL | re.MULTILINE)[0]
- self.muted = re.findall(MUTED_REGEX, stream_block,
- re.DOTALL | re.MULTILINE)[0] == 'true'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement