Last active
June 1, 2024 17:22
-
-
Save zhaoweizhong/72db7214895d9737275429d8dc2a0f61 to your computer and use it in GitHub Desktop.
Bot for sending formatted Twitter/Pixiv images to Telegram channel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import requests | |
import json | |
import logging | |
from telegram import InputMediaVideo, Update, InputMediaPhoto | |
from telegram.ext import ( | |
Application, | |
CommandHandler, | |
ContextTypes, | |
MessageHandler, | |
filters, | |
) | |
from telegram.constants import ParseMode | |
from pixivpy3 import AppPixivAPI | |
TELEGRAM_BOT_TOKEN = os.environ["TELEGRAM_BOT_TOKEN"] | |
TELEGRAM_CHANNEL_ID = os.environ["TELEGRAM_CHANNEL_ID"] | |
TELEGRAM_USER_ID = int(os.environ["TELEGRAM_USER_ID"]) | |
PIXIV_REFRESH_TOKEN = os.environ["PIXIV_REFRESH_TOKEN"] | |
bot = Application.builder().token(TELEGRAM_BOT_TOKEN).build() | |
def escape(text): | |
chars_to_escape = "[]-_*\\[]()~`>#+=|{}.!" | |
return "".join("\\" + char if char in chars_to_escape else char for char in text) | |
async def authorize(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
chat_id = update.effective_user.id | |
if chat_id != TELEGRAM_USER_ID: | |
await update.message.reply_text("You are not authorized to use this bot.") | |
logging.warning(f"Unauthorized user {chat_id} tried to use this bot.") | |
return False | |
else: | |
logging.info(f"Authorized user {chat_id} used this bot.") | |
return True | |
async def handle_pixiv(id, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
logging.info(f"Fetching pixiv artwork {id}...") | |
api = AppPixivAPI() | |
api.auth(refresh_token=PIXIV_REFRESH_TOKEN) | |
json_result = api.illust_detail(id) | |
illust = json_result.illust | |
logging.info(f"Got pixiv artwork {id}: {illust}") | |
if illust is not None: | |
illust_url = escape(f"https://www.pixiv.net/artworks/{illust.id}") | |
image_url = illust.image_urls.large | |
user_name = escape(illust.user.name) | |
user_url = escape(f"https://www.pixiv.net/users/{illust.user.id}") | |
title = escape(illust.title) | |
caption = escape(illust.caption) | |
tags = escape(" ".join([f"#{tag.name}" for tag in illust.tags])) | |
page_count = illust.page_count | |
result = None | |
if page_count == 1: | |
result = await update.get_bot().send_photo( | |
chat_id=TELEGRAM_CHANNEL_ID, | |
photo=image_url, | |
caption=f"*[{user_name}]({user_url})*:\n\n*{title}*\n{caption}\n\n{tags}\n\n{illust_url}", | |
parse_mode=ParseMode.MARKDOWN_V2, | |
read_timeout=60, | |
write_timeout=60, | |
) | |
else: | |
results = await update.get_bot().send_media_group( | |
chat_id=TELEGRAM_CHANNEL_ID, | |
media=[InputMediaPhoto(media=re.sub(r"_p\d+_master", "_p{}_master".format(i), image_url)) for i in range(min(page_count, 10))], | |
caption=f"*[{user_name}]({user_url})*:\n\n*{title}*\n{caption}\n\n{tags}\n\n{illust_url}", | |
parse_mode=ParseMode.MARKDOWN_V2, | |
read_timeout=240, | |
write_timeout=240, | |
) | |
result = results[0] | |
if result is not None: | |
message_url = escape(f"https://t.me/c/{TELEGRAM_CHANNEL_ID[4:]}/{result.message_id}") | |
await update.message.reply_text( | |
f"Pixiv \([{illust.id}]({illust_url})\) message sent: [{result.message_id}]({message_url})", parse_mode=ParseMode.MARKDOWN_V2, disable_web_page_preview=True | |
) | |
await update.message.delete() | |
logging.info(f"Sent pixiv message {result.message_id} for illust {illust.id}.") | |
else: | |
await update.message.reply_text("Failed to send pixiv message.") | |
logging.warning(f"Failed to send pixiv message for illust {illust.id}.") | |
else: | |
await update.message.reply_text("Failed to fetch pixiv artwork.") | |
logging.warning(f"Failed to fetch pixiv artwork {id}.") | |
async def handle_twitter(id, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
logging.info(f"Fetching tweet {id}...") | |
url = "https://api.fxtwitter.com/status/" + id | |
response = json.loads(requests.get(url).text) | |
logging.info(f"Got tweet {id}: {response}") | |
if response["code"] == 200: | |
tweet_url = escape(response["tweet"]["url"]) | |
tweet_text = escape(response["tweet"]["text"]) | |
medias = response["tweet"]["media"]["all"] | |
author_name = escape(response["tweet"]["author"]["name"]) | |
author_url = escape(response["tweet"]["author"]["url"]) | |
result = None | |
if len(medias) == 1: | |
if medias[0]["type"] == "photo": | |
file = requests.get(medias[0]["url"]) | |
result = await update.get_bot().send_photo( | |
chat_id=TELEGRAM_CHANNEL_ID, | |
photo=file.content, | |
caption=f"*[{author_name}]({author_url})*:\n\n{tweet_text}\n\n{tweet_url}", | |
parse_mode=ParseMode.MARKDOWN_V2, | |
read_timeout=60, | |
write_timeout=60, | |
) | |
elif medias[0]["type"] == "video" or medias[0]["type"] == "gif": | |
file = requests.get(medias[0]["url"]) | |
result = await update.get_bot().send_video( | |
chat_id=TELEGRAM_CHANNEL_ID, | |
video=file.content, | |
width=medias[0]["width"], | |
height=medias[0]["height"], | |
caption=f"*[{author_name}]({author_url})*:\n\n{tweet_text}\n\n{tweet_url}", | |
parse_mode=ParseMode.MARKDOWN_V2, | |
read_timeout=240, | |
write_timeout=240, | |
) | |
else: | |
media_processed = [] | |
for media in medias: | |
if media["type"] == "photo": | |
file = requests.get(media["url"]) | |
media_processed.append(InputMediaPhoto(media=file.content)) | |
elif media["type"] == "video" or media["type"] == "gif": | |
file = requests.get(media["url"]) | |
media_processed.append(InputMediaVideo(media=file.content, width=media["width"], height=media["height"])) | |
results = await update.get_bot().send_media_group( | |
chat_id=TELEGRAM_CHANNEL_ID, | |
media=media_processed, | |
caption=f"*[{author_name}]({author_url})*:\n\n{tweet_text}\n\n{tweet_url}", | |
parse_mode=ParseMode.MARKDOWN_V2, | |
read_timeout=240, | |
write_timeout=240, | |
) | |
result = results[0] | |
if result is not None: | |
message_url = escape(f"https://t.me/c/{TELEGRAM_CHANNEL_ID[4:]}/{result.message_id}") | |
await update.message.reply_text( | |
f"Twitter \([src]({tweet_url})\) message sent: [{result.message_id}]({message_url})", parse_mode=ParseMode.MARKDOWN_V2, disable_web_page_preview=True | |
) | |
await update.message.delete() | |
logging.info(f"Sent Twitter message {result.message_id} for tweet {id}.") | |
else: | |
await update.message.reply_text("Failed to send Twitter message.") | |
logging.warning(f"Failed to send Twitter message for tweet {id}.") | |
else: | |
await update.message.reply_text("Failed to fetch tweet.") | |
logging.warning(f"Failed to fetch tweet {id}.") | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
if not await authorize(update, context): | |
return | |
await update.message.reply_text("Hello, Mira.") | |
async def handle(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
if not await authorize(update, context): | |
return | |
msg = update.message.text | |
match_pixiv = re.match(r"^https:\/\/www.pixiv.net\/artworks\/(\d+)", msg) | |
if match_pixiv: | |
await handle_pixiv(match_pixiv.group(1), update, context) | |
else: | |
match_twitter = re.match( | |
r"^https:\/\/(?:twitter|mobile\.twitter|x|mobile\.x|fxtwitter|vxtwitter|fixupx).com\/\w+\/status\/(\d+)", | |
msg, | |
) | |
if match_twitter: | |
await handle_twitter(match_twitter.group(1), update, context) | |
else: | |
await update.message.reply_text("Unknown command.") | |
def main(): | |
logging.basicConfig( | |
level=logging.INFO, | |
) | |
logging.info("Starting bot...") | |
bot.add_handler(CommandHandler("start", start)) | |
bot.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle)) | |
bot.run_polling(allowed_updates=Update.ALL_TYPES) | |
logging.info("Bot started.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment