Last active
September 11, 2024 22:09
-
-
Save cgranade/767f8bec3cc69ab9d4eb21871e4c25cd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S uv run | |
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "feedgen", | |
# "click", | |
# "gevent", | |
# "flask", | |
# "gusmobile @ git+https://git.sr.ht/~rwa/gusmobile#egg=0.4.0" | |
# ] | |
# /// | |
# Note: types in this script are not checked, and are | |
# intended for documentation only. This is due to | |
# mypy not supporting generic aliases and unit | |
# newtype. Generic aliases are in progress as | |
# experimental, but unit newtype is just a bug. | |
# Minimum reproduction at: | |
# https://mypy-play.net/?mypy=latest&python=3.12&enable-incomplete-feature=NewGenericSyntax&gist=ae72d526c43d5299e6b5450e793bbb68 | |
from gevent.pywsgi import WSGIServer | |
import click | |
from flask import Flask, make_response | |
from feedgen.feed import FeedGenerator | |
import gusmobile | |
import logging | |
logger = logging.getLogger(__name__) | |
from dataclasses import dataclass | |
from typing import Optional, NewType, TypeVar | |
T = TypeVar('T') | |
import re | |
from datetime import date | |
from urllib.parse import urljoin, urlsplit, urlunsplit | |
## Unit type ## | |
NotFoundType = NewType("NotFoundType", ()) | |
NotFound = NotFoundType(()) | |
type Findable[T] = NotFound | T | |
## Dataclasses ## | |
@dataclass(frozen=True) | |
class Gemlog: | |
title: str | |
items: list | |
subtitle: Optional[str] = None | |
@dataclass(frozen=True) | |
class GemlogItem: | |
date: date | |
uri: str | |
title: Optional[str] | |
content: Optional[str] | |
## Actual logic ## | |
def gemjoin(uri: str, relative_uri: str): | |
(scheme, netloc, path, query, fragment) = urlsplit(uri) | |
path = urljoin(path, relative_uri) | |
return urlunsplit((scheme, netloc, path, "", "")) | |
def parse_gemlog(gemtext: str, uri: str, fetch_items: bool = True): | |
title: Findable[str] = NotFound | |
subtitle: Findable[Optional[str]] = NotFound | |
items: list[GemlogItem] = [] | |
# We only care about lines starting with # or =>. | |
for line in (line.strip() for line in gemtext.split("\n")): | |
# Are we looking for a title? | |
if title is NotFound: | |
if line.startswith("#"): | |
title = line.removeprefix("#").strip() | |
continue | |
elif not line: | |
# Ignore blank lines. | |
continue | |
else: | |
# No title found, make base_url the title | |
title = uri | |
continue | |
# Are we looking for a subtitle? | |
if title is not NotFound and subtitle is NotFound: | |
if line.startswith("##"): | |
subtitle = line.removeprefix("##").strip() | |
continue | |
elif not line: | |
# Ignore blank lines. | |
continue | |
else: | |
# No subtitle found before the first nontrivial | |
# line, mark it as None. | |
subtitle = None | |
# Note that we still need to process this line, so don't fall through! | |
if line.startswith("=>"): | |
parts = line.removeprefix("=>").strip().split(" ", 1) | |
if len(parts) == 1: # No title, just a URI. | |
continue | |
match = re.match(r'^(\d\d\d\d)-(\d\d)-(\d\d)\s*-*\s*(.*)$', parts[1]) | |
if not match: # Not an 8601 datepart. | |
continue | |
# Got a match! | |
year, month, day, item_title = match.groups() | |
item_title = item_title.strip() | |
item_uri = gemjoin(uri, parts[0]) | |
if fetch_items: | |
item_resp = gusmobile.fetch(item_uri) | |
content = item_resp.content if item_resp.status == "20" else None | |
else: | |
content = None | |
items.append(GemlogItem( | |
date=date(year=int(year), month=int(month), day=int(day)), | |
title=item_title if item_title else None, | |
uri=item_uri, | |
content=content | |
)) | |
# Ignore all other lines! | |
return Gemlog( | |
title=title if title is not NotFound else "malformed gemlog", | |
subtitle=subtitle, | |
items=items | |
) | |
def create_app(base_url: str, pretty: bool = False): | |
app = Flask(__name__) | |
base_url = base_url.removesuffix("/") | |
@app.route("/atom/<path:remote>") | |
def atom(remote): | |
remote_gem = f"gemini://{remote}" | |
logger.info("Fetching gemlog at %s.", remote_gem) | |
gem_resp = gusmobile.fetch(remote_gem) | |
match gem_resp.status: | |
case "20": | |
# OK! Let's go on and parse feed. | |
gemlog = parse_gemlog(gem_resp.content, remote_gem) | |
fg = FeedGenerator() | |
fg.id(f"{base_url}/atom/{remote}") | |
fg.title(gemlog.title) | |
if gemlog.subtitle: | |
fg.subtitle(gemlog.subtitle) | |
for item in gemlog.items: | |
fe = fg.add_entry() | |
fe.id(item.uri) | |
fe.title(item.title) | |
fe.link(href=item.uri) | |
# TODO: make this nicer by parsing gemtext | |
if item.content: | |
fe.content(item.content, type="text/plain") | |
resp = make_response(fg.atom_str(pretty=pretty), 200) | |
resp.headers["Content-Type"] = "application/atom+xml" | |
return resp | |
case _: | |
resp = make_response(f"Remote gemlog returned status {gem_resp.status}.", 503) | |
return resp | |
return app | |
@click.command() | |
@click.argument("base_url", type=str) | |
@click.option("--log-level", default="DEBUG", type=str) | |
@click.option("--port", default=8000, type=int) | |
@click.option("--bind", default="127.0.0.1", type=str) | |
@click.option("--pretty/--no-pretty", default=False) | |
def main( | |
base_url, | |
bind, port, | |
log_level, | |
pretty | |
): | |
logging.basicConfig(level=log_level) | |
app = create_app(base_url=base_url, pretty=pretty) | |
logger.info("Starting server at %s:%s.", bind, port) | |
http_server = WSGIServer((bind, port), app) | |
http_server.serve_forever() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment