Created
October 30, 2024 06:00
-
-
Save j2kun/900bb4aec70032ae726c47e22f511700 to your computer and use it in GitHub Desktop.
Utils for social media syndication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque, defaultdict | |
from itertools import zip_longest | |
import datetime | |
import os | |
import pathlib | |
import re | |
import subprocess | |
import marko | |
FILES_TO_IGNORE = set( | |
[ | |
"_index.md", | |
] | |
) | |
BLOG_URL_BASE = "https://www.jeremykun.com" | |
OFFSET_MATH_DOLLAR_REGEX = re.compile(r"\$\$([^$]+?)\$\$") | |
OFFSET_MATH_BRACKET_REGEX = re.compile(r"\\\[([^$]+?)\\\]") | |
INLINE_MATH_DOLLAR_REGEX = re.compile(r"\$([^$]+?)\$") | |
def get_git_root(): | |
return pathlib.Path( | |
subprocess.Popen( | |
["git", "rev-parse", "--show-toplevel"], | |
stdout=subprocess.PIPE, | |
) | |
.communicate()[0] | |
.rstrip() | |
.decode("utf-8") | |
) | |
def canonical_url(filename: pathlib.Path, post_type="shortform") -> str: | |
try: | |
with open(filename, "r") as infile: | |
for line in infile: | |
if line.startswith("url:"): | |
return BLOG_URL_BASE + line.strip().split(": ")[1] | |
except: | |
pass | |
# Note: using the url metadata inside the markdown file itself is not | |
# supported, we just assume no special url is set. | |
stripped_filename = filename.with_suffix("").name | |
if post_type == "shortform": | |
default = f"{BLOG_URL_BASE}/{post_type}/{stripped_filename}/" | |
elif post_type == "posts" or not post_type: | |
default = f"{BLOG_URL_BASE}/{stripped_filename}/" | |
else: | |
raise ValueError(f"Unsupported post type {post_type}") | |
print(f"Could not find url metadata in {filename}, using default {default}") | |
return default | |
def url_to_filepath(url: str) -> str: | |
# example urls are: | |
# https://www.jeremykun.com/shortform/2024-09-12-1502 | |
# https://www.jeremykun.com/2024/09/06/packing-matrix-vector-multiplication-in-fhe" | |
# https://www.jeremykun.com/fhe-in-production | |
tokens = url.lstrip("https://").strip("/").split("/") | |
if tokens[1] == "shortform": | |
return f"content/shortform/{tokens[2]}.md" | |
else: | |
url_suffix = "/".join(tokens[1:]) | |
search_string = f"url: /{url_suffix}" | |
for root, _, files in os.walk("content"): | |
for file in files: | |
with open(os.path.join(root, file), "r") as infile: | |
if search_string in infile.read(): | |
return os.path.join(root, file) | |
raise ValueError( | |
f"Could not find file for url {url}; search_string: {search_string}" | |
) | |
def get_text_children(node): | |
text = "" | |
for child in node.children: | |
match child.get_type(): | |
case "RawText": | |
text += child.children.strip() | |
case "LineBreak": | |
text += " " | |
case _: | |
raise ValueError(f"Unsupported child type {child.get_type()}: {child}") | |
return text | |
def convert_paragraph(node, convert_math=True): | |
if node.get_type() != "Paragraph": | |
raise ValueError(f"Invalid input node of type {node.get_type()}") | |
post_str = "" | |
for child in node.children: | |
match child.get_type(): | |
case "LineBreak": | |
post_str += " " | |
case "RawText": | |
# in this case, child.children is a single string, despite the | |
# name "children". | |
text = child.children | |
# Convert to mathstodon-compatible inline mathmode | |
if convert_math: | |
if text.startswith("$$"): | |
text = OFFSET_MATH_DOLLAR_REGEX.sub(r"\\(\1\\)", text) | |
elif text.startswith("\\["): | |
text = OFFSET_MATH_BRACKET_REGEX.sub(r"\\(\1\\)", text) | |
else: | |
text = INLINE_MATH_DOLLAR_REGEX.sub(r"\\(\1\\)", text) | |
post_str += text | |
case "Link": | |
if child.dest.startswith("http"): | |
url = child.dest | |
elif child.dest.startswith("/"): | |
url = f"{BLOG_URL_BASE}{child.dest}" | |
else: | |
raise ValueError(f"Unsupported link destination f{child.dest}") | |
link_text = get_text_children(child) | |
post_str += f"{link_text} ({url})" | |
case "CodeSpan": | |
post_str += f"`{child.children}`" | |
case _: | |
raise ValueError( | |
f"Unsupported paragraph node type: {child.get_type()}: {child.children}" | |
) | |
return post_str | |
def convert_code_block(node, post_permalink): | |
# Code blocks make for bad toots, so just omit the actual code and link to | |
# the post. | |
if node.get_type() != "FencedCode": | |
raise ValueError(f"Invalid input node of type {node.get_type()}") | |
return f"(Code omitted for brevity. See: {post_permalink})" | |
def convert_post_to_thread(content, post_permalink, convert_math=True): | |
"""Converts blog post to character-unlimited threads, one post per paragraph.""" | |
md = marko.Markdown() | |
doc = md.parse(content) | |
toots = [] | |
in_metadata = False | |
for child in doc.children: | |
# skip over hugo frontmatter, which uses ------ to demarcate it, and | |
# marko parses this as a ThematicBreak. | |
if child.get_type() == "ThematicBreak": | |
in_metadata = not in_metadata | |
continue | |
if in_metadata: | |
continue | |
match child.get_type(): | |
case "LineBreak": | |
continue | |
case "BlankLine": | |
continue | |
case "Paragraph": | |
toots.append(convert_paragraph(child, convert_math=convert_math)) | |
case "FencedCode": | |
toots.append(convert_code_block(child, post_permalink)) | |
case _: | |
raise ValueError( | |
f"Unsupported doc node type {child.get_type()}: {child}" | |
) | |
return toots | |
def title_and_link_as_post(post_contents: str, blog_post_permalink: str): | |
"""Converts a blog post to a title and link.""" | |
title = None | |
for line in post_contents.split("\n"): | |
if line.startswith("title:"): | |
title = line.strip().split(": ")[1].strip("'").strip('"') | |
if title is None: | |
raise ValueError(f"Could not find title in post {blog_post_permalink}") | |
return f"{title}\n\n{blog_post_permalink}" | |
def load_database(path): | |
"""Load a database where each line consists of a single key-value pair.""" | |
if not os.path.exists(path): | |
return {} | |
mapping = {} | |
with open(path, "r") as infile: | |
for line in infile: | |
key, value = line.strip().split() | |
mapping[key] = value | |
return mapping | |
def dump_database(mapping, path): | |
"""Write a database where each line consists of a single key-value pair.""" | |
with open(path, "w") as outfile: | |
for blog_url, thread_url_root in mapping.items(): | |
outfile.write(f"{blog_url} {thread_url_root}\n") | |
def get_blog_posts(): | |
git_root = pathlib.Path(get_git_root()) | |
if not os.path.isdir(git_root / ".git"): | |
raise ValueError(f"Could not find git root, looked at {git_root}") | |
print(f"Found {git_root=}") | |
shortform_path = git_root / "content" / "shortform" | |
normal_posts = git_root / "content" / "posts" | |
if not os.path.isdir(shortform_path): | |
raise ValueError(f"Could not find shortform_path at {shortform_path}") | |
shortform_posts = set( | |
[ | |
shortform_path / x | |
for x in os.listdir(shortform_path) | |
if x not in FILES_TO_IGNORE | |
] | |
) | |
normal_posts = set( | |
[normal_posts / x for x in os.listdir(normal_posts) if x not in FILES_TO_IGNORE] | |
) | |
return {"shortform": shortform_posts, "posts": normal_posts} | |
def get_post_date(filepath): | |
# example line: | |
# date: '2024-08-04T07:00:00-0700' | |
# date: '2024-08-04T07:00:00Z' | |
with open(filepath, "r") as infile: | |
for line in infile: | |
if line.startswith("date:"): | |
datestr = line.strip().split(": ")[1].strip("'").strip('"') | |
parsed_date = datetime.datetime.strptime(datestr, "%Y-%m-%dT%H:%M:%S%z") | |
if parsed_date: | |
return parsed_date | |
print(f"Could not parse date string {datestr} in {filepath}") | |
return datetime.datetime.fromtimestamp(0) | |
# return a date long ago to avoid accidentally triggering a syndication | |
return datetime.datetime.fromtimestamp(0) | |
def get_posts_without_mapping(posts_by_key, mapping, since_days=1): | |
posts_to_publish = defaultdict(list) | |
for key, posts_to_try in posts_by_key.items(): | |
print(f"Checking key='{key}'") | |
for filepath in posts_to_try: | |
print(f"Checking if {filepath} should be published") | |
blog_post_permalink = canonical_url(filepath, post_type=key) | |
post_date = get_post_date(filepath) | |
if ( | |
datetime.datetime.now(tz=post_date.tzinfo) - post_date | |
).days > since_days: | |
continue | |
if blog_post_permalink in mapping: | |
print( | |
f"{filepath} has existing social media thread at " | |
f"{mapping[blog_post_permalink]}, skipping." | |
) | |
continue | |
posts_to_publish[key].append(filepath) | |
return posts_to_publish | |
def split_post(post, max_char_len=300): | |
if len(post) < max_char_len: | |
return [post] | |
# weird because re.split keeps the separators as list items | |
# re_joined rejoins them together | |
re_split = [p.strip() for p in re.split(r"(\. |, )", post)] | |
re_joined = [ | |
i + j for i, j in zip_longest(re_split[::2], re_split[1::2], fillvalue="") | |
] | |
subposts = deque(re_joined) | |
for subpost in subposts: | |
if len(subpost) > max_char_len: | |
raise ValueError(f"Sentence is too long: {subpost}") | |
accumulated_subposts = [] | |
while subposts: | |
next_subpost = subposts.popleft() | |
if not accumulated_subposts: | |
accumulated_subposts.append(next_subpost) | |
continue | |
merged = accumulated_subposts[-1] + " " + next_subpost | |
if len(merged) > max_char_len: | |
accumulated_subposts.append(next_subpost) | |
else: | |
accumulated_subposts[-1] = merged | |
return accumulated_subposts | |
def add_link(post_lines: list[str], frontmatter_key: str, value: str) -> str: | |
"""Add a link to a post's frontmatter.""" | |
# find the second line containing `---` | |
found_one = False | |
for i, line in enumerate(post_lines): | |
if line.startswith("---"): | |
if found_one: | |
break | |
found_one = True | |
frontmatter = post_lines[: i + 1] | |
# do nothing if the frontmatter_key is already in the frontmatter | |
for line in frontmatter: | |
if line.startswith(frontmatter_key): | |
return "".join(post_lines) | |
# otherwise add it as the last line in the frontmatter | |
frontmatter.insert(len(frontmatter) - 1, f'{frontmatter_key}: "{value}"\n') | |
print( | |
f"Added {frontmatter_key} link to {post_lines[1].strip()}; " | |
f"frontmatter is now:\n{''.join(frontmatter)}" | |
) | |
# nb., input lines already end in newlines | |
return "".join(frontmatter + post_lines[i + 1 :]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment