|
import os |
|
import json |
|
import time |
|
import threading |
|
import json |
|
from telegram import Bot, Message, InputMediaPhoto, InputMediaVideo |
|
from typing import Any, List, Tuple |
|
from typing.re import Pattern |
|
import pickledb |
|
import html |
|
import re |
|
import websocket |
|
|
|
# Telegram channel |
|
channel = "@username" |
|
|
|
# Mastodon username and instance id |
|
username = "username_on_mastodon" |
|
instance = "mastodon.example.com" |
|
|
|
# Mastodon access token |
|
access_token = "" |
|
|
|
db = pickledb.load("mastodon_mapping.db", True) |
|
|
|
# Bot token |
|
bot = Bot("telegram bot token goes here") |
|
|
|
# Test telegram connectivity |
|
bot.get_me() |
|
|
|
match_tags: Pattern = re.compile(r"<[^>]+?>") |
|
|
|
|
|
def on_message(ws: websocket.WebSocketApp, source: str) -> None: |
|
if not isinstance(source, str): |
|
return |
|
source_json = json.loads(source) |
|
|
|
if source_json.get("event", None) == "delete": |
|
status_id: str = source_json["payload"] |
|
if db.exists(status_id): |
|
tg_msg_ids = db.lgetall(status_id) |
|
for i in tg_msg_ids: |
|
bot.delete_message(channel, i) |
|
db.rem(status_id) |
|
else: |
|
print(status_id, "is not found") |
|
return |
|
|
|
if not source_json.get("event", None) == "update": |
|
return |
|
data = json.loads(source_json["payload"]) |
|
if (get(data, "account", "acct", default="").lower() != username.lower() and |
|
not data['favourited'] and not data['reblogged'] and not data['bookmarked'] |
|
): |
|
# print("ignored", data) |
|
# ignore toots that has nothing to do with you. |
|
return |
|
|
|
indicator = "" |
|
# No quoted boost in Mastodon |
|
# if "quoted_status" in toot: |
|
# return end_toot(toot, "🔁", "quoted") |
|
if data["visibility"] in ("unlisted", "private", "direct"): |
|
return |
|
if data.get("reblog", None) is not None: |
|
print("found boosted") |
|
return send_toot(data, "🔁", "boosted") |
|
return send_toot(data, "", "original") |
|
|
|
|
|
elephant = "🐘" |
|
|
|
|
|
def send_toot(toot: dict, indicator: str, ttype: str): |
|
""" |
|
Logic: |
|
|
|
emoji indicators in front |
|
expand URL |
|
|
|
original |
|
plain: send |
|
@, no link: send w/o preview |
|
@, w/link: invisible char in front for first link |
|
media: send media |
|
like/boost |
|
emoji only (❤️/🔁 = original,n 🐦= my toot) |
|
boost with text |
|
emoji links (🔁 = original, 🐦 = my toot) |
|
my text (link preview enabled) |
|
with media: |
|
attach media |
|
""" |
|
try: |
|
media = get(toot, "media_attachments", default=[]) |
|
reply_to = None |
|
reply_to_status = toot.get("in_reply_to_id", None) |
|
if reply_to_status and db.exists(reply_to_status): |
|
tg_msg_ids = db.lgetall(reply_to_status) |
|
reply_to = tg_msg_ids[0] |
|
except Exception as e: |
|
print(e) |
|
raise e |
|
if ttype == "liked": |
|
content = f'<a href="{get_toot_url(toot)}">{indicator}{elephant}</a>' |
|
bot.send_message(channel, content, parse_mode="HTML", |
|
disable_web_page_preview=False, reply_to_message_id=reply_to) |
|
# No update on un-like, hence no need to enrol |
|
elif ttype == "boosted": |
|
try: |
|
print("ttype is boosted") |
|
content = ( |
|
f'<a href="{get_toot_url(toot["reblog"])}">{indicator}</a>' |
|
f'<a href="{get_toot_url(toot)}">{elephant}</a>' |
|
) |
|
print(content) |
|
msg = bot.send_message(channel, content, parse_mode="HTML", |
|
disable_web_page_preview=False, reply_to_message_id=reply_to) |
|
print(msg) |
|
enrol_messages(toot['id'], [msg]) |
|
print("enrolled") |
|
except Exception as e: |
|
print(e) |
|
raise e |
|
else: |
|
suffix = f'<a href="{get_toot_url(toot)}">{elephant}</a>' |
|
# if ttype == "quoted": |
|
# suffix = f'<a href="{get_toot_url(toot['reblog'])}">{indicator}</a>' + suffix |
|
text = toot["content"] |
|
expanded = "rel=\"nofollow noopener noreferrer\"" in text |
|
text = strip_tags(text) |
|
text = html.escape(text) |
|
content = f"{text}\n\n{suffix}" |
|
|
|
try: |
|
if media: |
|
if len(media) == 1: |
|
if media[0]["type"] == "image": |
|
msg = bot.send_photo( |
|
channel, media[0]['url'], caption=content, parse_mode="HTML", reply_to_message_id=reply_to) |
|
elif media[0]["type"] == "video": |
|
msg = bot.send_video(channel, media[0]['url'], caption=content, |
|
parse_mode="HTML", reply_to_message_id=reply_to) |
|
msgs = [msg] |
|
else: |
|
input_group = [] |
|
for i in media: |
|
if media[0]["type"] == "image": |
|
input_group.append(InputMediaPhoto(i['url'])) |
|
elif media[0]["type"] == "video": |
|
input_group.append(InputMediaVideo(i['url'])) |
|
input_group[0].caption = content |
|
input_group[0].parse_mode = "HTML" |
|
msgs = bot.send_media_group(channel, input_group) |
|
else: |
|
# print("expanded", expanded, "disable web preview", not expanded) |
|
msg = bot.send_message(channel, content, parse_mode="HTML", |
|
disable_web_page_preview=not expanded and ttype != "quoted", |
|
reply_to_message_id=reply_to) |
|
msgs = [msg] |
|
enrol_messages(toot['id'], msgs) |
|
except Exception as e: |
|
print(e) |
|
raise e |
|
|
|
|
|
def strip_tags(text: str) -> str: |
|
text = text.replace("</p><p>", "\n\n") |
|
text = text.replace("<br />", "\n") |
|
text = match_tags.sub("", text) |
|
text = html.unescape(text) |
|
return text |
|
|
|
|
|
def enrol_messages(toot_id: str, messages: List[Message]): |
|
if not db.exists(toot_id): |
|
db.lcreate(toot_id) |
|
db.lextend(toot_id, [str(i.message_id) for i in messages]) |
|
|
|
|
|
def get_toot_url(toot: dict) -> str: |
|
return toot.get("url", toot['uri']) |
|
|
|
|
|
def get(obj, *args, default=None) -> Any: |
|
"""Get value via path from object.""" |
|
for i in args: |
|
try: |
|
obj = obj[i] |
|
except: |
|
return default |
|
return obj |
|
|
|
|
|
def on_open(ws: websocket.WebSocketApp): |
|
print("Web socket opened") |
|
|
|
|
|
def on_error(ws: websocket.WebSocketApp, error): |
|
print("ERROR", error) |
|
|
|
|
|
def on_close(ws: websocket.WebSocketApp): |
|
print("Connection closed") |
|
exit(0) |
|
|
|
|
|
if __name__ == "__main__": |
|
# websocket.enableTrace(True) |
|
ws = websocket.WebSocketApp( |
|
f"wss://{instance}/api/v1/streaming?access_token={access_token}&stream=user", |
|
on_message=on_message, |
|
on_error=on_error, |
|
on_close=on_close |
|
) |
|
ws.on_open = on_open |
|
ws.run_forever() |
@gadflysu
Thanks for the comment. I’ve fixed the links and the dependency requirements.
Have you tried to listen to the websocket using a GUI client to see if your toots come through?