Created
January 18, 2017 19:58
-
-
Save radzhome/58b6e47956497c657c7a7c0d7b0bce9e to your computer and use it in GitHub Desktop.
local pypi index updater
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import os | |
from argparse import ArgumentParser | |
import requests | |
import BeautifulSoup | |
# Local pypi index path | |
PYPI_PATH = '/centos/pypi/web' | |
# File that tracks when index was last modified | |
LAST_MODIFIED_FILE = os.path.join(PYPI_PATH, 'last-modified') | |
# e.g. Full path /centos/pypi/web/packages/py2.py3/D/Django | |
PACKAGE_PATH = os.path.join(PYPI_PATH, 'packages') # Under that there is a version, and letter i.e d or D | |
FULL_PACKAGE_PATH = PACKAGE_PATH + "/{python_version}/{first_letter}/{package_name}" | |
# Package | |
INDEX_PATH = os.path.join(PYPI_PATH, 'simple') | |
# i.e. /centos/pypi/web/simple/Django/index.html | |
FULL_INDEX_PATH = INDEX_PATH + "/{package_name}" | |
# Index link, insert after </h1> | |
LINK_HTML = '<a href="../../packages/{python_version}/{first_letter}/{package_name}/{filename}#md5={md5_digest}" ' \ | |
'rel="internal">{filename}</a><br/>' | |
# Package info url | |
PYPI_API_URL = 'https://pypi.python.org/pypi/{package_name}/json' | |
def touch(fname): | |
""" | |
Touch the filename to update modified date | |
:param fname: | |
:return: | |
""" | |
try: | |
os.utime(fname, None) | |
except OSError: | |
open(fname, 'a').close() | |
def process_package(package_name): | |
""" | |
Processes an individual package or line in a requirements.txt file | |
:param package_name: | |
:return: bool success | |
""" | |
# Cleanup the name | |
package_name = package_name.replace('>', '=').replace('<', '=').replace(' ', '').replace('\n', '') | |
# Skip comments in file | |
if not package_name or package_name.startswith('#'): | |
return False | |
# Sanitize package name from requirements file | |
package_name = package_name.split('=')[0] | |
package_details_url = PYPI_API_URL.format(package_name=package_name) | |
response = requests.get(package_details_url) | |
if response.status_code != 200: | |
message = "Could not find package {}".format(package_name) | |
print(message) | |
logging.error("PyPi updater> {}".format(message)) | |
return False | |
else: | |
# Updated package name with correct case | |
try: | |
data = json.loads(response.content) | |
except ValueError: | |
message = "No JSON for package {} at url {}, continuing...".format(package_name, package_details_url) | |
logging.error("PyPi updater> {}".format(message)) | |
return False | |
package_name = data['info']['name'] | |
releases = data.get('releases') | |
first_letter = package_name[0] | |
# TODO: Store each release and update at end of package | |
for release, release_data in releases.items(): | |
release = release.lower() | |
# Get release if there is info for the release, and its an | |
if release_data and 'rc' not in release and 'dev' not in release and \ | |
'alpha' not in release and 'beta' not in release: | |
# Use first package in case there are many package types | |
# release_package = release_data[0] | |
# Print release info | |
message = "Checking package {} release {}".format(package_name, release) | |
print(message) | |
logging.info("PyPi updater> {}".format(message)) | |
for release_package in release_data: | |
# Relevant information about the release | |
package_type = release_package['packagetype'] | |
python_version = release_package['python_version'] | |
package_url = release_package['url'] | |
filename = release_package['filename'] | |
md5_digest = release_package['md5_digest'] | |
# Skip windows package release | |
if '_win' in package_type: | |
message = "Skipping windows package {}".format(filename) | |
print(message) | |
logging.debug("PyPi updater> {}".format(message)) | |
continue | |
# Package types: source distribution (sdist) or built distribution (bdist_*) | |
logging.debug("The package type is {}, version {}, url {}, file {}, md5 {}" | |
"".format(package_type, python_version, package_url, filename, md5_digest)) | |
# Make dirs for packages | |
cur_package_path = FULL_PACKAGE_PATH.format(python_version=python_version, | |
first_letter=first_letter, | |
package_name=package_name) | |
if not os.path.exists(cur_package_path): | |
os.makedirs(cur_package_path) | |
# Download the package file to correct place | |
file_path = os.path.join(cur_package_path, filename) | |
if not os.path.exists(file_path): | |
message = "Downloading package {}".format(file_path) | |
print(message) | |
logging.info("PyPi updater> {}".format(message)) | |
r = requests.get(package_url) | |
open(file_path, 'wb').write(r.content) | |
else: | |
message = "Already downloaded package {}, continuing ...".format(file_path) | |
print(message) | |
logging.info("PyPi updater> {}".format(message)) | |
continue # The file already exists | |
# Make dirs for the index.html file for the current package | |
cur_index_path = FULL_INDEX_PATH.format(package_name=package_name) | |
if not os.path.exists(cur_index_path): | |
os.makedirs(cur_index_path) | |
# Check if index exists | |
file_path = os.path.join(cur_index_path, 'index.html') | |
# Index file handling | |
if not os.path.exists(file_path): | |
# Create file or update | |
message = "Creating new index file {}".format(file_path) | |
print(message) | |
logging.info("PyPi updater> {}".format(message)) | |
index_file = open(file_path, 'w') | |
first_line = '<html><head><title>Links for {0}</title><meta name="api-version" value="2"/>' \ | |
'</head><body><h1>Links for {0}</h1>'.format(package_name) | |
index_file.write(first_line) | |
# Write the link to the index file | |
package_html = LINK_HTML.format(first_letter=first_letter, | |
python_version=python_version, | |
package_name=package_name, | |
filename=filename, | |
md5_digest=md5_digest) | |
index_file.write(package_html) | |
last_line = '</body></html>' | |
index_file.write(last_line) | |
index_file.close() | |
else: | |
# File already exists | |
message = "Updating index file {}".format(file_path) | |
print(message) | |
logging.info("PyPi updater> {}".format(message)) | |
# Update the file using BeautifulSoup | |
with open(file_path) as index_file: | |
txt = index_file.read() | |
soup = BeautifulSoup.BeautifulSoup(txt) | |
# Create new link, Write the link | |
package_html = LINK_HTML.format(first_letter=first_letter, | |
python_version=python_version, | |
package_name=package_name, | |
filename=filename, | |
md5_digest=md5_digest) | |
# insert it into the document | |
soup.body.append(BeautifulSoup.BeautifulSoup(package_html)) | |
# Save the file again keeping it pretty | |
with open(file_path, "w") as index_file: | |
index_file.write(str(soup.prettify())) | |
def main(req_file_path=None, package_name=None): | |
""" | |
Process package or file wrapper | |
:param req_file_path: | |
:param package_name: | |
:return: | |
""" | |
# Start message | |
message = "Staring index update" | |
index_modified = False | |
print(message) | |
logging.info("PyPi updater> {}".format(message)) | |
# Determine if processing single package or file path | |
if req_file_path: | |
# Iterate over each package in the requirements file | |
for line in open(req_file_path).readlines(): | |
success = process_package(package_name=line) | |
if success: | |
index_modified = True | |
elif package_name: | |
# Process single package | |
success = process_package(package_name=package_name) | |
if success: | |
index_modified = True | |
# Update the timestamp for the last-modified file if new packages added to index | |
if index_modified: | |
message = "Updates detected, touching last-modified file" | |
print(message) | |
logging.info("PyPi updater> {}".format(message)) | |
touch(LAST_MODIFIED_FILE) | |
if __name__ == "__main__": | |
# Setup logging | |
if not os.path.exists(PYPI_PATH): | |
os.makedirs(PYPI_PATH) | |
log_filename = os.path.join(PYPI_PATH, 'indexer.log') | |
log_level = logging.INFO | |
logging.basicConfig(filename=log_filename, level=log_level) | |
# Setup arg parser | |
parser = ArgumentParser() | |
parser.add_argument('package_name', nargs='?', default=None) | |
parser.add_argument('-r', '--requirement', dest='req_file_path', required=False, help='Optional requirement file') | |
args = vars(parser.parse_args()) | |
_req_file_path = args.get('req_file_path') | |
_package_name = args.get('package_name') | |
if not (_req_file_path or _package_name): | |
raise Exception("Requirements file or package name is required, none given") | |
if _req_file_path and _package_name: | |
raise Exception("Requirements file or package name is required, not both") | |
if _req_file_path and not os.path.exists(_req_file_path): | |
raise Exception("Requirements file not found") | |
# Run main with either requirements file or package name | |
main(req_file_path=_req_file_path, package_name=_package_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment