Last active
March 17, 2020 17:03
-
-
Save daniel-j/e2f49507a77cc96b9719c4111860ced4 to your computer and use it in GitHub Desktop.
Linux updater for My Little Karaoke song and theme packages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
LD_LIBRARY_PATH="" ./mlk-updater.py --check | |
./launch.sh $@ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import tarfile | |
import json | |
import shutil | |
import argparse | |
import gi | |
gi.require_version('Gtk', '3.0') | |
gi.require_version('Soup', '2.4') | |
from gi.repository import Gtk, Soup, GLib, GObject | |
parser = argparse.ArgumentParser(description='My Little Karaoke Updater') | |
parser.add_argument('--data-dir', help='Path to UltraStar Deluxe data directory') | |
parser.add_argument('--update', action='store_true', help='Don\'t ask about updating') | |
parser.add_argument('--check', action='store_true', help='Ask to check for updates') | |
args = parser.parse_args() | |
DATA_DIR = args.data_dir if args.data_dir else './data' | |
TEMP_DIR = os.path.join(DATA_DIR, 'temp') | |
def sizeof_fmt(num, suffix='B'): | |
# https://stackoverflow.com/a/1094933 | |
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: | |
if abs(num) < 1024.0: | |
return "%3.1f %s%s" % (num, unit, suffix) | |
num /= 1024.0 | |
return "%.1f %s%s" % (num, 'Yi', suffix) | |
class ProgressBarWindow(Gtk.Window): | |
def __init__(self): | |
Gtk.Window.__init__( | |
self, | |
title="My Little Karaoke Updater", | |
default_width=500 | |
) | |
self.set_border_width(50) | |
self.set_position(Gtk.WindowPosition.CENTER) | |
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=0) | |
self.add(vbox) | |
self.progressbar_package = Gtk.ProgressBar() | |
self.progressbar_package.set_show_text(True) | |
self.progressbar_total = Gtk.ProgressBar() | |
# self.progressbar_total.set_text('') | |
self.progressbar_total.set_show_text(True) | |
vbox.pack_start(self.progressbar_package, False, False, 5) | |
vbox.pack_start(self.progressbar_total, False, False, 5) | |
self.show_all() | |
self.progressbar_total.hide() | |
# button = Gtk.CheckButton("Button") | |
# vbox.pack_start(button, True, True, 0) | |
# button = Gtk.CheckButton("Activity mode") | |
# button.connect("toggled", self.on_activity_mode_toggled) | |
# vbox.pack_start(button, True, True, 0) | |
# button = Gtk.CheckButton("Right to Left") | |
# button.connect("toggled", self.on_right_to_left_toggled) | |
# vbox.pack_start(button, True, True, 0) | |
self.queue = [] | |
self.downloaded_part = 0 | |
self.part_size = 0 | |
self.progress_total = 0 | |
self.downloaded_total = 0 | |
self.soup = Soup.SessionAsync() | |
logger = Soup.Logger.new(Soup.LoggerLogLevel.HEADERS, -1) | |
self.soup.add_feature(logger) | |
try: | |
with open(os.path.join(DATA_DIR, 'mlkcache.json')) as cache_file: | |
self.cache = json.load(cache_file) | |
except: | |
self.cache = {} | |
self.progressbar_package.set_fraction(0) | |
self.fetch( | |
'https://www.mylittlekaraoke.com/store/webinst/linux.webinst', | |
self.on_manifest | |
) | |
# def got_chunk(msg, chunk): | |
# print('data', msg.props.response_body_data.get_size()) | |
# print(msg.props.response_headers.get_content_length()) | |
# msg.connect('got-chunk', got_chunk) | |
# msg.connect('finished', callback) | |
def fetch(self, uri, callback=None, method='GET', | |
accumulate=True, headers={}, set_range=None): | |
preheaders = { | |
'User-Agent': 'mlkupdater/Linux' | |
} | |
preheaders.update(headers) | |
message = Soup.Message.new(method, uri) | |
for key, value in preheaders.items(): | |
message.props.request_headers.append(key, value) | |
if set_range: | |
message.props.request_headers.set_range(set_range[0], set_range[1]) | |
if not accumulate: | |
message.response_body.set_accumulate(False) | |
def on_finished(session, message): | |
callback(message.props.response_body_data.get_data()) | |
print(method, uri) | |
self.soup.queue_message(message, on_finished if callback else None) | |
return message | |
def save_cache(self): | |
os.makedirs(os.path.join(DATA_DIR), exist_ok=True) | |
with open(os.path.join(DATA_DIR, 'mlkcache.json'), 'w') as cache_file: | |
json.dump(self.cache, cache_file) | |
def on_manifest(self, data): | |
data = data.decode('utf-8').strip().split('\n') | |
i = iter(data) | |
self.manifest = list(zip(i, i)) | |
# print(self.manifest) | |
self.package_iter = iter(self.manifest) | |
self.package_counter = 0 | |
self.progressbar_package.set_text('Fetching package info - 0/' + str(len(self.manifest))) | |
self.check_next_package() | |
def check_next_package(self): | |
try: | |
(url, size) = next(self.package_iter) | |
except StopIteration: | |
print('Check complete', flush=True) | |
self.save_cache() | |
if len(self.queue) != 0 and not args.update: | |
print(self.queue, flush=True) | |
dialog = Gtk.MessageDialog(parent=self, flags=0, message_type=Gtk.MessageType.QUESTION, buttons=Gtk.ButtonsType.YES_NO, text="Updates available!") | |
dialog.format_secondary_text("My Little Karaoke packages are available to download. Continue?") | |
response = dialog.run() | |
dialog.destroy() | |
if response != Gtk.ResponseType.YES: | |
self.close() | |
return | |
self.queue_iter = iter(self.queue) | |
GLib.timeout_add(200, self.on_timeout) | |
self.download_next_package() | |
return False | |
size = int(size) | |
filename = os.path.basename(url) | |
self.progressbar_package.set_text('Fetching package info - ' + str(self.package_counter + 1) + '/' + str(len(self.manifest))) | |
def on_get_headers(data): | |
headers = msgheaders.props.response_headers | |
content_length = headers.get_content_length() | |
if content_length != size: | |
print('Size mismatch!', url, content_length, size, flush=True) | |
GLib.timeout_add_seconds(2, self.check_next_package) | |
return | |
etag = headers.get('etag') | |
haslatest = False | |
cacheInfo = self.cache.get(filename, {}) | |
if (etag is None or cacheInfo.get('etag') == etag) and cacheInfo.get('size') == content_length and cacheInfo.get('complete'): | |
haslatest = True | |
if not haslatest: | |
self.cache[filename] = { | |
'complete': False, | |
'etag': etag, | |
'size': content_length | |
} | |
filepath = os.path.join(TEMP_DIR, filename) | |
downloaded = 0 | |
if os.path.isfile(filepath): | |
downloaded = os.path.getsize(filepath) | |
self.queue.append([url, filename, filepath, content_length, downloaded]) | |
self.downloaded_total += downloaded | |
else: | |
self.downloaded_total += content_length | |
self.progress_total += content_length | |
# print(filename, etag, sizeof_fmt(content_length), haslatest, flush=True) | |
self.package_counter += 1 | |
self.progressbar_package.set_fraction(self.package_counter / (len(self.manifest) - 1)) | |
GLib.timeout_add_seconds(0.1, self.check_next_package) | |
msgheaders = self.fetch(url, on_get_headers, method='HEAD') | |
return False | |
def download_next_package(self): | |
print('Download next') | |
try: | |
(url, filename, filepath, size, downloaded) = next(self.queue_iter) | |
except StopIteration: | |
print('Everything is up to date!', flush=True) | |
self.close() | |
try: | |
os.rmdir(TEMP_DIR) | |
except OSError: | |
pass | |
return False | |
self.part_filename = filename | |
self.downloaded_part = downloaded | |
self.part_size = size | |
free_space = shutil.disk_usage(DATA_DIR).free | |
required_space = self.progress_total - self.downloaded_total + 1024 * 200 | |
if free_space <= required_space: | |
print("Not enough free space!", flush=True) | |
dialog = Gtk.MessageDialog(self, 0, Gtk.MessageType.ERROR, Gtk.ButtonsType.YES_NO, "Not enough free space!") | |
dialog.format_secondary_text("You have " + sizeof_fmt(free_space) + " of free space left on your device. The remaining updates require " + sizeof_fmt(required_space) + " free space to complete.\nDo you want to continue downloading anyway?") | |
response = dialog.run() | |
dialog.destroy() | |
if response != Gtk.ResponseType.YES: | |
self.close() | |
return False | |
os.makedirs(os.path.join(TEMP_DIR), exist_ok=True) | |
if downloaded > 0: | |
f = open(filepath, 'a+b') | |
print('Resuming download of ' + url, flush=True) | |
else: | |
f = open(filepath, 'w+b') | |
print('Downloading ' + url, flush=True) | |
self.progressbar_package.set_fraction(0) | |
self.progressbar_total.show() | |
# def on_headers(message): | |
# headers = message.props.response_headers | |
# i = Soup.MessageHeadersIter.init(headers) | |
# while True: | |
# h = i.next() | |
# if not h or h.name is None: | |
# break | |
# print(h) | |
def on_chunk(message, chunk): | |
nonlocal downloaded | |
f.write(chunk.get_data()) | |
downloaded += chunk.length | |
self.downloaded_part = downloaded | |
self.downloaded_total += chunk.length | |
def on_complete_refresh(message): | |
self.on_timeout() | |
GLib.timeout_add(200, on_complete, message) | |
def on_complete(message): | |
f.close() | |
filetype = os.path.splitext(filename)[1] | |
dest = None | |
istar = False | |
if filetype == '.mlk': | |
dest = os.path.join(DATA_DIR, 'songs') | |
istar = True | |
elif filetype == '.mlt': | |
dest = os.path.join(DATA_DIR, 'themes') | |
istar = True | |
elif filetype == '.mlu': | |
dest = DATA_DIR # already includes avatars folder inside | |
istar = True | |
if istar and dest and tarfile.is_tarfile(filepath): | |
print('Extracting ' + filepath + ' to ' + dest, flush=True) | |
with tarfile.open(filepath, 'r:') as tar: | |
for file in tar: | |
name = os.path.join(dest, file.name) | |
try: | |
tar.extract(file, path=dest) | |
except: | |
os.remove(name) | |
tar.extract(file, path=dest) | |
finally: | |
os.chmod(name, file.mode) | |
else: | |
print('Unknown filetype ' + filename, flush=True) | |
self.download_next_package() | |
return False | |
os.remove(filepath) | |
self.cache[filename]['complete'] = True | |
self.save_cache() | |
self.download_next_package() | |
msg = self.fetch(url, accumulate=False, set_range=[downloaded, size]) | |
# msg.connect('got-headers', on_headers) | |
msg.connect('got-chunk', on_chunk) | |
msg.connect('finished', on_complete_refresh) | |
return False | |
def on_timeout(self): | |
""" | |
Update value on the progress bar | |
""" | |
self.progressbar_package.set_fraction(self.downloaded_part / self.part_size) | |
self.progressbar_package.set_text('Downloading ' + self.part_filename + ' - ' + '{0:.2f}'.format((self.downloaded_part / self.part_size) * 100) + ' % ' + sizeof_fmt(self.part_size)) | |
self.progressbar_total.set_fraction(self.downloaded_total / self.progress_total) | |
self.progressbar_total.set_text('Total progress - ' + '{0:.2f}'.format((self.downloaded_total / self.progress_total) * 100) + ' % ' + sizeof_fmt(self.progress_total)) | |
# As this is a timeout function, return True so that it | |
# continues to get called | |
return True | |
def start_update(): | |
win = ProgressBarWindow() | |
win.connect("delete-event", Gtk.main_quit) | |
Gtk.main() | |
if args.check: | |
dialog = Gtk.MessageDialog(None, 0, Gtk.MessageType.QUESTION, Gtk.ButtonsType.YES_NO, "Check for updates") | |
dialog.format_secondary_text("Do you want to check for My Little Karaoke updates?") | |
response = dialog.run() | |
dialog.destroy() | |
if response == Gtk.ResponseType.YES: | |
start_update() | |
else: | |
start_update() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment