Created
March 21, 2019 01:24
-
-
Save zhangolve/438e8f265aec7cc0075d541cb457ddb3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Utility functions.""" | |
| import sys | |
| import os | |
| import base64 | |
| import json | |
| import hashlib | |
| try: | |
| from collections import OrderedDict | |
| except ImportError: | |
| OrderedDict = dict | |
| __all__ = ['urlsafe_b64encode', 'urlsafe_b64decode', 'utf8', | |
| 'to_json', 'from_json', 'matches_requirement'] | |
| def urlsafe_b64encode(data): | |
| """urlsafe_b64encode without padding""" | |
| return base64.urlsafe_b64encode(data).rstrip(binary('=')) | |
| def urlsafe_b64decode(data): | |
| """urlsafe_b64decode without padding""" | |
| pad = b'=' * (4 - (len(data) & 3)) | |
| return base64.urlsafe_b64decode(data + pad) | |
| def to_json(o): | |
| '''Convert given data to JSON.''' | |
| return json.dumps(o, sort_keys=True) | |
| def from_json(j): | |
| '''Decode a JSON payload.''' | |
| return json.loads(j) | |
| def open_for_csv(name, mode): | |
| if sys.version_info[0] < 3: | |
| nl = {} | |
| bin = 'b' | |
| else: | |
| nl = { 'newline': '' } | |
| bin = '' | |
| return open(name, mode + bin, **nl) | |
| try: | |
| unicode | |
| def utf8(data): | |
| '''Utf-8 encode data.''' | |
| if isinstance(data, unicode): | |
| return data.encode('utf-8') | |
| return data | |
| except NameError: | |
| def utf8(data): | |
| '''Utf-8 encode data.''' | |
| if isinstance(data, str): | |
| return data.encode('utf-8') | |
| return data | |
| try: | |
| # For encoding ascii back and forth between bytestrings, as is repeatedly | |
| # necessary in JSON-based crypto under Python 3 | |
| unicode | |
| def native(s): | |
| return s | |
| def binary(s): | |
| if isinstance(s, unicode): | |
| return s.encode('ascii') | |
| return s | |
| except NameError: | |
| def native(s): | |
| if isinstance(s, bytes): | |
| return s.decode('ascii') | |
| return s | |
| def binary(s): | |
| if isinstance(s, str): | |
| return s.encode('ascii') | |
| class HashingFile(object): | |
| def __init__(self, fd, hashtype='sha256'): | |
| self.fd = fd | |
| self.hashtype = hashtype | |
| self.hash = hashlib.new(hashtype) | |
| self.length = 0 | |
| def write(self, data): | |
| self.hash.update(data) | |
| self.length += len(data) | |
| self.fd.write(data) | |
| def close(self): | |
| self.fd.close() | |
| def digest(self): | |
| if self.hashtype == 'md5': | |
| return self.hash.hexdigest() | |
| digest = self.hash.digest() | |
| return self.hashtype + '=' + native(urlsafe_b64encode(digest)) | |
| class OrderedDefaultDict(OrderedDict): | |
| def __init__(self, *args, **kwargs): | |
| if not args: | |
| self.default_factory = None | |
| else: | |
| if not (args[0] is None or callable(args[0])): | |
| raise TypeError('first argument must be callable or None') | |
| self.default_factory = args[0] | |
| args = args[1:] | |
| super(OrderedDefaultDict, self).__init__(*args, **kwargs) | |
| def __missing__ (self, key): | |
| if self.default_factory is None: | |
| raise KeyError(key) | |
| self[key] = default = self.default_factory() | |
| return default | |
| if sys.platform == 'win32': | |
| import ctypes.wintypes | |
| # CSIDL_APPDATA for reference - not used here for compatibility with | |
| # dirspec, which uses LOCAL_APPDATA and COMMON_APPDATA in that order | |
| csidl = dict(CSIDL_APPDATA=26, CSIDL_LOCAL_APPDATA=28, | |
| CSIDL_COMMON_APPDATA=35) | |
| def get_path(name): | |
| SHGFP_TYPE_CURRENT = 0 | |
| buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) | |
| ctypes.windll.shell32.SHGetFolderPathW(0, csidl[name], 0, SHGFP_TYPE_CURRENT, buf) | |
| return buf.value | |
| def save_config_path(*resource): | |
| appdata = get_path("CSIDL_LOCAL_APPDATA") | |
| path = os.path.join(appdata, *resource) | |
| if not os.path.isdir(path): | |
| os.makedirs(path) | |
| return path | |
| def load_config_paths(*resource): | |
| ids = ["CSIDL_LOCAL_APPDATA", "CSIDL_COMMON_APPDATA"] | |
| for id in ids: | |
| base = get_path(id) | |
| path = os.path.join(base, *resource) | |
| if os.path.exists(path): | |
| yield path | |
| else: | |
| def save_config_path(*resource): | |
| import xdg.BaseDirectory | |
| return xdg.BaseDirectory.save_config_path(*resource) | |
| def load_config_paths(*resource): | |
| import xdg.BaseDirectory | |
| return xdg.BaseDirectory.load_config_paths(*resource) | |
| def matches_requirement(req, wheels): | |
| """List of wheels matching a requirement. | |
| :param req: The requirement to satisfy | |
| :param wheels: List of wheels to search. | |
| """ | |
| try: | |
| from pkg_resources import Distribution, Requirement | |
| except ImportError: | |
| raise RuntimeError("Cannot use requirements without pkg_resources") | |
| req = Requirement.parse(req) | |
| selected = [] | |
| for wf in wheels: | |
| f = wf.parsed_filename | |
| dist = Distribution(project_name=f.group("name"), version=f.group("ver")) | |
| if dist in req: | |
| selected.append(wf) | |
| return selected |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment