Last active
December 29, 2015 03:48
-
-
Save junkmechanic/cae82755b531cd76a149 to your computer and use it in GitHub Desktop.
A fork of https://github.com/Schnouki/spop/blob/master/dspop/dspop to suit my needs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015 The spop contributors | |
# | |
# This file is part of spop. | |
# | |
# spop is free software: you can redistribute it and/or modify it under the | |
# terms of the GNU General Public License as published by the Free Software | |
# Foundation, either version 3 of the License, or (at your option) any later | |
# version. | |
# | |
# spop is distributed in the hope that it will be useful, but WITHOUT ANY | |
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR | |
# A PARTICULAR PURPOSE. See the GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License along with | |
# spop. If not, see <http://www.gnu.org/licenses/>. | |
# | |
# Additional permission under GNU GPL version 3 section 7 | |
# | |
# If you modify this Program, or any covered work, by linking or combining it | |
# with libspotify (or a modified version of that library), containing parts | |
# covered by the terms of the Libspotify Terms of Use, the licensors of this | |
# Program grant you additional permission to convey the resulting work. | |
# Changes made: | |
# 1. In the prompt displaying the tracks in a playlist, selecting a track would | |
# present another prompt to select from the following options: | |
# i. Add Album to Queue (adds the album around the track to the queue) | |
# ii. Add Track to Queue | |
# iii. Replace Queue with Album | |
# iv. Replace Queue with Track | |
# This makes sense if the playlists consist of albums stacked one after the | |
# other. If the playlist consists of songs in random order then the album | |
# related options above would add all the songs in that playlist that belong | |
# to the same album as the selected track (if there are any) in the order | |
# they appear in that playlist. | |
# 2. The script would send error notifications via libnotify. | |
# This fork requires (python-)notify2 and (python-)plumbum libraries for the | |
# notifications. | |
import os | |
import sys | |
import json | |
import errno | |
import socket | |
import os.path | |
import notify2 | |
import plumbum as pb | |
import subprocess | |
import collections | |
# {{{ Parameters | |
# Possible values: {index}, {artist}, {track}, {album}, {duration}, {uri}, {playing} | |
TRACK_FMT = "{playing}{index} - {artist} - {album} - {track} ({duration})" | |
SEARCH_ALBUM_FMT = "({year}) {artist} - {album} ({tracks} tracks)" | |
SEARCH_PLAYLIST_FMT = "{name} (by {owner}, {tracks} tracks)" | |
SEARCH_TRACK_FMT = "{artist} - {album} - {track} ({duration})" | |
DMENU_OPTS = ["-i", "-l", "40"] | |
ROFI_OPTS = ["-dmenu", "-columns", "1", "-i"] | |
# Use rofi if it's available | |
USE_ROFI = False | |
for _path in os.get_exec_path(): | |
_path = os.path.join(_path, "rofi") | |
if os.path.isfile(_path) and os.access(_path, os.R_OK | os.X_OK): | |
USE_ROFI = True | |
break | |
# }}} | |
# {{{ Spop client | |
class SpopClient: | |
def __init__(self, host, port): | |
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self._sock.connect((host, port)) | |
self.greeting = self._sock.recv(1024).decode().strip() | |
def _command(self, command, *args): | |
esc_args = ['"'+arg.replace('"', '\\"')+'"' if type(arg) is str else str(arg) for arg in args] | |
esc_args.insert(0, command) | |
cmd = " ".join(esc_args) + "\n" | |
self._sock.send(cmd.encode()) | |
buf = b"" | |
while True: | |
tmp = self._sock.recv(1024) | |
buf += tmp | |
try: | |
obj = json.loads(buf.decode()) | |
return obj | |
except: | |
pass | |
def __getattr__(self, name): | |
if name in ("repeat", "shuffle", "qclear", "qls", "ls", "goto", "add", "next", "prev", "toggle", "play", "offline_toggle", | |
"search", "status", "uadd", "uinfo", "uplay"): | |
def func(*attrs): | |
return self._command(name.replace("_", "-"), *attrs) | |
return func | |
else: | |
raise AttributeError | |
# }}} | |
# {{{ Dmenu interaction | |
def dmenu(items, highlight=[], prompt="dspop > "): | |
args = ["dmenu"] + DMENU_OPTS | |
if USE_ROFI: | |
args = ["rofi", "-p", prompt] + ROFI_OPTS | |
if len(highlight) > 0: | |
args += ["-a", ",".join(highlight)] | |
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) | |
for it in items: | |
p.stdin.write((it+"\n").encode()) | |
out, _ = p.communicate() | |
if len(out) == 0: | |
return None | |
out = out[:-1].decode() | |
return items.index(out) | |
def dmenu_input(prompt): | |
args = ["rofi" if USE_ROFI else "dmenu", "-p", prompt] | |
args.extend(ROFI_OPTS if USE_ROFI else DMENU_OPTS) | |
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) | |
out, _ = p.communicate() | |
return out[:-1].decode() | |
def format_search_album(album): | |
params = { | |
"album": album["title"], | |
"artist": album["artist"], | |
"year": album["year"], | |
"type": album["album_type"], | |
"tracks": len(album["tracks"]), | |
} | |
return SEARCH_ALBUM_FMT.format(**params) | |
def format_search_playlist(playlist): | |
params = { | |
"name": playlist["name"], | |
"owner": playlist["owner"], | |
"tracks": len(playlist["tracks"]), | |
} | |
return SEARCH_PLAYLIST_FMT.format(**params) | |
def format_search_track(track): | |
params = { | |
"album": track["album"], | |
"artist": track["artist"], | |
"duration": "%d:%02d" % divmod(track["duration"]/1000, 60), | |
"index": track["index"], | |
"track": track["title"], | |
"uri": track["uri"] | |
} | |
return SEARCH_TRACK_FMT.format(**params) | |
def format_tracks(tracks, current_track): | |
index_len = len(str(len(tracks))) | |
index_format = "%{}d".format(index_len) | |
items = [] | |
for track in tracks: | |
playing = "* " if "uri" in current_track and current_track["uri"] == track["uri"] and not USE_ROFI else " " | |
if USE_ROFI: | |
playing = "" | |
params = { | |
"album": track["album"], | |
"artist": track["artist"], | |
"duration": "%d:%02d" % divmod(track["duration"]/1000, 60), | |
"index": index_format % track["index"], | |
"track": track["title"], | |
"uri": track["uri"], | |
"playing": playing, | |
} | |
items.append(TRACK_FMT.format(**params)) | |
return items | |
def format_folder(items, indent=0, offline_status=False): | |
dst, indices = [], [] | |
for it in items: | |
if not it: | |
continue | |
if it["type"] == "separator": | |
dst.append(" " * (indent + (4 if offline_status else 0)) + "-"*40) | |
indices.append(None) | |
elif it["type"] == "folder": | |
dst.append(" " * (indent + (4 if offline_status else 0)) + "+ " + it["name"]) | |
indices.append(None) | |
rdst, rind = format_folder(it["playlists"], indent+2, offline_status) | |
dst.extend(rdst) | |
indices.extend(rind) | |
elif it["type"] == "playlist": | |
s = " "*indent | |
if offline_status: | |
s = ("[X] " if it["offline"] else "[ ] ") + s | |
s += "{0[name]} ({0[tracks]})".format(it) | |
dst.append(s) | |
indices.append(it["index"]) | |
return dst, indices | |
# }}} | |
# {{{ Menus | |
def main_menu(sc): | |
done = False | |
while not done: | |
choices = [ | |
("queue", "[ Queue ]>"), | |
("playlists", "[ Playlists ]>"), | |
("repeat", "[ Repeat ]"), | |
("shuffle", "[ Shuffle ]"), | |
("sep", "-"*40), | |
("search", "[ Search ]>"), | |
("search_pl", "[ Search (with playlists) ]>"), | |
("offline", "[ Offline playlists ]>"), | |
("sep", "-"*40), | |
("toggle", "[ Toggle Playback ]"), | |
("next", "[ Next Track ]"), | |
("prev", "[ Previous Track ]"), | |
] | |
idx = dmenu([ch[1] for ch in choices]) | |
choice = None if idx is None else choices[idx][0] | |
if choice is None: | |
done = True | |
elif choice == "queue": | |
done = menu_queue(sc) | |
elif choice == "playlists": | |
done = menu_playlists(sc) | |
elif choice == "offline": | |
menu_offline(sc) | |
elif choice == "repeat": | |
sc.repeat() | |
elif choice == "shuffle": | |
sc.shuffle() | |
elif choice == "toggle": | |
sc.toggle() | |
elif choice == "next": | |
sc.next() | |
elif choice == "prev": | |
sc.prev() | |
elif choice == "search": | |
done = menu_search(sc) | |
elif choice == "search_pl": | |
done = menu_search(sc, include_playlists=True) | |
def menu_queue(sc): | |
done = False | |
while not done: | |
items = ["[ Clear queue ]"] | |
tracks = sc.qls()["tracks"] | |
current_track = sc.status() | |
highlight = [] | |
if "current_track" in current_track: | |
highlight = [str(current_track["current_track"])] | |
items.extend(format_tracks(tracks, current_track)) | |
choice = dmenu(items, highlight, "Play Queue > ") | |
if choice is None: | |
done = True | |
elif choice == 0: | |
sc.qclear() | |
done = True | |
else: | |
sc.goto(choice) | |
return False | |
def menu_playlists(sc): | |
pls = sc.ls()["playlists"] | |
items, indices = format_folder(pls) | |
done = False | |
while not done: | |
choice = dmenu(items) | |
if choice is None: | |
return False | |
else: | |
idx = indices[choice] | |
if idx is None: | |
continue | |
done = menu_playlist(sc, idx) | |
return True | |
def menu_playlist(sc, idx): | |
tracks = sc.ls(idx)["tracks"] | |
# TODO: handle unavailable tracks | |
items = ["[ Set as queue ]", "[ Add to queue ]"] | |
current_track = sc.status() | |
items.extend(format_tracks(tracks, current_track)) | |
while True: | |
choice = dmenu(items) | |
if choice is None: | |
return False | |
elif choice == 0: | |
sc.play(idx) | |
return False | |
elif choice == 1: | |
sc.add(idx) | |
return False | |
else: | |
menu_track(sc, idx, choice - 1) | |
# sc.play(idx, choice-1) | |
# return True | |
def menu_track(sc, idx, track): | |
items = ["[ Add Album to Queue ]", | |
"[ Add Track to Queue ]", | |
"[ Replace Queue with Track ]", | |
"[ Replace Queue with Album ]"] | |
choice = dmenu(items) | |
if choice is None: | |
return | |
elif choice == 0: | |
for tno in sorrounding_album(sc, idx, track): | |
sc.add(idx, tno) | |
elif choice == 1: | |
sc.add(idx, track) | |
elif choice == 2: | |
album_tracks = sorrounding_album(sc, idx, track) | |
sc.play(idx, album_tracks[0]) | |
for tno in album_tracks[1:]: | |
sc.add(idx, tno) | |
elif choice == 3: | |
sc.play(idx, track) | |
def sorrounding_album(sc, idx, track): | |
srndin_album = sc.ls(idx)["tracks"][track]["album"] | |
album_tracks = [] | |
for t in sc.ls(idx)["tracks"]: | |
if t["album"] == srndin_album: | |
album_tracks.append(t["index"]) | |
return album_tracks | |
def menu_offline(sc): | |
while True: | |
pls = sc.ls()["playlists"] | |
items, indices = format_folder(pls, offline_status=True) | |
choice = dmenu(items) | |
if choice is None: | |
return False | |
else: | |
idx = indices[choice] | |
if idx is None: | |
continue | |
sc.offline_toggle(idx) | |
def menu_search(sc, include_playlists=False): | |
query = dmenu_input("Query > ") | |
if len(query) == 0: | |
return False | |
res = sc.search(query) | |
containers_by_uri = {} | |
SearchItem = collections.namedtuple("SearchItem", ["uri", "title", "is_container"]) | |
items = [] | |
albums, singles = [], [] | |
if "albums" in res and len(res["albums"]) > 0: | |
for album in res["albums"]: | |
if not album["available"]: | |
continue | |
ainfo = sc.uinfo(album["uri"]) | |
containers_by_uri[album["uri"]] = ainfo | |
item = SearchItem(album["uri"], " " + format_search_album(ainfo), True) | |
if ainfo["album_type"] == "single": | |
singles.append(item) | |
else: | |
albums.append(item) | |
if len(albums) > 0: | |
albums.sort(key=lambda item: item.title) | |
items.append(SearchItem(None, "Albums:", None)) | |
items.extend(albums) | |
if "tracks" in res and len(res["tracks"]) > 0: | |
items.append(SearchItem(None, "Tracks:", None)) | |
for track in res["tracks"]: | |
if not track["available"]: | |
continue | |
items.append(SearchItem(track["uri"], " " + format_search_track(track), False)) | |
if len(singles) > 0: | |
singles.sort(key=lambda item: item.title) | |
items.append(SearchItem(None, "Singles:", None)) | |
items.extend(singles) | |
if include_playlists and "playlists" in res and len(res["playlists"]) > 0: | |
playlists = [] | |
for playlist in res["playlists"]: | |
if "error" in playlist: | |
continue | |
pinfo = sc.uinfo(playlist["uri"]) | |
containers_by_uri[playlist["uri"]] = pinfo | |
item = SearchItem(playlist["uri"], " " + format_search_playlist(pinfo), True) | |
playlists.append(item) | |
if len(playlists) > 0: | |
playlists.sort(key=lambda item: item.title) | |
items.append(SearchItem(None, "Playlists:", None)) | |
items.extend(playlists) | |
while True: | |
idx = dmenu([it.title for it in items]) | |
sel_item = None if idx is None else items[idx] | |
if sel_item is None or sel_item.uri is None: | |
return False | |
sel_items = ["[ Set as queue ]", "[ Add to queue ]"] | |
highlight = [] | |
if sel_item.is_container: | |
sel_cont = containers_by_uri[sel_item.uri] | |
sel_cont_tracks = [track for track in sel_cont["tracks"] | |
if track["available"]] | |
current_track = sc.status() | |
if "uri" in current_track: | |
for idx, track in enumerate(sel_cont_tracks): | |
if track["uri"] == current_track["uri"]: | |
highlight = [str(idx + 2)] | |
break | |
sel_items.extend(format_tracks(sel_cont_tracks, current_track)) | |
sel_choice = dmenu(sel_items, highlight) | |
if sel_choice == 0: | |
sc.uplay(sel_item.uri) | |
elif sel_choice == 1: | |
sc.uadd(sel_item.uri) | |
elif sel_choice is not None: | |
sc.uplay(sel_cont_tracks[sel_choice-2]["uri"]) | |
# }}} | |
def check_failure_reason(): | |
ps = pb.local['ps'] | |
grep = pb.local['grep'] | |
which = pb.local['which'] | |
spopd = which('spopd') | |
chain = ps['aux'] | grep['spop'] | |
running = False | |
for proc in chain().split('\n'): | |
if not proc: | |
continue | |
pname = proc.split()[10].split()[0] | |
if pname == 'spopd' or pname == spopd: | |
running = True | |
notify2.init('dspop') | |
summary = "Dspop - Couldn't connect to spopd" | |
spotify_img = '/home/ankur/.files/images/spotify_black.png' | |
if running: | |
notify2.Notification(summary, '\nSpopd seems to be running.' + | |
'\nCheck your config for the correct port.', | |
spotify_img).show() | |
else: | |
notify2.Notification(summary, "\nCan't find spopd process running.", | |
spotify_img).show() | |
if __name__ == "__main__": | |
try: | |
sc = SpopClient("localhost", 6602) | |
except OSError as err: | |
if err.errno == errno.ECONNREFUSED: | |
check_failure_reason() | |
sys.exit() | |
else: | |
raise | |
main_menu(sc) | |
# Local Variables: | |
# mode: python | |
# End: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment