Skip to content

Instantly share code, notes, and snippets.

@blueset
Last active May 26, 2021 20:00
Show Gist options
  • Save blueset/390e146c322d277b61f99066b5ca4ecf to your computer and use it in GitHub Desktop.
Save blueset/390e146c322d277b61f99066b5ca4ecf to your computer and use it in GitHub Desktop.
Plltxe: Sync Tweets (and Toots) to a Telegram Channel Using Account Activity API (and WebSocket)

Plltxe: Sync Tweets (and Toots) to a Telegram Channel Using Account Activity API (and WebSocket)

plltxe v. say.

This bot is running on https://t.me/plltxe with tweets from https://twitter.com/blueset, and toots from https://mastodon.social/web/accounts/1150683.

For details, see https://blog.1a23.com/2020/03/21/sync-tweets-to-a-telegram-channel-using-account-activity-api/ .

Dependencies

Tweets

pip3 install twitivity pickledb python-telegram-bot

Toots

pip3 install websockets-client pickledb python-telegram-bot
import os
import json
import time
import threading
import json
from telegram import Bot, Message, InputMediaPhoto, InputMediaVideo
from typing import Any, List, Tuple
from typing.re import Pattern
import pickledb
import html
import re
import websocket
# Telegram channel
channel = "@username"
# Mastodon username and instance id
username = "username_on_mastodon"
instance = "mastodon.example.com"
# Mastodon access token
access_token = ""
db = pickledb.load("mastodon_mapping.db", True)
# Bot token
bot = Bot("telegram bot token goes here")
# Test telegram connectivity
bot.get_me()
match_tags: Pattern = re.compile(r"<[^>]+?>")
def on_message(ws: websocket.WebSocketApp, source: str) -> None:
if not isinstance(source, str):
return
source_json = json.loads(source)
if source_json.get("event", None) == "delete":
status_id: str = source_json["payload"]
if db.exists(status_id):
tg_msg_ids = db.lgetall(status_id)
for i in tg_msg_ids:
bot.delete_message(channel, i)
db.rem(status_id)
else:
print(status_id, "is not found")
return
if not source_json.get("event", None) == "update":
return
data = json.loads(source_json["payload"])
if (get(data, "account", "acct", default="").lower() != username.lower() and
not data['favourited'] and not data['reblogged'] and not data['bookmarked']
):
# print("ignored", data)
# ignore toots that has nothing to do with you.
return
indicator = ""
# No quoted boost in Mastodon
# if "quoted_status" in toot:
# return end_toot(toot, "🔁", "quoted")
if data["visibility"] in ("unlisted", "private", "direct"):
return
if data.get("reblog", None) is not None:
print("found boosted")
return send_toot(data, "🔁", "boosted")
return send_toot(data, "", "original")
elephant = "🐘"
def send_toot(toot: dict, indicator: str, ttype: str):
"""
Logic:
emoji indicators in front
expand URL
original
plain: send
@, no link: send w/o preview
@, w/link: invisible char in front for first link
media: send media
like/boost
emoji only (❤️/🔁 = original,n 🐦= my toot)
boost with text
emoji links (🔁 = original, 🐦 = my toot)
my text (link preview enabled)
with media:
attach media
"""
try:
media = get(toot, "media_attachments", default=[])
reply_to = None
reply_to_status = toot.get("in_reply_to_id", None)
if reply_to_status and db.exists(reply_to_status):
tg_msg_ids = db.lgetall(reply_to_status)
reply_to = tg_msg_ids[0]
except Exception as e:
print(e)
raise e
if ttype == "liked":
content = f'<a href="{get_toot_url(toot)}">{indicator}{elephant}</a>'
bot.send_message(channel, content, parse_mode="HTML",
disable_web_page_preview=False, reply_to_message_id=reply_to)
# No update on un-like, hence no need to enrol
elif ttype == "boosted":
try:
print("ttype is boosted")
content = (
f'<a href="{get_toot_url(toot["reblog"])}">{indicator}</a>'
f'<a href="{get_toot_url(toot)}">{elephant}</a>'
)
print(content)
msg = bot.send_message(channel, content, parse_mode="HTML",
disable_web_page_preview=False, reply_to_message_id=reply_to)
print(msg)
enrol_messages(toot['id'], [msg])
print("enrolled")
except Exception as e:
print(e)
raise e
else:
suffix = f'<a href="{get_toot_url(toot)}">{elephant}</a>'
# if ttype == "quoted":
# suffix = f'<a href="{get_toot_url(toot['reblog'])}">{indicator}</a>' + suffix
text = toot["content"]
expanded = "rel=\"nofollow noopener noreferrer\"" in text
text = strip_tags(text)
text = html.escape(text)
content = f"{text}\n\n{suffix}"
try:
if media:
if len(media) == 1:
if media[0]["type"] == "image":
msg = bot.send_photo(
channel, media[0]['url'], caption=content, parse_mode="HTML", reply_to_message_id=reply_to)
elif media[0]["type"] == "video":
msg = bot.send_video(channel, media[0]['url'], caption=content,
parse_mode="HTML", reply_to_message_id=reply_to)
msgs = [msg]
else:
input_group = []
for i in media:
if media[0]["type"] == "image":
input_group.append(InputMediaPhoto(i['url']))
elif media[0]["type"] == "video":
input_group.append(InputMediaVideo(i['url']))
input_group[0].caption = content
input_group[0].parse_mode = "HTML"
msgs = bot.send_media_group(channel, input_group)
else:
# print("expanded", expanded, "disable web preview", not expanded)
msg = bot.send_message(channel, content, parse_mode="HTML",
disable_web_page_preview=not expanded and ttype != "quoted",
reply_to_message_id=reply_to)
msgs = [msg]
enrol_messages(toot['id'], msgs)
except Exception as e:
print(e)
raise e
def strip_tags(text: str) -> str:
text = text.replace("</p><p>", "\n\n")
text = text.replace("<br />", "\n")
text = match_tags.sub("", text)
text = html.unescape(text)
return text
def enrol_messages(toot_id: str, messages: List[Message]):
if not db.exists(toot_id):
db.lcreate(toot_id)
db.lextend(toot_id, [str(i.message_id) for i in messages])
def get_toot_url(toot: dict) -> str:
return toot.get("url", toot['uri'])
def get(obj, *args, default=None) -> Any:
"""Get value via path from object."""
for i in args:
try:
obj = obj[i]
except:
return default
return obj
def on_open(ws: websocket.WebSocketApp):
print("Web socket opened")
def on_error(ws: websocket.WebSocketApp, error):
print("ERROR", error)
def on_close(ws: websocket.WebSocketApp):
print("Connection closed")
exit(0)
if __name__ == "__main__":
# websocket.enableTrace(True)
ws = websocket.WebSocketApp(
f"wss://{instance}/api/v1/streaming?access_token={access_token}&stream=user",
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.on_open = on_open
ws.run_forever()
import os
import json
import time
import threading
from telegram import Bot, Message, InputMediaPhoto
from typing import Any, List, Tuple
import pickledb
import html
# Define environment variables before importing twitivity
os.environ["consumer_key"] = ""
os.environ["consumer_secret"] = ""
os.environ["access_token"] = ""
os.environ["access_token_secret"] = ""
os.environ["env_name"] = "env_name"
from twitivity import Event, Activity
url = "https://example.com/callback"
def register_webhook():
time.sleep(1)
activity = Activity()
# Get a list of existing hooks
existing = activity.api(
method="GET", endpoint=f"all/{os.environ['env_name']}/webhooks.json"
).json()
print("existing", existing)
# Deregister existing hooks
for i in existing:
wid = i["id"]
resp = activity.api(
method="DELETE",
endpoint=f"all/{os.environ['env_name']}/webhooks/{wid}.json",
)
print("delete", wid, resp, resp.content)
# Register new hook
print("register webhook")
print(activity.register_webhook(callback_url=url).json())
print("subscribe")
print(activity.subscribe().content)
if __name__ == "__main__":
register_webhook()
import os
import json
import time
import threading
from telegram import Bot, Message, InputMediaPhoto
from typing import Any, List, Tuple
import pickledb
import html
# Define environment variables before importing twitivity
os.environ["consumer_key"] = ""
os.environ["consumer_secret"] = ""
os.environ["access_token"] = ""
os.environ["access_token_secret"] = ""
os.environ["env_name"] = "env_name"
from twitivity import Event, Activity
url = "https://example.com/callback"
channel = "@username"
db = pickledb.load("mapping.db", True)
bot = Bot("telegram bot token goes here")
# Test telegram connectivity
bot.get_me()
class StreamEvent(Event):
CALLBACK_URL: str = url
def on_data(self, data: dict) -> None:
for_user_id = data["for_user_id"]
if "tweet_delete_events" in data:
for event in data["tweet_delete_events"]:
if event["status"]["user_id"] != for_user_id:
continue
status_id: str = event["status"]["id"]
if db.exists(status_id):
tg_msg_ids = db.lgetall(status_id)
for i in tg_msg_ids:
bot.delete_message(channel, i)
db.rem(status_id)
else:
print(status_id, "is not found")
return
if "favorite_events" in data:
for event in data["favorite_events"]:
if event["user"]["id_str"] != for_user_id:
continue
send_tweet(event["favorited_status"], "❤️", "liked")
elif "tweet_create_events" in data:
for tweet in data["tweet_create_events"]:
if tweet["user"]["id_str"] != for_user_id:
continue
indicator = ""
include_text = True
if "quoted_status" in tweet:
send_tweet(tweet, "🔁", "quoted")
elif "retweeted_status" in tweet:
send_tweet(tweet, "🔁", "retweeted")
else:
send_tweet(tweet, indicator, "original")
else:
return
bird = "🐦"
def send_tweet(tweet: dict, indicator: str, ttype: str):
"""
Logic:
emoji indicators in front
expand URL
original
plain: send
@, no link: send w/o preview
@, w/link: invisible char in front for first link
media: send media
like/retweet
emoji only (❤️/🔁 = original,n 🐦= my tweet)
retweet with text
emoji links (🔁 = original, 🐦 = my tweet)
my text (link preview enabled)
with media:
attach media
"""
media = get(tweet, "extended_entities", "media")
video = get(media, 0, "video_info")
reply_to = None
reply_to_status = tweet.get("in_reply_to_status_id_str", None)
if reply_to_status and db.exists(reply_to_status):
tg_msg_ids = db.lgetall(reply_to_status)
reply_to = tg_msg_ids[0]
if ttype == "liked":
content = f'<a href="{get_tweet_url(tweet)}">{indicator}{bird}</a>'
bot.send_message(channel, content, parse_mode="HTML", disable_web_page_preview=False, reply_to_message_id=reply_to)
# No update on un-like, hence no need to enrol
elif ttype == "retweeted":
content = (
f'<a href="{get_tweet_url(tweet["retweeted_status"])}">{indicator}</a>'
f'<a href="{get_tweet_url(tweet)}">{bird}</a>'
)
msg = bot.send_message(channel, content, parse_mode="HTML", disable_web_page_preview=False, reply_to_message_id=reply_to)
enrol_messages(tweet['id_str'], [msg])
else:
suffix = f'<a href="{get_tweet_url(tweet)}">{bird}</a>'
if ttype == "quoted":
suffix = f'<a href="{get_tweet_url(tweet["quoted_status"])}">{indicator}</a>' + suffix
text, expanded = expand_url(tweet)
text = html.escape(text)
content = f"{text}\n\n{suffix}"
video_url = None
if video:
for i in video['variants']:
if i['content_type'] == "video/mp4":
video_url = i['url']
break
if video_url:
msg = bot.send_video(channel, video_url, caption=content, parse_mode="HTML", reply_to_message_id=reply_to)
msgs = [msg]
elif media:
if len(media) == 1:
msg = bot.send_photo(channel, media[0]['media_url_https'], caption=content, parse_mode="HTML", reply_to_message_id=reply_to)
msgs = [msg]
else:
input_group = [InputMediaPhoto(i['media_url_https']) for i in media]
input_group[0].caption = content
input_group[0].parse_mode = "HTML"
msgs = bot.send_media_group(channel, input_group)
else:
print("expanded", expanded, "disable web preview", not expanded)
msg = bot.send_message(channel, content, parse_mode="HTML",
disable_web_page_preview=not expanded and ttype != "quoted",
reply_to_message_id=reply_to)
msgs = [msg]
enrol_messages(tweet['id_str'], msgs)
def expand_url(tweet: dict) -> Tuple[str, bool]:
"""Returns expanded url, and if changes are made."""
text = get(tweet, "extended_tweet", "text", default=tweet["text"])
tweet = get(tweet, "extended_tweet", default=tweet)
urls: List[dict] = get(tweet, "entities", "urls")
edited = False
if urls:
for i in urls:
text = text.replace(i['url'], i['expanded_url'])
edited = True
return text, edited
def enrol_messages(tweet_id: str, messages: List[Message]):
if not db.exists(tweet_id):
db.lcreate(tweet_id)
db.lextend(tweet_id, [str(i.message_id) for i in messages])
def get_tweet_url(tweet: dict) -> str:
return f"https://twitter.com/{get(tweet, 'user', 'screen_name')}/status/{get(tweet, 'id_str')}"
def get(obj, *args, default=None) -> Any:
"""Get value via path from object."""
for i in args:
try:
obj = obj[i]
except:
return default
return obj
stream_events = StreamEvent()
if __name__ == "__main__":
stream_events.listen()
@blueset
Copy link
Author

blueset commented Jul 8, 2020

@gadflysu
Thanks for the comment. I’ve fixed the links and the dependency requirements.

Have you tried to listen to the websocket using a GUI client to see if your toots come through?

@ibnunowshad
Copy link

I am quite new to this, I have my Telegram Channel and I need to push my toots to that Telegram Channel. I have my Telegram Bot as well.

I had created access token in Mastodon, so this .py will listen always in a server where I'm running the script? I don't know Python scripting, hence trying to understand the codes.

if __name__ == "__main__":
            # websocket.enableTrace(True)
            ws = websocket.WebSocketApp(
                f"wss://{instance}/api/v1/streaming?access_token={access_token}&amp;stream=user",
                on_message=on_message,
                on_error=on_error,
                on_close=on_close
            )
            ws.on_open = on_open
            <span class="pl-s1">ws</span>.<span class="pl-en">run_forever</span>()```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment