Skip to content

Instantly share code, notes, and snippets.

@KelSolaar
Created August 18, 2024 02:58
Show Gist options
  • Save KelSolaar/a9ddead7b941bd9309ae773d413b57cd to your computer and use it in GitHub Desktop.
Save KelSolaar/a9ddead7b941bd9309ae773d413b57cd to your computer and use it in GitHub Desktop.
Download Slack Files from an Export
import asyncio
import json
import logging
from functools import partial, reduce
from pathlib import Path
import aiofiles
import aiohttp
import aiometer
LOGGER = logging.getLogger(__name__)
async def download_file(url, destination, retries=5, delay=1):
for attempt in range(retries):
LOGGER.info(
'Downloading "%s" to "%s"... (Attempt %s of %s)',
url,
destination,
attempt + 1,
retries,
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
async with aiofiles.open(destination, "wb") as f:
await f.write(await response.read())
LOGGER.info(f'Downloaded "%s" successfully!', url)
return True
else:
LOGGER.error(
'Failed to download "%s". Status code: %s',
url,
response.status,
)
except Exception as error:
LOGGER.error(f"An exception occurred: %s. Retrying...", error)
await asyncio.sleep(delay)
LOGGER.info('Failed to download "%s" after %s attempts!', url, retries)
return False
async def parse_json_file_slack(json_file_path):
LOGGER.info('Processing "%s" JSON file...', json_file_path)
async with aiofiles.open(json_file_path, mode="r") as f:
content = await f.read()
files = {}
for message in json.loads(content):
if "files" in message:
for file_data in message["files"]:
if "url_private_download" in file_data:
key = (
json_file_path,
file_data.get("id"),
file_data.get("name"),
)
files[key] = file_data["url_private_download"]
return files
async def parse_json_files_from_directory(directory):
LOGGER.info('Starting "%s" directory walk...', directory)
tasks = []
for json_file_path in directory.rglob("*.json"):
task = asyncio.create_task(parse_json_file_slack(json_file_path))
tasks.append(task)
LOGGER.info("found %s JSON files.", len(tasks))
return await asyncio.gather(*tasks)
async def main(directory):
results = reduce(
lambda a, b: {**a, **b}, await parse_json_files_from_directory(directory)
)
tasks = []
for (json_file_path, file_id, file_name), url in results.items():
channel = json_file_path.parent.name
date = json_file_path.stem
destination_directory = directory / "__files__" / channel / date
destination_directory.mkdir(parents=True, exist_ok=True)
destination = destination_directory / f"{file_name}__{file_id}"
tasks.append([url, destination])
results = await aiometer.run_all(
[partial(download_file, task[0], task[1], 10, 5) for task in tasks],
max_at_once=3,
max_per_second=2,
)
failed_tasks = []
for result, task in zip(results, tasks):
if not result:
failed_tasks.append(task[:1])
for failed_task in failed_tasks:
LOGGER.critical(f'"%s" url could not be downloaded!', failed_task[0])
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
asyncio.run(main(Path("colour-science Slack export Sep 16 2014 - Aug 18 2024")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment