-
-
Save inoperable/6dd3b5ea611bb30e60a80b9d61ff8a03 to your computer and use it in GitHub Desktop.
mirror_archives.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import os | |
import re | |
from urllib.parse import urlparse | |
import async_timeout | |
import aiohttp | |
import backoff | |
ARCHIVE_URL = "https://archive.mozilla.org/pub/" | |
PRODUCTS = ("mobile", "firefox", "thunderbird") | |
NB_RETRY_REQUEST = 100 | |
TIMEOUT_SECONDS = 5 * 60 | |
@backoff.on_exception(backoff.expo, | |
asyncio.TimeoutError, | |
max_tries=NB_RETRY_REQUEST) | |
async def fetch(session, url): | |
headers = { | |
"Accept": "application/json", | |
"User-Agent": "BuildHub;[email protected]" | |
} | |
with async_timeout.timeout(TIMEOUT_SECONDS): | |
print(url) | |
async with session.get(url, headers=headers, timeout=None) as response: | |
return await response.text(), response.headers["Content-Type"] | |
async def fetch_recursive(session, url): | |
data, ctype = await fetch(session, url) | |
index = "index." + "json" if "json" in ctype else "html" | |
urlinfo = urlparse(url) | |
local_folder = urlinfo.path[1:] | |
try: | |
os.makedirs(local_folder) | |
except FileExistsError: | |
return # already done. | |
with open(os.path.join(local_folder, index), "w") as f: | |
f.write(data) | |
try: | |
data = json.loads(data) | |
except ValueError: | |
return # simple html pages. | |
folders = data["prefixes"] | |
reg = re.compile(".*(try-builds|tinder)") | |
folders = [f for f in folders if not reg.match(f)] | |
futures = [fetch_recursive(session, url + folder) for folder in folders] | |
return await asyncio.gather(*futures) | |
async def main(loop): | |
async with aiohttp.ClientSession(loop=loop) as session: | |
for product in PRODUCTS: | |
await fetch_recursive(session, ARCHIVE_URL + product + "/") | |
def run(): | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main(loop)) | |
loop.close() | |
if __name__ == "__main__": | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment