Created
May 25, 2017 05:12
-
-
Save jmoiron/7429343c3cbd57c0c8dc1315b69bf586 to your computer and use it in GitHub Desktop.
old weechat plugin using python interface for mp3 announcing from a bunch of diff linux mp3 players
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# this module uses code from Michael Hudson's xmms-py modules | |
# this code is available in its original form here: | |
# http://www.python.net/crew/mwh/hacks/xmms-py.html | |
# the original code had this notice on it: | |
# | |
# Released by Michael Hudson on 2000-07-01, and again on 2001-04-26 | |
# public domain; no warranty, no restrictions | |
# | |
# most of the support for xmms, beep, and audacious comes from | |
# various pieces of Hudson's modules | |
# | |
# licensed under the GNU GPL v2 | |
# a copy of this license can be found: | |
# http://www.gnu.org/copyleft/gpl.html | |
# | |
# | |
# 11/26/07 - Fixed a bug w/ using juk/amarok w/o pydcop | |
# 11/28/07 - Started weechat port | |
# 07/13/08 - Changed Audacious support to Dbus, added BMPx (v 0.6) | |
class WeeChat(object): | |
"""Fake 'weechat' object. Can proxy in for printing, etc.""" | |
def __nonzero__(self): | |
return False | |
def prnt(self, string): | |
string = string.replace('\00302', '') | |
string = string.replace('\00303', '') | |
string = string.replace('\002', '') | |
string = string.replace('\003', '') | |
print string | |
command=prnt | |
try: | |
import weechat | |
except: | |
weechat = WeeChat() | |
import sys, struct | |
import socket, os, pwd | |
from subprocess import * | |
pcop, pydcop, bus = False, False, False | |
try: | |
import pcop, pydcop | |
except: pass | |
try: | |
import dbus | |
bus = dbus.SessionBus() | |
except: | |
dbus = False | |
__module_name__ = "pymp3" | |
__module_version__ = "0.6" | |
__module_description__ = "mp3 announce/utils" | |
__debugging__ = False | |
if __debugging__: | |
import traceback | |
def print_debug(string): | |
global __debugging__ | |
if __debugging__: | |
string = str(string) | |
weechat.prnt("\00302" + string + "\003") | |
def print_info(string): | |
string = str(string) | |
weechat.prnt("\00303" + string + "\003") | |
# XXX: beep was superceded by BMPx, BMPx is in the process of being replaced | |
# by MPX (from the same developers). Hopefully, MPRIS will at least bring | |
# some stability to the IPC/RPC interface :) | |
players = { | |
'audacious' : 'audacious', | |
'bmpx' : 'beep-media-player-2', | |
'beep' : 'beep-media-player', | |
# 'xmms2' : 'xmms2d', | |
'xmms' : 'xmms', | |
'banshee' : 'banshee.exe', | |
'banshee1' : 'banshee-1', | |
'juk' : 'juk', | |
'amarok' : 'amarokapp', | |
'rhythmbox' : 'rhythmbox', | |
} | |
_player_order = ['audacious', 'bmpx', 'beep', 'xmms', | |
'banshee', 'juk', 'amarok', 'rhythmbox'] | |
# find out which player is running | |
def which(): | |
ps = Popen(['ps', 'aux'], stdout=PIPE) | |
output = ps.stdout.readlines() | |
for line in output: | |
for player in _player_order: | |
findstr = players[player] | |
if line.rfind(findstr) > -1: | |
return player | |
return | |
#FIXME: This code isn't that great; it should probably not rely on 'split' since | |
# quoted won't work properly. Think of a way to fix this (maybe resort to shell=True) | |
def command(runstr): | |
return Popen(runstr.split(), stdout=PIPE).communicate()[0] | |
# these players use xmms style command socket | |
SOCKET_PLAYERS = ['audacious', 'beep', 'xmms'] | |
class SocketCommand: | |
CMD_PLAY = 2 # | |
CMD_PAUSE = 3 # | |
CMD_STOP = 4 # | |
CMD_GET_PLAYLIST_POS = 7 # | |
# TODO: make socket_next and socket_prev use this | |
# instead of using next/prev repeatedly | |
#CMD_SET_PLAYLIST_POS = 8 # | |
CMD_GET_PLAYLIST_LENGTH = 9 # | |
CMD_GET_OUTPUT_TIME = 11 # | |
CMD_GET_PLAYLIST_FILE = 17 # | |
CMD_GET_PLAYLIST_TITLE = 18 # | |
CMD_GET_PLAYLIST_TIME = 19 # | |
CMD_GET_INFO = 20 # | |
CMD_EJECT = 28 # | |
CMD_PLAYLIST_PREV = 29 # | |
CMD_PLAYLIST_NEXT = 30 # | |
CMD_TOGGLE_REPEAT = 33 # | |
CMD_TOGGLE_SHUFFLE = 34 # | |
""" | |
I've tried to make the following class a facsimily of a "persistent connection", | |
but my attempts have led to the following error with xmms: | |
** WARNING **: ctrl_write_packet(): Failed to send data: Broken pipe | |
Even manually closing, deleting, and then re-initializing the socket did not avoid | |
this warning. It seems that only letting the garbage collector snag old Connection | |
objects makes xmms happy. | |
There is one aspect here missing from Hudson's original library: sending a custom | |
send format with the 'args' option. I wasn't using this feature in any requests, | |
as all of my provided formats were 'l' anyway. | |
""" | |
class XmmsConnection: | |
class ClientPacketHeader: | |
def __init__(self): | |
self.version,self.cmd,self.length = 0,0,0 | |
def __repr__(self): | |
return "<< %s : version: %s cmd: %s length: %s >>"\ | |
%(self.__class__.__name__,self.version,self.cmd,self.length) | |
def encode(self): | |
return struct.pack("hhl",self.version,self.cmd,self.length) | |
def __init__(self,session=0): | |
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
self.sock.connect("/tmp/xmms_%s.%d"%(pwd.getpwuid(os.geteuid())[0],session)) | |
def read_header(self): | |
head = self.ClientPacketHeader() | |
head.version, head.cmd, head.length = struct.unpack("hhl",self.sock.recv(8)) | |
return head | |
def send(self, cmd, args=''): | |
data = "" | |
if isinstance(args, int): | |
data = struct.pack('l', args) | |
packet = struct.pack("hhl", 1, cmd, len(data)) + data | |
self.sock.send(packet) | |
def get_reply(self, format=''): | |
header = self.read_header() | |
if format: reply = struct.unpack(format, self.sock.recv(header.length)) | |
else: reply = self.sock.recv(header.length) | |
return reply | |
# general utilities --- | |
def human_bitrate(bps): | |
"""Takes bits per second and returns a string w/ appropriate units.""" | |
units = ['bps', 'kbps', 'Mbps'] | |
# order of magnitude | |
# if we get a weird number, assume kbps = kiloBYTESpersec | |
# if we get a number ending in '00', assume it's 1000's of bits (correct) | |
if str(bps).endswith("00"): | |
reduce_factor = 1000 | |
else: | |
reduce_factor = 1024.0 | |
oom = 0 | |
while bps /(reduce_factor**(oom+1)) >= 1: | |
oom += 1 | |
return '%0.1f %s' % (bps/reduce_factor**oom, units[oom]) | |
def s_to_mmss(s): | |
"""Converts seconds to minutes:seconds: mm:ss.""" | |
s = int(s) | |
sec = s % 60 | |
min = s / 60 | |
return '%2d:%02d' % (min, sec) | |
def us_to_mmss(us): | |
"""Converts miliseconds to minutes:seconds: mm:ss.""" | |
us = int(us) | |
return s_to_mmss(us/1000) | |
class MediaPlayer: | |
"""A superclass that implements some book-keeping and some convenience functions | |
for media player objects. These objects print out as an announce string, and | |
get (and cache) info from the player in a clean, consistent "getitem" API.""" | |
def __init__(self, name): | |
self.name = name | |
def _nyi(self, name): | |
print_debug("%s not yet implemented for `%s`." % name, self.name) | |
def _empty_dict(self): | |
"""Return an empty info dictionary with all of the keys set.""" | |
keys = ['player', 'playlist_position', 'playlist_length', 'file', | |
'display_title', 'elapsed', 'length', 'bitrate', 'frequency', | |
'title', 'artist', 'album', 'track'] | |
d = {} | |
for key in keys: d[key] = '' | |
d['player'] = self.name | |
return d | |
def play(self): self._nyi('Play') | |
def stop(self): self._nyi('Stop') | |
def pause(self): self._nyi('Pause') | |
def next(self): self._nyi('Next') | |
def prev(self): self._nyi('Prev') | |
def eject(self): self._nyi('eject') | |
def open(self): self._nyi('open') | |
def shuffle(self): self._nyi('shuffle') | |
def repeat(self): self._nyi('repeat') | |
def next_n(self, n): | |
"""Skip "forward" `n` songs. Should obey the player's current | |
repeat settings, and preferably shuffle. Overwrite if player has | |
Playlist position setting.""" | |
for i in range(n): | |
self.next() | |
def prev_n(self, n): | |
"""Go "backward" `n` songs. Should obey the player's current | |
repeat settings, and preferably shuffle. Overwrite if player has | |
Playlist position setting.""" | |
for i in range(n): | |
self.prev() | |
def get_info(self): return self._empty_dict() | |
def __str__(self): | |
# FIXME: this should basically be done away with. | |
"""FIXME: This implements the old announce strings. It's probably easier | |
to move this to the subclasses, but for now this is fine.""" | |
info = self.get_info() | |
if self.name in SOCKET_PLAYERS: | |
return '%s ~ [%s] of [%s] ~ %s ~ %sHz' % (info['display_title'], \ | |
info['elapsed'], info['length'], info['bitrate'], info['frequency']) | |
elif self.name in ['juk', 'amarok']: | |
return '%s - [%s] - %s ~ [%s] of [%s] ~ %s' % (info['artist'], \ | |
info['album'], info['title'], info['elapsed'], info['length'], \ | |
info['bitrate']) | |
elif self.name in ['banshee', 'rhythmbox']: | |
return '%s - [%s] - %s ~ [%s] of [%s]' % (info['artist'], info['album'], \ | |
info['title'], info['elapsed'], info['length']) | |
def __repr__(self): | |
return '<MediaPlayer %s ...>' % (self.name) | |
class Xmms(MediaPlayer): | |
def __init__(self, name='xmms'): | |
MediaPlayer.__init__(self, name) | |
self._ifcache = {} | |
def _makeConnection(self): | |
if self.name in ['beep', 'xmms']: return XmmsConnection() | |
return False | |
def _cmd(self, command, args='', reply_format=''): | |
connection = self._makeConnection() | |
connection.send(command, args=args) | |
return connection.get_reply(format=reply_format) | |
def play(self): self._cmd(SocketCommand.CMD_PLAY) | |
def stop(self): self._cmd(SocketCommand.CMD_STOP) | |
def pause(self): self._cmd(SocketCommand.CMD_PAUSE) | |
def next(self): self._cmd(SocketCommand.CMD_PLAYLIST_NEXT) | |
def prev(self): self._cmd(SocketCommand.CMD_PLAYLIST_PREV) | |
def eject(self): self._cmd(SocketCommand.CMD_EJECT) | |
def open(self): self._cmd(SocketCommand.CMD_EJECT) | |
def shuffle(self): self._cmd(SocketCommand.CMD_TOGGLE_SHUFFLE) | |
def repeat(self): self._cmd(SocketCommand.CMD_TOGGLE_REPEAT) | |
def get_info(self): | |
d = self._empty_dict() | |
d['playlist_position'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_POS, reply_format='l')[0] | |
position = d['playlist_position'] | |
d['playlist_length'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_LENGTH, reply_format='l')[0] | |
d['file'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_FILE, args=position)[:-1] | |
d['display_title'] = self._cmd(SocketCommand.CMD_GET_PLAYLIST_TITLE, args=position)[:-1] | |
info = self._cmd(SocketCommand.CMD_GET_INFO, reply_format='lll') | |
utime_elapsed = self._cmd(SocketCommand.CMD_GET_OUTPUT_TIME, reply_format='l')[0] | |
utime_length = self._cmd(SocketCommand.CMD_GET_PLAYLIST_TIME, args=position, reply_format='l')[0] | |
d['elapsed'] = us_to_mmss(utime_elapsed) | |
d['length'] = us_to_mmss(utime_length) | |
d['bitrate'] = human_bitrate(info[0]) | |
d['frequency'] = info[1] | |
return d | |
BEEP_FIRST_RUN = True | |
BEEP_MESSAGE = """beep-media-player has a bug with its control socket and returns | |
bogus information for bitrate, frequency, and number of channels. Consider the | |
'audacious' media player, or BMPx, as beep-media-player is no longer in | |
development.""".replace("\n", ' ') | |
class Beep(Xmms): | |
def __init__(self): | |
global BEEP_FIRST_RUN, BEEP_MESSAGE | |
if BEEP_FIRST_RUN: | |
print_info(BEEP_MESSAGE) | |
BEEP_FIRST_RUN = False | |
Xmms.__init__(self, 'beep') | |
BMPX_FIRST_RUN = True | |
BMPX_4013_WARNING = """You are running bmpx version "%s", which has known \ | |
bugs in the dbus interface. "BMP 0.40.14" fixes some of these, but pause \ | |
support is still known to be broken in this release.""" | |
BMPX_FORMAT = """%(artist)s - [%(album)s] - %(title)s ~ [%(length)s] \ | |
~ %(kbps)s ~ %(freq)sHz""" | |
class Bmpx(MediaPlayer): | |
def __init__(self, name="bmpx"): | |
global BMPX_FIRST_RUN | |
if not bus: | |
return | |
MediaPlayer.__init__(self, name) | |
self.Root = bus.get_object('org.mpris.bmp', '/') | |
self.Player = bus.get_object('org.mpris.bmp', '/Player') | |
self.TrackList = bus.get_object('org.mpris.bmp', '/TrackList') | |
if BMPX_FIRST_RUN: | |
BMPX_FIRST_RUN = False | |
self.version = str(self.Root.Identity()) | |
if self.version < 'BMP 0.40.14': | |
print BMPX_4013_WARNING % self.version | |
def play(self): | |
if self.version < 'BMP 0.40.14': | |
print_info("playing does not work with version \"%s\" of BMPx" % self.version) | |
return | |
self.Player.Play() | |
def stop(self): | |
if self.version < 'BMP 0.40.14': | |
print_info("stop disabled for this version of BMPx, since playing does not work.") | |
return | |
self.Player.Stop() | |
def pause(self): | |
if self.version < 'BMP 0.40.15': | |
print_info("pausing does not work with version \"%s\" of BMPx" % self.version) | |
return | |
self.Player.Pause() | |
def next(self): self.Player.Next() | |
def prev(self): self.Player.Prev() | |
# are these necessary? maybe they should be removed | |
def eject(self): pass | |
def open(self): pass | |
def get_info(self): | |
info = self.Player.GetMetadata() | |
decode = lambda x: unicode(x).encode('utf-8') | |
return { | |
'artist' : decode(info['artist']), | |
'album' : decode(info['album']), | |
'title' : decode(info['title']), | |
'length' : s_to_mmss(int(info['time'])), | |
'kbps' : human_bitrate(int(info['bitrate'])), | |
'freq' : decode(info['samplerate']), | |
} | |
def __str__(self): | |
info = self.get_info() | |
return BMPX_FORMAT % info | |
AUDACIOUS_FIRST_RUN = True | |
AUDACIOUS_NODBUS = """Audacious deprecated the control socket interface many \ | |
releases ago, and as of the release included with Ubuntu 8.04, it's officially \ | |
gone. For now, the python dbus bindings are required for Audacious usage until \ | |
a suitable interface using 'dbus-send' can be developed.""" | |
AUDACIOUS_FORMAT = """%(artist)s - [%(album)s] - %(title)s ~ [%(elapsed)s] \ | |
of [%(length)s] ~ %(kbps)s ~ %(freq)sHz""" | |
class Audacious(MediaPlayer): | |
format = AUDACIOUS_FORMAT | |
def __init__(self, name="audacious"): | |
MediaPlayer.__init__(self, name) | |
global AUDACIOUS_FIRST_RUN | |
if not bus and AUDACIOUS_FIRST_RUN: | |
print_info(AUDACIOUS_NODBUS) | |
AUDACIOUS_FIRST_RUN = False | |
return | |
AUDACIOUS_FIRST_RUN = False | |
self.bus = bus | |
# set up the mpris interfaces | |
self.Root = bus.get_object('org.mpris.audacious', '/') | |
self.Player = bus.get_object('org.mpris.audacious', '/Player') | |
self.TrackList = bus.get_object('org.mpris.audacious', '/TrackList') | |
# XXX: this interface is going away in Audacious 2.0 as per nenolod | |
self.Atheme = bus.get_object('org.atheme.audacious', '/org/atheme/audacious') | |
def play(self): self.Player.Play() | |
def stop(self): self.Player.Stop() | |
def pause(self): self.Player.Pause() | |
def next(self): self.Player.Next() | |
def prev(self): self.Player.Prev() | |
# are these necessary? maybe they should be removed | |
def eject(self): self.Atheme.Eject() | |
def open(self): self.Atheme.Eject() | |
def __str__(self): | |
info = self.get_info() | |
return self.format % info | |
def get_info(self): | |
kbps, freq, ch = map(int, self.Atheme.GetInfo()) | |
info_dict = self.Player.GetMetadata() | |
return { | |
'kbps' : human_bitrate(kbps), | |
'freq' : freq, | |
'channels' : ch, | |
'artist' : unicode(info_dict['artist']).encode('utf-8'), | |
'album' : unicode(info_dict['album']).encode('utf-8'), | |
'title' : unicode(info_dict['title']).encode('utf-8'), | |
'elapsed' : us_to_mmss(self.Player.PositionGet()), | |
'length' : us_to_mmss(info_dict['length']), | |
} | |
BANSHEE_FIRST_RUN = True | |
BANSHEE_MESSAGE = """Although banshee is supported without them, it is recommended | |
that you install the python-dbus bindings for increased speed.""".replace("\n", " ") | |
class Banshee(MediaPlayer): | |
def __init__(self): | |
global BANSHEE_FIRST_RUN, BANSHEE_MESSAGE | |
if BANSHEE_FIRST_RUN and not bus: | |
print_info(BANSHEE_MESSAGE) | |
BANSHEE_FIRST_RUN = False | |
MediaPlayer.__init__(self, 'banshee') | |
self._ifcache = {} | |
interface = ['play', 'stop', 'pause', 'next', 'prev', 'eject', 'open', 'get_info'] | |
if bus: | |
self.d_obj = bus.get_object("org.gnome.Banshee", "/org/gnome/Banshee/Player") | |
self.banshee = dbus.Interface(self.d_obj, "org.gnome.Banshee.Core") | |
for func in interface: | |
setattr(self, func, getattr(self, '%s_dbus' % func)) | |
else: | |
for func in interface: | |
setattr(self, func, getattr(self, '%s_nodbus' % func)) | |
def play_dbus(self): self.banshee.Play() | |
def stop_dbus(self): self.banshee.Pause() | |
def pause_dbus(self): self.banshee.TogglePlaying() | |
def next_dbus(self): self.banshee.Next() | |
def prev_dbus(self): self.banshee.Previous() | |
def eject_dbus(self): self.banshee.ShowWindow() | |
def open_dbus(self): self.banshee.ShowWindow() | |
def get_info_dbus(self): | |
d = self._empty_dict() | |
d['length'] = s_to_mmss(self.banshee.GetPlayingDuration()) | |
d['elapsed'] = s_to_mmss(self.banshee.GetPlayingPosition()) | |
d['artist'] = unicode(self.banshee.GetPlayingArtist()).encode('UTF-8') | |
d['title'] = unicode(self.banshee.GetPlayingTitle()).encode('UTF-8') | |
d['album'] = unicode(self.banshee.GetPlayingAlbum()).encode('UTF-8') | |
return d | |
def play_nodbus(self): command('banshee --play') | |
def stop_nodbus(self): command('banshee --pause') | |
def pause_nodbus(self): command('banshee --toggle-playing') | |
def next_nodbus(self): command('banshee --next') | |
def prev_nodbus(self): command('banshee --previous') | |
def eject_nodbus(self): command('banshee --show') | |
def open_nodbus(self): command('banshee --show') | |
# shuffle & repeat not yet implemented | |
def get_info_nodbus(self): | |
d = self._empty_dict() | |
info = command(' '.join(['banshee', '--hide-field', '--query-title', | |
'--query-artist', '--query-position', '--query-album', | |
'--query-duration'])).strip() | |
# duration, artist, album, title, position | |
# banshee reports things in seconds | |
info = info.split('\n') | |
d['length'] = s_to_mmss(info[0]) | |
d['artist'] = info[1] | |
d['album'] = info[2] | |
d['title'] = info[3] | |
d['elapsed'] = s_to_mmss(info[4]) | |
return d | |
class Rhythmbox(MediaPlayer): | |
"""MediaPlayer class for Rhythmbox, a Gtk/Gnome media player.""" | |
def __init__(self): | |
if not bus: | |
raise Exception('Rhythmbox is not supported w/o python-dbus bindings.') | |
MediaPlayer.__init__(self, 'rhythmbox') | |
player_obj = bus.get_object("org.gnome.Rhythmbox", "/org/gnome/Rhythmbox/Player") | |
shell_obj = bus.get_object("org.gnome.Rhythmbox", "/org/gnome/Rhythmbox/Shell") | |
self.player = dbus.Interface(player_obj, "org.gnome.Rhythmbox.Player") | |
self.shell = dbus.Interface(shell_obj, "org.gnome.Rhythmbox.Shell") | |
def play(self): | |
if not bool(self.player.getPlaying()): self.player.playPause() | |
def stop(self): | |
if bool(self.player.getPlaying()): self.player.playPause() | |
def pause(self): self.player.playPause() | |
def next(self): self.player.next() | |
def prev(self): self.player.previous() | |
def eject(self): print_info("There isn't an easy way to do this in rhythmbox right now.") | |
def open(self): print_info("There isn't an easy way to do this in rhythmbox right now.") | |
def get_info(self): | |
d = self._empty_dict() | |
uri = unicode(self.player.getPlayingUri()) | |
properties = dict([(unicode(key), val) for key,val in dict(self.shell.getSongProperties(uri)).items()]) | |
d['length'] = s_to_mmss(int(properties.get('duration', 0))) | |
d['elapsed'] = s_to_mmss(int(self.player.getElapsed())) | |
d['artist'] = unicode(properties.get('artist', '')).encode('UTF-8') | |
d['album'] = unicode(properties.get('album', '')).encode('UTF-8') | |
d['title'] = unicode(properties.get('title', '')).encode('UTF-8') | |
# Banshee reports a 'bitrate', but as far as i can tell it's always 0 | |
return d | |
JUK_FIRST_RUN = True | |
DCOP_MESSAGE = """Although juk is supported without them, it is recommended that | |
you install the python-dcop bindings for increased speed.""".replace("\n", ' ') | |
class Juk(MediaPlayer): | |
"""MediaPlayer class for Juk, a Qt/KDE media player. This implementation is | |
a bit messy because it resolves whether or not to use DCOP statically; after | |
importing, the comparissons are made and the appropriate functions are used.""" | |
def __init__(self): | |
global JUK_FIRST_RUN, DCOP_MESSAGE, pydcop | |
if JUK_FIRST_RUN and not pydcop: | |
print_info(DCOP_MESSAGE) | |
JUK_FIRST_RUN = False | |
MediaPlayer.__init__(self, 'juk') | |
self._ifcache = {} | |
# these functions are to be selected from _%s_dcop and #s_nodcop | |
self._functions = ['eject', 'open'] | |
# these functions are created below; the keys are function names, the values | |
# are juk PLayer dcop values | |
self._func_map = {'play':'play', 'stop':'stop', 'pause':'playPause', 'next':'forward', 'prev':'back'} | |
if pydcop: | |
# if we have pydcop, create 'juk' and set some functions | |
self.juk = pydcop.anyAppCalled("juk") | |
self.get_property = (lambda x: self.juk.Player.trackProperty(x)) | |
self.get_juk = (lambda func: getattr(self.juk.Player, func)()) | |
for func in self._functions: | |
setattr(self, func, getattr(self, '_%s_dcop' % func)) | |
else: | |
# with no dcop, set equivalent functions to above using 'command' interface | |
self.get_property = (lambda x: command('dcop juk Player trackProperty %s' % (x)).strip()) | |
self.get_juk = (lambda func: command('dcop juk Player %s' % func)) | |
for func in self._functions: | |
setattr(self, func, getattr(self, '_%s_nodcop' % func)) | |
# this forloop sets all of the keys in 'func_map' to lambdas that call | |
# whatever 'get_juk' was created by the conditional above | |
for funcname, juk_property in self._func_map.items(): | |
setattr(self, funcname, (lambda prop=juk_property: self.get_juk(prop))) | |
def _eject_dcop(self): | |
pcop.dcop_call("juk", "juk-mainwindow#1", "restore", ()) | |
pcop.dcop_call("juk", "juk-mainwindow#1", "raise", ()) | |
def _open_dcop(self): self._eject_dcop() | |
def _eject_nodcop(self): | |
command('dcop juk juk-mainwindow#1 restore') | |
command('dcop juk juk-mainwindow#1 raise') | |
def _open_nodcop(self): self._eject_nodcop() | |
def get_info(self): | |
d = self._empty_dict() | |
elapsed = self.get_juk('currentTime') | |
d['elapsed'] = s_to_mmss(elapsed) | |
d['title'] = self.get_property('Title') | |
d['artist'] = self.get_property('Artist') | |
d['album'] = self.get_property('Album') | |
d['length'] = s_to_mmss(self.get_property('Seconds')) | |
d['bitrate'] = '%s Kbps' % self.get_property('Bitrate') | |
return d | |
AMAROK_FIRST_RUN = True | |
AMAROK_DCOP_MESSAGE = """Although amarok is supported without them, it is recommended that | |
you install the python-dcop bindings for increased speed.""".replace("\n", ' ') | |
class Amarok(MediaPlayer): | |
"""MediaPlayer class for Amarok, a Qt/KDE media player. This implementation is | |
a bit messy because it resolves whether or not to use DCOP statically; after | |
importing, the comparissons are made and the appropriate functions are used.""" | |
def __init__(self): | |
global AMAROK_FIRST_RUN, AMAROK_DCOP_MESSAGE, pydcop | |
if AMAROK_FIRST_RUN and not pydcop: | |
print_info(AMAROK_DCOP_MESSAGE) | |
AMAROK_FIRST_RUN = False | |
MediaPlayer.__init__(self, 'amarok') | |
self._ifcache = {} | |
"""If the pydcop is available, then we create a 'self.get_property' function | |
that uses pydcop; if it isn't available, we create a function that works the same | |
but using our 'command' interface. Then, using the 'self.get_property', we bind | |
'self.play', 'self.stop', etc. to the object's namespace.""" | |
self._functions = ['play', 'stop', 'pause'] | |
if pydcop: | |
self.amarok = pydcop.anyAppCalled("amarok") | |
self.get_property = (lambda x: getattr(self.amarok.player, x)()) | |
self.get_playlist = (lambda x: getattr(self.amarok.playlist, x)()) | |
self.set_playlist = (lambda x: self.amarok.playlist.playByIndex(x)) | |
else: | |
self.get_property = (lambda x: command('dcop amarok player %s' % x).strip()) | |
self.get_playlist = (lambda x: command('dcop amarok playlist %s' % x).strip()) | |
self.set_playlist = (lambda x: command('dcop amarok playlist playByIndex %s' % x)) | |
for func in self._functions: | |
setattr(self, func, (lambda func=func: self.get_property(func))) | |
def open(self): print_info("There isn't an easy way to do this with amarok right now.") | |
def eject(self): print_info("There isn't an easy way to do this with amarok right now.") | |
def prev_n(self, n): | |
"""Go backwards 'n' times in the playlist""" | |
position = self.get_playlist('getActiveIndex') | |
new_position = position - n | |
if new_position < 0: new_position = 0 | |
self.set_playlist(new_position) | |
def next_n(self, n): | |
"""Go forwards 'n' times in the playlist""" | |
position = self.get_playlist('getActiveIndex') | |
playlist_length = self.get_playlist('getTotalTrackCount') | |
new_position = position + n | |
if new_position >= playlist_length: | |
new_position = playlist_length - 1 | |
self.set_playlist(new_position) | |
def get_info(self): | |
d = self._empty_dict() | |
# this comes back in 'm:ss' | |
d['elapsed'] = self.get_property('currentTime') | |
d['title'] = self.get_property('title') | |
d['artist'] = self.get_property('artist') | |
d['album'] = self.get_property('album') | |
d['length'] = self.get_property('totalTime') | |
d['bitrate'] = '%s Kbps' % self.get_property('bitrate') | |
return d | |
def current_player(): | |
player = which() | |
print_debug("detected %s is running" % player) | |
if player is 'banshee1': | |
player = 'banshee' | |
if not player: | |
raise Exception("Currently not running a supported media player.") | |
player_obj = eval("%s()" % player.capitalize()) | |
return player_obj | |
def help(args): | |
weechat.prnt("Commands:") | |
weechat.prnt(" \002/mp3\002 : announce the currently playing mp3") | |
weechat.prnt(" \002/mp3\002 \00303stop\003 : stop playing") | |
weechat.prnt(" \002/mp3\002 \00303play\003 : start playing") | |
weechat.prnt(" \002/mp3\002 \00303pause\003 : pause playback") | |
weechat.prnt(" \002/mp3\002 \00303next [#]\003 : skip to next (# of) track(s)") | |
weechat.prnt(" \002/mp3\002 \00303prev [#]\003 : skip to prev (# of) track(s)") | |
weechat.prnt(" \002/mp3\002 \00303open\003 : open files") | |
weechat.prnt("") | |
def usage(): | |
weechat.prnt("Usage: \002/mp3\002 [cmd]") | |
weechat.prnt("\t\002/mp3\002 \037help\037 for commands.") | |
def announce(): | |
weechat.command('/me is listening to: %s' % (current_player())) | |
def stop(*args): | |
current_player().stop() | |
def play(*args): | |
current_player().play() | |
def pause(*args): | |
current_player().pause() | |
def open(*args): | |
current_player().open() | |
def eject(*args): | |
current_player().eject() | |
def _make_num(numstr): | |
try: return int(numstr) | |
except: | |
print_error('"%s" must be a number.' % numstr) | |
return None | |
def next(argv): | |
num = None | |
if len(argv) == 3: | |
num = _make_num(argv[2]) | |
if num is None: return | |
if num is None: | |
current_player().next() | |
else: | |
current_player().next_n(num) | |
def prev(argv): | |
num = None | |
if len(argv) == 3: | |
num = _make_num(argv[2]) | |
if num is None: return | |
if num is None: | |
current_player().prev() | |
else: | |
current_player().prev_n(num) | |
#def dispatch(argv, arg_to_eol, c): | |
def dispatch(server, args): | |
args = "/mp3 " + args | |
args = args.strip() | |
args = args.split(' ') | |
print_debug(args) | |
if len(args) == 1: | |
try: announce() | |
except Exception, ex: | |
if __debugging__: print_debug(traceback.format_exc()) | |
else: usage() | |
# eat in weechat is 0? | |
return 0 | |
try: | |
{ | |
"help" : help, | |
"stop" : stop, | |
"play" : play, | |
"pause" : pause, | |
"next" : next, | |
"prev" : prev, | |
"eject" : eject, | |
"open" : open, | |
}[args[1]](args) | |
except Exception, ex: | |
if __debugging__: print_debug(traceback.format_exc()) | |
else: usage() | |
return 0 | |
if weechat: | |
weechat.register(__module_name__, __module_version__, '', __module_description__) | |
weechat.add_command_handler('mp3', 'dispatch') | |
# are we '__main__' when we are imported by weechat? probably not | |
if __name__ == '__main__' and not weechat: | |
announce() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment