Last active
March 23, 2022 01:41
-
-
Save tassaron/2e1c1bd5d67202d9daf191eec93b1f6a to your computer and use it in GitHub Desktop.
turn Mastodon posts into paginated json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
from mastodon import Mastodon | |
import os | |
import json | |
import sys | |
from typing import Tuple, List | |
import datetime | |
""" | |
This script looks for posts tagged #blog by the USER_ID on INSTANCE_URL | |
It ignores reblogs but not private posts or DMs | |
However the URL will be blank if a post is not public or unlisted | |
It fetches a MAX of 5 new posts per execution and paginates in groups of 5 | |
Requires `blogapp_usercred.secret` to exist. See Mastodon.py docs: | |
- https://mastodonpy.readthedocs.io/en/stable/ | |
The output of this script is chronological: | |
- The newest five posts go into `posts.json`, newest at bottom | |
- The 6th newest post is at the bottom of the last page in `/pages` | |
- The oldest post is at the top of `/pages/1/posts.json` | |
""" | |
USER_ID = 1 | |
INSTANCE_URL = "https://tassaron.com" | |
__VERBOSE__ = False | |
__DEBUG__ = False | |
def paginate(up_to_ten_posts: List[dict]) -> Tuple[List[dict], List[dict]]: | |
"""Splits up to 10 posts into the newest 5 and oldest 5 | |
""" | |
def sort_posts(): | |
posts = { | |
datetime.datetime.strptime(post["created_at"].split(".", 1)[0], "%Y-%m-%d %H:%M:%S"): post | |
for post in up_to_ten_posts | |
} | |
return [posts[key] for key in sorted(posts)] | |
up_to_ten_posts = sort_posts() | |
new_posts = up_to_ten_posts[-5:] | |
old_posts = [post for post in up_to_ten_posts[:5] if post not in new_posts] | |
return new_posts, old_posts | |
def make_next_page(up_to_five_posts: List[dict]) -> None: | |
"""Given five posts that need to move to the next page, | |
this function writes those to the last page and the page | |
afterwards if necessary. | |
e.g., if page 8 has 2 posts and function receives 5 posts, | |
add 3 posts to page 8 and 2 to page 9 | |
""" | |
assert len(up_to_five_posts) < 6 | |
last_page = sorted(os.listdir("pages"))[-1] | |
path = f"pages/{last_page}/posts.json" | |
if not os.path.exists(path): | |
with open(path, "w") as f: | |
json.dump([], f) | |
with open(path, "r") as f: | |
page_posts = json.load(f) | |
count = 0 | |
for new_post in up_to_five_posts: | |
page_posts.append(new_post) | |
count += 1 | |
if len(page_posts) == 5: | |
# make next directory and blank json file inside | |
next_page_path = f"pages/{'{:0>6d}'.format(int(last_page) + 1)}" | |
os.mkdir(next_page_path) | |
with open(f"{next_page_path}/posts.json", "w") as f: | |
json.dump([], f) | |
break | |
with open(path, "w") as f: | |
json.dump(page_posts, f) | |
if count != len(up_to_five_posts): | |
make_next_page(up_to_five_posts[count:]) | |
def main(): | |
global __VERBOSE__ | |
global __DEBUG__ | |
if len(sys.argv) > 1: | |
for arg in sys.argv: | |
if arg in ("--verbose", "-v"): | |
__VERBOSE__ = True | |
elif arg == "--debug": | |
__DEBUG__ = True | |
else: | |
print(f"Unknown argument: {arg}") | |
mastodon = Mastodon( | |
access_token = 'blogapp_usercred.secret', | |
api_base_url = INSTANCE_URL, | |
) | |
def get_counts() -> Tuple[int, int]: | |
def make_needed_files(): | |
if not os.path.exists("pages/000001"): | |
os.makedirs("pages/000001") | |
if not os.path.exists("posts.json"): | |
with open("posts.json", "w") as f: | |
json.dump([], f) | |
if not os.path.exists("last_count.txt"): | |
with open("last_count.txt", "w") as f: | |
f.write("0") | |
make_needed_files() | |
with open("last_count.txt", "r") as f: | |
last_count = int(f.readline().strip()) | |
new_count = mastodon.me()["statuses_count"] | |
return last_count, new_count | |
last_count, new_count = get_counts() | |
with open("last_count.txt", "w") as f: | |
f.write(str(new_count)) | |
if last_count == new_count: | |
print("No new statuses") | |
exit(0) | |
def add_post(new_post: dict) -> None: | |
post = { | |
"created_at": str(new_post["created_at"]), | |
"url": new_post["url"] if new_post["visibility"] in ("public", "unlisted") else "", | |
"content": new_post["content"], | |
"images": [(attachment["url"], attachment["description"]) for attachment in new_post["media_attachments"] if attachment["type"] == "image"], | |
} | |
data.insert(0, post) | |
new_posts = mastodon.account_statuses( | |
USER_ID, | |
limit=min(10 if __DEBUG__ else 5, new_count - last_count) | |
) | |
with open("posts.json", "r") as f: | |
data = json.load(f) | |
for new_post in new_posts: | |
if __VERBOSE__: | |
print(f"Considering the following post:\n{new_post['content']}") | |
if __DEBUG__: | |
add_post(new_post) | |
continue | |
if new_post["reblogged"] or not new_post["tags"]: | |
continue | |
for tag in new_post["tags"]: | |
if tag["name"] == "blog": | |
add_post(new_post) | |
# data can now be 5-10 in length | |
assert len(data) < 11 | |
if len(data) > 5: | |
data, next_page = paginate(data) | |
make_next_page(next_page) | |
# save newest 5 posts to posts.json | |
with open("posts.json", "w") as f: | |
json.dump(data, f) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment