Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import sys
- import os
- import tkinter
- from tkinter import ttk
- from tkinter import StringVar
- from PIL import ImageTk, Image
- from configparser import ConfigParser
- from threading import Thread
- from queue import Queue
- import traceback
- import vlc
- import logging
- import urllib.request
- import urllib.error
- import io
- import data
- from pandora import *
- class Mikro:
- def __init__(self):
- self.root = tkinter.Tk(className='Mikro')
- self.root.protocol('WM_DELETE_WINDOW', self.on_quit)
- self.root.title('Mikro')
- self.root.resizable(width='False', height='False')
- self.pandora = Pandora()
- self.prefs = Preferences(self)
- self.worker = Worker(self).send
- self.album_art = AlbumArt(self)
- self.station = Station(self)
- self.playlist = Playlist(self)
- self.song = Song(self)
- frame = ttk.Frame(self.root)
- frame.pack(padx=8, pady=8, expand=True, fill='both')
- self.info_label_text = StringVar()
- self.info_label_text.set('')
- self.play_pause_button_text = StringVar()
- self.play_pause_button_text.set('Pause')
- self.mute_button_text = StringVar()
- self.mute_button_text.set('Mute')
- self.love_button_text = StringVar()
- self.love_button_text.set('Love')
- self.stations_combo = ttk.Combobox(frame, justify='center', values='', state='readonly')
- self.stations_combo.bind('<<ComboboxSelected>>', self.station.change)
- self.stations_combo.pack(fill='x', expand=True)
- top_frame = ttk.Frame(frame)
- top_frame.pack(pady=8, expand=True, fill='x')
- play_pause_button = ttk.Button(top_frame, textvariable=self.play_pause_button_text, command=self.play_pause_toggle)
- play_pause_button.pack(side='left', expand=True, fill='x')
- skip_button = ttk.Button(top_frame, text='Skip', command=self.user_skip)
- skip_button.pack(side='left', expand=True, fill='x')
- mute_button = ttk.Button(top_frame, textvariable=self.mute_button_text, command=self.mute_toggle)
- mute_button.pack(side='left', expand=True, fill='x')
- self.vol_slider = ttk.Scale(top_frame, from_=0, to=100, orient='horizontal', value='100', command=self.change_vol)
- self.vol_slider.pack(padx=4, pady=0, side='left', expand=True, fill='x')
- settings_button = ttk.Button(top_frame, text='Settings', command=self.mute_toggle)
- settings_button.pack(side='left', expand=True, fill='x')
- self.album_art_label = ttk.Label(frame, textvariable=self.info_label_text, image='', compound='left')
- self.album_art_label.pack(expand=True, fill='x', pady=0, padx=0)
- spacer_frame = ttk.Frame(frame)
- spacer_frame.pack(expand=True, fill='x', pady=4)
- button_frame = ttk.Frame(frame)
- love_button = ttk.Button(button_frame, text='Love', textvariable=self.love_button_text, command=self.love_toggle)
- love_button.pack(side='left', expand=True, fill='x')
- tired_button = ttk.Button(button_frame, text='Tired', command=self.song.tired)
- tired_button.pack(side='left', expand=True, fill='x')
- ban_button = ttk.Button(button_frame, text='Ban', command=self.song.ban)
- ban_button.pack(side='left', expand=True, fill='x')
- button_frame.pack(expand=True, fill='x')
- self.player = Player(self)
- self.player.mute = False
- self.vol_slider.set(self.prefs.volume)
- self.time_loop = None
- self.reconnect_time = None
- self.pending_prefs_save = False
- if not self.prefs.username or not self.prefs.password:
- print('show prefs')
- else:
- self.connect(False)
- def connect(self, reconnecting):
- if self.prefs.pandora_one:
- client = data.client_keys['pandora-one']
- else:
- client = data.client_keys['android-generic']
- args = (client, self.prefs.username, self.prefs.password)
- def reconnect(*ignore):
- self.reconnect_time = time.time() + 1800
- if not reconnecting:
- self.station.process()
- self.worker(self.pandora.connect, args, reconnect)
- def start_time_loop(self):
- def time_loop():
- self.info_label_text.set(self.song.song_text)
- self.time_loop = self.root.after(1000, time_loop)
- if self.time_loop is None:
- time_loop()
- def stop_time_loop(self):
- if self.time_loop is not None:
- self.root.after_cancel(self.time_loop)
- self.time_loop = None
- def love_toggle(self):
- if self.song.rating is None:
- self.song.love()
- else:
- self.song.unrate()
- def user_skip(self):
- if self.song.isAd:
- return
- self.song.next()
- def play_pause_toggle(self):
- player_state = self.player.state
- if player_state is not None:
- self.player.state = not player_state
- def on_play_pause(self, state):
- if state:
- self.play_pause_button_text.set('Pause')
- self.start_time_loop()
- else:
- self.play_pause_button_text.set('Play')
- self.stop_time_loop()
- def mute_toggle(self):
- mute_state = self.player.mute
- if mute_state is not None:
- self.player.mute = not mute_state
- def on_mute_unmute(self, muted):
- if muted:
- self.mute_button_text.set('Unmute')
- else:
- self.mute_button_text.set('Mute')
- def change_vol(self, *ignore):
- slider_value = round(self.vol_slider.get())
- player_volume = self.player.volume
- if slider_value != player_volume:
- self.player.volume = slider_value
- self.prefs.volume = slider_value
- def on_quit(self):
- self.prefs.write()
- self.root.destroy()
- class Station:
- def __init__(self, parent):
- self._mikro = parent
- self._pandora = self._mikro.pandora
- self._index = None
- def process(self, *ignore):
- stations = self._pandora.stations
- last_station = self._mikro.prefs.last_station
- stations_combo = self._mikro.stations_combo
- station_names = [i.name for i in stations]
- min_combo_width = len(max(station_names, key=len))
- self._index = next(iter(stations.index(i) for i in stations if i.id == last_station), 0)
- stations_combo['values'] = station_names
- stations_combo['width'] = min_combo_width
- stations_combo.current(self._index)
- self._mikro.playlist.get(start = True)
- def change(self, *ignore):
- selected_station_index = self._mikro.stations_combo.current()
- if self._index != selected_station_index:
- self._index = selected_station_index
- self._mikro.prefs.last_station = self.current.id
- self._mikro.playlist.get(start = True)
- self._mikro.stations_combo.selection_clear()
- @property
- def current(self):
- return self._pandora.stations[self._index]
- class Playlist:
- def __init__(self, parent):
- self._mikro = parent
- self._worker = self._mikro.worker
- self._station = self._mikro.station
- self._prefs = self._mikro.prefs
- self._waiting = True
- self._playlist = []
- @property
- def songs_remaining(self):
- return len(self._playlist)
- @property
- def waiting(self):
- return self._waiting
- def pop_zero(self):
- return self._playlist.pop(0)
- def get(self, start = False):
- if start:
- self._waiting = True
- def process_playlist(playlist):
- if start:
- self._waiting = False
- self._playlist = playlist
- self._mikro.song.next()
- else:
- self._playlist += playlist
- self._worker(self._station.current.get_playlist, (), process_playlist)
- class Song:
- def __init__(self, parent):
- self._mikro = parent
- self._root = self._mikro.root
- self._worker = self._mikro.worker
- self._pandora = self._mikro.pandora
- self._album_art = self._mikro.album_art
- self._playlist = self._mikro.playlist
- self._station = self._mikro.station
- self._prefs = self._mikro.prefs
- self.__current = None
- self.__duration = None
- self._bitrate_codec_str =''
- @property
- def _current(self):
- return self.__current
- @_current.setter
- def _current(self, new_song):
- self.__current = new_song
- @property
- def _time_to_reconnect(self):
- return time.time() > self._mikro.reconnect_time
- def _register_ad(self):
- self._worker(self._pandora.register_ad,
- (self._current.adTokens,
- self._station.current.id), )
- def _queue(self):
- self._get_new_song()
- self._album_art.get(self._current.artUrl)
- self._set_description()
- if self.isAd:
- self._register_ad()
- self._root.title('Commercial Advertisement - Mikro')
- else:
- self._root.title('%s by %s - Mikro'%(self._current.title, self._current.artist))
- if self.rating is None:
- self._mikro.love_button_text.set('Love')
- else:
- self._mikro.love_button_text.set('Unlove')
- self._mikro.player.start_new_song(self._url)
- if self._time_to_reconnect:
- self._mikro.connect(True)
- if self._playlist.songs_remaining == 0:
- self._playlist.get()
- def next(self):
- if not self._playlist.waiting:
- self._mikro.stop_time_loop()
- self._queue()
- def _get_new_song(self):
- self._current = self._playlist.pop_zero()
- def love(self):
- if self._current.isAd:
- return
- def set_button_text(*ignore):
- self._mikro.love_button_text.set('Unlove')
- self._worker(self._current.rate, (RATE_LOVE, self._station.current), set_button_text)
- def ban(self):
- if self._current.isAd:
- return
- self._worker(self._current.rate, (RATE_BAN, self._station.current), )
- self.next()
- def unrate(self):
- if self._current.isAd:
- return
- def set_button_text(*ignore):
- self._mikro.love_button_text.set('Love')
- self._worker(self._current.rate, (RATE_NONE, self._station.current), set_button_text)
- def tired(self):
- if self._current.isAd:
- return
- self._worker(self._current.set_tired, (), )
- self.next()
- def _set_description(self):
- if not self.isAd:
- self._song_info = ' %s\n by %s\n from %s' %(self._current.title, self._current.artist, self._current.album)
- else:
- self._song_info = ' Commercial Advertisement\n from Pandora'
- self._bitrate_codec_str = ' %skbit/s %s' %(self._birate, self._codec)
- @property
- def rating(self):
- return self._current.rating
- @property
- def isAd(self):
- return self._current.isAd
- @property
- def song_text(self):
- return '%s\n\n%s - %s / %s' %(self._song_info, self._bitrate_codec_str, self._position, self._duration)
- @property
- def _url(self):
- return self._current.audioUrlMap[self._prefs.audio_quality]['audioUrl']
- @property
- def _birate(self):
- return self._current.audioUrlMap[self._prefs.audio_quality]['bitrate']
- @property
- def _codec(self):
- return self._current.audioUrlMap[self._prefs.audio_quality]['encoding']
- @property
- def _position(self):
- position = self._mikro.player.position
- if position is None:
- return '--:--'
- return self._format_time(position)
- @property
- def _duration(self):
- duration = self.__duration
- if duration is None:
- return '--:--'
- return duration
- def set_duration(self, dur):
- if dur is None:
- self.__duration = None
- return
- if not self._current.got_duration:
- self.__current.got_duration = True
- self.__duration = self._format_time(dur)
- def _format_time(self, milliseconds):
- milliseconds /= 1000
- seconds = milliseconds % 60
- milliseconds /= 60
- minutes = milliseconds % 60
- milliseconds //= 60
- hours = milliseconds
- if hours:
- return '%i:%02i:%02i'%(hours, minutes, seconds)
- else:
- return '%02i:%02i'%(minutes, seconds)
- class AlbumArt:
- def __init__(self, parent):
- self._worker = Worker(parent).send
- self._mikro = parent
- self.__default_art = None
- def get(self, artUrl):
- def get_albumArt():
- if not artUrl:
- return self._default_art
- try:
- with urllib.request.urlopen(artUrl) as albumArt:
- albumArt = io.BytesIO(albumArt.read())
- with Image.open(albumArt) as albumArt:
- albumArt = albumArt.resize((125, 125), Image.ANTIALIAS)
- albumArt = ImageTk.PhotoImage(albumArt)
- return albumArt
- except urllib.error.HTTPError:
- return self._default_art
- def push_albumArt(albumArt):
- self._mikro.album_art_label.config(image='')
- self._mikro.album_art_label.config(image=albumArt)
- self._worker(get_albumArt, (), push_albumArt)
- @property
- def _default_art(self):
- if self.__default_art is not None:
- return self.__default_art
- with Image.open('default.jpg') as defaultArt:
- self.__default_art = ImageTk.PhotoImage(defaultArt)
- return self.__default_art
- class Player:
- def __init__(self, parent):
- self._after_idle = parent.root.after_idle
- self._set_duration = parent.song.set_duration
- self._next_song = parent.song.next
- self._on_play_pause = parent.on_play_pause
- self._on_mute_unmute = parent.on_mute_unmute
- self._vlc_instance = vlc.Instance()
- self._player = self._vlc_instance.media_player_new()
- self._eventManager = self._player.event_manager()
- self._eventManager.event_attach(vlc.EventType.MediaPlayerLengthChanged, self._on_LengthChanged)
- self._eventManager.event_attach(vlc.EventType.MediaPlayerEndReached, self._on_EndReached)
- self._eventManager.event_attach(vlc.EventType.MediaPlayerPlaying, self._on_Playing)
- self._eventManager.event_attach(vlc.EventType.MediaPlayerPaused, self._on_Paused)
- self._eventManager.event_attach(vlc.EventType.MediaPlayerMuted, self._on_Muted)
- self._eventManager.event_attach(vlc.EventType.MediaPlayerUnmuted, self._on_Unmuted)
- def _on_LengthChanged(self, event):
- self._after_idle(self._set_duration, self.duration)
- def _on_EndReached(self, event):
- self._after_idle(self._next_song)
- def _on_Playing(self, event):
- self._after_idle(self._on_play_pause, True)
- def _on_Paused(self, event):
- self._after_idle(self._on_play_pause, False)
- def _on_Muted(self, event):
- self._after_idle(self._on_mute_unmute, True)
- def _on_Unmuted(self, event):
- self._after_idle(self._on_mute_unmute, False)
- @property
- def position(self):
- position = self._player.get_time()
- if position < 0:
- position = None
- return position
- @property
- def duration(self):
- duration = self._player.get_length()
- if duration < 0:
- duration = None
- return duration
- @property
- def state(self):
- state = self._player.get_state()
- if state == vlc.State.Playing:
- return True
- elif state == vlc.State.Paused:
- return False
- else:
- return None
- @state.setter
- def state(self, state):
- if state:
- self._player.play()
- else:
- self._player.pause()
- @property
- def volume(self):
- return self._player.audio_get_volume()
- @volume.setter
- def volume(self, new_vol):
- self._player.audio_set_volume(new_vol)
- @property
- def _mute(self):
- mute = self._player.audio_get_mute()
- if mute == -1:
- return None
- elif mute == 0:
- return False
- elif mute == 1:
- return True
- @property
- def mute(self):
- return self._mute
- @mute.setter
- def mute(self, mute):
- if self._mute is not None:
- self._player.audio_set_mute(mute)
- def start_new_song(self, audioUrl):
- song = self._vlc_instance.media_new_location(audioUrl)
- self._player.set_media(song)
- song.release()
- self._player.play()
- class Preferences:
- def __init__(self, parent):
- self._mikro = parent
- self._root = self._mikro.root
- self._config_parser = ConfigParser()
- self._prefs_file = None
- self._username = None
- self._password = None
- self._pandora_one = None
- self._last_station = None
- self._audio_quality = None
- self._volume = None
- self._quality_combo_values = None
- self._default_username = 'jasonlevigray3@gmail.com'
- self._default_password = 'notachance'
- self._default_pandora_one = True
- self._default_last_station = ''
- self._default_audio_quality = 'highQuality'
- self._default_volume = 100
- self._default_quality_combo_values = ['128kbit/s mp3',
- '64kbit/s aacplus',
- '32kbit/s aacplus']
- self.valid_audio_qualities = ['highQuality',
- 'mediumQuality',
- 'lowQuality']
- self._prepare_prefs()
- @property
- def username(self):
- return self._username or self._default_username
- @username.setter
- def username(self, username):
- if isinstance(username, str):
- self._username = username
- @property
- def password(self):
- return self._password or self._default_password
- @password.setter
- def password(self, password):
- if isinstance(password, str):
- self._password = password
- @property
- def pandora_one(self):
- return self._pandora_one or self._default_pandora_one
- @pandora_one.setter
- def pandora_one(self, pandora_one):
- if isinstance(pandora_one, bool):
- self._pandora_one = pandora_one
- self.quality_combo_values = pandora_one
- @property
- def audio_quality(self):
- return self._audio_quality or self._default_audio_quality
- @audio_quality.setter
- def audio_quality(self, audio_quality):
- if isinstance(audio_quality, str):
- if audio_quality in self.valid_audio_qualities:
- self._audio_quality = audio_quality
- @property
- def last_station(self):
- return self._last_station or self._default_last_station
- @last_station.setter
- def last_station(self, last_station):
- if isinstance(last_station, str):
- self._last_station = last_station
- @property
- def volume(self):
- return self._volume or self._default_volume
- @volume.setter
- def volume(self, volume):
- if isinstance(volume, int):
- self._volume = volume
- @property
- def quality_combo_values(self):
- return self._quality_combo_values or self._default_quality_combo_values
- @quality_combo_values.setter
- def quality_combo_values(self, pandora_one):
- if isinstance(pandora_one, bool):
- if pandora_one:
- high = '192kbit/s mp3'
- else:
- high = '128kbit/s mp3'
- self._quality_combo_values = [high,
- '64kbit/s aacplus',
- '32kbit/s aacplus']
- def _prepare_prefs(self):
- prefs_dir = '%s/.config/mikro' %os.path.expanduser('~')
- if not os.path.exists(prefs_dir):
- os.makedirs(prefs_dir)
- prefs_file = 'config.ini'
- self._prefs_file = '%s/%s' %(prefs_dir, prefs_file)
- if not os.path.isfile(self._prefs_file):
- self.write()
- else:
- self.read()
- def read(self):
- with open(self._prefs_file, 'r') as prefs_file:
- self._config_parser.readfp(prefs_file)
- setting = self._config_parser['SETTINGS']
- self.username = setting.get('username', '')
- self.password = setting.get('password', '')
- self.pandora_one = setting.getboolean('pandora_one', False)
- self.last_station = setting.get('last_station', '')
- self.audio_quality = setting.get('audio_quality', 'highQuality')
- self.volume = int(setting.get('volume', '100'))
- def write(self):
- self._config_parser['SETTINGS'] = {'username': self.username,
- 'password': self.password,
- 'pandora_one': str(self.pandora_one),
- 'last_station': self.last_station,
- 'audio_quality':self.audio_quality,
- 'volume': str(self.volume)}
- with open(self._prefs_file, 'w') as prefs_file:
- self._config_parser.write(prefs_file)
- class Worker:
- def __init__(self, parent):
- self._root = parent.root
- self._thread = Thread(target=self._run)
- self._thread.daemon = True
- self._queue = Queue()
- self._thread.start()
- def _run(self):
- while True:
- command, args, callback, errorback = self._queue.get()
- try:
- result = command(*args)
- if callback:
- self._root.after_idle(callback, result)
- except Exception as e:
- e.traceback = traceback.format_exc()
- if errorback:
- self._root.after_idle(errorback, e)
- def send(self, command, args=(), callback=None, errorback=None):
- if errorback is None: errorback = self._default_errorback
- self._queue.put((command, args, callback, errorback))
- def _default_errorback(self, error):
- print('Unhandled exception in worker thread:\n{}'.format(error.traceback))
- def main():
- Mikro().root.mainloop()
- sys.exit(0)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement