Skip to content

Instantly share code, notes, and snippets.

@junkmechanic
Last active December 29, 2015 03:48
Show Gist options
  • Save junkmechanic/cae82755b531cd76a149 to your computer and use it in GitHub Desktop.
Save junkmechanic/cae82755b531cd76a149 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# Copyright (C) 2010, 2011, 2012, 2013, 2014, 2015 The spop contributors
#
# This file is part of spop.
#
# spop is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# spop is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# spop. If not, see <http://www.gnu.org/licenses/>.
#
# Additional permission under GNU GPL version 3 section 7
#
# If you modify this Program, or any covered work, by linking or combining it
# with libspotify (or a modified version of that library), containing parts
# covered by the terms of the Libspotify Terms of Use, the licensors of this
# Program grant you additional permission to convey the resulting work.
# Changes made:
# 1. In the prompt displaying the tracks in a playlist, selecting a track would
# present another prompt to select from the following options:
# i. Add Album to Queue (adds the album around the track to the queue)
# ii. Add Track to Queue
# iii. Replace Queue with Album
# iv. Replace Queue with Track
# This makes sense if the playlists consist of albums stacked one after the
# other. If the playlist consists of songs in random order then the album
# related options above would add all the songs in that playlist that belong
# to the same album as the selected track (if there are any) in the order
# they appear in that playlist.
# 2. The script would send error notifications via libnotify.
# This fork requires (python-)notify2 and (python-)plumbum libraries for the
# notifications.
import os
import sys
import json
import errno
import socket
import os.path
import notify2
import plumbum as pb
import subprocess
import collections
# {{{ Parameters
# Possible values: {index}, {artist}, {track}, {album}, {duration}, {uri}, {playing}
TRACK_FMT = "{playing}{index} - {artist} - {album} - {track} ({duration})"
SEARCH_ALBUM_FMT = "({year}) {artist} - {album} ({tracks} tracks)"
SEARCH_PLAYLIST_FMT = "{name} (by {owner}, {tracks} tracks)"
SEARCH_TRACK_FMT = "{artist} - {album} - {track} ({duration})"
DMENU_OPTS = ["-i", "-l", "40"]
ROFI_OPTS = ["-dmenu", "-columns", "1", "-i"]
# Use rofi if it's available
USE_ROFI = False
for _path in os.get_exec_path():
_path = os.path.join(_path, "rofi")
if os.path.isfile(_path) and os.access(_path, os.R_OK | os.X_OK):
USE_ROFI = True
break
# }}}
# {{{ Spop client
class SpopClient:
def __init__(self, host, port):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self.greeting = self._sock.recv(1024).decode().strip()
def _command(self, command, *args):
esc_args = ['"'+arg.replace('"', '\\"')+'"' if type(arg) is str else str(arg) for arg in args]
esc_args.insert(0, command)
cmd = " ".join(esc_args) + "\n"
self._sock.send(cmd.encode())
buf = b""
while True:
tmp = self._sock.recv(1024)
buf += tmp
try:
obj = json.loads(buf.decode())
return obj
except:
pass
def __getattr__(self, name):
if name in ("repeat", "shuffle", "qclear", "qls", "ls", "goto", "add", "next", "prev", "toggle", "play", "offline_toggle",
"search", "status", "uadd", "uinfo", "uplay"):
def func(*attrs):
return self._command(name.replace("_", "-"), *attrs)
return func
else:
raise AttributeError
# }}}
# {{{ Dmenu interaction
def dmenu(items, highlight=[], prompt="dspop > "):
args = ["dmenu"] + DMENU_OPTS
if USE_ROFI:
args = ["rofi", "-p", prompt] + ROFI_OPTS
if len(highlight) > 0:
args += ["-a", ",".join(highlight)]
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
for it in items:
p.stdin.write((it+"\n").encode())
out, _ = p.communicate()
if len(out) == 0:
return None
out = out[:-1].decode()
return items.index(out)
def dmenu_input(prompt):
args = ["rofi" if USE_ROFI else "dmenu", "-p", prompt]
args.extend(ROFI_OPTS if USE_ROFI else DMENU_OPTS)
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
out, _ = p.communicate()
return out[:-1].decode()
def format_search_album(album):
params = {
"album": album["title"],
"artist": album["artist"],
"year": album["year"],
"type": album["album_type"],
"tracks": len(album["tracks"]),
}
return SEARCH_ALBUM_FMT.format(**params)
def format_search_playlist(playlist):
params = {
"name": playlist["name"],
"owner": playlist["owner"],
"tracks": len(playlist["tracks"]),
}
return SEARCH_PLAYLIST_FMT.format(**params)
def format_search_track(track):
params = {
"album": track["album"],
"artist": track["artist"],
"duration": "%d:%02d" % divmod(track["duration"]/1000, 60),
"index": track["index"],
"track": track["title"],
"uri": track["uri"]
}
return SEARCH_TRACK_FMT.format(**params)
def format_tracks(tracks, current_track):
index_len = len(str(len(tracks)))
index_format = "%{}d".format(index_len)
items = []
for track in tracks:
playing = "* " if "uri" in current_track and current_track["uri"] == track["uri"] and not USE_ROFI else " "
if USE_ROFI:
playing = ""
params = {
"album": track["album"],
"artist": track["artist"],
"duration": "%d:%02d" % divmod(track["duration"]/1000, 60),
"index": index_format % track["index"],
"track": track["title"],
"uri": track["uri"],
"playing": playing,
}
items.append(TRACK_FMT.format(**params))
return items
def format_folder(items, indent=0, offline_status=False):
dst, indices = [], []
for it in items:
if not it:
continue
if it["type"] == "separator":
dst.append(" " * (indent + (4 if offline_status else 0)) + "-"*40)
indices.append(None)
elif it["type"] == "folder":
dst.append(" " * (indent + (4 if offline_status else 0)) + "+ " + it["name"])
indices.append(None)
rdst, rind = format_folder(it["playlists"], indent+2, offline_status)
dst.extend(rdst)
indices.extend(rind)
elif it["type"] == "playlist":
s = " "*indent
if offline_status:
s = ("[X] " if it["offline"] else "[ ] ") + s
s += "{0[name]} ({0[tracks]})".format(it)
dst.append(s)
indices.append(it["index"])
return dst, indices
# }}}
# {{{ Menus
def main_menu(sc):
done = False
while not done:
choices = [
("queue", "[ Queue ]>"),
("playlists", "[ Playlists ]>"),
("repeat", "[ Repeat ]"),
("shuffle", "[ Shuffle ]"),
("sep", "-"*40),
("search", "[ Search ]>"),
("search_pl", "[ Search (with playlists) ]>"),
("offline", "[ Offline playlists ]>"),
("sep", "-"*40),
("toggle", "[ Toggle Playback ]"),
("next", "[ Next Track ]"),
("prev", "[ Previous Track ]"),
]
idx = dmenu([ch[1] for ch in choices])
choice = None if idx is None else choices[idx][0]
if choice is None:
done = True
elif choice == "queue":
done = menu_queue(sc)
elif choice == "playlists":
done = menu_playlists(sc)
elif choice == "offline":
menu_offline(sc)
elif choice == "repeat":
sc.repeat()
elif choice == "shuffle":
sc.shuffle()
elif choice == "toggle":
sc.toggle()
elif choice == "next":
sc.next()
elif choice == "prev":
sc.prev()
elif choice == "search":
done = menu_search(sc)
elif choice == "search_pl":
done = menu_search(sc, include_playlists=True)
def menu_queue(sc):
done = False
while not done:
items = ["[ Clear queue ]"]
tracks = sc.qls()["tracks"]
current_track = sc.status()
highlight = []
if "current_track" in current_track:
highlight = [str(current_track["current_track"])]
items.extend(format_tracks(tracks, current_track))
choice = dmenu(items, highlight, "Play Queue > ")
if choice is None:
done = True
elif choice == 0:
sc.qclear()
done = True
else:
sc.goto(choice)
return False
def menu_playlists(sc):
pls = sc.ls()["playlists"]
items, indices = format_folder(pls)
done = False
while not done:
choice = dmenu(items)
if choice is None:
return False
else:
idx = indices[choice]
if idx is None:
continue
done = menu_playlist(sc, idx)
return True
def menu_playlist(sc, idx):
tracks = sc.ls(idx)["tracks"]
# TODO: handle unavailable tracks
items = ["[ Set as queue ]", "[ Add to queue ]"]
current_track = sc.status()
items.extend(format_tracks(tracks, current_track))
while True:
choice = dmenu(items)
if choice is None:
return False
elif choice == 0:
sc.play(idx)
return False
elif choice == 1:
sc.add(idx)
return False
else:
menu_track(sc, idx, choice - 1)
# sc.play(idx, choice-1)
# return True
def menu_track(sc, idx, track):
items = ["[ Add Album to Queue ]",
"[ Add Track to Queue ]",
"[ Replace Queue with Track ]",
"[ Replace Queue with Album ]"]
choice = dmenu(items)
if choice is None:
return
elif choice == 0:
for tno in sorrounding_album(sc, idx, track):
sc.add(idx, tno)
elif choice == 1:
sc.add(idx, track)
elif choice == 2:
album_tracks = sorrounding_album(sc, idx, track)
sc.play(idx, album_tracks[0])
for tno in album_tracks[1:]:
sc.add(idx, tno)
elif choice == 3:
sc.play(idx, track)
def sorrounding_album(sc, idx, track):
srndin_album = sc.ls(idx)["tracks"][track]["album"]
album_tracks = []
for t in sc.ls(idx)["tracks"]:
if t["album"] == srndin_album:
album_tracks.append(t["index"])
return album_tracks
def menu_offline(sc):
while True:
pls = sc.ls()["playlists"]
items, indices = format_folder(pls, offline_status=True)
choice = dmenu(items)
if choice is None:
return False
else:
idx = indices[choice]
if idx is None:
continue
sc.offline_toggle(idx)
def menu_search(sc, include_playlists=False):
query = dmenu_input("Query > ")
if len(query) == 0:
return False
res = sc.search(query)
containers_by_uri = {}
SearchItem = collections.namedtuple("SearchItem", ["uri", "title", "is_container"])
items = []
albums, singles = [], []
if "albums" in res and len(res["albums"]) > 0:
for album in res["albums"]:
if not album["available"]:
continue
ainfo = sc.uinfo(album["uri"])
containers_by_uri[album["uri"]] = ainfo
item = SearchItem(album["uri"], " " + format_search_album(ainfo), True)
if ainfo["album_type"] == "single":
singles.append(item)
else:
albums.append(item)
if len(albums) > 0:
albums.sort(key=lambda item: item.title)
items.append(SearchItem(None, "Albums:", None))
items.extend(albums)
if "tracks" in res and len(res["tracks"]) > 0:
items.append(SearchItem(None, "Tracks:", None))
for track in res["tracks"]:
if not track["available"]:
continue
items.append(SearchItem(track["uri"], " " + format_search_track(track), False))
if len(singles) > 0:
singles.sort(key=lambda item: item.title)
items.append(SearchItem(None, "Singles:", None))
items.extend(singles)
if include_playlists and "playlists" in res and len(res["playlists"]) > 0:
playlists = []
for playlist in res["playlists"]:
if "error" in playlist:
continue
pinfo = sc.uinfo(playlist["uri"])
containers_by_uri[playlist["uri"]] = pinfo
item = SearchItem(playlist["uri"], " " + format_search_playlist(pinfo), True)
playlists.append(item)
if len(playlists) > 0:
playlists.sort(key=lambda item: item.title)
items.append(SearchItem(None, "Playlists:", None))
items.extend(playlists)
while True:
idx = dmenu([it.title for it in items])
sel_item = None if idx is None else items[idx]
if sel_item is None or sel_item.uri is None:
return False
sel_items = ["[ Set as queue ]", "[ Add to queue ]"]
highlight = []
if sel_item.is_container:
sel_cont = containers_by_uri[sel_item.uri]
sel_cont_tracks = [track for track in sel_cont["tracks"]
if track["available"]]
current_track = sc.status()
if "uri" in current_track:
for idx, track in enumerate(sel_cont_tracks):
if track["uri"] == current_track["uri"]:
highlight = [str(idx + 2)]
break
sel_items.extend(format_tracks(sel_cont_tracks, current_track))
sel_choice = dmenu(sel_items, highlight)
if sel_choice == 0:
sc.uplay(sel_item.uri)
elif sel_choice == 1:
sc.uadd(sel_item.uri)
elif sel_choice is not None:
sc.uplay(sel_cont_tracks[sel_choice-2]["uri"])
# }}}
def check_failure_reason():
ps = pb.local['ps']
grep = pb.local['grep']
which = pb.local['which']
spopd = which('spopd')
chain = ps['aux'] | grep['spop']
running = False
for proc in chain().split('\n'):
if not proc:
continue
pname = proc.split()[10].split()[0]
if pname == 'spopd' or pname == spopd:
running = True
notify2.init('dspop')
summary = "Dspop - Couldn't connect to spopd"
spotify_img = '/home/ankur/.files/images/spotify_black.png'
if running:
notify2.Notification(summary, '\nSpopd seems to be running.' +
'\nCheck your config for the correct port.',
spotify_img).show()
else:
notify2.Notification(summary, "\nCan't find spopd process running.",
spotify_img).show()
if __name__ == "__main__":
try:
sc = SpopClient("localhost", 6602)
except OSError as err:
if err.errno == errno.ECONNREFUSED:
check_failure_reason()
sys.exit()
else:
raise
main_menu(sc)
# Local Variables:
# mode: python
# End:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment