Skip to content

Instantly share code, notes, and snippets.

@zodman
Created February 19, 2026 02:39
Show Gist options
  • Select an option

  • Save zodman/498be9c88d059f2e24dd993cc0e87a63 to your computer and use it in GitHub Desktop.

Select an option

Save zodman/498be9c88d059f2e24dd993cc0e87a63 to your computer and use it in GitHub Desktop.
irgramd patch
diff --git a/irgramd b/irgramd
old mode 100644
new mode 100755
diff --git a/telegram.py b/telegram.py
index 39e02aa..8d727e3 100644
--- a/telegram.py
+++ b/telegram.py
@@ -15,57 +15,74 @@ import asyncio
import collections
import telethon
from telethon import types as tgty, utils as tgutils
-from telethon.tl.functions.messages import GetMessagesReactionsRequest, GetFullChatRequest
+from telethon.tl.functions.messages import (
+ GetMessagesReactionsRequest,
+ GetFullChatRequest,
+)
from telethon.tl.functions.channels import GetFullChannelRequest
+from telethon.errors import SessionPasswordNeededError
# Local modules
from include import CHAN_MAX_LENGTH, NICK_MAX_LENGTH
from irc import IRCUser
-from utils import sanitize_filename, add_filename, is_url_equiv, extract_url, get_human_size, get_human_duration
+from utils import (
+ sanitize_filename,
+ add_filename,
+ is_url_equiv,
+ extract_url,
+ get_human_size,
+ get_human_duration,
+)
from utils import get_highlighted, fix_braces, format_timestamp, pretty, current_date
import emoji2emoticon as e
# Test IP table
-TEST_IPS = { 1: '149.154.175.10',
- 2: '149.154.167.40',
- 3: '149.154.175.117',
- }
+TEST_IPS = {
+ 1: "149.154.175.10",
+ 2: "149.154.167.40",
+ 3: "149.154.175.117",
+}
+
+# Telegram
- # Telegram
class TelegramHandler(object):
def __init__(self, irc, settings):
- self.logger = logging.getLogger()
- self.config_dir = settings['config_dir']
- self.cache_dir = settings['cache_dir']
- self.download = settings['download_media']
- self.notice_size = settings['download_notice'] * 1048576
- self.media_dir = settings['media_dir']
- self.media_url = settings['media_url']
- self.upload_dir = settings['upload_dir']
- self.api_id = settings['api_id']
- self.api_hash = settings['api_hash']
- self.phone = settings['phone']
- self.test = settings['test']
- self.test_dc = settings['test_datacenter']
- self.test_ip = settings['test_host'] if settings['test_host'] else TEST_IPS[self.test_dc]
- self.test_port = settings['test_port']
- self.ask_code = settings['ask_code']
- self.quote_len = settings['quote_length']
- self.hist_fmt = settings['hist_timestamp_format']
- self.timezone = settings['timezone']
- self.geo_url = settings['geo_url']
- if not settings['emoji_ascii']:
+ self.logger = logging.getLogger()
+ self.config_dir = settings["config_dir"]
+ self.cache_dir = settings["cache_dir"]
+ self.download = settings["download_media"]
+ self.notice_size = settings["download_notice"] * 1048576
+ self.media_dir = settings["media_dir"]
+ self.media_url = settings["media_url"]
+ self.upload_dir = settings["upload_dir"]
+ self.api_id = settings["api_id"]
+ self.api_hash = settings["api_hash"]
+ self.phone = settings["phone"]
+ self.test = settings["test"]
+ self.test_dc = settings["test_datacenter"]
+ self.test_ip = (
+ settings["test_host"] if settings["test_host"] else TEST_IPS[self.test_dc]
+ )
+ self.test_port = settings["test_port"]
+ self.ask_code = settings["ask_code"]
+ self.quote_len = settings["quote_length"]
+ self.hist_fmt = settings["hist_timestamp_format"]
+ self.timezone = settings["timezone"]
+ self.geo_url = settings["geo_url"]
+ if not settings["emoji_ascii"]:
e.emo = {}
- self.media_cn = 0
- self.irc = irc
+ self.media_cn = 0
+ self.irc = irc
self.authorized = False
- self.id = None
+ self.id = None
self.tg_username = None
self.channels_date = {}
- self.mid = mesg_id('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!#$%+./_~')
+ self.mid = mesg_id(
+ "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!#$%+./_~"
+ )
self.webpending = {}
self.refwd_me = False
self.cache = collections.OrderedDict()
@@ -78,66 +95,84 @@ class TelegramHandler(object):
async def initialize_telegram(self):
# Setup media folder
- self.telegram_media_dir = os.path.expanduser(self.media_dir or os.path.join(self.cache_dir, 'media'))
+ self.telegram_media_dir = os.path.expanduser(
+ self.media_dir or os.path.join(self.cache_dir, "media")
+ )
if not os.path.exists(self.telegram_media_dir):
os.makedirs(self.telegram_media_dir)
# Setup upload folder
- self.telegram_upload_dir = os.path.expanduser(self.upload_dir or os.path.join(self.cache_dir, 'upload'))
+ self.telegram_upload_dir = os.path.expanduser(
+ self.upload_dir or os.path.join(self.cache_dir, "upload")
+ )
if not os.path.exists(self.telegram_upload_dir):
os.makedirs(self.telegram_upload_dir)
# Setup session folder
- self.telegram_session_dir = os.path.join(self.config_dir, 'session')
+ self.telegram_session_dir = os.path.join(self.config_dir, "session")
if not os.path.exists(self.telegram_session_dir):
os.makedirs(self.telegram_session_dir)
# Construct Telegram client
if self.test:
- self.telegram_client = telethon.TelegramClient(None, self.api_id, self.api_hash)
- self.telegram_client.session.set_dc(self.test_dc, self.test_ip, self.test_port)
+ self.telegram_client = telethon.TelegramClient(
+ None, self.api_id, self.api_hash
+ )
+ self.telegram_client.session.set_dc(
+ self.test_dc, self.test_ip, self.test_port
+ )
else:
- telegram_session = os.path.join(self.telegram_session_dir, 'telegram')
- self.telegram_client = telethon.TelegramClient(telegram_session, self.api_id, self.api_hash)
+ telegram_session = os.path.join(self.telegram_session_dir, "telegram")
+ self.telegram_client = telethon.TelegramClient(
+ telegram_session, self.api_id, self.api_hash
+ )
# Initialize Telegram ID to IRC nick mapping
self.tid_to_iid = {}
# Register Telegram callbacks
callbacks = (
- (self.handle_telegram_message , telethon.events.NewMessage),
- (self.handle_raw , telethon.events.Raw),
+ (self.handle_telegram_message, telethon.events.NewMessage),
+ (self.handle_raw, telethon.events.Raw),
(self.handle_telegram_chat_action, telethon.events.ChatAction),
- (self.handle_telegram_deleted , telethon.events.MessageDeleted),
- (self.handle_telegram_edited , telethon.events.MessageEdited),
+ (self.handle_telegram_deleted, telethon.events.MessageDeleted),
+ (self.handle_telegram_edited, telethon.events.MessageEdited),
)
for handler, event in callbacks:
self.telegram_client.add_event_handler(handler, event)
# Start Telegram client
if self.test:
- await self.telegram_client.start(self.phone, code_callback=lambda: str(self.test_dc) * 5)
+ await self.telegram_client.start(
+ self.phone, code_callback=lambda: str(self.test_dc) * 5
+ )
else:
await self.telegram_client.connect()
while not await self.telegram_client.is_user_authorized():
- self.logger.info('Telegram account not authorized')
- await self.telegram_client.send_code_request(self.phone)
+ self.logger.info("Telegram account not authorized")
+ resp = await self.telegram_client.send_code_request(self.phone)
+ self.logger.info(f"{resp}")
self.auth_checked.set()
if not self.ask_code:
return
- self.logger.info('You must provide the Login code that Telegram will '
- 'sent you via SMS or another connected client')
- code = await aioconsole.ainput('Login code: ')
+ self.logger.info(
+ "You must provide the Login code that Telegram will "
+ "sent you via SMS or another connected client"
+ )
+ code = await aioconsole.ainput("Login code: ")
try:
await self.telegram_client.sign_in(code=code)
- except:
- pass
+ except SessionPasswordNeededError:
+ password = await aioconsole.ainput(
+ "Two-step verification enabled. Enter password: ",
+ )
+ await self.telegram_client.sign_in(password=password)
await self.continue_auth()
async def continue_auth(self):
- self.logger.info('Telegram account authorized')
+ self.logger.info("Telegram account authorized")
self.authorized = True
self.auth_checked.set()
await self.init_mapping()
@@ -161,7 +196,13 @@ class TelegramHandler(object):
tg_nick = self.get_telegram_nick(user)
tg_ni = tg_nick.lower()
if not user.is_self:
- irc_user = IRCUser(None, ('Telegram',''), tg_nick, user.id, self.get_telegram_display_name(user))
+ irc_user = IRCUser(
+ None,
+ ("Telegram", ""),
+ tg_nick,
+ user.id,
+ self.get_telegram_display_name(user),
+ )
self.irc.users[tg_ni] = irc_user
self.add_sorted_len_usernames(tg_ni)
self.tid_to_iid[user.id] = tg_nick
@@ -183,28 +224,30 @@ class TelegramHandler(object):
if not user.is_self:
self.irc.irc_channels[chan].add(user_nick)
# Add admin users as ops in irc
- if isinstance(user.participant, tgty.ChatParticipantAdmin) or \
- isinstance(user.participant, tgty.ChannelParticipantAdmin):
+ if isinstance(
+ user.participant, tgty.ChatParticipantAdmin
+ ) or isinstance(user.participant, tgty.ChannelParticipantAdmin):
self.irc.irc_channels_ops[chan].add(user_nick)
# Add creator users as founders in irc
- elif isinstance(user.participant, tgty.ChatParticipantCreator) or \
- isinstance(user.participant, tgty.ChannelParticipantCreator):
+ elif isinstance(
+ user.participant, tgty.ChatParticipantCreator
+ ) or isinstance(user.participant, tgty.ChannelParticipantCreator):
self.irc.irc_channels_founder[chan].add(user_nick)
except:
- self.logger.warning('Not possible to get participants of channel %s', channel)
+ self.logger.warning(
+ "Not possible to get participants of channel %s", channel
+ )
def get_telegram_nick(self, user):
- nick = (user.username
- or self.get_telegram_display_name(user)
- or str(user.id))
+ nick = user.username or self.get_telegram_display_name(user) or str(user.id)
nick = nick[:NICK_MAX_LENGTH]
while nick in self.irc.iid_to_tid:
- nick += '_'
+ nick += "_"
return nick
def get_telegram_display_name(self, user):
name = telethon.utils.get_display_name(user)
- name = name.replace(' ', '_')
+ name = name.replace(" ", "_")
return name
async def get_telegram_display_name_me(self):
@@ -212,21 +255,22 @@ class TelegramHandler(object):
return self.get_telegram_display_name(tg_user)
def get_telegram_channel(self, chat):
- chan = '#' + chat.title.replace(' ', '-').replace(',', '-')
+ chan = "#" + chat.title.replace(" ", "-").replace(",", "-")
while chan.lower() in self.irc.iid_to_tid:
- chan += '_'
+ chan += "_"
return chan
def get_irc_user_from_telegram(self, tid):
nick = self.tid_to_iid[tid]
- if nick == self.tg_username: return None
+ if nick == self.tg_username:
+ return None
return self.irc.users[nick.lower()]
def get_irc_name_from_telegram_id(self, tid):
if tid in self.tid_to_iid.keys():
name_in_irc = self.tid_to_iid[tid]
else:
- name_in_irc = '<Unknown>'
+ name_in_irc = "<Unknown>"
return name_in_irc
async def get_irc_name_from_telegram_forward(self, fwd, saved):
@@ -237,14 +281,14 @@ class TelegramHandler(object):
name = fwd.from_name
else:
peer_id, type = self.get_peer_id_and_type(from_id)
- if type == 'user':
+ if type == "user":
try:
user = self.get_irc_user_from_telegram(peer_id)
except:
name = str(peer_id)
else:
if user is None:
- name = '{}'
+ name = "{}"
self.refwd_me = True
else:
name = user.irc_nick
@@ -252,14 +296,14 @@ class TelegramHandler(object):
try:
name = await self.get_irc_channel_from_telegram_id(peer_id)
except:
- name = ''
+ name = ""
return name
async def get_irc_nick_from_telegram_id(self, tid, entity=None):
if tid not in self.tid_to_iid:
user = entity or await self.telegram_client.get_entity(tid)
nick = self.get_telegram_nick(user)
- self.tid_to_iid[tid] = nick
+ self.tid_to_iid[tid] = nick
self.irc.iid_to_tid[nick] = tid
return self.tid_to_iid[tid]
@@ -267,16 +311,16 @@ class TelegramHandler(object):
async def get_irc_channel_from_telegram_id(self, tid, entity=None):
rtid, type = tgutils.resolve_id(tid)
if rtid not in self.tid_to_iid:
- chat = entity or await self.telegram_client.get_entity(tid)
+ chat = entity or await self.telegram_client.get_entity(tid)
channel = self.get_telegram_channel(chat)
- self.tid_to_iid[rtid] = channel
+ self.tid_to_iid[rtid] = channel
self.irc.iid_to_tid[channel] = rtid
return self.tid_to_iid[rtid]
async def get_telegram_channel_participants(self, tid):
channel = self.tid_to_iid[tid]
- nicks = []
+ nicks = []
async for user in self.telegram_client.iter_participants(tid):
user_nick = await self.get_irc_nick_from_telegram_id(user.id, user)
@@ -290,16 +334,17 @@ class TelegramHandler(object):
return None
tid = self.get_tid(irc_nick, tid)
user = await self.telegram_client.get_entity(tid)
- if isinstance(user.status,tgty.UserStatusRecently) or \
- isinstance(user.status,tgty.UserStatusOnline):
+ if isinstance(user.status, tgty.UserStatusRecently) or isinstance(
+ user.status, tgty.UserStatusOnline
+ ):
idle = 0
- elif isinstance(user.status,tgty.UserStatusOffline):
+ elif isinstance(user.status, tgty.UserStatusOffline):
last = user.status.was_online
current = current_date()
idle = int((current - last).total_seconds())
- elif isinstance(user.status,tgty.UserStatusLastWeek):
+ elif isinstance(user.status, tgty.UserStatusLastWeek):
idle = 604800
- elif isinstance(user.status,tgty.UserStatusLastMonth):
+ elif isinstance(user.status, tgty.UserStatusLastMonth):
idle = 2678400
else:
idle = None
@@ -313,15 +358,15 @@ class TelegramHandler(object):
else:
entity = await self.telegram_client.get_entity(tid)
entity_cache[0] = entity
- if isinstance(entity, tgty.Channel):
+ if isinstance(entity, tgty.Channel):
full = await self.telegram_client(GetFullChannelRequest(channel=entity))
elif isinstance(entity, tgty.Chat):
full = await self.telegram_client(GetFullChatRequest(chat_id=tid))
else:
- return ''
- entity_type = self.get_entity_type(entity, format='long')
+ return ""
+ entity_type = self.get_entity_type(entity, format="long")
topic = full.full_chat.about
- sep = ': ' if topic else ''
+ sep = ": " if topic else ""
return entity_type + sep + topic
async def get_channel_creation(self, channel, entity_cache):
@@ -351,36 +396,36 @@ class TelegramHandler(object):
def get_entity_type(self, entity, format):
if isinstance(entity, tgty.User):
- short = long = 'User'
+ short = long = "User"
elif isinstance(entity, tgty.Chat):
- short = 'Chat'
- long = 'Chat/Basic Group'
+ short = "Chat"
+ long = "Chat/Basic Group"
elif isinstance(entity, tgty.Channel):
if entity.broadcast:
- short = 'Broad'
- long = 'Broadcast Channel'
+ short = "Broad"
+ long = "Broadcast Channel"
elif entity.megagroup:
- short = 'Mega'
- long = 'Super/Megagroup Channel'
+ short = "Mega"
+ long = "Super/Megagroup Channel"
elif entity.gigagroup:
- short = 'Giga'
- long = 'Broadcast Gigagroup Channel'
+ short = "Giga"
+ long = "Broadcast Gigagroup Channel"
- return short if format == 'short' else long
+ return short if format == "short" else long
def get_peer_id_and_type(self, peer):
if isinstance(peer, tgty.PeerChannel):
id = peer.channel_id
- type = 'chan'
+ type = "chan"
elif isinstance(peer, tgty.PeerChat):
id = peer.chat_id
- type = 'chan'
+ type = "chan"
elif isinstance(peer, tgty.PeerUser):
id = peer.user_id
- type = 'user'
+ type = "user"
else:
id = peer
- type = ''
+ type = ""
return id, type
async def is_bot(self, irc_nick, tid=None):
@@ -398,56 +443,61 @@ class TelegramHandler(object):
async def edition_case(self, msg):
def msg_edited(m):
- return m.id in self.cache and \
- ( m.message != self.cache[m.id]['text']
- or m.media != self.cache[m.id]['media']
- )
+ return m.id in self.cache and (
+ m.message != self.cache[m.id]["text"]
+ or m.media != self.cache[m.id]["media"]
+ )
+
async def get_reactions(m):
- react = await self.telegram_client(GetMessagesReactionsRequest(m.peer_id, id=[m.id]))
+ react = await self.telegram_client(
+ GetMessagesReactionsRequest(m.peer_id, id=[m.id])
+ )
updates = react.updates
- r = next((x for x in updates if type(x) is tgty.UpdateMessageReactions), None)
+ r = next(
+ (x for x in updates if type(x) is tgty.UpdateMessageReactions), None
+ )
return r.reactions.recent_reactions if r else None
react = None
if msg.reactions is None:
- case = 'edition'
+ case = "edition"
elif (reactions := await get_reactions(msg)) is None:
if msg_edited(msg):
- case = 'edition'
+ case = "edition"
else:
- case = 'react-del'
+ case = "react-del"
elif react := max(reactions, key=lambda y: y.date):
- case = 'react-add'
+ case = "react-add"
else:
if msg_edited(msg):
- case = 'edition'
+ case = "edition"
else:
- case = 'react-del'
+ case = "react-del"
react = None
return case, react
def to_cache(self, id, mid, message, proc_message, user, chan, media):
self.limit_cache(self.cache)
self.cache[id] = {
- 'mid': mid,
- 'text': message,
- 'rendered_text': proc_message,
- 'user': user,
- 'channel': chan,
- 'media': media,
- }
+ "mid": mid,
+ "text": message,
+ "rendered_text": proc_message,
+ "user": user,
+ "channel": chan,
+ "media": media,
+ }
def to_volatile_cache(self, prev_id, id, ev, user, chan, date):
if chan in prev_id:
prid = prev_id[chan] if chan else prev_id[user]
self.limit_cache(self.volatile_cache)
elem = {
- 'id': id,
- 'rendered_event': ev,
- 'user': user,
- 'channel': chan,
- 'date': date,
- }
+ "id": id,
+ "rendered_event": ev,
+ "user": user,
+ "channel": chan,
+ "date": date,
+ }
if prid not in self.volatile_cache:
self.volatile_cache[prid] = [elem]
else:
@@ -457,10 +507,11 @@ class TelegramHandler(object):
if len(cache) >= 10000:
cache.popitem(last=False)
- def replace_mentions(self, text, me_nick='', received=True):
+ def replace_mentions(self, text, me_nick="", received=True):
# For received replace @mention to ~mention~
# For sent replace mention: to @mention
rargs = {}
+
def repl_mentioned(text, me_nick, received, mark, repl_pref, repl_suff):
new_text = text
@@ -476,7 +527,7 @@ class TelegramHandler(object):
if received:
mention = mark + user
mention_case = mark + username
- else: # sent
+ else: # sent
mention = user + mark
mention_case = username + mark
replcmnt = repl_pref + username + repl_suff
@@ -487,21 +538,23 @@ class TelegramHandler(object):
new_text = new_text.replace(ment, replcmnt, 1)
# Next words (with space as separator)
- mention = ' ' + mention
- mention_case = ' ' + mention_case
- replcmnt = ' ' + replcmnt
- new_text = new_text.replace(mention, replcmnt).replace(mention_case, replcmnt)
+ mention = " " + mention
+ mention_case = " " + mention_case
+ replcmnt = " " + replcmnt
+ new_text = new_text.replace(mention, replcmnt).replace(
+ mention_case, replcmnt
+ )
return new_text
if received:
- mark = '@'
- rargs['repl_pref'] = '~'
- rargs['repl_suff'] = '~'
- else: # sent
- mark = ':'
- rargs['repl_pref'] = '@'
- rargs['repl_suff'] = ''
+ mark = "@"
+ rargs["repl_pref"] = "~"
+ rargs["repl_suff"] = "~"
+ else: # sent
+ mark = ":"
+ rargs["repl_pref"] = "@"
+ rargs["repl_suff"] = ""
if text.find(mark) != -1:
text_replaced = repl_mentioned(text, me_nick, received, mark, **rargs)
@@ -521,38 +574,40 @@ class TelegramHandler(object):
def format_reaction(self, msg, message_rendered, edition_case, reaction):
react_quote_len = self.quote_len * 2
if len(message_rendered) > react_quote_len:
- text_old = '{}...'.format(message_rendered[:react_quote_len])
+ text_old = "{}...".format(message_rendered[:react_quote_len])
text_old = fix_braces(text_old)
else:
text_old = message_rendered
- if edition_case == 'react-add':
+ if edition_case == "react-add":
user = self.get_irc_user_from_telegram(reaction.peer_id.user_id)
emoji = reaction.reaction.emoticon
- react_action = '+'
+ react_action = "+"
react_icon = e.emo[emoji] if emoji in e.emo else emoji
- elif edition_case == 'react-del':
+ elif edition_case == "react-del":
user = self.get_irc_user_from_telegram(msg.sender_id)
- react_action = '-'
- react_icon = ''
- return text_old, '{}{}'.format(react_action, react_icon), user
+ react_action = "-"
+ react_icon = ""
+ return text_old, "{}{}".format(react_action, react_icon), user
async def handle_telegram_edited(self, event):
- self.logger.debug('Handling Telegram Message Edited: %s', pretty(event))
+ self.logger.debug("Handling Telegram Message Edited: %s", pretty(event))
id = event.message.id
mid = self.mid.num_to_id_offset(event.message.peer_id, id)
- fmid = '[{}]'.format(mid)
+ fmid = "[{}]".format(mid)
message = self.filters(event.message.message)
- message_rendered = await self.render_text(event.message, mid, upd_to_webpend=None)
+ message_rendered = await self.render_text(
+ event.message, mid, upd_to_webpend=None
+ )
edition_case, reaction = await self.edition_case(event.message)
- if edition_case == 'edition':
- action = 'Edited'
+ if edition_case == "edition":
+ action = "Edited"
user = self.get_irc_user_from_telegram(event.sender_id)
if id in self.cache:
- t = self.filters(self.cache[id]['text'])
- rt = self.cache[id]['rendered_text']
+ t = self.filters(self.cache[id]["text"])
+ rt = self.cache[id]["rendered_text"]
ht, is_ht = get_highlighted(t, message)
else:
@@ -574,18 +629,24 @@ class TelegramHandler(object):
if self.last_reaction == reaction.date:
return
self.last_reaction = reaction.date
- action = 'React'
- text_old, edition_react, user = self.format_reaction(event.message, message_rendered, edition_case, reaction)
+ action = "React"
+ text_old, edition_react, user = self.format_reaction(
+ event.message, message_rendered, edition_case, reaction
+ )
- text = '|{} {}| {}'.format(action, text_old, edition_react)
+ text = "|{} {}| {}".format(action, text_old, edition_react)
chan = await self.relay_telegram_message(event, user, text)
- self.to_cache(id, mid, message, message_rendered, user, chan, event.message.media)
+ self.to_cache(
+ id, mid, message, message_rendered, user, chan, event.message.media
+ )
self.to_volatile_cache(self.prev_id, id, text, user, chan, current_date())
async def handle_next_reaction(self, event):
- self.logger.debug('Handling Telegram Next Reaction (2nd, 3rd, ...): %s', pretty(event))
+ self.logger.debug(
+ "Handling Telegram Next Reaction (2nd, 3rd, ...): %s", pretty(event)
+ )
reactions = event.reactions.recent_reactions
react = max(reactions, key=lambda y: y.date) if reactions else None
@@ -598,9 +659,11 @@ class TelegramHandler(object):
message = self.filters(msg.message)
message_rendered = await self.render_text(msg, mid, upd_to_webpend=None)
- text_old, edition_react, user = self.format_reaction(msg, message_rendered, edition_case='react-add', reaction=react)
+ text_old, edition_react, user = self.format_reaction(
+ msg, message_rendered, edition_case="react-add", reaction=react
+ )
- text = '|React {}| {}'.format(text_old, edition_react)
+ text = "|React {}| {}".format(text_old, edition_react)
chan = await self.relay_telegram_message(msg, user, text)
@@ -608,33 +671,43 @@ class TelegramHandler(object):
self.to_volatile_cache(self.prev_id, id, text, user, chan, current_date())
async def handle_telegram_deleted(self, event):
- self.logger.debug('Handling Telegram Message Deleted: %s', pretty(event))
+ self.logger.debug("Handling Telegram Message Deleted: %s", pretty(event))
for deleted_id in event.original_update.messages:
if deleted_id in self.cache:
- recovered_text = self.cache[deleted_id]['rendered_text']
- text = '|Deleted| {}'.format(recovered_text)
- user = self.cache[deleted_id]['user']
- chan = self.cache[deleted_id]['channel']
- await self.relay_telegram_message(message=None, user=user, text=text, channel=chan)
- self.to_volatile_cache(self.prev_id, deleted_id, text, user, chan, current_date())
+ recovered_text = self.cache[deleted_id]["rendered_text"]
+ text = "|Deleted| {}".format(recovered_text)
+ user = self.cache[deleted_id]["user"]
+ chan = self.cache[deleted_id]["channel"]
+ await self.relay_telegram_message(
+ message=None, user=user, text=text, channel=chan
+ )
+ self.to_volatile_cache(
+ self.prev_id, deleted_id, text, user, chan, current_date()
+ )
else:
- text = 'Message id {} deleted not in cache'.format(deleted_id)
+ text = "Message id {} deleted not in cache".format(deleted_id)
await self.relay_telegram_private_message(self.irc.service_user, text)
async def handle_raw(self, update):
- self.logger.debug('Handling Telegram Raw Event: %s', pretty(update))
+ self.logger.debug("Handling Telegram Raw Event: %s", pretty(update))
- if isinstance(update, tgty.UpdateWebPage) and isinstance(update.webpage, tgty.WebPage):
+ if isinstance(update, tgty.UpdateWebPage) and isinstance(
+ update.webpage, tgty.WebPage
+ ):
message = self.webpending.pop(update.webpage.id, None)
if message:
- await self.handle_telegram_message(event=None, message=message, upd_to_webpend=update.webpage)
+ await self.handle_telegram_message(
+ event=None, message=message, upd_to_webpend=update.webpage
+ )
elif isinstance(update, tgty.UpdateMessageReactions):
await self.handle_next_reaction(update)
- async def handle_telegram_message(self, event, message=None, upd_to_webpend=None, history=False):
- self.logger.debug('Handling Telegram Message: %s', pretty(event or message))
+ async def handle_telegram_message(
+ self, event, message=None, upd_to_webpend=None, history=False
+ ):
+ self.logger.debug("Handling Telegram Message: %s", pretty(event or message))
msg = event.message if event else message
@@ -667,11 +740,11 @@ class TelegramHandler(object):
elif message.forward:
refwd_text = await self.handle_telegram_forward(message)
else:
- refwd_text = ''
+ refwd_text = ""
target_mine = self.handle_target_mine(message.peer_id, user)
- final_text = '[{}] {}{}{}'.format(mid, target_mine, refwd_text, text)
+ final_text = "[{}] {}{}{}".format(mid, target_mine, refwd_text, text)
final_text = self.filters(final_text)
return final_text
@@ -679,9 +752,9 @@ class TelegramHandler(object):
if history and self.hist_fmt:
timestamp = format_timestamp(self.hist_fmt, self.timezone, date)
if action:
- res = '{} {}'.format(text, timestamp)
+ res = "{} {}".format(text, timestamp)
else:
- res = '{} {}'.format(timestamp, text)
+ res = "{} {}".format(timestamp, text)
else:
res = text
return res
@@ -690,39 +763,45 @@ class TelegramHandler(object):
if history:
if id in self.volatile_cache:
for item in self.volatile_cache[id]:
- user = item['user']
- text = item['rendered_event']
- chan = item['channel']
- date = item['date']
- text_send = self.set_history_timestamp(text, history=True, date=date, action=False)
+ user = item["user"]
+ text = item["rendered_event"]
+ chan = item["channel"]
+ date = item["date"]
+ text_send = self.set_history_timestamp(
+ text, history=True, date=date, action=False
+ )
await self.relay_telegram_message(None, user, text_send, chan)
async def relay_telegram_message(self, message, user, text, channel=None):
private = (message and message.is_private) or (not message and not channel)
- action = (message and message.action)
+ action = message and message.action
if private:
await self.relay_telegram_private_message(user, text, action)
chan = None
else:
- chan = await self.relay_telegram_channel_message(message, user, text, channel, action)
+ chan = await self.relay_telegram_channel_message(
+ message, user, text, channel, action
+ )
return chan
async def relay_telegram_private_message(self, user, message, action=None):
- self.logger.debug('Relaying Telegram Private Message: %s, %s', user, message)
+ self.logger.debug("Relaying Telegram Private Message: %s, %s", user, message)
if action:
await self.irc.send_action(user, None, message)
else:
await self.irc.send_msg(user, None, message)
- async def relay_telegram_channel_message(self, message, user, text, channel, action):
+ async def relay_telegram_channel_message(
+ self, message, user, text, channel, action
+ ):
if message:
entity = await message.get_chat()
chan = await self.get_irc_channel_from_telegram_id(message.chat_id, entity)
else:
chan = channel
- self.logger.debug('Relaying Telegram Channel Message: %s, %s', chan, text)
+ self.logger.debug("Relaying Telegram Channel Message: %s, %s", chan, text)
if action:
await self.irc.send_action(user, chan, text)
@@ -732,7 +811,7 @@ class TelegramHandler(object):
return chan
async def handle_telegram_chat_action(self, event):
- self.logger.debug('Handling Telegram Chat Action: %s', pretty(event))
+ self.logger.debug("Handling Telegram Chat Action: %s", pretty(event))
try:
tid = event.action_message.to_id.channel_id
@@ -742,13 +821,19 @@ class TelegramHandler(object):
irc_channel = await self.get_irc_channel_from_telegram_id(tid)
await self.get_telegram_channel_participants(tid)
- try: # Join Chats
- irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.action.users[0])
+ try: # Join Chats
+ irc_nick = await self.get_irc_nick_from_telegram_id(
+ event.action_message.action.users[0]
+ )
except (IndexError, AttributeError):
- try: # Kick
- irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.action.user_id)
- except (IndexError, AttributeError): # Join Channels
- irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.sender_id)
+ try: # Kick
+ irc_nick = await self.get_irc_nick_from_telegram_id(
+ event.action_message.action.user_id
+ )
+ except (IndexError, AttributeError): # Join Channels
+ irc_nick = await self.get_irc_nick_from_telegram_id(
+ event.action_message.sender_id
+ )
if event.user_added or event.user_joined:
await self.irc.join_irc_channel(irc_nick, irc_channel, full_join=False)
@@ -762,24 +847,26 @@ class TelegramHandler(object):
channel = self.get_telegram_channel(chat)
self.tid_to_iid[chat.id] = channel
self.irc.iid_to_tid[channel] = chat.id
- await self.irc.join_irc_channel(self.irc.irc_nick, channel, full_join=True)
+ await self.irc.join_irc_channel(
+ self.irc.irc_nick, channel, full_join=True
+ )
async def handle_telegram_action(self, message, mid):
if isinstance(message.action, tgty.MessageActionPinMessage):
replied = await message.get_reply_message()
cid = self.mid.num_to_id_offset(replied.peer_id, replied.id)
- action_text = 'has pinned message [{}]'.format(cid)
+ action_text = "has pinned message [{}]".format(cid)
elif isinstance(message.action, tgty.MessageActionChatEditPhoto):
_, media_type = self.scan_photo_attributes(message.action.photo)
photo_url = await self.download_telegram_media(message, mid)
- action_text = 'has changed chat [{}] {}'.format(media_type, photo_url)
+ action_text = "has changed chat [{}] {}".format(media_type, photo_url)
else:
- action_text = ''
+ action_text = ""
return action_text
async def handle_telegram_reply(self, message):
- space = ' '
- trunc = ''
+ space = " "
+ trunc = ""
replied = await message.get_reply_message()
if replied:
replied_msg = replied.message
@@ -789,51 +876,66 @@ class TelegramHandler(object):
replied_id = message.reply_to.reply_to_msg_id
cid = self.mid.num_to_id_offset(message.peer_id, replied_id)
if replied_id in self.cache:
- text = self.cache[replied_id]['text']
- replied_user = self.cache[replied_id]['user']
- sp = ' '
+ text = self.cache[replied_id]["text"]
+ replied_user = self.cache[replied_id]["user"]
+ sp = " "
else:
- text = ''
- replied_user = ''
- sp = ''
- replied_msg = '|Deleted|{}{}'.format(sp, text)
+ text = ""
+ replied_user = ""
+ sp = ""
+ replied_msg = "|Deleted|{}{}".format(sp, text)
if not replied_msg:
- replied_msg = ''
- space = ''
+ replied_msg = ""
+ space = ""
elif len(replied_msg) > self.quote_len:
- replied_msg = replied_msg[:self.quote_len]
- trunc = '...'
+ replied_msg = replied_msg[: self.quote_len]
+ trunc = "..."
if replied_user is None:
- replied_nick = '{}'
+ replied_nick = "{}"
self.refwd_me = True
- elif replied_user == '':
- replied_nick = ''
+ elif replied_user == "":
+ replied_nick = ""
else:
replied_nick = replied_user.irc_nick
- return '|Re {}: [{}]{}{}{}| '.format(replied_nick, cid, space, replied_msg, trunc)
+ return "|Re {}: [{}]{}{}{}| ".format(
+ replied_nick, cid, space, replied_msg, trunc
+ )
async def handle_telegram_forward(self, message):
- space = space2 = ' '
- if not (forwarded_peer_name := await self.get_irc_name_from_telegram_forward(message.fwd_from, saved=False)):
- space = ''
- saved_peer_name = await self.get_irc_name_from_telegram_forward(message.fwd_from, saved=True)
+ space = space2 = " "
+ if not (
+ forwarded_peer_name := await self.get_irc_name_from_telegram_forward(
+ message.fwd_from, saved=False
+ )
+ ):
+ space = ""
+ saved_peer_name = await self.get_irc_name_from_telegram_forward(
+ message.fwd_from, saved=True
+ )
if saved_peer_name and saved_peer_name != forwarded_peer_name:
secondary_name = saved_peer_name
else:
# if it's from me I want to know who was the destination of a message (user)
- if self.refwd_me and (saved_from_peer := message.fwd_from.saved_from_peer) is not None:
- secondary_name = self.get_irc_user_from_telegram(saved_from_peer.user_id).irc_nick
+ if (
+ self.refwd_me
+ and (saved_from_peer := message.fwd_from.saved_from_peer) is not None
+ ):
+ secondary_name = self.get_irc_user_from_telegram(
+ saved_from_peer.user_id
+ ).irc_nick
else:
- secondary_name = ''
- space2 = ''
+ secondary_name = ""
+ space2 = ""
- return '|Fwd{}{}{}{}| '.format(space, forwarded_peer_name, space2, secondary_name)
+ return "|Fwd{}{}{}{}| ".format(
+ space, forwarded_peer_name, space2, secondary_name
+ )
async def handle_telegram_media(self, message, user, mid):
- caption = ' | {}'.format(message.message) if message.message else ''
+ caption = " | {}".format(message.message) if message.message else ""
to_download = True
- media_url_or_data = ''
+ media_url_or_data = ""
size = 0
filename = None
@@ -842,7 +944,9 @@ class TelegramHandler(object):
size = document.size
h_size = get_human_size(size)
for x in document.attributes:
- if isinstance(x, tgty.DocumentAttributeVideo) or isinstance(x, tgty.DocumentAttributeAudio):
+ if isinstance(x, tgty.DocumentAttributeVideo) or isinstance(
+ x, tgty.DocumentAttributeAudio
+ ):
attrib_av = x
if isinstance(x, tgty.DocumentAttributeFilename):
attrib_file = x
@@ -856,154 +960,169 @@ class TelegramHandler(object):
# web
return await self.handle_webpage(message.media.webpage, message, mid)
elif isinstance(message.media.webpage, tgty.WebPagePending):
- media_type = 'webpending'
+ media_type = "webpending"
media_url_or_data = message.message
- caption = ''
+ caption = ""
self.webpending[message.media.webpage.id] = message
else:
- media_type = 'webunknown'
+ media_type = "webunknown"
media_url_or_data = message.message
- caption = ''
+ caption = ""
elif message.photo:
size, media_type = self.scan_photo_attributes(message.media.photo)
elif message.audio:
- size, h_size, attrib_audio, filename = scan_doc_attributes(message.media.document)
- dur = get_human_duration(attrib_audio.duration) if attrib_audio else ''
- per = attrib_audio.performer or ''
- tit = attrib_audio.title or ''
- theme = ',{}/{}'.format(per, tit) if per or tit else ''
- media_type = 'audio:{},{}{}'.format(h_size, dur, theme)
+ size, h_size, attrib_audio, filename = scan_doc_attributes(
+ message.media.document
+ )
+ dur = get_human_duration(attrib_audio.duration) if attrib_audio else ""
+ per = attrib_audio.performer or ""
+ tit = attrib_audio.title or ""
+ theme = ",{}/{}".format(per, tit) if per or tit else ""
+ media_type = "audio:{},{}{}".format(h_size, dur, theme)
elif message.voice:
- size, _, attrib_audio, filename = scan_doc_attributes(message.media.document)
- dur = get_human_duration(attrib_audio.duration) if attrib_audio else ''
- media_type = 'rec:{}'.format(dur)
+ size, _, attrib_audio, filename = scan_doc_attributes(
+ message.media.document
+ )
+ dur = get_human_duration(attrib_audio.duration) if attrib_audio else ""
+ media_type = "rec:{}".format(dur)
elif message.video:
- size, h_size, attrib_video, filename = scan_doc_attributes(message.media.document)
- dur = get_human_duration(attrib_video.duration) if attrib_video else ''
- media_type = 'video:{},{}'.format(h_size, dur)
- elif message.video_note: media_type = 'videorec'
- elif message.gif: media_type = 'anim'
- elif message.sticker: media_type = 'sticker'
+ size, h_size, attrib_video, filename = scan_doc_attributes(
+ message.media.document
+ )
+ dur = get_human_duration(attrib_video.duration) if attrib_video else ""
+ media_type = "video:{},{}".format(h_size, dur)
+ elif message.video_note:
+ media_type = "videorec"
+ elif message.gif:
+ media_type = "anim"
+ elif message.sticker:
+ media_type = "sticker"
elif message.document:
size, h_size, _, filename = scan_doc_attributes(message.media.document)
- media_type = 'file:{}'.format(h_size)
+ media_type = "file:{}".format(h_size)
elif message.contact:
- media_type = 'contact'
- caption = ''
+ media_type = "contact"
+ caption = ""
to_download = False
if message.media.first_name:
- media_url_or_data += message.media.first_name + ' '
+ media_url_or_data += message.media.first_name + " "
if message.media.last_name:
- media_url_or_data += message.media.last_name + ' '
+ media_url_or_data += message.media.last_name + " "
if message.media.phone_number:
media_url_or_data += message.media.phone_number
elif message.game:
- media_type = 'game'
- caption = ''
+ media_type = "game"
+ caption = ""
to_download = False
if message.media.game.title:
media_url_or_data = message.media.game.title
elif message.geo:
- media_type = 'geo'
- caption = ''
+ media_type = "geo"
+ caption = ""
to_download = False
if self.geo_url:
- geo_url = ' | ' + self.geo_url
+ geo_url = " | " + self.geo_url
else:
- geo_url = ''
- lat_long_template = 'lat: {lat}, long: {long}' + geo_url
- media_url_or_data = lat_long_template.format(lat=message.media.geo.lat, long=message.media.geo.long)
+ geo_url = ""
+ lat_long_template = "lat: {lat}, long: {long}" + geo_url
+ media_url_or_data = lat_long_template.format(
+ lat=message.media.geo.lat, long=message.media.geo.long
+ )
elif message.invoice:
- media_type = 'invoice'
- caption = ''
+ media_type = "invoice"
+ caption = ""
to_download = False
- media_url_or_data = ''
+ media_url_or_data = ""
elif message.poll:
- media_type = 'poll'
- caption = ''
+ media_type = "poll"
+ caption = ""
to_download = False
media_url_or_data = self.handle_poll(message.media.poll)
elif message.venue:
- media_type = 'venue'
- caption = ''
+ media_type = "venue"
+ caption = ""
to_download = False
- media_url_or_data = ''
+ media_url_or_data = ""
else:
- media_type = 'unknown'
- caption = ''
+ media_type = "unknown"
+ caption = ""
to_download = False
media_url_or_data = message.message
if to_download:
relay_attr = (message, user, mid, media_type)
- media_url_or_data = await self.download_telegram_media(message, mid, filename, size, relay_attr)
+ media_url_or_data = await self.download_telegram_media(
+ message, mid, filename, size, relay_attr
+ )
return self.format_media(media_type, media_url_or_data, caption)
def handle_poll(self, poll):
text = poll.question
for ans in poll.answers:
- text += '\n* ' + ans.text
+ text += "\n* " + ans.text
return text
def handle_target_mine(self, target, user):
# Add the target of messages sent by self user (me)
# received in other clients
target_id, target_type = self.get_peer_id_and_type(target)
- if user is None and target_type == 'user' and target_id != self.id:
- # self user^
- # as sender
+ if user is None and target_type == "user" and target_id != self.id:
+ # self user^
+ # as sender
irc_id = self.get_irc_name_from_telegram_id(target_id)
- target_mine = '[T: {}] '.format(irc_id)
+ target_mine = "[T: {}] ".format(irc_id)
else:
- target_mine = ''
+ target_mine = ""
return target_mine
async def handle_webpage(self, webpage, message, mid):
- media_type = 'web'
+ media_type = "web"
logo = await self.download_telegram_media(message, mid)
if is_url_equiv(webpage.url, webpage.display_url):
url_data = webpage.url
else:
- url_data = '{} | {}'.format(webpage.url, webpage.display_url)
+ url_data = "{} | {}".format(webpage.url, webpage.display_url)
if message:
# sometimes the 1st line of message contains the title, don't repeat it
message_line = message.message.splitlines()[0]
if message_line != webpage.title:
title = webpage.title
else:
- title = ''
+ title = ""
# extract the URL in the message, don't repeat it
message_url = extract_url(message.message)
if is_url_equiv(message_url, webpage.url):
if is_url_equiv(message_url, webpage.display_url):
media_url_or_data = message.message
else:
- media_url_or_data = '{} | {}'.format(message.message, webpage.display_url)
+ media_url_or_data = "{} | {}".format(
+ message.message, webpage.display_url
+ )
else:
- media_url_or_data = '{} | {}'.format(message.message, url_data)
+ media_url_or_data = "{} | {}".format(message.message, url_data)
else:
title = webpage.title
media_url_or_data = url_data
if title and logo:
- caption = ' | {} | {}'.format(title, logo)
+ caption = " | {} | {}".format(title, logo)
elif title:
- caption = ' | {}'.format(title)
+ caption = " | {}".format(title)
elif logo:
- caption = ' | {}'.format(logo)
+ caption = " | {}".format(logo)
else:
- caption = ''
+ caption = ""
return self.format_media(media_type, media_url_or_data, caption)
def format_media(self, media_type, media_url_or_data, caption):
- return '[{}] {}{}'.format(media_type, media_url_or_data, caption)
+ return "[{}] {}{}".format(media_type, media_url_or_data, caption)
def scan_photo_attributes(self, photo):
size = 0
@@ -1017,16 +1136,18 @@ class TelegramHandler(object):
if x.size > size:
size = x.size
ph_size = x
- if hasattr(ph_size, 'w') and hasattr(ph_size, 'h'):
- media_type = 'photo:{}x{}'.format(ph_size.w, ph_size.h)
+ if hasattr(ph_size, "w") and hasattr(ph_size, "h"):
+ media_type = "photo:{}x{}".format(ph_size.w, ph_size.h)
else:
- media_type = 'photo'
+ media_type = "photo"
return size, media_type
- async def download_telegram_media(self, message, mid, filename=None, size=0, relay_attr=None):
+ async def download_telegram_media(
+ self, message, mid, filename=None, size=0, relay_attr=None
+ ):
if not self.download:
- return ''
+ return ""
if filename:
idd_file = add_filename(filename, mid)
new_file = sanitize_filename(idd_file)
@@ -1036,11 +1157,13 @@ class TelegramHandler(object):
else:
await self.notice_downloading(size, relay_attr)
local_path = await message.download_media(new_path)
- if not local_path: return ''
+ if not local_path:
+ return ""
else:
await self.notice_downloading(size, relay_attr)
local_path = await message.download_media(self.telegram_media_dir)
- if not local_path: return ''
+ if not local_path:
+ return ""
filetype = os.path.splitext(local_path)[1]
gen_file = str(self.media_cn) + filetype
idd_file = add_filename(gen_file, mid)
@@ -1050,24 +1173,28 @@ class TelegramHandler(object):
if local_path != new_path:
os.replace(local_path, new_path)
- if self.media_url[-1:] != '/':
- self.media_url += '/'
+ if self.media_url[-1:] != "/":
+ self.media_url += "/"
return self.media_url + new_file
async def notice_downloading(self, size, relay_attr):
if relay_attr and size > self.notice_size:
message, user, mid, media_type = relay_attr
- await self.relay_telegram_message(message, user, '[{}] [{}] [Downloading]'.format(mid, media_type))
+ await self.relay_telegram_message(
+ message, user, "[{}] [{}] [Downloading]".format(mid, media_type)
+ )
+
class mesg_id:
def __init__(self, alpha):
self.alpha = alpha
self.base = len(alpha)
- self.alphaval = { i:v for v, i in enumerate(alpha) }
+ self.alphaval = {i: v for v, i in enumerate(alpha)}
self.mesg_base = {}
- def num_to_id(self, num, neg=''):
- if num < 0: return self.num_to_id(-num, '-')
+ def num_to_id(self, num, neg=""):
+ if num < 0:
+ return self.num_to_id(-num, "-")
(high, low) = divmod(num, self.base)
if high >= self.base:
aux = self.num_to_id(high)
@@ -1083,7 +1210,8 @@ class mesg_id:
def id_to_num(self, id, n=1):
if id:
- if id[0] == '-': return self.id_to_num(id[1:], -1)
+ if id[0] == "-":
+ return self.id_to_num(id[1:], -1)
aux = self.alphaval[id[-1:]] * n
sum = self.id_to_num(id[:-1], n * self.base)
return sum + aux
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment