Created
August 18, 2024 02:58
-
-
Save KelSolaar/a9ddead7b941bd9309ae773d413b57cd to your computer and use it in GitHub Desktop.
Download Slack Files from an Export
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import logging | |
from functools import partial, reduce | |
from pathlib import Path | |
import aiofiles | |
import aiohttp | |
import aiometer | |
LOGGER = logging.getLogger(__name__) | |
async def download_file(url, destination, retries=5, delay=1): | |
for attempt in range(retries): | |
LOGGER.info( | |
'Downloading "%s" to "%s"... (Attempt %s of %s)', | |
url, | |
destination, | |
attempt + 1, | |
retries, | |
) | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url) as response: | |
if response.status == 200: | |
async with aiofiles.open(destination, "wb") as f: | |
await f.write(await response.read()) | |
LOGGER.info(f'Downloaded "%s" successfully!', url) | |
return True | |
else: | |
LOGGER.error( | |
'Failed to download "%s". Status code: %s', | |
url, | |
response.status, | |
) | |
except Exception as error: | |
LOGGER.error(f"An exception occurred: %s. Retrying...", error) | |
await asyncio.sleep(delay) | |
LOGGER.info('Failed to download "%s" after %s attempts!', url, retries) | |
return False | |
async def parse_json_file_slack(json_file_path): | |
LOGGER.info('Processing "%s" JSON file...', json_file_path) | |
async with aiofiles.open(json_file_path, mode="r") as f: | |
content = await f.read() | |
files = {} | |
for message in json.loads(content): | |
if "files" in message: | |
for file_data in message["files"]: | |
if "url_private_download" in file_data: | |
key = ( | |
json_file_path, | |
file_data.get("id"), | |
file_data.get("name"), | |
) | |
files[key] = file_data["url_private_download"] | |
return files | |
async def parse_json_files_from_directory(directory): | |
LOGGER.info('Starting "%s" directory walk...', directory) | |
tasks = [] | |
for json_file_path in directory.rglob("*.json"): | |
task = asyncio.create_task(parse_json_file_slack(json_file_path)) | |
tasks.append(task) | |
LOGGER.info("found %s JSON files.", len(tasks)) | |
return await asyncio.gather(*tasks) | |
async def main(directory): | |
results = reduce( | |
lambda a, b: {**a, **b}, await parse_json_files_from_directory(directory) | |
) | |
tasks = [] | |
for (json_file_path, file_id, file_name), url in results.items(): | |
channel = json_file_path.parent.name | |
date = json_file_path.stem | |
destination_directory = directory / "__files__" / channel / date | |
destination_directory.mkdir(parents=True, exist_ok=True) | |
destination = destination_directory / f"{file_name}__{file_id}" | |
tasks.append([url, destination]) | |
results = await aiometer.run_all( | |
[partial(download_file, task[0], task[1], 10, 5) for task in tasks], | |
max_at_once=3, | |
max_per_second=2, | |
) | |
failed_tasks = [] | |
for result, task in zip(results, tasks): | |
if not result: | |
failed_tasks.append(task[:1]) | |
for failed_task in failed_tasks: | |
LOGGER.critical(f'"%s" url could not be downloaded!', failed_task[0]) | |
if __name__ == "__main__": | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
asyncio.run(main(Path("colour-science Slack export Sep 16 2014 - Aug 18 2024"))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment