Last active
August 28, 2025 15:51
-
-
Save patrickdevivo/9da8add9baf7e1c41be643d9381783be to your computer and use it in GitHub Desktop.
Download files from GH Archive using Modal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import modal | |
| from datetime import date | |
| from .app import app, gharchive, GHARCHIVE_DATA_PATH | |
| @app.function( | |
| volumes={GHARCHIVE_DATA_PATH: gharchive}, | |
| timeout=36000, | |
| retries=modal.Retries( | |
| max_retries=8, | |
| backoff_coefficient=2, | |
| initial_delay=1, | |
| max_delay=30, | |
| ), | |
| ephemeral_disk=800 * 1024, | |
| ) | |
| @modal.concurrent(max_inputs=12) | |
| def download_file(year: int, month: int, day: int, hour: int) -> tuple[str, float, int]: | |
| import os, time, tempfile, shutil, pycurl, random, io | |
| # Tiny jitter to avoid synchronized bursts (helps with WAF/rate shaping) | |
| time.sleep(random.uniform(0.01, 0.05)) | |
| # URLs & paths | |
| url = f"https://data.gharchive.org/{year}-{month:02d}-{day:02d}-{hour}.json.gz" | |
| vol_path = f"{GHARCHIVE_DATA_PATH}/{year}/{month:02d}/{day:02d}/{hour}.json.gz" | |
| # Stage to a temporary file in /tmp which is on the attached SSD, as recommended in Modal docs: https://modal.com/docs/guide/dataset-ingestion | |
| tmp_dir = f"/tmp/{year}/{month:02d}/{day:02d}" | |
| os.makedirs(tmp_dir, exist_ok=True) | |
| fd, tmp_path = tempfile.mkstemp( | |
| dir=tmp_dir, prefix=f"{year}-{month:02d}-{day:02d}-{hour}.json.gz." | |
| ) | |
| os.close(fd) | |
| # Configure curl | |
| c = pycurl.Curl() | |
| c.setopt(c.URL, url) | |
| c.setopt(c.FOLLOWLOCATION, 1) | |
| c.setopt(c.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2TLS) # allow HTTP/2 | |
| c.setopt( | |
| c.USERAGENT, "gharchive-downloader/1.0 (contact: [email protected])" | |
| ) # may help with rate limiting | |
| c.setopt(c.NOSIGNAL, 1) | |
| c.setopt(c.CONNECTTIMEOUT, 10) | |
| c.setopt( | |
| c.TIMEOUT, 60 * 5 | |
| ) # total timeout - most downloads should occur within this | |
| c.setopt(c.LOW_SPEED_LIMIT, 20_000) # bytes/sec | |
| c.setopt(c.LOW_SPEED_TIME, 20) # if below limit for 20s -> timeout | |
| hdr_buf = io.BytesIO() | |
| c.setopt(c.HEADERFUNCTION, hdr_buf.write) | |
| # Execute the download → /tmp, then fsync for durability | |
| with open(tmp_path, "wb", buffering=1024 * 1024) as f: | |
| c.setopt(c.WRITEDATA, f) | |
| c.perform() | |
| f.flush() | |
| os.fsync(f.fileno()) | |
| # Telemetry from curl | |
| status = int(c.getinfo(pycurl.RESPONSE_CODE)) | |
| size_b = int(c.getinfo(pycurl.SIZE_DOWNLOAD)) | |
| total_s = float(c.getinfo(pycurl.TOTAL_TIME)) | |
| mbps = float(c.getinfo(pycurl.SPEED_DOWNLOAD)) / (1024 * 1024) | |
| ttfb_s = float(c.getinfo(pycurl.STARTTRANSFER_TIME)) | |
| headers_s = hdr_buf.getvalue().decode("latin1", errors="replace") | |
| c.close() | |
| # Handle non-200s | |
| if status != 200: | |
| # Clean partial temp file | |
| try: | |
| os.remove(tmp_path) | |
| except FileNotFoundError: | |
| pass | |
| # Sometimes there's missing hours, but raise on 404 anyways to retry and report | |
| if status == 404: | |
| time.sleep(random.uniform(0.5, 2.0)) | |
| raise RuntimeError(f"HTTP 404 for {url} (retryable)") | |
| # Transient / retry-worthy codes → raise so Modal retries | |
| retry_statuses = {403, 429, 500, 502, 503, 504} | |
| if status in retry_statuses: | |
| first_hdr = headers_s.splitlines()[0] if headers_s else f"HTTP {status}" | |
| raise RuntimeError(f"HTTP {status} for {url} ({first_hdr})") | |
| # Other client errors: surface as failures (Modal will retry) | |
| raise RuntimeError(f"HTTP {status} for {url}") | |
| # Publish into the Modal volume atomically | |
| vol_dir = os.path.dirname(vol_path) | |
| os.makedirs(vol_dir, exist_ok=True) | |
| tmp_dest = vol_path + ".part" | |
| with ( | |
| open(tmp_path, "rb", buffering=1024 * 1024) as rf, | |
| open(tmp_dest, "wb", buffering=1024 * 1024) as wf, | |
| ): | |
| shutil.copyfileobj(rf, wf, length=8 * 1024 * 1024) | |
| wf.flush() | |
| os.fsync(wf.fileno()) | |
| os.replace(tmp_dest, vol_path) # atomic within the same FS | |
| dfd = os.open(vol_dir, os.O_DIRECTORY) | |
| try: | |
| os.fsync(dfd) | |
| finally: | |
| os.close(dfd) | |
| # Clean local temp | |
| try: | |
| os.remove(tmp_path) | |
| except FileNotFoundError: | |
| pass | |
| print( | |
| f"{os.path.basename(vol_path)} — {size_b / 1_048_576:.1f} MB " | |
| f"in {total_s:.2f}s @ {mbps:.2f} MB/s (TTFB {ttfb_s:.2f}s) → {vol_path}" | |
| ) | |
| return vol_path, total_s, size_b | |
| @app.function(timeout=36000, volumes={GHARCHIVE_DATA_PATH: gharchive}) | |
| def download_range(start: date, end: date = date.today()): | |
| from datetime import timedelta | |
| import time | |
| # Build hour-level inputs | |
| inputs = [] | |
| delta = end - start | |
| for d in range(delta.days + 1): | |
| day = start + timedelta(days=d) | |
| for hour in range(24): | |
| inputs.append((day.year, day.month, day.day, hour)) | |
| print( | |
| f"Downloading events from {start} to {end} — {len(inputs)} files over {delta.days + 1} days" | |
| ) | |
| t0 = time.time() | |
| total_size = 0 | |
| ok = 0 | |
| failures = [] | |
| for result in download_file.starmap( | |
| inputs, | |
| return_exceptions=True, | |
| wrap_returned_exceptions=False, | |
| order_outputs=False, | |
| ): | |
| if isinstance(result, Exception): | |
| failures.append(result) | |
| else: | |
| _, _, sz = result | |
| total_size += sz | |
| ok += 1 | |
| elapsed = time.time() - t0 | |
| total_gb = total_size / (1024**3) | |
| agg_gbps = (total_gb / elapsed) if elapsed > 0 else 0.0 | |
| print( | |
| f"Done: {ok}/{len(inputs)} files in {elapsed:.1f}s — {total_gb:.2f} GB total, avg {agg_gbps:.2f} GB/s" | |
| ) | |
| if failures: | |
| print(f"Encountered {len(failures)} failures (Modal handled retries): ") | |
| for f in failures: | |
| print(f"[FAIL] {f!s}") | |
| gharchive.commit() | |
| @app.local_entrypoint() | |
| def main(): | |
| download_range.remote(date(2020, 1, 1)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment