Created
September 4, 2022 08:41
-
-
Save moyix/1bf820837930ec56d214952b0cce2d32 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import os | |
import re | |
import json | |
import zipfile | |
from collections import defaultdict, namedtuple | |
from collections.abc import Mapping | |
from email.parser import HeaderParser | |
from email.policy import compat32 | |
from base64 import urlsafe_b64decode | |
import csv | |
import configparser | |
# For entry points | |
class CaseSensitiveConfigParser(configparser.ConfigParser): | |
optionxform = staticmethod(str) | |
# From PEP 566: | |
# It may be necessary to store metadata in a data structure which does not | |
# allow for multiple repeated keys, such as JSON. | |
# | |
# The canonical method to transform metadata fields into such a data structure | |
# is as follows: | |
# | |
# 1. The original key-value format should be read with email.parser.HeaderParser; | |
# 2. All transformed keys should be reduced to lower case. Hyphens should be | |
# replaced with underscores, but otherwise should retain all other characters; | |
# 3. The transformed value for any field marked with “(Multiple-use”) should be a | |
# single list containing all the original values for the given key; | |
# 4. The Keywords field should be converted to a list by splitting the original | |
# value on whitespace characters; | |
# 5. The message body, if present, should be set to the value of the description key. | |
# 6. The result should be stored as a string-keyed dictionary. | |
multiple_use = { | |
"dynamic", | |
"platform", | |
"classifier", | |
"requires-dist", | |
"requires-external", | |
"project-url", | |
"provides-extra", | |
"provides-pist", | |
"obsoletes-dist", | |
} | |
def msg_to_json(msg): | |
d = defaultdict(list) | |
for k, v in msg.items(): | |
if k.lower() in multiple_use: | |
d[k.lower().replace('-', '_')].append(v) | |
elif k.lower() == 'keywords': | |
d['keywords'] = v.split() | |
else: | |
d[k.lower().replace('-', '_')] = v | |
if msg.is_multipart(): | |
desc = msg.get_payload(0).get_payload() | |
else: | |
desc = msg.get_payload() | |
if desc: d['description'] = desc | |
return dict(d) | |
def record_to_json(record): | |
fields = ['path', 'hash', 'size'] | |
rows = [] | |
for row in csv.DictReader(record.get_payload().splitlines(), fieldnames=fields): | |
if row['size'] and row['size'].strip(): | |
row['size'] = int(row['size']) | |
else: | |
row['size'] = None | |
if row['hash']: | |
hash_desc = row['hash'].split('=',1) | |
if len(hash_desc) == 2: | |
kind, value = hash_desc | |
else: | |
kind, value = 'sha256', hash_desc[0] | |
row['hash_algorithm'] = kind | |
row['hash_value'] = urlsafe_b64decode(value+'==').hex() | |
else: | |
row['hash_algorithm'] = None | |
row['hash_value'] = None | |
del row['hash'] | |
rows.append(row) | |
return rows | |
# Decoders | |
def msg_decode(b): | |
return HeaderParser(policy=compat32).parsestr(utf8_decode(b)) | |
def utf8_decode(b): | |
for codec in ('utf-8', 'windows-1252', 'latin1'): | |
try: | |
text = b.decode(codec) | |
break | |
except UnicodeDecodeError: | |
pass | |
else: | |
raise UnicodeDecodeError("Could not decode as utf-8 or windows-1252") | |
return text | |
def ident(s): | |
return s | |
# Convert a mapping to a dict (recursively) | |
def mapping2dict(mapping): | |
return { | |
k: mapping2dict(v) if isinstance(v, Mapping) else v | |
for k, v in mapping.items() if v | |
} | |
def parse_entry_points(s): | |
cp = CaseSensitiveConfigParser() | |
cp.read_string(s) | |
return mapping2dict(cp) | |
def parse_true(s): | |
return True | |
def parse_false(s): | |
return False | |
excluded_kinds = {'txt', 'rst', 'md'} | |
def parse_kind(s, **kwargs): | |
if 'kind' not in kwargs or not kwargs['kind'] or kwargs['kind'] in excluded_kinds: | |
kind = 'default' | |
else: | |
kind = kwargs['kind'] | |
return {kind: s} | |
def parse_lines(s): | |
return s.splitlines() | |
# Basic actions: | |
def action_insert(d, k, v): | |
d[k] = v | |
def action_append(d, k, v): | |
if k not in d: | |
d[k] = [] | |
d[k].append(v) | |
def action_update(d, k, v): | |
if k not in d: | |
d[k] = {} | |
d[k].update(v) | |
# Extended name matchers | |
def filter_kind_args(args): | |
if 'kind' in args: | |
kind = args['kind'] | |
if not kind or kind in excluded_kinds: | |
kind = 'default' | |
else: | |
for ex in excluded_kinds: | |
if kind.endswith("."+ex): | |
kind = kind[:-(len(ex)+1)] | |
break | |
args['kind'] = kind | |
return args | |
def regex_name(regex, debug_name): | |
regex_comp = re.compile(regex) | |
def matcher(s): | |
m = regex_comp.search(s) | |
if m: | |
return filter_kind_args(m.groupdict()) | |
else: | |
return None | |
matcher.__debug_name__ = debug_name | |
return matcher | |
def name_with_kind(name, kind): | |
def matcher(s): | |
if s == name: | |
return {'kind': kind} | |
else: | |
return None | |
matcher.__debug_name__ = name | |
return matcher | |
def parse_json(s): | |
if s and s.strip(): | |
return json.loads(s) | |
else: | |
return {} | |
# Describe how to parse a wheel file | |
# If name matches a zip file entry, then we will do: | |
# Simple string match: | |
# value = parser(decoder(zf.read(name)) | |
# Extended name match: | |
# value = parser(decoder(zf.read(name)), **name_matcher(name)) | |
# action(dict, key, value) | |
WheelMeta = namedtuple('WheelMeta', [ | |
# Filename for matching. Can be either a simple string, or a function | |
# that returns args to pass to the parser for a match, or None for no match. | |
'name', | |
'key', # Key to put in the info dict. Can be None to ignore. | |
'decoder', # Function to decode the value | |
'parser', # Function to parse the value | |
'action', # How to handle the value | |
]) | |
handlers = [ | |
WheelMeta('METADATA', 'metadata', msg_decode, msg_to_json, action_insert), | |
WheelMeta('WHEEL', 'wheel', msg_decode, msg_to_json, action_insert), | |
WheelMeta('RECORD', 'record', msg_decode, record_to_json, action_insert), | |
WheelMeta('RECORD.jws', 'record_signature', utf8_decode, parse_json, action_insert), | |
WheelMeta('DESCRIPTION.rst', 'description', utf8_decode, ident, action_insert), | |
WheelMeta('metadata.json', 'metadata_json', utf8_decode, parse_json, action_insert), | |
WheelMeta('pbr.json', 'pbr', utf8_decode, parse_json, action_insert), | |
WheelMeta('top_level.txt', 'top_level', utf8_decode, ident, action_insert), | |
WheelMeta('top_level.txt.orig', 'top_level', utf8_decode, ident, action_insert), | |
WheelMeta('entry_points.txt', 'entry_points', utf8_decode, parse_entry_points, action_insert), | |
WheelMeta('zip-safe', 'zip_safe', ident, parse_true, action_insert), | |
WheelMeta('not-zip-safe', 'zip_safe', ident, parse_false, action_insert), | |
WheelMeta('namespace_packages.txt', 'namespace_packages', utf8_decode, ident, action_insert), | |
WheelMeta('direct_url.json', 'direct_url', utf8_decode, parse_json, action_insert), | |
WheelMeta('INSTALLER', 'installer', utf8_decode, ident, action_insert), | |
WheelMeta('REQUESTED', 'requested', utf8_decode, ident, action_insert), | |
WheelMeta('dependency_links.txt', 'dependency_links', utf8_decode, parse_lines, action_insert), | |
WheelMeta('eager_resources.txt', 'eager_resources', utf8_decode, parse_lines, action_insert), | |
WheelMeta('SOURCES.txt', 'sources', utf8_decode, parse_lines, action_insert), | |
WheelMeta('SOURCES.txt.orig', 'sources', utf8_decode, parse_lines, action_insert), | |
WheelMeta('top_list.txt', 'top_list', utf8_decode, ident, action_insert), | |
WheelMeta('CHANGES.rst', 'changes', utf8_decode, ident, action_insert), | |
WheelMeta('RELEASE-NOTES.rst', 'release_notes', utf8_decode, ident, action_insert), | |
WheelMeta(name_with_kind('Apache-2.0.txt', 'Apache-2.0'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('BSD-3-Clause.txt', 'BSD-3-Clause'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('CC-BY-4.0.txt', 'CC-BY-4.0'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('MIT.txt', 'MIT'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('CC-PDDC.txt', 'CC-PDDC'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('dep5', 'dep5'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('LI_en.txt', 'default'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('legal.txt', 'default'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('open_source_license.txt', 'open_source'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('EULA.txt', 'EULA'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('REDIST.txt', 'REDIST'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('THIRDPARTY.txt', 'THIRDPARTY'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('HYPER_API_OSS_disclosure.txt', 'HYPER_API_OSS_disclosure'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(name_with_kind('AUTHORS.google-crc32c', 'authors'), 'authors', utf8_decode, parse_kind, action_update), | |
WheelMeta(regex_name(r'^\..*$', 'dotfiles'), None, None, None, None), # Ignore dotfiles | |
WheelMeta(regex_name(r'(~|\.bak)$', 'backups'), None, None, None, None), # Ignore backup files | |
WheelMeta(regex_name(r'conflicted copy', 'dropbox'), None, None, None, None), # Ignore Dropbox conflict files | |
WheelMeta(regex_name(r'/$', 'dirs'), None, None, None, None), # Ignore directories | |
WheelMeta(regex_name(r'WHEELe', 'spelling'), None, None, None, None), # Ignore misspelled WHEEL | |
WheelMeta('LICENSE.pdf', None, None, None, None), # A PDF File?? Really??? | |
WheelMeta(regex_name(r'(?i)((?P<kind>[\w.-]+)[_-])?third-party-programs\.txt$', 'third_party_programs'), 'third_party_programs', utf8_decode, parse_kind, action_update), | |
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)?\.ABOUT$', 'about'), 'about', utf8_decode, parse_kind, action_update), | |
WheelMeta(regex_name(r'(?i)NOTICES?(-WHEEL)?(\.txt|\.rst|\.md|)$', 'notice'), 'notice', utf8_decode, parse_kind, action_insert), | |
WheelMeta(regex_name(r'(?i)(README|misc)(\.txt|\.rst|\.md|)$', 'notice'), 'readme', utf8_decode, ident, action_insert), | |
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)?AUTHORS(\.txt|\.rst|\.md|\.py|)$', 'authors'), 'authors', utf8_decode, parse_kind, action_update), | |
WheelMeta(regex_name(r'(?i)(?P<kind>[\w.-]+)[-._](LICEN[CS]ES?)?$', 'license_before'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(regex_name(r'(?i)(LICEN[SC]ES?)[-._]?(?P<kind>[\w.-]+)?(\.txt|\.rst|)?$', 'license_after'), 'license', utf8_decode, parse_kind, action_update), | |
WheelMeta(regex_name(r'(?i)COPYING[-._]?(?P<kind>[\w.-]+)?(\.txt|\.rst|\.md)?$', 'copying'), 'copying', utf8_decode, parse_kind, action_update), | |
] | |
simple_handlers = {h.name: h for h in handlers} | |
extended_handlers = [h for h in handlers if not isinstance(h.name, str)] | |
distinfo_re = re.compile(r'\.dist-info/(?P<name>.+)$') | |
def wheel_info(file): | |
info = {} | |
with zipfile.ZipFile(file) as zf: | |
for name in zf.namelist(): | |
if m := distinfo_re.search(name): | |
data = zf.read(name) | |
d_name = m.group('name') | |
if d_name in simple_handlers: | |
h = simple_handlers[d_name] | |
if h.key is None: continue | |
h.action(info, h.key, h.parser(h.decoder(data))) | |
else: | |
for h in extended_handlers: | |
if (args := h.name(d_name)) is not None: | |
if h.key is None: break | |
value = h.parser(h.decoder(data), **args) | |
h.action(info, h.key, value) | |
break | |
else: | |
print(f'{file}: Unknown file in .dist-info: {name}', file=sys.stderr) | |
else: | |
pass | |
for h in handlers: | |
if h.key and h.key not in info: | |
info[h.key] = None | |
return info | |
def debug_name_handler(name): | |
if m := distinfo_re.search(name): | |
d_name = m.group('name') | |
if d_name in simple_handlers: | |
h = simple_handlers[d_name] | |
if h.key is None: | |
print(f'{d_name} -> simple: ignored') | |
else: | |
print(f"{d_name} -> simple: {h.key} {h.parser.__name__} {h.decoder.__name__} {h.action.__name__}") | |
else: | |
for h in extended_handlers: | |
if (args := h.name(m.group('name'))) is not None: | |
if h.key is None: | |
print(f'{d_name} -> extended: ignored by {h.name.__debug_name__})') | |
else: | |
argstr = ', '.join(f'{k}={v}' for k, v in args.items()) | |
print(f"{d_name} -> extended: {h.name.__debug_name__}({argstr}) {h.key} {h.parser.__name__} {h.decoder.__name__} {h.action.__name__}") | |
break | |
else: | |
print(f'{d_name} -> Unknown') | |
def main(): | |
import sys | |
for file in sys.argv[1:]: | |
print(f"Working on {file}", file=sys.stderr) | |
info = wheel_info(file) | |
out_name = os.path.basename(file) + '.json' | |
out_name = os.path.join('wheel_meta', out_name) | |
with open(out_name, 'w') as out: | |
json.dump(info, out) | |
out.write('\n') | |
def debug_main(): | |
import sys | |
for line in sys.stdin: | |
debug_name_handler(line.strip()) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment