Created
July 3, 2021 18:06
-
-
Save basilevs/3a01b12cb6ddff47d70004c46040b42a to your computer and use it in GitHub Desktop.
Download and unzip a file via HTTP in Python with disconnect recovery
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
#from jenkins import Jenkins | |
#from os import environ | |
from tempfile import TemporaryFile, gettempdir | |
from os.path import join, exists | |
from zipfile import ZipFile | |
from requests import get | |
from pprint import pprint | |
url_template = "https://artifactory-ito.spirenteng.com/artifactory/apt-jenkins-itest-builds/{0}/{1}/{2}/iTest-linux.gtk.x86_64.zip" | |
def build_url(job, number): | |
project, name = job.split("--") | |
return url_template.format(name, project, number) | |
def download_and_unzip(url, dst_dir): | |
with TemporaryFile() as temp_file: | |
download_file(url, temp_file) | |
temp_file.flush() | |
with ZipFile(temp_file) as zip: | |
error = zip.testzip() | |
if error: | |
raise ValueError("Failed to unzip: " + error) | |
zip.extractall(path=dst_dir) | |
def download_file(url, f): | |
# NOTE the stream=True parameter below | |
size = -1 | |
while size < 0 or f.tell() < size: | |
headers = { 'Range': 'bytes={}-'.format(f.tell()) } | |
pprint(headers) | |
with get(url, stream=True, headers=headers) as r: | |
r.raise_for_status() | |
if size < 0: | |
size = int(r.headers.get('Content-Length')) | |
for chunk in r.iter_content(chunk_size=int(size/100)): | |
# If you have chunk encoded response uncomment if | |
# and set chunk_size parameter to None. | |
#if chunk: | |
f.write(chunk) | |
position = f.tell() | |
print('\r', int(position/size*100), '%', end='', flush=True) | |
print('') | |
def download_build(job, number): | |
dst_dir = join(gettempdir(), 'itest', job, str(number)) | |
if exists(dst_dir): | |
raise ValueError(dst_dir + ' already exists') | |
download_and_unzip(build_url(job, number), dst_dir) | |
if __name__ == "__main__": | |
download_build('itest--branches', 4188) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment