Skip to content

Instantly share code, notes, and snippets.

@j2kun
Created October 30, 2024 06:00
Show Gist options
  • Save j2kun/900bb4aec70032ae726c47e22f511700 to your computer and use it in GitHub Desktop.
Save j2kun/900bb4aec70032ae726c47e22f511700 to your computer and use it in GitHub Desktop.
Utils for social media syndication
from collections import deque, defaultdict
from itertools import zip_longest
import datetime
import os
import pathlib
import re
import subprocess
import marko
FILES_TO_IGNORE = set(
[
"_index.md",
]
)
BLOG_URL_BASE = "https://www.jeremykun.com"
OFFSET_MATH_DOLLAR_REGEX = re.compile(r"\$\$([^$]+?)\$\$")
OFFSET_MATH_BRACKET_REGEX = re.compile(r"\\\[([^$]+?)\\\]")
INLINE_MATH_DOLLAR_REGEX = re.compile(r"\$([^$]+?)\$")
def get_git_root():
return pathlib.Path(
subprocess.Popen(
["git", "rev-parse", "--show-toplevel"],
stdout=subprocess.PIPE,
)
.communicate()[0]
.rstrip()
.decode("utf-8")
)
def canonical_url(filename: pathlib.Path, post_type="shortform") -> str:
try:
with open(filename, "r") as infile:
for line in infile:
if line.startswith("url:"):
return BLOG_URL_BASE + line.strip().split(": ")[1]
except:
pass
# Note: using the url metadata inside the markdown file itself is not
# supported, we just assume no special url is set.
stripped_filename = filename.with_suffix("").name
if post_type == "shortform":
default = f"{BLOG_URL_BASE}/{post_type}/{stripped_filename}/"
elif post_type == "posts" or not post_type:
default = f"{BLOG_URL_BASE}/{stripped_filename}/"
else:
raise ValueError(f"Unsupported post type {post_type}")
print(f"Could not find url metadata in {filename}, using default {default}")
return default
def url_to_filepath(url: str) -> str:
# example urls are:
# https://www.jeremykun.com/shortform/2024-09-12-1502
# https://www.jeremykun.com/2024/09/06/packing-matrix-vector-multiplication-in-fhe"
# https://www.jeremykun.com/fhe-in-production
tokens = url.lstrip("https://").strip("/").split("/")
if tokens[1] == "shortform":
return f"content/shortform/{tokens[2]}.md"
else:
url_suffix = "/".join(tokens[1:])
search_string = f"url: /{url_suffix}"
for root, _, files in os.walk("content"):
for file in files:
with open(os.path.join(root, file), "r") as infile:
if search_string in infile.read():
return os.path.join(root, file)
raise ValueError(
f"Could not find file for url {url}; search_string: {search_string}"
)
def get_text_children(node):
text = ""
for child in node.children:
match child.get_type():
case "RawText":
text += child.children.strip()
case "LineBreak":
text += " "
case _:
raise ValueError(f"Unsupported child type {child.get_type()}: {child}")
return text
def convert_paragraph(node, convert_math=True):
if node.get_type() != "Paragraph":
raise ValueError(f"Invalid input node of type {node.get_type()}")
post_str = ""
for child in node.children:
match child.get_type():
case "LineBreak":
post_str += " "
case "RawText":
# in this case, child.children is a single string, despite the
# name "children".
text = child.children
# Convert to mathstodon-compatible inline mathmode
if convert_math:
if text.startswith("$$"):
text = OFFSET_MATH_DOLLAR_REGEX.sub(r"\\(\1\\)", text)
elif text.startswith("\\["):
text = OFFSET_MATH_BRACKET_REGEX.sub(r"\\(\1\\)", text)
else:
text = INLINE_MATH_DOLLAR_REGEX.sub(r"\\(\1\\)", text)
post_str += text
case "Link":
if child.dest.startswith("http"):
url = child.dest
elif child.dest.startswith("/"):
url = f"{BLOG_URL_BASE}{child.dest}"
else:
raise ValueError(f"Unsupported link destination f{child.dest}")
link_text = get_text_children(child)
post_str += f"{link_text} ({url})"
case "CodeSpan":
post_str += f"`{child.children}`"
case _:
raise ValueError(
f"Unsupported paragraph node type: {child.get_type()}: {child.children}"
)
return post_str
def convert_code_block(node, post_permalink):
# Code blocks make for bad toots, so just omit the actual code and link to
# the post.
if node.get_type() != "FencedCode":
raise ValueError(f"Invalid input node of type {node.get_type()}")
return f"(Code omitted for brevity. See: {post_permalink})"
def convert_post_to_thread(content, post_permalink, convert_math=True):
"""Converts blog post to character-unlimited threads, one post per paragraph."""
md = marko.Markdown()
doc = md.parse(content)
toots = []
in_metadata = False
for child in doc.children:
# skip over hugo frontmatter, which uses ------ to demarcate it, and
# marko parses this as a ThematicBreak.
if child.get_type() == "ThematicBreak":
in_metadata = not in_metadata
continue
if in_metadata:
continue
match child.get_type():
case "LineBreak":
continue
case "BlankLine":
continue
case "Paragraph":
toots.append(convert_paragraph(child, convert_math=convert_math))
case "FencedCode":
toots.append(convert_code_block(child, post_permalink))
case _:
raise ValueError(
f"Unsupported doc node type {child.get_type()}: {child}"
)
return toots
def title_and_link_as_post(post_contents: str, blog_post_permalink: str):
"""Converts a blog post to a title and link."""
title = None
for line in post_contents.split("\n"):
if line.startswith("title:"):
title = line.strip().split(": ")[1].strip("'").strip('"')
if title is None:
raise ValueError(f"Could not find title in post {blog_post_permalink}")
return f"{title}\n\n{blog_post_permalink}"
def load_database(path):
"""Load a database where each line consists of a single key-value pair."""
if not os.path.exists(path):
return {}
mapping = {}
with open(path, "r") as infile:
for line in infile:
key, value = line.strip().split()
mapping[key] = value
return mapping
def dump_database(mapping, path):
"""Write a database where each line consists of a single key-value pair."""
with open(path, "w") as outfile:
for blog_url, thread_url_root in mapping.items():
outfile.write(f"{blog_url} {thread_url_root}\n")
def get_blog_posts():
git_root = pathlib.Path(get_git_root())
if not os.path.isdir(git_root / ".git"):
raise ValueError(f"Could not find git root, looked at {git_root}")
print(f"Found {git_root=}")
shortform_path = git_root / "content" / "shortform"
normal_posts = git_root / "content" / "posts"
if not os.path.isdir(shortform_path):
raise ValueError(f"Could not find shortform_path at {shortform_path}")
shortform_posts = set(
[
shortform_path / x
for x in os.listdir(shortform_path)
if x not in FILES_TO_IGNORE
]
)
normal_posts = set(
[normal_posts / x for x in os.listdir(normal_posts) if x not in FILES_TO_IGNORE]
)
return {"shortform": shortform_posts, "posts": normal_posts}
def get_post_date(filepath):
# example line:
# date: '2024-08-04T07:00:00-0700'
# date: '2024-08-04T07:00:00Z'
with open(filepath, "r") as infile:
for line in infile:
if line.startswith("date:"):
datestr = line.strip().split(": ")[1].strip("'").strip('"')
parsed_date = datetime.datetime.strptime(datestr, "%Y-%m-%dT%H:%M:%S%z")
if parsed_date:
return parsed_date
print(f"Could not parse date string {datestr} in {filepath}")
return datetime.datetime.fromtimestamp(0)
# return a date long ago to avoid accidentally triggering a syndication
return datetime.datetime.fromtimestamp(0)
def get_posts_without_mapping(posts_by_key, mapping, since_days=1):
posts_to_publish = defaultdict(list)
for key, posts_to_try in posts_by_key.items():
print(f"Checking key='{key}'")
for filepath in posts_to_try:
print(f"Checking if {filepath} should be published")
blog_post_permalink = canonical_url(filepath, post_type=key)
post_date = get_post_date(filepath)
if (
datetime.datetime.now(tz=post_date.tzinfo) - post_date
).days > since_days:
continue
if blog_post_permalink in mapping:
print(
f"{filepath} has existing social media thread at "
f"{mapping[blog_post_permalink]}, skipping."
)
continue
posts_to_publish[key].append(filepath)
return posts_to_publish
def split_post(post, max_char_len=300):
if len(post) < max_char_len:
return [post]
# weird because re.split keeps the separators as list items
# re_joined rejoins them together
re_split = [p.strip() for p in re.split(r"(\. |, )", post)]
re_joined = [
i + j for i, j in zip_longest(re_split[::2], re_split[1::2], fillvalue="")
]
subposts = deque(re_joined)
for subpost in subposts:
if len(subpost) > max_char_len:
raise ValueError(f"Sentence is too long: {subpost}")
accumulated_subposts = []
while subposts:
next_subpost = subposts.popleft()
if not accumulated_subposts:
accumulated_subposts.append(next_subpost)
continue
merged = accumulated_subposts[-1] + " " + next_subpost
if len(merged) > max_char_len:
accumulated_subposts.append(next_subpost)
else:
accumulated_subposts[-1] = merged
return accumulated_subposts
def add_link(post_lines: list[str], frontmatter_key: str, value: str) -> str:
"""Add a link to a post's frontmatter."""
# find the second line containing `---`
found_one = False
for i, line in enumerate(post_lines):
if line.startswith("---"):
if found_one:
break
found_one = True
frontmatter = post_lines[: i + 1]
# do nothing if the frontmatter_key is already in the frontmatter
for line in frontmatter:
if line.startswith(frontmatter_key):
return "".join(post_lines)
# otherwise add it as the last line in the frontmatter
frontmatter.insert(len(frontmatter) - 1, f'{frontmatter_key}: "{value}"\n')
print(
f"Added {frontmatter_key} link to {post_lines[1].strip()}; "
f"frontmatter is now:\n{''.join(frontmatter)}"
)
# nb., input lines already end in newlines
return "".join(frontmatter + post_lines[i + 1 :])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment