Skip to content

Instantly share code, notes, and snippets.

@patrickdevivo
Last active August 28, 2025 15:51
Show Gist options
  • Select an option

  • Save patrickdevivo/9da8add9baf7e1c41be643d9381783be to your computer and use it in GitHub Desktop.

Select an option

Save patrickdevivo/9da8add9baf7e1c41be643d9381783be to your computer and use it in GitHub Desktop.
Download files from GH Archive using Modal
import modal
from datetime import date
from .app import app, gharchive, GHARCHIVE_DATA_PATH
@app.function(
volumes={GHARCHIVE_DATA_PATH: gharchive},
timeout=36000,
retries=modal.Retries(
max_retries=8,
backoff_coefficient=2,
initial_delay=1,
max_delay=30,
),
ephemeral_disk=800 * 1024,
)
@modal.concurrent(max_inputs=12)
def download_file(year: int, month: int, day: int, hour: int) -> tuple[str, float, int]:
import os, time, tempfile, shutil, pycurl, random, io
# Tiny jitter to avoid synchronized bursts (helps with WAF/rate shaping)
time.sleep(random.uniform(0.01, 0.05))
# URLs & paths
url = f"https://data.gharchive.org/{year}-{month:02d}-{day:02d}-{hour}.json.gz"
vol_path = f"{GHARCHIVE_DATA_PATH}/{year}/{month:02d}/{day:02d}/{hour}.json.gz"
# Stage to a temporary file in /tmp which is on the attached SSD, as recommended in Modal docs: https://modal.com/docs/guide/dataset-ingestion
tmp_dir = f"/tmp/{year}/{month:02d}/{day:02d}"
os.makedirs(tmp_dir, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
dir=tmp_dir, prefix=f"{year}-{month:02d}-{day:02d}-{hour}.json.gz."
)
os.close(fd)
# Configure curl
c = pycurl.Curl()
c.setopt(c.URL, url)
c.setopt(c.FOLLOWLOCATION, 1)
c.setopt(c.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2TLS) # allow HTTP/2
c.setopt(
c.USERAGENT, "gharchive-downloader/1.0 (contact: [email protected])"
) # may help with rate limiting
c.setopt(c.NOSIGNAL, 1)
c.setopt(c.CONNECTTIMEOUT, 10)
c.setopt(
c.TIMEOUT, 60 * 5
) # total timeout - most downloads should occur within this
c.setopt(c.LOW_SPEED_LIMIT, 20_000) # bytes/sec
c.setopt(c.LOW_SPEED_TIME, 20) # if below limit for 20s -> timeout
hdr_buf = io.BytesIO()
c.setopt(c.HEADERFUNCTION, hdr_buf.write)
# Execute the download → /tmp, then fsync for durability
with open(tmp_path, "wb", buffering=1024 * 1024) as f:
c.setopt(c.WRITEDATA, f)
c.perform()
f.flush()
os.fsync(f.fileno())
# Telemetry from curl
status = int(c.getinfo(pycurl.RESPONSE_CODE))
size_b = int(c.getinfo(pycurl.SIZE_DOWNLOAD))
total_s = float(c.getinfo(pycurl.TOTAL_TIME))
mbps = float(c.getinfo(pycurl.SPEED_DOWNLOAD)) / (1024 * 1024)
ttfb_s = float(c.getinfo(pycurl.STARTTRANSFER_TIME))
headers_s = hdr_buf.getvalue().decode("latin1", errors="replace")
c.close()
# Handle non-200s
if status != 200:
# Clean partial temp file
try:
os.remove(tmp_path)
except FileNotFoundError:
pass
# Sometimes there's missing hours, but raise on 404 anyways to retry and report
if status == 404:
time.sleep(random.uniform(0.5, 2.0))
raise RuntimeError(f"HTTP 404 for {url} (retryable)")
# Transient / retry-worthy codes → raise so Modal retries
retry_statuses = {403, 429, 500, 502, 503, 504}
if status in retry_statuses:
first_hdr = headers_s.splitlines()[0] if headers_s else f"HTTP {status}"
raise RuntimeError(f"HTTP {status} for {url} ({first_hdr})")
# Other client errors: surface as failures (Modal will retry)
raise RuntimeError(f"HTTP {status} for {url}")
# Publish into the Modal volume atomically
vol_dir = os.path.dirname(vol_path)
os.makedirs(vol_dir, exist_ok=True)
tmp_dest = vol_path + ".part"
with (
open(tmp_path, "rb", buffering=1024 * 1024) as rf,
open(tmp_dest, "wb", buffering=1024 * 1024) as wf,
):
shutil.copyfileobj(rf, wf, length=8 * 1024 * 1024)
wf.flush()
os.fsync(wf.fileno())
os.replace(tmp_dest, vol_path) # atomic within the same FS
dfd = os.open(vol_dir, os.O_DIRECTORY)
try:
os.fsync(dfd)
finally:
os.close(dfd)
# Clean local temp
try:
os.remove(tmp_path)
except FileNotFoundError:
pass
print(
f"{os.path.basename(vol_path)}{size_b / 1_048_576:.1f} MB "
f"in {total_s:.2f}s @ {mbps:.2f} MB/s (TTFB {ttfb_s:.2f}s) → {vol_path}"
)
return vol_path, total_s, size_b
@app.function(timeout=36000, volumes={GHARCHIVE_DATA_PATH: gharchive})
def download_range(start: date, end: date = date.today()):
from datetime import timedelta
import time
# Build hour-level inputs
inputs = []
delta = end - start
for d in range(delta.days + 1):
day = start + timedelta(days=d)
for hour in range(24):
inputs.append((day.year, day.month, day.day, hour))
print(
f"Downloading events from {start} to {end}{len(inputs)} files over {delta.days + 1} days"
)
t0 = time.time()
total_size = 0
ok = 0
failures = []
for result in download_file.starmap(
inputs,
return_exceptions=True,
wrap_returned_exceptions=False,
order_outputs=False,
):
if isinstance(result, Exception):
failures.append(result)
else:
_, _, sz = result
total_size += sz
ok += 1
elapsed = time.time() - t0
total_gb = total_size / (1024**3)
agg_gbps = (total_gb / elapsed) if elapsed > 0 else 0.0
print(
f"Done: {ok}/{len(inputs)} files in {elapsed:.1f}s — {total_gb:.2f} GB total, avg {agg_gbps:.2f} GB/s"
)
if failures:
print(f"Encountered {len(failures)} failures (Modal handled retries): ")
for f in failures:
print(f"[FAIL] {f!s}")
gharchive.commit()
@app.local_entrypoint()
def main():
download_range.remote(date(2020, 1, 1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment