Last active
January 3, 2024 21:40
-
-
Save movalex/74e5d3b08df51fe17559eedb0822bbcf to your computer and use it in GitHub Desktop.
Download CloudApp Data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import os | |
import pandas as pd | |
import requests | |
from requests.adapters import HTTPAdapter, Retry | |
import concurrent.futures | |
from pathlib import Path | |
from tqdm import tqdm | |
from datetime import datetime | |
def process_headers(headers): | |
"""Process and return relevant information from the response headers.""" | |
total_length = int(headers.get("content-length", 0)) | |
last_modified = headers.get("Last-Modified") | |
last_modified_timestamp = None | |
if last_modified: | |
last_modified_date = datetime.strptime( | |
last_modified, "%a, %d %b %Y %H:%M:%S GMT" | |
) | |
last_modified_timestamp = last_modified_date.timestamp() | |
return total_length, last_modified_timestamp | |
def setup_progress_bar(total_length, file_name): | |
"""Setup and return a tqdm progress bar.""" | |
return tqdm(total=total_length, unit="B", unit_scale=True, desc=file_name) | |
def create_retry_session( | |
retries=3, | |
backoff_factor=1, | |
status_forcelist=[429, 500, 502, 503, 504], | |
method_whitelist=["HEAD", "GET", "OPTIONS"], | |
): | |
"""Create a requests.Session with retry strategy. | |
Currently not used""" | |
retry_strategy = Retry( | |
total=retries, | |
backoff_factor=backoff_factor, | |
status_forcelist=status_forcelist, | |
method_whitelist=method_whitelist, | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
session = requests.Session() | |
session.mount("https://", adapter) | |
session.mount("http://", adapter) | |
return session | |
def download_file(file_url: str, file_path: Path): | |
"""Download a file with a progress bar and set its last modified time.""" | |
# print(f"Attempting to download: {file_url} to {file_path}") | |
if pd.isna(file_url): | |
return f"Skipped (no URL found): {file_path.name}" | |
if file_path.exists(): | |
return f"Skipped (exists): {file_path.name}" | |
# session = create_retry_session() | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" | |
} | |
try: | |
with requests.get( | |
file_url, stream=True, headers=headers | |
) as r: # using the session with a timeout | |
r.raise_for_status() | |
total_length, last_modified_timestamp = process_headers(r.headers) | |
with open(file_path, "wb") as f, setup_progress_bar( | |
total_length, file_path.name | |
) as bar: | |
for chunk in r.iter_content(chunk_size=1024): | |
if chunk: # filter out keep-alive new chunks | |
f.write(chunk) | |
bar.update(len(chunk)) | |
if last_modified_timestamp is not None: | |
os.utime(file_path, (last_modified_timestamp, last_modified_timestamp)) | |
return f"Downloaded {file_path}" | |
except Exception as e: | |
error_msg = f"Error downloading {file_url}: {e}" | |
print(error_msg) | |
return error_msg | |
def safe_file_name(file_path, max_length=255): | |
"""Truncate the file name to a safe length.""" | |
directory, file_name = os.path.split(file_path) | |
if len(file_name) > max_length: | |
# Preserve the file extension | |
extension = os.path.splitext(file_name)[1] | |
# Truncate the file name and append the extension | |
file_name = file_name[: max_length - len(extension)] + extension | |
return os.path.join(directory, file_name) | |
def prepare_download_tasks(df, files_dir): | |
# Prepare the list of files to download | |
download_tasks = [] | |
for _, row in df.iterrows(): | |
file_url = row["remote_url"] | |
file_name = row["name"] | |
private_slug = ( | |
row["private_slug"] if not pd.isna(row["private_slug"]) else "no_slug" | |
) | |
file_path = files_dir / f"{private_slug}_{file_name}" | |
safe_path = Path(safe_file_name(str(file_path))) | |
download_tasks.append((file_url, safe_path)) | |
return download_tasks | |
def perform_downloads(download_tasks, max_workers): | |
"""Download files using a thread pool and return the results.""" | |
results = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_url = { | |
executor.submit(download_file, url, path): url | |
for url, path in download_tasks | |
} | |
for future in concurrent.futures.as_completed(future_to_url): | |
url = future_to_url[future] | |
try: | |
result = future.result() | |
results.append(result) | |
except Exception as e: | |
results.append(f"Error downloading {url}: {e}") | |
return results | |
def main(csv_path, files_dir_path, output_csv_path, max_workers=5): | |
df = pd.read_csv(csv_path) | |
files_dir = Path(files_dir_path) | |
files_dir.mkdir(parents=True, exist_ok=True) | |
download_tasks = prepare_download_tasks(df, files_dir) | |
results = perform_downloads(download_tasks, max_workers) | |
df["download_status"] = results | |
df.to_csv(output_csv_path, index=False) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Download files from a CSV file with multi-threading." | |
) | |
parser.add_argument( | |
"csv_path", | |
type=str, | |
help="Path to the CSV file containing the file URLs and names.", | |
) | |
parser.add_argument( | |
"files_dir", type=str, help="Directory path where files will be saved." | |
) | |
parser.add_argument( | |
"output_csv_path", | |
type=str, | |
help="Path to save the CSV file with download statuses.", | |
) | |
parser.add_argument( | |
"--max_workers", | |
type=int, | |
default=5, | |
help="Maximum number of worker threads to use for downloading files.", | |
) | |
args = parser.parse_args() | |
main(args.csv_path, args.files_dir, args.output_csv_path, args.max_workers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment