Skip to content

Instantly share code, notes, and snippets.

@gnunicorn
Created December 1, 2011 16:18
Show Gist options
  • Save gnunicorn/1417912 to your computer and use it in GitHub Desktop.
Save gnunicorn/1417912 to your computer and use it in GitHub Desktop.
soundcloud multimedia keys for linux
#!/usr/bin/env python
"""
Toggle Play-Pause of soundcloud running in your chrome (you obvisouly started with '--remote-shell-port=9222').
inspired (and largely copied) from:
http://code.google.com/p/mog-mm/
"""
import socket
try:
import json
except ImportError:
import simplejson as json
import os
import sys
try:
import gobject
import dbus
import dbus.service
import dbus.mainloop.glib
except ImportError:
print "Skipping GNOME/Linux imports"
# credit: https://github.com/yappie/chrome-remote-python
class ChromeTab(object):
def __init__(self, crs, tab_id, url):
self.crs = crs
self.tab_id = int(tab_id)
self.url = url
def __repr__(self):
return "ChromeTab(tab_id = %d, url = '%s')" % (self.tab_id, self.url)
def v8_cmd(self, cmd, no_response = False):
res = self.crs.send_raw(
cmd,
tool = 'V8Debugger',
destination = self.tab_id,
no_response = no_response)
if not no_response:
assert res['result'] == 0
return res
def v8_attach(self):
return self.v8_cmd({ "command": "attach" })
def v8_detach(self):
return self.v8_cmd({ "command": "detach" })
def v8_eval_expr(self, expr):
debugger_json = {
"type": "request",
"command": "evaluate",
"arguments":
{
"expression": expr
}
}
print self.url
res = self.v8_cmd({
"command": "debugger_command",
"data": debugger_json
})
if res['data']['success']:
return res['data']['body']['value']
else:
raise Exception('V8 Error: ' + res['data']['message'])
def v8_evaluate_js(self, js):
return self.v8_cmd(
{"command": "evaluate_javascript", "data": js},
no_response = True,
)
# credit: https://github.com/yappie/chrome-remote-python
class ChromeRemoteShell(object):
def __init__(self, host = '127.0.0.1', port = 9222):
self.verbose = False
self.sock = socket.socket()
try:
self.sock.connect((host, port))
except:
raise Exception(
"Can't connect.\n"
"Did you forget to run shell?\n"
" google-chrome --remote-shell-port=9222"
)
# TODO
# this logic is rather strange. sometimes Chrome replies
# with 'ChromeDevToolsHandshake' (i.e. missing CRLF)
# apparently it always works on the third try if the
# first try fails
try:
self.sock.send('ChromeDevToolsHandshake\r\n')
if self.sock.recv(4096) != 'ChromeDevToolsHandshake\r\n':
self.sock.settimeout(0.1)
self.sock.send('ChromeDevToolsHandshake\r\n')
if self.sock.recv(4096) != 'ChromeDevToolsHandshake\r\n':
self.sock.send('ChromeDevToolsHandshake\r\n')
assert self.sock.recv(4096) == 'ChromeDevToolsHandshake\r\n'
except:
print "Caught Timeout in CRS __init__"
def send_raw(self, cmd, tool = 'DevToolsService', destination = None,
no_response = False):
js = json.dumps(cmd)
headers = {
'Content-Length': len(js),
'Tool': tool,
}
if destination:
headers['Destination'] = destination
if self.verbose: print '--- ******************* ---'
for h,v in headers.items():
self.sock.send('%s:%s\r\n' % (h,v))
if self.verbose: print 'SENT> %s:%s\r\n' % (h,v),
self.sock.send('\r\n')
if self.verbose: print 'SENT> '
self.sock.send(js)
if self.verbose: print 'SENT> %s' % js
if self.verbose: print '---\nGot back:'
if not no_response:
txt = self.sock.recv(40960)
_, _, js_res = txt.partition('\r\n\r\n')
if self.verbose: print txt
if self.verbose: print '---'
return json.loads(js_res)
def ping(self):
return self.send_raw({ "command": "ping" })
def version(self):
return self.send_raw({ "command": "version" })['data']
def tabs(self):
return list(ChromeTab(self, tab_id, url) for tab_id, url in
self.send_raw({ "command": "list_tabs" })['data'])
def tab_by_url(self, url):
for tab in self.tabs():
if tab.url == url:
return tab
raise LookupError("Tab not found")
def attach(self, tab_id):
return self.send_raw({ "command": "version" })['data']
class MogMM(object):
def evaluate_js(self, cmd):
# connect to the chrome shell
crs = ChromeRemoteShell()
# find the MOG tab by URL
for t in crs.tabs():
if 'soundcloud.com' in t.url:
tab = t
break
else:
return False
print tab
try:
tab.v8_evaluate_js("""
(function(cmd) {
function _currently_playing() {
return $.grep(soundManager.soundIDs,
function(val) {
var sound = soundManager.sounds[val];
if (sound.playState === 1 && sound.paused == false) {
return true;
}
});
}
function _get_latest_playing() {
var playing = _currently_playing();
if (playing.length) {
return playing[0];
}
if (soundManager.soundsIDs.length) {
return soundManager.soundIDs[soundManager.soundsIDs.length -1];
}
}
function _get_all() {
return $.map($('.player.mode'),
function(itm) {
return Number($(itm).attr("data-sc-track"))
});
}
var commands = {
play: function() {
var playing = _currently_playing();
if (playing.length) {
soundManager.pauseAll();
} else {
soundManager.resume(soundManager.soundIDs[soundManager.soundIDs.length -1]);
}
},
next: function() {
var all_items = _get_all();
var cur_idx = all_items.indexOf(_get_latest_playing());
if (cur_idx === -1 || cur_idx === all_items.length - 1) {
cur_idx = 0;
}
soundManager.pauseAll();
/* this broken atm */
soundManager.start(all_items[cur_idx]);
}
}[cmd]();
})('%s');
""" % cmd.lower())
except KeyError, e:
print "Command '%s' unknown" % cmd
def on_mediakey(comes_from, what):
"""
gets called when multimedia keys are pressed down.
"""
if what in ['Play', 'Next', 'Previous']:
mog = MogMM()
if what == 'Play':
print ('>> Got PlayPause multimedia key')
mog.evaluate_js('play')
"""
# FIXME: those break the soundcloud website atm...
elif what == 'Next':
print ('>> Got Next multimedia key')
mog.evaluate_js('next')
elif what == 'Previous':
print ('>> Got Prev multimedia key')
mog.evaluate_js('previous')
"""
else:
print ('Got an unknown multimedia key...')
def on_command(cmd):
"""
gets called when a command is passed via argument
"""
mog = MogMM()
if cmd == 'play':
print ('>> Got PlayPause command')
mog.evaluate_js('play')
elif cmd == 'next':
print ('>> Got Next command')
mog.evaluate_js('next')
elif cmd == 'prev':
print ('>> Got Prev command')
mog.evaluate_js('previous')
def gnome_loop():
# set up the glib main loop.
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.Bus(dbus.Bus.TYPE_SESSION)
bus_object = bus.get_object('org.gnome.SettingsDaemon',
'/org/gnome/SettingsDaemon/MediaKeys')
# this is what gives us the multi media keys.
dbus_interface = 'org.gnome.SettingsDaemon.MediaKeys'
bus_object.GrabMediaPlayerKeys("MyMultimediaThingy", 0,
dbus_interface=dbus_interface)
# connect_to_signal registers our callback function.
bus_object.connect_to_signal('MediaPlayerKeyPressed',
on_mediakey)
# and we start the main loop.
mainloop = gobject.MainLoop()
mainloop.run()
if __name__ == "__main__":
if (len(sys.argv) == 2):
if sys.argv[1] == 'daemon':
gnome_loop()
elif sys.argv[1] == 'play':
on_command('play')
elif sys.argv[1] == 'next':
on_command('next')
elif sys.argv[1] == 'prev':
on_command('prev')
else:
print ('Unknown command. Please use play (Play/Pause), next (Next Track), prev (Previous Track), or daemon (listen to mm keys, on GNOME)')
else:
print ('Please provide one argument at the command line')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment