Created
April 5, 2023 14:36
-
-
Save kevdoran/23d1bf4353f768f6dac4960e77d0f609 to your computer and use it in GitHub Desktop.
A Python 3 script for assisting in verifying Apache NiFi release candidates
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""A Python 3 script for assisting in verifying Apache release candidates | |
""" | |
import argparse | |
import logging | |
import os | |
import subprocess | |
import sys | |
logger = logging.getLogger(__name__) | |
NIFI_KEYS_DEV_URL = "https://dist.apache.org/repos/dist/dev/nifi/KEYS" | |
NIFI_KEYS_REL_URL = "https://dist.apache.org/repos/dist/release/nifi/KEYS" | |
class Configuration: | |
def __init__(self, bypass_cache=None, keys_url=None): | |
if bypass_cache: | |
self.bypass_cache = bypass_cache | |
if keys_url: | |
self.keys_url = keys_url | |
class HashUtil: | |
@staticmethod | |
def get_expected_hash_from_remote_file(hash_file_url): | |
try: | |
wget_out = subprocess.check_output(["wget", "-O-", "-q", hash_file_url]) | |
expected_hash = wget_out.decode('ascii').strip(' \t\n\r') | |
return expected_hash | |
except subprocess.CalledProcessError as err: | |
if err.returncode == 8: | |
raise FileNotFoundError("File not found at url='{}'".format(hash_file_url)) | |
else: | |
raise err | |
@staticmethod | |
def md5(source_archive): | |
md5_out = subprocess.check_output(["md5", "-q", source_archive]) | |
md5_hash = md5_out.decode('ascii').strip(' \t\n\r') | |
return md5_hash | |
@staticmethod | |
def sha1(source_archive): | |
return HashUtil._shasum(source_archive, "1") | |
@staticmethod | |
def sha256(source_archive): | |
return HashUtil._shasum(source_archive, "256") | |
@staticmethod | |
def sha512(source_archive): | |
return HashUtil._shasum(source_archive, "512") | |
@staticmethod | |
def _shasum(source_archive, algorithm): | |
algorithm_arg = "-a{}".format(algorithm) | |
shasum_out = subprocess.check_output(["shasum", algorithm_arg, source_archive]) | |
sha_hash = shasum_out.decode('ascii').split()[0] | |
return sha_hash | |
def verify_signature(source_tarball, source_tarball_url): | |
logging.info("Verifying signature for {}".format(source_tarball)) | |
# import latest signing keys | |
subprocess.run(["wget", NIFI_KEYS_DEV_URL]) | |
subprocess.run(["gpg", "--import", "KEYS"]) | |
os.remove('KEYS') | |
subprocess.run(["wget", NIFI_KEYS_REL_URL]) | |
subprocess.run(["gpg", "--import", "KEYS"]) | |
os.remove('KEYS') | |
# verify source signature | |
source_asc_url = source_tarball_url + ".asc" | |
source_asc = source_tarball + ".asc" | |
subprocess.run(["wget", source_asc_url]) | |
subprocess.run(["gpg", "--verify", "-v", source_asc]) | |
# cleanup | |
os.remove(source_asc) | |
def get_hashes(algorithm, source_archive, source_archive_url): | |
actual_hash = None | |
expected_hash_url = None | |
if algorithm == "md5": | |
actual_hash = HashUtil.md5(source_archive) | |
expected_hash_url = source_archive_url + ".md5" | |
elif algorithm == "sha1": | |
actual_hash = HashUtil.sha1(source_archive) | |
expected_hash_url = source_archive_url + ".sha1" | |
elif algorithm == "sha256": | |
actual_hash = HashUtil.sha256(source_archive) | |
expected_hash_url = source_archive_url + ".sha256" | |
elif algorithm == "sha512": | |
actual_hash = HashUtil.sha512(source_archive) | |
expected_hash_url = source_archive_url + ".sha512" | |
else: | |
raise Exception("Invalid hash algorithm '{}'".format(algorithm)) | |
expected_hash = None | |
try: | |
expected_hash = HashUtil.get_expected_hash_from_remote_file(expected_hash_url) | |
except FileNotFoundError: | |
logger.debug("Hash file not found at %s", expected_hash_url) | |
return actual_hash, expected_hash | |
def verify_hashes(source_tarball, source_tarball_url): | |
logging.info("Verifying hashes for {}".format(source_tarball)) | |
for hash_algorithm in ["md5", "sha1", "sha256", "sha512"]: | |
actual_md5, expected_md5 = get_hashes(hash_algorithm, source_tarball, source_tarball_url) | |
print("{} hash: {}".format(hash_algorithm, actual_md5)) | |
if expected_md5: | |
print("Expected: {}".format(expected_md5)) | |
assert expected_md5 == actual_md5 | |
print("Equality Check: PASSED") | |
else: | |
print("Remote hash file does not exist for {}. Cannot perform automated check.".format(hash_algorithm)) | |
print("Equality Check: UNKNOWN") | |
def download_source_tarball(source_tarball_url, bypass_cache=False): | |
logging.info("Downloading {}".format(source_tarball_url)) | |
source_tarball = source_tarball_url.split("/")[-1] | |
if os.path.isfile(source_tarball) and not bypass_cache: | |
logging.info("Found local source tarball. Using local copy without re-downloading") | |
else: | |
subprocess.run(["wget", source_tarball_url]) | |
return source_tarball | |
def verify_rc(source_tarball_url): | |
source_tarball = download_source_tarball(source_tarball_url) | |
verify_hashes(source_tarball, source_tarball_url) | |
verify_signature(source_tarball, source_tarball_url) | |
def init_logging(verbose=False): | |
if verbose: | |
log_level = logging.DEBUG | |
logging.basicConfig(format="%(asctime)s %(levelname)s:\t%(message)s", level=log_level) | |
else: | |
log_level = logging.INFO | |
logging.basicConfig(format="%(message)s", level=log_level) | |
logger.debug("Logging initialized.") | |
def process_args(raw_args=[]): | |
parser = argparse.ArgumentParser( | |
description = "Verifies Apache release candidates.", | |
epilog = "As an alternative to the commandline, params can be placed in a file, one per line, and specified on the commandline like '%(prog)s @params.conf'.", | |
fromfile_prefix_chars = '@' ) | |
# required, positional arguments | |
parser.add_argument( | |
"source_archive_url", | |
help = "The URL to the source tarball/zip file", | |
metavar = "SOURCE_ARCHIVE_URL") | |
# optional, flag arguments | |
parser.add_argument( | |
"-v", | |
"--verbose", | |
help="increase output verbosity", | |
action="store_true") | |
parser.add_argument( | |
"-n", | |
"--no-cache", | |
help="don't use cached archive", | |
action="store_true") | |
args = parser.parse_args() | |
return args | |
def main(raw_args=[]): | |
args = process_args(raw_args) | |
init_logging(args.verbose) | |
verify_rc(args.source_archive_url) | |
config = Configuration() | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment