Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- #
- # this module uses code from Michael Hudson's xmms-py modules
- # this code is available in its original form here:
- # http://www.python.net/crew/mwh/hacks/xmms-py.html
- # the original code had this notice on it:
- #
- # Released by Michael Hudson on 2000-07-01, and again on 2001-04-26
- # public domain; no warranty, no restrictions
- #
- # most of the support for xmms, beep, and audacious comes from
- # various pieces of Hudson's modules
- #
- # licensed under the GNU GPL v2
- # a copy of this license can be found:
- # http://www.gnu.org/copyleft/gpl.html
- #
- # Future Plan:
- # - allow for customizeable announce string
- # - more mp3 players (any requests?)
- #
- # Note: Around 11/28/07, I stopped using X-chat as my main IRC client. This
- # decision was entirely due to a usable command line IRC client with python
- # scripting becoming available (weechat). I continue to maintain this script
- # and add to it as I update the weechat version (the differences between the
- # two are so slight I could probably make them one script), but as a warning,
- # this module does not receive nearly as heavy testing.
- #
- # 11/26/07 - Fixed a bug w/ using juk/amarok w/o pydcop
- # 11/28/07 - Started weechat port
- # 07/13/08 - Changed Audacious support to Dbus, added BMPx (v 0.6)
- # 01/27/09 - Added Amarok2 support, Menu items. (v 0.7)
- # Many thanks to Nikolas La Belle for the patch for these features.
- # 10/12/11 - Added Guadayeque support (I might have changed nonfuncional Rhythmbox behaviour before) (by irvine)
- ENABLE_MENU = False
- class Xchat(object):
- """Fake xchat object. For testing via command line."""
- def __nonzero__(self):
- """>>> bool(Xchat()) == False"""
- return False
- def prnt(self, s):
- s = s.replace('\00302', '')
- s = s.replace('\00303', '')
- s = s.replace('\002', '')
- s = s.replace('\003', '')
- print s
- command=prnt
- def __getattr__(self, name):
- def lambda_(*x, **y):
- print '%s: %s, %s' % (name, x, y)
- return lambda_
- try: import xchat
- except ImportError: xchat = Xchat()
- import sys, struct
- import socket, os, pwd
- from subprocess import *
- pcop, pydcop, bus = False, False, False
- try:
- import pcop, pydcop
- except: pass
- try:
- import dbus
- bus = dbus.SessionBus()
- except:
- dbus = False
- __module_name__ = "pymp3"
- __module_version__ = "0.7"
- __module_description__ = "mp3 announce/utils"
- __debugging__ = False
- if __debugging__:
- import traceback
- # MENU ITEMS
- # Courtesy of Nikolas La Belle
- if ENABLE_MENU:
- xchat.command('MENU DEL pymp')
- xchat.command('MENU -p6 ADD pymp')
- xchat.command('MENU ADD \"pymp/Announce\" \"mp3\"')
- xchat.command('MENU ADD \"pymp/-\"')
- xchat.command('MENU ADD \"pymp/Play\" \"mp3 play\"')
- xchat.command('MENU ADD \"pymp/Stop\" \"mp3 stop\"')
- xchat.command('MENU ADD \"pymp/Pause\" \"mp3 pause\"')
- xchat.command('MENU ADD \"pymp/Next\" \"mp3 next\"')
- xchat.command('MENU ADD \"pymp/Prev\" \"mp3 prev\"')
- def prnt(s): print s
- def print_debug(string):
- global __debugging__
- if __debugging__:
- string = str(string)
- print "\00302" + string + "\003"
- def print_info(string):
- string = str(string)
- print "\00303" + string + "\003"
- if not xchat:
- prnt = xchat.prnt
- print_debug = xchat.prnt
- print_info = xchat.prnt
- # XXX: beep was superceded by BMPx, BMPx is in the process of being replaced
- # by MPX (from the same developers). Hopefully, MPRIS will at least bring
- # some stability to the IPC/RPC interface :)
- players = {
- 'audacious' : 'audacious',
- 'bmpx' : 'beep-media-player-2',
- 'beep' : 'beep-media-player',
- # 'xmms2' : 'xmms2d',
- 'xmms' : 'xmms',
- 'banshee' : 'Banshee.exe',
- 'banshee1' : 'banshee-1',
- 'juk' : 'juk',
- 'amarok' : 'amarokapp',
- 'amarok2' : 'amarok',
- 'rhythmbox' : 'rhythmbox',
- 'guayadeque' : 'guayadeque',
- }
- _player_order = ['audacious', 'amarok2', 'bmpx', 'beep', 'xmms',
- 'banshee1', 'banshee', 'juk', 'amarok', 'rhythmbox', 'guayadeque']
- # find out which player is running
- def which():
- ps = Popen(['ps', 'aux'], stdout=PIPE)
- output = ps.stdout.readlines()
- for line in output:
- for player in _player_order:
- findstr = players[player]
- if line.rfind(findstr) > -1:
- return player
- return
- #FIXME: This code isn't that great; it should probably not rely on 'split' since
- # quoted won't work properly. Think of a way to fix this (maybe resort to shell=True)
- def command(runstr):
- return Popen(runstr.split(), stdout=PIPE).communicate()[0]
- # these players use xmms style command socket
- SOCKET_PLAYERS = ['audacious', 'beep', 'xmms']
- class SocketCommand:
- CMD_PLAY = 2 #
- CMD_PAUSE = 3 #
- CMD_STOP = 4 #
- CMD_GET_PLAYLIST_POS = 7 #
- # TODO: make socket_next and socket_prev use this
- # instead of using next/prev repeatedly
- #CMD_SET_PLAYLIST_POS = 8 #
- CMD_GET_PLAYLIST_LENGTH = 9 #
- CMD_GET_OUTPUT_TIME = 11 #
- CMD_GET_PLAYLIST_FILE = 17 #
- CMD_GET_PLAYLIST_TITLE = 18 #
- CMD_GET_PLAYLIST_TIME = 19 #
- CMD_GET_INFO = 20 #
- CMD_EJECT = 28 #
- CMD_PLAYLIST_PREV = 29 #
- CMD_PLAYLIST_NEXT = 30 #
- CMD_TOGGLE_REPEAT = 33 #
- CMD_TOGGLE_SHUFFLE = 34 #
- """
- I've tried to make the following class a facsimily of a "persistent connection",
- but my attempts have led to the following error with xmms:
- ** WARNING **: ctrl_write_packet(): Failed to send data: Broken pipe
- Even manually closing, deleting, and then re-initializing the socket did not avoid
- this warning. It seems that only letting the garbage collector snag old Connection
- objects makes xmms happy.
- There is one aspect here missing from Hudson's original library: sending a custom
- send format with the 'args' option. I wasn't using this feature in any requests,
- as all of my provided formats were 'l' anyway.
- """
- class XmmsConnection:
- class ClientPacketHeader:
- def __init__(self):
- self.version,self.cmd,self.length = 0,0,0
- def __repr__(self):
- return "<< %s : version: %s cmd: %s length: %s >>"\
- %(self.__class__.__name__,self.version,self.cmd,self.length)
- def encode(self):
- return struct.pack("hhl",self.version,self.cmd,self.length)
- def __init__(self,session=0):
- self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.sock.connect("/tmp/xmms_%s.%d"%(pwd.getpwuid(os.geteuid())[0],session))
- def read_header(self):
- head = self.ClientPacketHeader()
- head.version, head.cmd, head.length = struct.unpack("hhl",self.sock.recv(8))
- return head
- def send(self, cmd, args=''):
- data = ""
- if isinstance(args, int):
- data = struct.pack('l', args)
- packet = struct.pack("hhl", 1, cmd, len(data)) + data
- self.sock.send(packet)
- def get_reply(self, format=''):
- header = self.read_header()
- if format: reply = struct.unpack(format, self.sock.recv(header.length))
- else: reply = self.sock.recv(header.length)
- return reply
- # general utilities ---
- def human_bitrate(bps):
- """Takes bits per second and returns a string w/ appropriate units."""
- units = ['bps', 'kbps', 'Mbps']
- # order of magnitude
- # if we get a weird number, assume kbps = kiloBYTESpersec
- # if we get a number ending in '00', assume it's 1000's of bits (correct)
- if str(bps).endswith("00"):
- reduce_factor = 1000
- else:
- reduce_factor = 1024.0
- oom = 0
- while bps /(reduce_factor**(oom+1)) >= 1:
- oom += 1
- return '%0.1f %s' % (bps/reduce_factor**oom, units[oom])
- def s_to_mmss(s):
- """Converts seconds to minutes:seconds: mm:ss."""
- s = int(s)
- sec = s % 60
- min = s / 60
- return '%2d:%02d' % (min, sec)
- def us_to_mmss(us):
- """Converts miliseconds to minutes:seconds: mm:ss."""
- us = int(us)
- return s_to_mmss(us/1000)
- class MediaPlayer:
- """A superclass that implements some book-keeping and some convenience functions
- for media player objects. These objects print out as an announce string, and
- get (and cache) info from the player in a clean, consistent "getitem" API."""
- def __init__(self, name):
- self.name = name
- def _nyi(self, name):
- print_debug("%s not yet implemented for `%s`." % (name, self.name))
- def _empty_dict(self):
- """Return an empty info dictionary with all of the keys set."""
- keys = ['player', 'playlist_position', 'playlist_length', 'file',
- 'display_title', 'elapsed', 'length', 'bitrate', 'frequency',
- 'title', 'artist', 'album', 'track']
- d = {}
- for key in keys: d[key] = ''
- d['player'] = self.name
- return d
- def play(self): self._nyi('Play')
- def stop(self): self._nyi('Stop')
- def pause(self): self._nyi('Pause')
- def next(self): self._nyi('Next')
- def prev(self): self._nyi('Prev')
- def eject(self): self._nyi('eject')
- def open(self): self._nyi('open')
- def shuffle(self): self._nyi('shuffle')
- def repeat(self): self._nyi('repeat')
- def next_n(self, n):
- """Skip "forward" `n` songs. Should obey the player's current
- repeat settings, and preferably shuffle. Overwrite if player has
- Playlist position setting."""
- for i in range(n):
- self.next()
- def prev_n(self, n):
- """Go "backward" `n` songs. Should obey the player's current
- repeat settings, and preferably shuffle. Overwrite if player has
- Playlist position setting."""
- for i in range(n):
- self.prev()
- def get_info(self): return self._empty_dict()
- def __str__(self):
- # FIXME: this should basically be done away with.
- """FIXME: This implements the old announce strings. It's probably easier
- to move this to the subclasses, but for now this is fine."""
- info = self.get_info()
- if self.name in SOCKET_PLAYERS:
- return '%s ~ [%s] of [%s] ~ %s ~ %sHz' % (info['display_title'], \
- info['elapsed'], info['length'], info['bitrate'], info['frequency'])
- elif self.name in ['juk', 'amarok']:
- return '%s - [%s] - %s ~ [%s] of [%s] ~ %s' % (info['artist'], \
- info['album'], info['title'], info['elapsed'], info['length'], \
- info['bitrate'])
- elif self.name in ['banshee', 'rhythmbox']:
- return '%s - [%s] - %s ~ [%s] of [%s]' % (info['artist'], info['album'], \
- info['title'], info['elapsed'], info['length'])
- def __repr__(self):
- return '<MediaPlayer %s ...>' % (self.name)
- class Xmms(MediaPlayer):
- def __init__(self, name='xmms'):
- MediaPlayer.__init__(self, name)
- self._ifcache = {}
- def _makeConnection(self):
- if self.name in ['beep', 'xmms']: return XmmsConnection()
- return False
- def _cmd(self, command, args='', reply_format=''):
- connection = self._makeConnection()
- connection.send(command, args=args)
- return connection.get_reply(format=reply_format)
- def play(self): self._cmd(SocketCommand.CMD_PLAY)
- def stop(self): self._cmd(SocketCommand.CMD_STOP)
- def pause(self): self._cmd(SocketCommand.CMD_PAUSE)
- def next(self): self._cmd(SocketCommand.CMD_PLAYLIST_NEXT)
- def prev(self): self._cmd(SocketCommand.CMD_PLAYLIST_PREV)
- def eject(self): self._cmd(SocketCommand.CMD_EJECT)
- def open(self): self._cmd(SocketCommand.CMD_EJECT)
- def shuffle(self): self._cmd(SocketCommand.CMD_TOGGLE_SHUFFLE)
- def repeat(self): self._cmd(SocketCommand.CMD_TOGGLE_REPEAT)
- def get_info(self):
- d = self._empty_dict()
- d['playlist_position'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_POS, reply_format='l')[0]
- position = d['playlist_position']
- d['playlist_length'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_LENGTH, reply_format='l')[0]
- d['file'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_FILE, args=position)[:-1]
- d['display_title'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_TITLE, args=position)[:-1]
- info = self._cmd(SocketCommand.CMD_GET_INFO, reply_format='lll')
- utime_elapsed = self._cmd(SocketCommand.CMD_GET_OUTPUT_TIME, reply_format='l')[0]
- utime_length = self._cmd(SocketCommand.CMD_GET_PLAYLIST_TIME, args=position, reply_format='l')[0]
- d['elapsed'] = us_to_mmss(utime_elapsed)
- d['length'] = us_to_mmss(utime_length)
- d['bitrate'] = human_bitrate(info[0])
- d['frequency'] = info[1]
- return d
- BEEP_FIRST_RUN = True
- BEEP_MESSAGE = """beep-media-player has a bug with its control socket and returns
- bogus information for bitrate, frequency, and number of channels. Consider the
- 'audacious' media player, or BMPx, as beep-media-player is no longer in
- development.""".replace("\n", ' ')
- class Beep(Xmms):
- def __init__(self):
- global BEEP_FIRST_RUN, BEEP_MESSAGE
- if BEEP_FIRST_RUN:
- print_info(BEEP_MESSAGE)
- BEEP_FIRST_RUN = False
- Xmms.__init__(self, 'beep')
- BMPX_FIRST_RUN = True
- BMPX_4013_WARNING = """You are running bmpx version "%s", which has known \
- bugs in the dbus interface. "BMP 0.40.14" fixes some of these, but pause \
- support is still known to be broken in this release."""
- BMPX_FORMAT = """%(artist)s - [%(album)s] - %(title)s ~ [%(length)s] \
- ~ %(kbps)s ~ %(freq)sHz"""
- class Bmpx(MediaPlayer):
- def __init__(self, name="bmpx"):
- global BMPX_FIRST_RUN
- if not bus:
- return
- MediaPlayer.__init__(self, name)
- self.Root = bus.get_object('org.mpris.bmp', '/')
- self.Player = bus.get_object('org.mpris.bmp', '/Player')
- self.TrackList = bus.get_object('org.mpris.bmp', '/TrackList')
- if BMPX_FIRST_RUN:
- BMPX_FIRST_RUN = False
- self.version = str(self.Root.Identity())
- if self.version < 'BMP 0.40.14':
- print BMPX_4013_WARNING % self.version
- def play(self):
- if self.version < 'BMP 0.40.14':
- print_info("playing does not work with version \"%s\" of BMPx" % self.version)
- return
- self.Player.Play()
- def stop(self):
- if self.version < 'BMP 0.40.14':
- print_info("stop disabled for this version of BMPx, since playing does not work.")
- return
- self.Player.Stop()
- def pause(self):
- if self.version < 'BMP 0.40.15':
- print_info("pausing does not work with version \"%s\" of BMPx" % self.version)
- return
- self.Player.Pause()
- def next(self): self.Player.Next()
- def prev(self): self.Player.Prev()
- # are these necessary? maybe they should be removed
- def eject(self): pass
- def open(self): pass
- def get_info(self):
- info = self.Player.GetMetadata()
- decode = lambda x: unicode(x).encode('utf-8')
- return {
- 'artist' : decode(info['artist']),
- 'album' : decode(info['album']),
- 'title' : decode(info['title']),
- 'length' : s_to_mmss(int(info['time'])),
- 'kbps' : human_bitrate(int(info['bitrate'])),
- 'freq' : decode(info['samplerate']),
- }
- def __str__(self):
- info = self.get_info()
- return BMPX_FORMAT % info
- AUDACIOUS_FIRST_RUN = True
- AUDACIOUS_NODBUS = """Audacious deprecated the control socket interface many \
- releases ago, and as of the release included with Ubuntu 8.04, it's officially \
- gone. For now, the python dbus bindings are required for Audacious usage until \
- a suitable interface using 'dbus-send' can be developed."""
- AUDACIOUS_FORMAT = """%(artist)s - [%(album)s] - %(title)s ~ [%(elapsed)s] \
- of [%(length)s] ~ %(kbps)s ~ %(freq)sHz"""
- class Audacious(MediaPlayer):
- format = AUDACIOUS_FORMAT
- def __init__(self, name="audacious"):
- MediaPlayer.__init__(self, name)
- global AUDACIOUS_FIRST_RUN
- if not bus and AUDACIOUS_FIRST_RUN:
- print_info(AUDACIOUS_NODBUS)
- AUDACIOUS_FIRST_RUN = False
- return
- AUDACIOUS_FIRST_RUN = False
- self.bus = bus
- # set up the mpris interfaces
- self.Root = bus.get_object('org.mpris.audacious', '/')
- self.Player = bus.get_object('org.mpris.audacious', '/Player')
- self.TrackList = bus.get_object('org.mpris.audacious', '/TrackList')
- # XXX: this interface is going away in Audacious 2.0 as per nenolod
- self.Atheme = bus.get_object('org.atheme.audacious', '/org/atheme/audacious')
- def play(self): self.Player.Play()
- def stop(self): self.Player.Stop()
- def pause(self): self.Player.Pause()
- def next(self): self.Player.Next()
- def prev(self): self.Player.Prev()
- # are these necessary? maybe they should be removed
- def eject(self): self.Atheme.Eject()
- def open(self): self.Atheme.Eject()
- def __str__(self):
- info = self.get_info()
- return self.format % info
- def get_info(self):
- kbps, freq, ch = map(int, self.Atheme.GetInfo())
- info_dict = self.Player.GetMetadata()
- return {
- 'kbps' : human_bitrate(kbps),
- 'freq' : freq,
- 'channels' : ch,
- 'artist' : unicode(info_dict['artist']).encode('utf-8'),
- 'album' : unicode(info_dict['album']).encode('utf-8'),
- 'title' : unicode(info_dict['title']).encode('utf-8'),
- 'elapsed' : us_to_mmss(self.Player.PositionGet()),
- 'length' : us_to_mmss(info_dict['length']),
- }
- BANSHEE_FIRST_RUN = True
- BANSHEE_MESSAGE = """Although banshee is supported without them, it is recommended
- that you install the python-dbus bindings for increased speed.""".replace("\n", " ")
- class Banshee(MediaPlayer):
- def __init__(self):
- global BANSHEE_FIRST_RUN, BANSHEE_MESSAGE
- if BANSHEE_FIRST_RUN and not bus:
- print_info(BANSHEE_MESSAGE)
- BANSHEE_FIRST_RUN = False
- MediaPlayer.__init__(self, 'banshee')
- self._ifcache = {}
- interface = ['play', 'stop', 'pause', 'next', 'prev', 'eject', 'open', 'get_info']
- if bus:
- self.d_obj = bus.get_object("org.bansheeproject.Banshee", "/org/bansheeproject/Banshee/PlayerEngine")
- self.banshee = dbus.Interface(self.d_obj, "org.bansheeproject.Banshee.PlayerEngine")
- for func in interface:
- setattr(self, func, getattr(self, '%s_dbus' % func))
- else:
- for func in interface:
- setattr(self, func, getattr(self, '%s_nodbus' % func))
- def play_dbus(self): self.banshee.Play()
- def stop_dbus(self): self.banshee.Pause()
- def pause_dbus(self): self.banshee.TogglePlaying()
- def next_dbus(self): self.banshee.Next()
- def prev_dbus(self): self.banshee.Previous()
- def eject_dbus(self): self.banshee.ShowWindow()
- def open_dbus(self): self.banshee.ShowWindow()
- def get_info_dbus(self):
- d = self._empty_dict()
- currentTrack = self.banshee.GetCurrentTrack()
- d['length'] = us_to_mmss(self.banshee.GetLength())
- d['elapsed'] = us_to_mmss(self.banshee.GetPosition())
- d['artist'] = str(currentTrack[u'album-artist'])
- d['title'] = str(currentTrack[u'name'])
- d['album'] = str(currentTrack[u'album'])
- return d
- def play_nodbus(self): command('banshee --play')
- def stop_nodbus(self): command('banshee --pause')
- def pause_nodbus(self): command('banshee --toggle-playing')
- def next_nodbus(self): command('banshee --next')
- def prev_nodbus(self): command('banshee --previous')
- def eject_nodbus(self): command('banshee --show')
- def open_nodbus(self): command('banshee --show')
- # shuffle & repeat not yet implemented
- def get_info_nodbus(self):
- d = self._empty_dict()
- info = command(' '.join(['banshee', '--hide-field', '--query-title',
- '--query-artist', '--query-position', '--query-album',
- '--query-duration'])).strip()
- # duration, artist, album, title, position
- # banshee reports things in seconds
- info = info.split('\n')
- d['length'] = us_to_mmss(info[0])
- d['artist'] = info[1]
- d['album'] = info[2]
- d['title'] = info[3]
- d['elapsed'] = us_to_mmss(info[4])
- return d
- class Rhythmbox(MediaPlayer):
- """MediaPlayer class for Rhythmbox, a Gtk/Gnome media player."""
- def __init__(self):
- if not bus:
- raise Exception('Rhythmbox is not supported w/o python-dbus bindings.')
- MediaPlayer.__init__(self, 'rhythmbox')
- player_obj = bus.get_object("org.gnome.Rhythmbox", "/org/gnome/Rhythmbox/Player")
- shell_obj = bus.get_object("org.gnome.Rhythmbox", "/org/gnome/Rhythmbox/Shell")
- self.player = dbus.Interface(player_obj, "org.gnome.Rhythmbox.Player")
- self.shell = dbus.Interface(shell_obj, "org.gnome.Rhythmbox.Shell")
- def play(self):
- if not bool(self.player.getPlaying()): self.player.playPause()
- def stop(self):
- if bool(self.player.getPlaying()): self.player.playPause()
- def pause(self): self.player.playPause()
- def next(self): self.player.next()
- def prev(self): self.player.previous()
- def eject(self): print_info("There isn't an easy way to do this in rhythmbox right now.")
- def open(self): print_info("There isn't an easy way to do this in rhythmbox right now.")
- def get_info(self):
- d = self._empty_dict()
- uri = unicode(self.player.getPlayingUri())
- properties = dict([(unicode(key), val) for key,val in dict(self.shell.getSongProperties(uri)).items()])
- d['length'] = s_to_mmss(int(properties.get('duration', 0)))
- d['elapsed'] = s_to_mmss(int(self.player.getElapsed()))
- d['artist'] = unicode(properties.get('artist', '')).encode('UTF-8')
- d['album'] = unicode(properties.get('album', '')).encode('UTF-8')
- d['title'] = unicode(properties.get('title', '')).encode('UTF-8')
- # Rhythmbox reports a 'bitrate', but as far as i can tell it's always 0
- return d
- JUK_FIRST_RUN = True
- DCOP_MESSAGE = """Although juk is supported without them, it is recommended that
- you install the python-dcop bindings for increased speed.""".replace("\n", ' ')
- class Juk(MediaPlayer):
- """MediaPlayer class for Juk, a Qt/KDE media player. This implementation is
- a bit messy because it resolves whether or not to use DCOP statically; after
- importing, the comparissons are made and the appropriate functions are used."""
- def __init__(self):
- global JUK_FIRST_RUN, DCOP_MESSAGE, pydcop
- if JUK_FIRST_RUN and not pydcop:
- print_info(DCOP_MESSAGE)
- JUK_FIRST_RUN = False
- MediaPlayer.__init__(self, 'juk')
- self._ifcache = {}
- # these functions are to be selected from _%s_dcop and #s_nodcop
- self._functions = ['eject', 'open']
- # these functions are created below; the keys are function names, the values
- # are juk PLayer dcop values
- self._func_map = {'play':'play', 'stop':'stop', 'pause':'playPause', 'next':'forward', 'prev':'back'}
- if pydcop:
- # if we have pydcop, create 'juk' and set some functions
- self.juk = pydcop.anyAppCalled("juk")
- self.get_property = (lambda x: self.juk.Player.trackProperty(x))
- self.get_juk = (lambda func: getattr(self.juk.Player, func)())
- for func in self._functions:
- setattr(self, func, getattr(self, '_%s_dcop' % func))
- else:
- # with no dcop, set equivalent functions to above using 'command' interface
- self.get_property = (lambda x: command('dcop juk Player trackProperty %s' % (x)).strip())
- self.get_juk = (lambda func: command('dcop juk Player %s' % func))
- for func in self._functions:
- setattr(self, func, getattr(self, '_%s_nodcop' % func))
- # this forloop sets all of the keys in 'func_map' to lambdas that call
- # whatever 'get_juk' was created by the conditional above
- for funcname, juk_property in self._func_map.items():
- setattr(self, funcname, (lambda prop=juk_property: self.get_juk(prop)))
- def _eject_dcop(self):
- pcop.dcop_call("juk", "juk-mainwindow#1", "restore", ())
- pcop.dcop_call("juk", "juk-mainwindow#1", "raise", ())
- def _open_dcop(self): self._eject_dcop()
- def _eject_nodcop(self):
- command('dcop juk juk-mainwindow#1 restore')
- command('dcop juk juk-mainwindow#1 raise')
- def _open_nodcop(self): self._eject_nodcop()
- def get_info(self):
- d = self._empty_dict()
- elapsed = self.get_juk('currentTime')
- d['elapsed'] = s_to_mmss(elapsed)
- d['title'] = self.get_property('Title')
- d['artist'] = self.get_property('Artist')
- d['album'] = self.get_property('Album')
- d['length'] = s_to_mmss(self.get_property('Seconds'))
- d['bitrate'] = '%s Kbps' % self.get_property('Bitrate')
- return d
- AMAROK2_FIRST_RUN = True
- AMAROK2_FORMAT = """%(artist)s - [%(album)s] - %(title)s ~ [%(elapsed)s] \
- of [%(length)s] ~ %(kbps)skbps ~ %(freq)sHz"""
- AMAROK2_NODBUS = """Amarok 2 uses dbus hooks, python dbus bindings are \
- required for Amarok 2 usage until a suitable interface using 'dbus-send' \
- can be developed."""
- class Amarok2(MediaPlayer):
- format = AMAROK2_FORMAT
- def __init__(self, name="amarok2"):
- MediaPlayer.__init__(self, name)
- global AMAROK2_FIRST_RUN
- if not bus and AMAROK2_FIRST_RUN:
- print_info(AMAROK2_NODBUS)
- AMAROK2_FIRST_RUN = False
- return
- AMAROK2_FIRST_RUN = False
- self.bus = bus
- # set up the mpris interfaces
- self.Root = bus.get_object("org.kde.amarok", "/")
- self.Player = bus.get_object("org.kde.amarok", "/Player")
- self.TrackList = bus.get_object("org.kde.amarok", "/TrackList")
- #DBUS COMMANDS
- def play(self): self.Player.Play()
- def stop(self): self.Player.Stop()
- def pause(self): self.Player.Pause()
- def next(self): self.Player.Next()
- def prev(self): self.Player.Prev()
- def __str__(self):
- info = self.get_info()
- return self.format % info
- def get_info(self):
- info_dict = self.Player.GetMetadata()
- return {
- 'kbps' : unicode(info_dict['audio-bitrate']).encode('utf-8'),
- 'freq' : unicode(info_dict['audio-samplerate']).encode('utf-8'),
- 'artist' : unicode(info_dict['artist']).encode('utf-8'),
- 'album' : unicode(info_dict['album']).encode('utf-8'),
- 'title' : unicode(info_dict['title']).encode('utf-8'),
- 'elapsed' : us_to_mmss(self.Player.PositionGet()),
- 'length' : us_to_mmss(info_dict['mtime']),
- }
- AMAROK_FIRST_RUN = True
- AMAROK_DCOP_MESSAGE = """Although amarok is supported without them, it is recommended that
- you install the python-dcop bindings for increased speed.""".replace("\n", ' ')
- class Amarok(MediaPlayer):
- """MediaPlayer class for Amarok, a Qt/KDE media player. This implementation is
- a bit messy because it resolves whether or not to use DCOP statically; after
- importing, the comparissons are made and the appropriate functions are used."""
- def __init__(self):
- global AMAROK_FIRST_RUN, AMAROK_DCOP_MESSAGE, pydcop
- if AMAROK_FIRST_RUN and not pydcop:
- print_info(AMAROK_DCOP_MESSAGE)
- AMAROK_FIRST_RUN = False
- MediaPlayer.__init__(self, 'amarok')
- self._ifcache = {}
- """If the pydcop is available, then we create a 'self.get_property' function
- that uses pydcop; if it isn't available, we create a function that works the same
- but using our 'command' interface. Then, using the 'self.get_property', we bind
- 'self.play', 'self.stop', etc. to the object's namespace."""
- self._functions = ['play', 'stop', 'pause']
- if pydcop:
- self.amarok = pydcop.anyAppCalled("amarok")
- self.get_property = (lambda x: getattr(self.amarok.player, x)())
- self.get_playlist = (lambda x: getattr(self.amarok.playlist, x)())
- self.set_playlist = (lambda x: self.amarok.playlist.playByIndex(x))
- else:
- self.get_property = (lambda x: command('dcop amarok player %s' % x).strip())
- self.get_playlist = (lambda x: command('dcop amarok playlist %s' % x).strip())
- self.set_playlist = (lambda x: command('dcop amarok playlist playByIndex %s' % x))
- for func in self._functions:
- setattr(self, func, (lambda func=func: self.get_property(func)))
- def open(self): print_info("There isn't an easy way to do this with amarok right now.")
- def eject(self): print_info("There isn't an easy way to do this with amarok right now.")
- def prev_n(self, n):
- """Go backwards 'n' times in the playlist"""
- position = self.get_playlist('getActiveIndex')
- new_position = position - n
- if new_position < 0: new_position = 0
- self.set_playlist(new_position)
- def next_n(self, n):
- """Go forwards 'n' times in the playlist"""
- position = self.get_playlist('getActiveIndex')
- playlist_length = self.get_playlist('getTotalTrackCount')
- new_position = position + n
- if new_position >= playlist_length:
- new_position = playlist_length - 1
- self.set_playlist(new_position)
- def get_info(self):
- d = self._empty_dict()
- # this comes back in 'm:ss'
- d['elapsed'] = self.get_property('currentTime')
- d['title'] = self.get_property('title')
- d['artist'] = self.get_property('artist')
- d['album'] = self.get_property('album')
- d['length'] = self.get_property('totalTime')
- d['bitrate'] = '%s Kbps' % self.get_property('bitrate')
- return d
- GDQ_FORMAT = """%(artist)s - [%(album)s] - %(title)s [%(elapsed)s of%(length)s]"""
- class Guayadeque(MediaPlayer):
- """MediaPlayer class for Guayadeque."""
- def __init__(self):
- if not bus:
- raise Exception('guayadeque is not supported w/o python-dbus bindings.')
- MediaPlayer.__init__(self, 'guayadeque')
- # bus = dbus.SessionBus()
- player_obj = bus.get_object("org.mpris.guayadeque", "/Player")
- shell_obj = bus.get_object("org.mpris.guayadeque", "/org/mpris/MediaPlayer2")
- self.player = dbus.Interface(player_obj, "org.freedesktop.MediaPlayer")
- self.shell = dbus.Interface(shell_obj, "org.freedesktop.MediaPlayer.Shell")
- def play(self): self.player.Play()
- def stop(self): self.player.Stop()
- def pause(self): self.player.Pause()
- def next(self): self.player.Next()
- def prev(self): self.player.Prev()
- def eject(self): print_info("There isn't an easy way to do this in guayadeque right now.")
- def open(self): print_info("There isn't an easy way to do this in guayadeque right now.")
- def get_info(self):
- info = self.player.GetMetadata()
- elapsed = self.player.PositionGet()
- decode = lambda x: unicode(x).encode('utf-8')
- return {
- 'artist' : decode(info['artist']),
- 'album' : decode(info['album']),
- 'title' : decode(info['title']),
- 'length' : s_to_mmss(int(info['time'])),
- 'elapsed' : us_to_mmss(int(elapsed)),
- # 'kbps' : human_bitrate(int(info['bitrate'])),
- # 'freq' : decode(info['samplerate']),
- }
- def __str__(self):
- info = self.get_info()
- return GDQ_FORMAT % info
- def current_player():
- player = which()
- print_debug("detected %s is running" % player)
- if player is 'banshee1':
- player = 'banshee'
- if not player:
- raise Exception("Currently not running a supported media player.")
- player_obj = eval("%s()" % player.capitalize())
- return player_obj
- def help(args):
- prnt("Commands:")
- prnt(" \002/mp3\002 : announce the currently playing mp3")
- prnt(" \002/mp3\002 \00303stop\003 : stop playing")
- prnt(" \002/mp3\002 \00303play\003 : start playing")
- prnt(" \002/mp3\002 \00303pause\003 : pause playback")
- prnt(" \002/mp3\002 \00303next [#]\003 : skip to next (# of) track(s)")
- prnt(" \002/mp3\002 \00303prev [#]\003 : skip to prev (# of) track(s)")
- prnt(" \002/mp3\002 \00303open\003 : open files")
- prnt("")
- def usage():
- prnt("Usage: \002/mp3\002 [cmd]")
- prnt("\t\002/mp3\002 \037help\037 for commands.")
- def announce():
- xchat.command('me is listening to: %s' % (current_player()))
- def stop(*args):
- current_player().stop()
- def play(*args):
- current_player().play()
- def pause(*args):
- current_player().pause()
- def open(*args):
- current_player().open()
- def eject(*args):
- current_player().eject()
- def _make_num(numstr):
- try: return int(numstr)
- except:
- print_error('"%s" must be a number.' % numstr)
- return None
- def next(argv):
- num = None
- if len(argv) == 3:
- num = _make_num(argv[2])
- if num is None: return
- if num is None:
- current_player().next()
- else:
- current_player().next_n(num)
- def prev(argv):
- num = None
- if len(argv) == 3:
- num = _make_num(argv[2])
- if num is None: return
- if num is None:
- current_player().prev()
- else:
- current_player().prev_n(num)
- def dispatch(argv, arg_to_eol, c):
- if len(argv) == 1:
- try: announce()
- except Exception, ex:
- if __debugging__: print_debug(traceback.format_exc())
- if len(getattr(ex, 'args', [])): print_info(ex.args[0])
- else: usage()
- return xchat.EAT_XCHAT
- try:
- {
- "help" : help,
- "stop" : stop,
- "play" : play,
- "pause" : pause,
- "next" : next,
- "prev" : prev,
- "eject" : eject,
- "open" : open,
- }[argv[1]](argv)
- except Exception, ex:
- if __debugging__: print_debug(traceback.format_exc())
- if len(getattr(ex, 'args', [])): print_info(ex.args[0])
- else: usage()
- return xchat.EAT_XCHAT
- __unhook__ = xchat.hook_command("mp3", dispatch, help="/mp3 help for commands.")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement