Last active
September 25, 2022 10:55
-
-
Save nroi/492ccf2d55400746cb8084984e04002f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import datetime | |
import requests | |
import tarfile | |
import tempfile | |
import gzip | |
import os | |
import hashlib | |
# Check checksums of locally stored ArchLinux package files: | |
# If a locally stored ArchLinux package has a checksum that differs from the checksum | |
# this file should have (according to the database file downloaded from an official mirror), | |
# a warning is emitted. Additionally, mismatching files can be deleted | |
# with the --delete flag. | |
DB_SOURCE_URI = 'https://mirror.osbeck.com/archlinux' | |
def fetch_package_metadata(url): | |
response = requests.get(url) | |
if response.status_code != 200: | |
print('Skip URL {} due to status code {}'.format(url, response.status_code)) | |
return [] | |
else: | |
tmpfile = tempfile.TemporaryFile() | |
tmpfile.write(response.content) | |
tmpfile.seek(0) | |
gzipfile = gzip.GzipFile(fileobj=tmpfile) | |
tar = tarfile.open(fileobj=gzipfile) | |
members = [member for member in tar.getmembers() if member.name.endswith('/desc')] | |
return [package_desc_from_member(tar, member) for member in members] | |
def package_desc_from_member(tar, member): | |
extracted_file = tar.extractfile(member) | |
content = extracted_file.read().decode('utf-8') | |
return parse_package_desc(content) | |
def parse_package_desc(package_desc): | |
sha256sum = None | |
filename = None | |
csize = None | |
lines = package_desc.split('\n') | |
for idx, line in enumerate(lines): | |
if line.startswith('%SHA256SUM%'): | |
sha256sum = lines[idx + 1] | |
elif line.startswith('%FILENAME%'): | |
filename = lines[idx + 1] | |
elif line.startswith('%CSIZE%'): | |
csize = lines[idx + 1] | |
return { | |
'sha256sum': sha256sum, | |
'filename': filename, | |
'csize': int(csize), | |
} | |
def verify_checksums(directory, packages): | |
result = empty_result() | |
for package in packages: | |
path = os.path.join(directory, package['filename']) | |
try: | |
with open(path, 'rb') as f: | |
local_sha256sum = get_sha256sum(f) | |
stat = os.stat(path) | |
local_filesize = stat.st_size | |
local_mtime = datetime.datetime.fromtimestamp(stat.st_mtime) | |
cfs_filesize = get_cfs_filesize(path) | |
if local_sha256sum != package['sha256sum']: | |
result['mismatches'].append({ | |
'filename': package['filename'], | |
'path': path, | |
'local_sha256sum': local_sha256sum, | |
'remote_sha256sum': package['sha256sum'], | |
'local_filesize': local_filesize, | |
'remote_filesize': package['csize'], | |
'local_mtime': local_mtime, | |
'cfs_filesize': cfs_filesize, | |
}) | |
else: | |
result['num_matches'] += 1 | |
except FileNotFoundError: | |
# File does not exist, because it hasn't been downloaded. | |
# Safe to ignore this exception. | |
pass | |
return result | |
def get_cfs_filesize(path): | |
directory, basename = os.path.split(path) | |
cfs_basename = '.{}.cfs'.format(basename) | |
cfs_path = os.path.join(directory, cfs_basename) | |
try: | |
with open(cfs_path, 'r') as f: | |
content_length = int(f.read().rstrip()) | |
except FileNotFoundError: | |
return -1 | |
return content_length | |
def aggregate_results(results): | |
aggregated_result = empty_result() | |
for result in results: | |
aggregated_result['num_matches'] += result['num_matches'] | |
aggregated_result['mismatches'] += result['mismatches'] | |
return aggregated_result | |
def empty_result(): | |
return { | |
'num_matches': 0, | |
'mismatches': [], | |
} | |
def get_sha256sum(fileobject): | |
sha256_hash = hashlib.sha256() | |
for chunk in iter(lambda: fileobject.read(4096), b''): | |
sha256_hash.update(chunk) | |
return sha256_hash.hexdigest() | |
def main(): | |
parser = argparse.ArgumentParser(description='Check pacman files for sha256sum mismatches.') | |
parser.add_argument( | |
'--delete', | |
action='store_true', | |
help='Delete all files that do not match', | |
) | |
parser.add_argument( | |
'--flexo-pkg-dir', | |
action='store', | |
help='The directory where Flexo stores its packages', | |
default='/var/cache/flexo/pkg', | |
type=str, | |
) | |
args = parser.parse_args() | |
results = [] | |
repos_to_check = os.listdir(args.flexo_pkg_dir) | |
for repo in repos_to_check: | |
url = '{}/{}/os/x86_64/{}.db'.format(DB_SOURCE_URI, repo, repo) | |
local_path = '{}/{}/os/x86_64/'.format(args.flexo_pkg_dir, repo) | |
package_metadata = fetch_package_metadata(url) | |
results.append(verify_checksums(local_path, package_metadata)) | |
result = aggregate_results(results) | |
if len(result['mismatches']) == 0: | |
print('No mismatches were detected. Matching files: {}'.format(result['num_matches'])) | |
else: | |
print('Mismatches were detected for the following files:', end='\n\n') | |
for mismatch in result['mismatches']: | |
print('Filename:\t\t{}'.format(mismatch['filename'])) | |
print('File path:\t\t{}'.format(mismatch['path'])) | |
print('Expected sha256sum:\t{}'.format(mismatch['remote_sha256sum'])) | |
print('Actual sha256sum:\t{}'.format(mismatch['local_sha256sum'])) | |
print('Expected filesize:\t{}'.format(mismatch['remote_filesize'])) | |
print('Actual filesize:\t{}'.format(mismatch['local_filesize'])) | |
print('CFS filesize:\t\t{}'.format(mismatch['cfs_filesize'])) | |
print('Modification time:\t{}'.format(mismatch['local_mtime'])) | |
if args.delete: | |
for mismatch in result['mismatches']: | |
print('Deleting file ' + mismatch['path']) | |
os.remove(mismatch['path']) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment