|
# /// script |
|
# requires-python = ">=3.10" |
|
# dependencies = [ |
|
# "httpx>=0.23.0", |
|
# ] |
|
# /// |
|
import argparse |
|
from dataclasses import dataclass, field |
|
import json |
|
import logging |
|
import os |
|
from pathlib import Path |
|
import re |
|
import asyncio |
|
from typing import Any, NamedTuple |
|
from urllib.parse import urljoin, urlsplit, urlunsplit, SplitResult |
|
import httpx |
|
|
|
VAULT_PATH = Path("vault") |
|
client = httpx.AsyncClient(limits=httpx.Limits(max_connections=5), timeout=httpx.Timeout(timeout=15.0, pool=None)) |
|
|
|
logging.basicConfig(level="INFO") |
|
|
|
|
|
async def get_initial_content(url: str): |
|
return (await client.get(url=url)).text |
|
|
|
|
|
@dataclass |
|
class PublishURLs: |
|
options: str |
|
cache: str |
|
page: str |
|
|
|
@property |
|
def access(self): |
|
return remove_last_path_segment(self.page) |
|
|
|
|
|
class VaultFile(NamedTuple): |
|
access_url: str |
|
path: str |
|
|
|
@property |
|
def url(self) -> str: |
|
return urljoin(*self) |
|
|
|
|
|
@dataclass |
|
class PublishData: |
|
options: dict[str, Any] |
|
cache: dict[str, Any] |
|
page: str |
|
urls: PublishURLs |
|
|
|
files: list[VaultFile] = field(init=False) |
|
documents: list[VaultFile] = field(init=False) |
|
attachments: list[VaultFile] = field(init=False) |
|
|
|
def __post_init__(self): |
|
self.files: list[VaultFile] = [] |
|
self.documents: list[VaultFile] = [] |
|
self.attachments: list[VaultFile] = [] |
|
|
|
for key in sorted(self.cache): |
|
file = VaultFile(self.urls.access, key) |
|
self.files.append(file) |
|
if file.path.endswith(".md"): |
|
self.documents.append(file) |
|
else: |
|
self.attachments.append(file) |
|
|
|
|
|
def remove_last_path_segment(url: str) -> str: |
|
split_url = urlsplit(url) |
|
path_segments = split_url.path.split('/') |
|
if len(path_segments) > 1: |
|
path_segments = path_segments[:-1] # Remove the last path segment |
|
new_path = '/'.join(path_segments) |
|
new_url = urlunsplit(SplitResult(split_url.scheme, split_url.netloc, new_path, split_url.query, split_url.fragment)) |
|
return new_url + '/' |
|
else: |
|
return url |
|
|
|
|
|
def extract_urls(content: str) -> PublishURLs: |
|
urls: list[str] = [] |
|
for name in PublishURLs.__dataclass_fields__: |
|
pattern = fr'window\.preload{name.capitalize()}=f\("(.+?)"\)' |
|
match = re.search(pattern, content) |
|
if not match: |
|
raise LookupError(f"Can't find {name} in {content}") |
|
urls.append(match.groups()[0]) |
|
return PublishURLs(*urls) |
|
|
|
|
|
async def extract_data(content: str) -> PublishData: |
|
urls = extract_urls(content) |
|
async with asyncio.TaskGroup() as task_group: |
|
tasks = [ |
|
task_group.create_task(client.get(url)) for url in (urls.options, urls.cache, urls.page) |
|
] |
|
|
|
options, cache, page = [task.result() for task in tasks] |
|
return PublishData(options=options.json(), cache=cache.json(), page=page.text, urls=urls) |
|
|
|
|
|
async def save_file(file: VaultFile, path: Path, force: bool) -> None: |
|
file_path = path / file.path |
|
if file_path.exists() and not force: |
|
return logging.info(f"{file_path} already exists, skipping, use --force to overwrite") |
|
|
|
async with client.stream("get", file.url) as response: |
|
os.makedirs(file_path.parent, exist_ok=True) |
|
with open(file_path, "wb") as local_file: |
|
async for chunk in response.aiter_bytes(): |
|
local_file.write(chunk) |
|
|
|
|
|
async def get_pages(publish_data: PublishData, path: Path, force: bool) -> None: |
|
async with asyncio.TaskGroup() as task_group, asyncio.Semaphore(6) as _: |
|
for file in publish_data.documents: |
|
task_group.create_task(save_file(file, path, force)) |
|
|
|
logging.info("Finished downloading documents, downloading attachments") |
|
|
|
async with asyncio.TaskGroup() as task_group, asyncio.Semaphore(6) as _: |
|
for file in publish_data.attachments: |
|
task_group.create_task(save_file(file, path, force)) |
|
|
|
|
|
async def main(url: str, path: Path, force: bool = False) -> None: |
|
root_html = await get_initial_content(url) |
|
publish_data = await extract_data(root_html) |
|
await get_pages(publish_data, path, force) |
|
with open("publish_data.json", "w") as f: |
|
f.write(json.dumps(publish_data.cache, ensure_ascii=False, indent=4, sort_keys=True)) |
|
logging.info(f"Done!\n\nVault saved in {path.absolute()}") |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser(description="Fetch and process web publication data.") |
|
parser.add_argument("url", help="URL to fetch data from. e.g. https://notesonai.com/Notes+on+AI",) |
|
parser.add_argument("-p", "--path", type=Path, default=Path("vault"), help="Path to save files") |
|
parser.add_argument("-f", "--force", action="store_true", help="Whether to re-download existing files") |
|
args = parser.parse_args() |
|
|
|
asyncio.run(main(args.url, args.path, args.force)) |