Skip to content

Instantly share code, notes, and snippets.

@zmwangx
Created November 15, 2019 10:25
Show Gist options
  • Select an option

  • Save zmwangx/7fbaafbb3e9f2833324c1ac6291b9ef1 to your computer and use it in GitHub Desktop.

Select an option

Save zmwangx/7fbaafbb3e9f2833324c1ac6291b9ef1 to your computer and use it in GitHub Desktop.
googler installation script (working copy)
#!/usr/bin/env python3
import builtins
import collections
import hashlib
import os
import pathlib
import shutil
import subprocess
import sys
import tempfile
import textwrap
import time
import urllib.request
try:
import readline
except ImportError:
pass
GOOGLER_VERSION = "3.8"
# TODO: switch to jarun/googler and publish deterministive tarball with releases
ARCHIVE_URL = "https://github.com/zmwangx/googler/releases/download/v{version}/googler-{version}.tar.gz".format(
version=GOOGLER_VERSION
)
ARCHIVE_SHA256 = "d4c4c7552aade4013183d51ffbafe324b0f562a3af52ac13718457ab98956d77"
ARCHIVE_PREFIX = "googler-{version}".format(version=GOOGLER_VERSION)
FILE_MAPPING = collections.OrderedDict(
[
("googler", "bin/googler"),
("googler.1", "share/man/man1/googler.1"),
(
"auto-completion/bash/googler-completion.bash",
"etc/bash_completion.d/googler-completion.bash",
),
(
"auto-completion/fish/googler.fish",
"share/fish/vendor_completions.d/googler.fish",
),
("auto-completion/zsh/_googler", "share/zsh/site-functions/_googler"),
]
)
BOLD = "\x1b[1m"
BOLD_BLUE = "\x1b[1;34m"
BOLD_GREEN = "\x1b[1;32m"
BOLD_RED = "\x1b[1;31m"
BOLD_UNDERLINE = "\x1b[1;4m"
RESET = "\x1b[0m"
UNDERLINE = "\x1b[4m"
YELLOW = "\x1b[33m"
BATCH = False
def pstep(msg):
print(
"{BOLD_BLUE}{msg}{RESET}".format(msg=msg, BOLD_BLUE=BOLD_BLUE, RESET=RESET),
file=sys.stderr,
)
def psuccess(msg):
print(
"{BOLD_GREEN}{msg}{RESET}".format(msg=msg, BOLD_GREEN=BOLD_GREEN, RESET=RESET),
file=sys.stderr,
)
def pwarn(msg):
print(
"{YELLOW}[WARNING] {msg}{RESET}".format(msg=msg, YELLOW=YELLOW, RESET=RESET),
file=sys.stderr,
)
def perror(msg):
print(
"{BOLD_RED}[ERROR] {msg}{RESET}".format(
msg=msg, BOLD_RED=BOLD_RED, RESET=RESET
),
file=sys.stderr,
)
def errexit(msg):
perror(msg)
sys.exit(1)
# Prints and runs a system command. Prints error message and raises
# RuntimeError if exit status is nonzero.
def run_command(args, sudo=False):
args = [str(arg) for arg in args]
if sudo:
args.insert(0, "sudo")
print(
"{BOLD_BLUE}> {cmdline}{RESET}".format(
cmdline=" ".join(args), BOLD_BLUE=BOLD_BLUE, RESET=RESET
),
file=sys.stderr,
)
try:
subprocess.check_call(args)
except subprocess.CalledProcessError as err:
perror("The above command failed with code {code}.".format(code=err.returncode))
raise RuntimeError
# A yes/no/quit prompt where yes is default. Returns True for yes, False
# for no, raises KeyboardInterrupt for quit, repeats the prompt otherwise.
def ynq_prompt(prompt, warn=False):
if warn:
full_prompt = "{YELLOW}{prompt}{RESET}".format(
prompt=prompt, YELLOW=YELLOW, RESET=RESET
)
else:
full_prompt = prompt
full_prompt += " [{BOLD_UNDERLINE}y{RESET}{BOLD}es{RESET}/{UNDERLINE}n{RESET}o/{UNDERLINE}q{RESET}uit] ".format(
BOLD=BOLD, BOLD_UNDERLINE=BOLD_UNDERLINE, RESET=RESET, UNDERLINE=UNDERLINE
)
while True:
try:
answer = input(full_prompt).strip()
except EOFError:
raise KeyboardInterrupt
if not answer:
return True
short_answer = answer[0].lower()
if short_answer == "y":
return True
elif short_answer == "n":
return False
elif short_answer == "q":
raise KeyboardInterrupt
class ValidationError(Exception):
pass
# validator validates the string value from user input, and optionally
# transforms the value. The value should be returned as is if it passes
# validation no transformation is needed. If validation fails, raise
# ValidationError with the error message as the argument (the error
# message is printed).
def value_prompt(prompt, validator=None, default=None):
full_prompt = prompt
if default is not None:
full_prompt += " [{default}]".format(default=default)
full_prompt += ": "
while True:
try:
answer = input(full_prompt).strip()
except EOFError:
raise KeyboardInterrupt
if default is not None and not answer:
return default
if validator:
try:
return validator(answer)
except ValidationError as err:
print(
"{YELLOW}{error}{RESET}".format(
error=err, YELLOW=YELLOW, RESET=RESET
),
file=sys.stderr,
)
else:
return answer
def check_python_requirement():
ver = sys.version_info
if ver < (3, 4):
ver_str = "%d.%d.%d" % (ver.major, ver.minor, ver.micro)
errexit("Your Python %s is too old; Python 3.4 or later is required." % ver_str)
class DownloadProgressReporter:
REFRESH_INTERVAL = 0.1
def __init__(self):
self.total_size = 0
self.last_refresh = 0
def __enter__(self):
sys.stderr.write("\x1b[?25l")
return self
def __exit__(self, exc_type, exc_value, traceback):
sys.stderr.write("\n\x1b[?25h")
def update(self, blocks_transferred, block_size, total_size):
if total_size <= 0:
return
if time.time() - self.last_refresh < self.REFRESH_INTERVAL:
return
self.total_size = total_size
transferred_size = min(blocks_transferred * block_size, total_size)
total_size_width = len(str(total_size))
sys.stderr.write(
"[{transferred_size:{width}d}/{total_size}]\r".format(
transferred_size=transferred_size,
width=total_size_width,
total_size=total_size,
)
)
self.last_refresh = time.time()
def done(self):
if self.total_size > 0:
sys.stderr.write(
"[{total_size}/{total_size}] Done.\r".format(total_size=self.total_size)
)
def download_tarball_and_unpack(workdir):
download_dest = workdir / "googler.tar.gz"
pstep("Downloading {url} to {dest} ...".format(url=ARCHIVE_URL, dest=download_dest))
# TODO: error handling
try:
with DownloadProgressReporter() as reporter:
urllib.request.urlretrieve(
ARCHIVE_URL, filename=str(download_dest), reporthook=reporter.update
)
reporter.done()
except Exception:
pass
pstep("Verifying SHA-256 checksum...")
with download_dest.open("rb") as fp:
sha256 = hashlib.sha256(fp.read()).hexdigest()
if sha256 != ARCHIVE_SHA256:
errexit(
"{dest}: SHA-256 checksum mismatch (expected {expected}, got {actual})".format(
dest=download_dest, expected=ARCHIVE_SHA256, actual=sha256
)
)
pstep("Extracting...")
shutil.unpack_archive(str(download_dest), str(workdir))
for src in FILE_MAPPING:
path = workdir / ARCHIVE_PREFIX / src
if not path.is_file():
errexit("'{path}' not found.".format(path=path))
def directory_in_search_path(dir):
PATH = os.getenv("PATH")
if not PATH:
return False
paths = [pathlib.Path(p).resolve() for p in PATH.split(os.pathsep) if p]
return dir.resolve() in paths
class RinseAndRepeat(Exception):
pass
def prompt_and_install(workdir):
def absolute_dir_validator(s):
path = pathlib.Path(s)
if not path.is_absolute():
raise ValidationError("'{path}' is not an absolute path.".format(path=s))
if path.exists() and not path.is_dir():
raise ValidationError("'{path}' is not a directory.".format(path=s))
return path
while True:
try:
prefix = value_prompt(
"Installation prefix",
validator=absolute_dir_validator,
default=pathlib.Path("/usr/local"),
)
if not prefix.exists():
if not ynq_prompt(
"'{prefix}' does not exist; create directory?".format(
prefix=prefix
),
warn=True,
):
raise RinseAndRepeat
googler_dest = prefix / "bin/googler"
if googler_dest.exists():
if googler_dest.is_dir():
perror(
"'{googler}' is an existing directory.".format(
googler=googler_dest
)
)
raise RinseAndRepeat
else:
if not ynq_prompt(
"'{googler}' already exists; overwrite?".format(
googler=googler_dest
),
warn=True,
):
raise RinseAndRepeat
# Build the absolute src => dest mappings.
abs_file_mapping = collections.OrderedDict()
for src, dest in FILE_MAPPING.items():
abs_src = workdir / ARCHIVE_PREFIX / src
if prefix == pathlib.Path("/usr") and dest.startswith("etc/"):
# sysconfdir has a special case: sysconfdir
# corresponding to /usr is /etc, not /usr/etc.
abs_dest = pathlib.Path("/") / dest
else:
abs_dest = prefix / dest
if abs_dest.is_dir():
perror("'{dest} is an existing directory.'".format(dest=abs_dest))
raise RinseAndRepeat
abs_file_mapping[abs_src] = abs_dest
dests = abs_file_mapping.values()
def get_clobber_list():
return [p for p in dests if p.exists()]
def get_parent_dirs_to_create():
return [p.parent for p in dests if not p.parent.exists()]
def printable_file_list(paths):
return textwrap.indent("\n".join(str(p) for p in paths), " ")
if get_clobber_list():
clobber_msg = "Existing files to be removed:\n\n{clobber_list}\n\n".format(
clobber_list=printable_file_list(get_clobber_list())
)
else:
clobber_msg = ""
install_msg = "New files to be installed:\n\n{install_list}\n\n".format(
install_list=printable_file_list(dests)
)
if not ynq_prompt(clobber_msg + install_msg + "Continue?"):
raise RinseAndRepeat
def install(sudo=False):
clobber_list = get_clobber_list()
new_dirs = get_parent_dirs_to_create()
if new_dirs:
run_command(["mkdir", "-p"] + new_dirs, sudo=sudo)
if clobber_list:
run_command(["rm", "-f"] + clobber_list, sudo=sudo)
for src, dest in abs_file_mapping.items():
run_command(["cp", "-f", src, dest], sudo=sudo)
try:
install(sudo=False)
except RuntimeError:
if not ynq_prompt(
"Installation as current user failed. "
"Retry as root (you may be prompted for your password)?",
warn=True,
):
raise RinseAndRepeat
try:
install(sudo=True)
except RuntimeError:
errexit("Installation as root failed.")
psuccess("Installed googler.")
try:
run_command([googler_dest, "--version"])
except RuntimeError:
pwarn("There are problems with your googler installation.")
psuccess(
"In the future, you may upgrade your googler installation by downloading and running the installer script again, or executing:\n"
" googler --upgrade"
)
if not directory_in_search_path(googler_dest.parent):
pwarn(
"'{dir}' is not found in PATH. It is advised that you add it to PATH in the relevant shell runcoms.".format(
dir=googler_dest.parent
)
)
break
except RinseAndRepeat:
# TODO: break if automated
pass
def main():
# TODO: argparse: -y, --yes
try:
check_python_requirement()
with tempfile.TemporaryDirectory() as workdir:
workdir = pathlib.Path(workdir)
download_tarball_and_unpack(workdir)
prompt_and_install(workdir)
except KeyboardInterrupt:
errexit("Aborted.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment