Created
October 30, 2024 22:09
-
-
Save j2kun/970472bad4be034f6992fc21ccba4e0a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import fire | |
import marko | |
from scripts.webmention.utils import send_webmention | |
from scripts import utils as utils | |
DATABASE_PATH = "scripts/webmention/outgoing_processed.txt" | |
BLOGROLL_PATH = "content/blogroll/_index.md" | |
BLOGROLL_URL = "https://www.jeremykun.com/blogroll" | |
SKIP_TERMS = [ | |
"arxiv.org", | |
"docs.python.org", | |
"doi.org", | |
"eprint.iacr.org", | |
"github.com", | |
"linkedin.com", | |
"mathstodon.xyz", | |
"reddit.com", | |
"stackoverflow.com", | |
"twitter.com", | |
"web.archive.org", | |
"wikipedia.org", | |
"x.com", | |
"youtube.com", | |
] | |
SKIP_ENDINGS = [ | |
".dmg", | |
".mp4", | |
".msi", | |
".pdf", | |
".zip", | |
] | |
def get_links_in_paragraph(paragraph): | |
links = [] | |
for child in paragraph.children: | |
match child.get_type(): | |
case "Link": | |
if child.dest.startswith("http"): | |
url = child.dest | |
links.append(url) | |
case _: | |
continue | |
return links | |
def get_outgoing_links(post_text): | |
md = marko.Markdown() | |
doc = md.parse(post_text) | |
in_metadata_count = 0 | |
links = [] | |
for child in doc.children: | |
# skip over hugo frontmatter, which uses ------ to demarcate it, and | |
# marko parses this as a ThematicBreak. | |
if child.get_type() == "ThematicBreak" and in_metadata_count < 2: | |
in_metadata_count += 1 | |
continue | |
if in_metadata_count < 2: | |
continue | |
match child.get_type(): | |
case "Paragraph": | |
links.extend(get_links_in_paragraph(child)) | |
case _: | |
continue | |
return links | |
def main(since_days=7, dry_run=False): | |
db = utils.load_database(DATABASE_PATH) | |
blog_posts = utils.get_blog_posts() | |
# handle normal blog posts and shortform | |
for key, items in blog_posts.items(): | |
for post_path in items: | |
if post_path in db: | |
print(f"Skipping {post_path}, already processed") | |
continue | |
post_date = utils.get_post_date(post_path) | |
if ( | |
datetime.datetime.now(tz=post_date.tzinfo) - post_date | |
).days > since_days: | |
continue | |
print(f"Scanning {post_path} for outgoing links") | |
from_url = utils.canonical_url(post_path, post_type=key) | |
print(f"{from_url=}") | |
with open(post_path, "r") as infile: | |
outgoing_links = get_outgoing_links(infile.read()) | |
for to_url in outgoing_links: | |
if any([term in to_url for term in SKIP_TERMS]) or any( | |
[to_url.endswith(ending) for ending in SKIP_ENDINGS] | |
): | |
continue | |
send_webmention(from_url, to_url) | |
db[post_path] = str(len(outgoing_links)) | |
blogroll_count = db.get(BLOGROLL_PATH, 0) | |
blogroll_links = [] | |
with open(BLOGROLL_PATH, "r") as infile: | |
for line in infile: | |
if "{{< blogroll" in line: | |
url_token = line.split()[3] | |
url = url_token.split("=")[-1].strip('"') | |
assert url.startswith("http"), f"Invalid URL: {url}" | |
blogroll_links.append(url) | |
if len(blogroll_links) == blogroll_count: | |
print("No new blogroll links to process") | |
else: | |
try: | |
for to_url in blogroll_links: | |
send_webmention(BLOGROLL_URL, to_url) | |
finally: | |
db[BLOGROLL_PATH] = str(len(blogroll_links)) | |
if not dry_run: | |
utils.dump_database(db, DATABASE_PATH) | |
if __name__ == "__main__": | |
fire.Fire(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment