-
-
Save zodman/498be9c88d059f2e24dd993cc0e87a63 to your computer and use it in GitHub Desktop.
irgramd patch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/irgramd b/irgramd | |
| old mode 100644 | |
| new mode 100755 | |
| diff --git a/telegram.py b/telegram.py | |
| index 39e02aa..8d727e3 100644 | |
| --- a/telegram.py | |
| +++ b/telegram.py | |
| @@ -15,57 +15,74 @@ import asyncio | |
| import collections | |
| import telethon | |
| from telethon import types as tgty, utils as tgutils | |
| -from telethon.tl.functions.messages import GetMessagesReactionsRequest, GetFullChatRequest | |
| +from telethon.tl.functions.messages import ( | |
| + GetMessagesReactionsRequest, | |
| + GetFullChatRequest, | |
| +) | |
| from telethon.tl.functions.channels import GetFullChannelRequest | |
| +from telethon.errors import SessionPasswordNeededError | |
| # Local modules | |
| from include import CHAN_MAX_LENGTH, NICK_MAX_LENGTH | |
| from irc import IRCUser | |
| -from utils import sanitize_filename, add_filename, is_url_equiv, extract_url, get_human_size, get_human_duration | |
| +from utils import ( | |
| + sanitize_filename, | |
| + add_filename, | |
| + is_url_equiv, | |
| + extract_url, | |
| + get_human_size, | |
| + get_human_duration, | |
| +) | |
| from utils import get_highlighted, fix_braces, format_timestamp, pretty, current_date | |
| import emoji2emoticon as e | |
| # Test IP table | |
| -TEST_IPS = { 1: '149.154.175.10', | |
| - 2: '149.154.167.40', | |
| - 3: '149.154.175.117', | |
| - } | |
| +TEST_IPS = { | |
| + 1: "149.154.175.10", | |
| + 2: "149.154.167.40", | |
| + 3: "149.154.175.117", | |
| +} | |
| + | |
| +# Telegram | |
| - # Telegram | |
| class TelegramHandler(object): | |
| def __init__(self, irc, settings): | |
| - self.logger = logging.getLogger() | |
| - self.config_dir = settings['config_dir'] | |
| - self.cache_dir = settings['cache_dir'] | |
| - self.download = settings['download_media'] | |
| - self.notice_size = settings['download_notice'] * 1048576 | |
| - self.media_dir = settings['media_dir'] | |
| - self.media_url = settings['media_url'] | |
| - self.upload_dir = settings['upload_dir'] | |
| - self.api_id = settings['api_id'] | |
| - self.api_hash = settings['api_hash'] | |
| - self.phone = settings['phone'] | |
| - self.test = settings['test'] | |
| - self.test_dc = settings['test_datacenter'] | |
| - self.test_ip = settings['test_host'] if settings['test_host'] else TEST_IPS[self.test_dc] | |
| - self.test_port = settings['test_port'] | |
| - self.ask_code = settings['ask_code'] | |
| - self.quote_len = settings['quote_length'] | |
| - self.hist_fmt = settings['hist_timestamp_format'] | |
| - self.timezone = settings['timezone'] | |
| - self.geo_url = settings['geo_url'] | |
| - if not settings['emoji_ascii']: | |
| + self.logger = logging.getLogger() | |
| + self.config_dir = settings["config_dir"] | |
| + self.cache_dir = settings["cache_dir"] | |
| + self.download = settings["download_media"] | |
| + self.notice_size = settings["download_notice"] * 1048576 | |
| + self.media_dir = settings["media_dir"] | |
| + self.media_url = settings["media_url"] | |
| + self.upload_dir = settings["upload_dir"] | |
| + self.api_id = settings["api_id"] | |
| + self.api_hash = settings["api_hash"] | |
| + self.phone = settings["phone"] | |
| + self.test = settings["test"] | |
| + self.test_dc = settings["test_datacenter"] | |
| + self.test_ip = ( | |
| + settings["test_host"] if settings["test_host"] else TEST_IPS[self.test_dc] | |
| + ) | |
| + self.test_port = settings["test_port"] | |
| + self.ask_code = settings["ask_code"] | |
| + self.quote_len = settings["quote_length"] | |
| + self.hist_fmt = settings["hist_timestamp_format"] | |
| + self.timezone = settings["timezone"] | |
| + self.geo_url = settings["geo_url"] | |
| + if not settings["emoji_ascii"]: | |
| e.emo = {} | |
| - self.media_cn = 0 | |
| - self.irc = irc | |
| + self.media_cn = 0 | |
| + self.irc = irc | |
| self.authorized = False | |
| - self.id = None | |
| + self.id = None | |
| self.tg_username = None | |
| self.channels_date = {} | |
| - self.mid = mesg_id('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!#$%+./_~') | |
| + self.mid = mesg_id( | |
| + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!#$%+./_~" | |
| + ) | |
| self.webpending = {} | |
| self.refwd_me = False | |
| self.cache = collections.OrderedDict() | |
| @@ -78,66 +95,84 @@ class TelegramHandler(object): | |
| async def initialize_telegram(self): | |
| # Setup media folder | |
| - self.telegram_media_dir = os.path.expanduser(self.media_dir or os.path.join(self.cache_dir, 'media')) | |
| + self.telegram_media_dir = os.path.expanduser( | |
| + self.media_dir or os.path.join(self.cache_dir, "media") | |
| + ) | |
| if not os.path.exists(self.telegram_media_dir): | |
| os.makedirs(self.telegram_media_dir) | |
| # Setup upload folder | |
| - self.telegram_upload_dir = os.path.expanduser(self.upload_dir or os.path.join(self.cache_dir, 'upload')) | |
| + self.telegram_upload_dir = os.path.expanduser( | |
| + self.upload_dir or os.path.join(self.cache_dir, "upload") | |
| + ) | |
| if not os.path.exists(self.telegram_upload_dir): | |
| os.makedirs(self.telegram_upload_dir) | |
| # Setup session folder | |
| - self.telegram_session_dir = os.path.join(self.config_dir, 'session') | |
| + self.telegram_session_dir = os.path.join(self.config_dir, "session") | |
| if not os.path.exists(self.telegram_session_dir): | |
| os.makedirs(self.telegram_session_dir) | |
| # Construct Telegram client | |
| if self.test: | |
| - self.telegram_client = telethon.TelegramClient(None, self.api_id, self.api_hash) | |
| - self.telegram_client.session.set_dc(self.test_dc, self.test_ip, self.test_port) | |
| + self.telegram_client = telethon.TelegramClient( | |
| + None, self.api_id, self.api_hash | |
| + ) | |
| + self.telegram_client.session.set_dc( | |
| + self.test_dc, self.test_ip, self.test_port | |
| + ) | |
| else: | |
| - telegram_session = os.path.join(self.telegram_session_dir, 'telegram') | |
| - self.telegram_client = telethon.TelegramClient(telegram_session, self.api_id, self.api_hash) | |
| + telegram_session = os.path.join(self.telegram_session_dir, "telegram") | |
| + self.telegram_client = telethon.TelegramClient( | |
| + telegram_session, self.api_id, self.api_hash | |
| + ) | |
| # Initialize Telegram ID to IRC nick mapping | |
| self.tid_to_iid = {} | |
| # Register Telegram callbacks | |
| callbacks = ( | |
| - (self.handle_telegram_message , telethon.events.NewMessage), | |
| - (self.handle_raw , telethon.events.Raw), | |
| + (self.handle_telegram_message, telethon.events.NewMessage), | |
| + (self.handle_raw, telethon.events.Raw), | |
| (self.handle_telegram_chat_action, telethon.events.ChatAction), | |
| - (self.handle_telegram_deleted , telethon.events.MessageDeleted), | |
| - (self.handle_telegram_edited , telethon.events.MessageEdited), | |
| + (self.handle_telegram_deleted, telethon.events.MessageDeleted), | |
| + (self.handle_telegram_edited, telethon.events.MessageEdited), | |
| ) | |
| for handler, event in callbacks: | |
| self.telegram_client.add_event_handler(handler, event) | |
| # Start Telegram client | |
| if self.test: | |
| - await self.telegram_client.start(self.phone, code_callback=lambda: str(self.test_dc) * 5) | |
| + await self.telegram_client.start( | |
| + self.phone, code_callback=lambda: str(self.test_dc) * 5 | |
| + ) | |
| else: | |
| await self.telegram_client.connect() | |
| while not await self.telegram_client.is_user_authorized(): | |
| - self.logger.info('Telegram account not authorized') | |
| - await self.telegram_client.send_code_request(self.phone) | |
| + self.logger.info("Telegram account not authorized") | |
| + resp = await self.telegram_client.send_code_request(self.phone) | |
| + self.logger.info(f"{resp}") | |
| self.auth_checked.set() | |
| if not self.ask_code: | |
| return | |
| - self.logger.info('You must provide the Login code that Telegram will ' | |
| - 'sent you via SMS or another connected client') | |
| - code = await aioconsole.ainput('Login code: ') | |
| + self.logger.info( | |
| + "You must provide the Login code that Telegram will " | |
| + "sent you via SMS or another connected client" | |
| + ) | |
| + code = await aioconsole.ainput("Login code: ") | |
| try: | |
| await self.telegram_client.sign_in(code=code) | |
| - except: | |
| - pass | |
| + except SessionPasswordNeededError: | |
| + password = await aioconsole.ainput( | |
| + "Two-step verification enabled. Enter password: ", | |
| + ) | |
| + await self.telegram_client.sign_in(password=password) | |
| await self.continue_auth() | |
| async def continue_auth(self): | |
| - self.logger.info('Telegram account authorized') | |
| + self.logger.info("Telegram account authorized") | |
| self.authorized = True | |
| self.auth_checked.set() | |
| await self.init_mapping() | |
| @@ -161,7 +196,13 @@ class TelegramHandler(object): | |
| tg_nick = self.get_telegram_nick(user) | |
| tg_ni = tg_nick.lower() | |
| if not user.is_self: | |
| - irc_user = IRCUser(None, ('Telegram',''), tg_nick, user.id, self.get_telegram_display_name(user)) | |
| + irc_user = IRCUser( | |
| + None, | |
| + ("Telegram", ""), | |
| + tg_nick, | |
| + user.id, | |
| + self.get_telegram_display_name(user), | |
| + ) | |
| self.irc.users[tg_ni] = irc_user | |
| self.add_sorted_len_usernames(tg_ni) | |
| self.tid_to_iid[user.id] = tg_nick | |
| @@ -183,28 +224,30 @@ class TelegramHandler(object): | |
| if not user.is_self: | |
| self.irc.irc_channels[chan].add(user_nick) | |
| # Add admin users as ops in irc | |
| - if isinstance(user.participant, tgty.ChatParticipantAdmin) or \ | |
| - isinstance(user.participant, tgty.ChannelParticipantAdmin): | |
| + if isinstance( | |
| + user.participant, tgty.ChatParticipantAdmin | |
| + ) or isinstance(user.participant, tgty.ChannelParticipantAdmin): | |
| self.irc.irc_channels_ops[chan].add(user_nick) | |
| # Add creator users as founders in irc | |
| - elif isinstance(user.participant, tgty.ChatParticipantCreator) or \ | |
| - isinstance(user.participant, tgty.ChannelParticipantCreator): | |
| + elif isinstance( | |
| + user.participant, tgty.ChatParticipantCreator | |
| + ) or isinstance(user.participant, tgty.ChannelParticipantCreator): | |
| self.irc.irc_channels_founder[chan].add(user_nick) | |
| except: | |
| - self.logger.warning('Not possible to get participants of channel %s', channel) | |
| + self.logger.warning( | |
| + "Not possible to get participants of channel %s", channel | |
| + ) | |
| def get_telegram_nick(self, user): | |
| - nick = (user.username | |
| - or self.get_telegram_display_name(user) | |
| - or str(user.id)) | |
| + nick = user.username or self.get_telegram_display_name(user) or str(user.id) | |
| nick = nick[:NICK_MAX_LENGTH] | |
| while nick in self.irc.iid_to_tid: | |
| - nick += '_' | |
| + nick += "_" | |
| return nick | |
| def get_telegram_display_name(self, user): | |
| name = telethon.utils.get_display_name(user) | |
| - name = name.replace(' ', '_') | |
| + name = name.replace(" ", "_") | |
| return name | |
| async def get_telegram_display_name_me(self): | |
| @@ -212,21 +255,22 @@ class TelegramHandler(object): | |
| return self.get_telegram_display_name(tg_user) | |
| def get_telegram_channel(self, chat): | |
| - chan = '#' + chat.title.replace(' ', '-').replace(',', '-') | |
| + chan = "#" + chat.title.replace(" ", "-").replace(",", "-") | |
| while chan.lower() in self.irc.iid_to_tid: | |
| - chan += '_' | |
| + chan += "_" | |
| return chan | |
| def get_irc_user_from_telegram(self, tid): | |
| nick = self.tid_to_iid[tid] | |
| - if nick == self.tg_username: return None | |
| + if nick == self.tg_username: | |
| + return None | |
| return self.irc.users[nick.lower()] | |
| def get_irc_name_from_telegram_id(self, tid): | |
| if tid in self.tid_to_iid.keys(): | |
| name_in_irc = self.tid_to_iid[tid] | |
| else: | |
| - name_in_irc = '<Unknown>' | |
| + name_in_irc = "<Unknown>" | |
| return name_in_irc | |
| async def get_irc_name_from_telegram_forward(self, fwd, saved): | |
| @@ -237,14 +281,14 @@ class TelegramHandler(object): | |
| name = fwd.from_name | |
| else: | |
| peer_id, type = self.get_peer_id_and_type(from_id) | |
| - if type == 'user': | |
| + if type == "user": | |
| try: | |
| user = self.get_irc_user_from_telegram(peer_id) | |
| except: | |
| name = str(peer_id) | |
| else: | |
| if user is None: | |
| - name = '{}' | |
| + name = "{}" | |
| self.refwd_me = True | |
| else: | |
| name = user.irc_nick | |
| @@ -252,14 +296,14 @@ class TelegramHandler(object): | |
| try: | |
| name = await self.get_irc_channel_from_telegram_id(peer_id) | |
| except: | |
| - name = '' | |
| + name = "" | |
| return name | |
| async def get_irc_nick_from_telegram_id(self, tid, entity=None): | |
| if tid not in self.tid_to_iid: | |
| user = entity or await self.telegram_client.get_entity(tid) | |
| nick = self.get_telegram_nick(user) | |
| - self.tid_to_iid[tid] = nick | |
| + self.tid_to_iid[tid] = nick | |
| self.irc.iid_to_tid[nick] = tid | |
| return self.tid_to_iid[tid] | |
| @@ -267,16 +311,16 @@ class TelegramHandler(object): | |
| async def get_irc_channel_from_telegram_id(self, tid, entity=None): | |
| rtid, type = tgutils.resolve_id(tid) | |
| if rtid not in self.tid_to_iid: | |
| - chat = entity or await self.telegram_client.get_entity(tid) | |
| + chat = entity or await self.telegram_client.get_entity(tid) | |
| channel = self.get_telegram_channel(chat) | |
| - self.tid_to_iid[rtid] = channel | |
| + self.tid_to_iid[rtid] = channel | |
| self.irc.iid_to_tid[channel] = rtid | |
| return self.tid_to_iid[rtid] | |
| async def get_telegram_channel_participants(self, tid): | |
| channel = self.tid_to_iid[tid] | |
| - nicks = [] | |
| + nicks = [] | |
| async for user in self.telegram_client.iter_participants(tid): | |
| user_nick = await self.get_irc_nick_from_telegram_id(user.id, user) | |
| @@ -290,16 +334,17 @@ class TelegramHandler(object): | |
| return None | |
| tid = self.get_tid(irc_nick, tid) | |
| user = await self.telegram_client.get_entity(tid) | |
| - if isinstance(user.status,tgty.UserStatusRecently) or \ | |
| - isinstance(user.status,tgty.UserStatusOnline): | |
| + if isinstance(user.status, tgty.UserStatusRecently) or isinstance( | |
| + user.status, tgty.UserStatusOnline | |
| + ): | |
| idle = 0 | |
| - elif isinstance(user.status,tgty.UserStatusOffline): | |
| + elif isinstance(user.status, tgty.UserStatusOffline): | |
| last = user.status.was_online | |
| current = current_date() | |
| idle = int((current - last).total_seconds()) | |
| - elif isinstance(user.status,tgty.UserStatusLastWeek): | |
| + elif isinstance(user.status, tgty.UserStatusLastWeek): | |
| idle = 604800 | |
| - elif isinstance(user.status,tgty.UserStatusLastMonth): | |
| + elif isinstance(user.status, tgty.UserStatusLastMonth): | |
| idle = 2678400 | |
| else: | |
| idle = None | |
| @@ -313,15 +358,15 @@ class TelegramHandler(object): | |
| else: | |
| entity = await self.telegram_client.get_entity(tid) | |
| entity_cache[0] = entity | |
| - if isinstance(entity, tgty.Channel): | |
| + if isinstance(entity, tgty.Channel): | |
| full = await self.telegram_client(GetFullChannelRequest(channel=entity)) | |
| elif isinstance(entity, tgty.Chat): | |
| full = await self.telegram_client(GetFullChatRequest(chat_id=tid)) | |
| else: | |
| - return '' | |
| - entity_type = self.get_entity_type(entity, format='long') | |
| + return "" | |
| + entity_type = self.get_entity_type(entity, format="long") | |
| topic = full.full_chat.about | |
| - sep = ': ' if topic else '' | |
| + sep = ": " if topic else "" | |
| return entity_type + sep + topic | |
| async def get_channel_creation(self, channel, entity_cache): | |
| @@ -351,36 +396,36 @@ class TelegramHandler(object): | |
| def get_entity_type(self, entity, format): | |
| if isinstance(entity, tgty.User): | |
| - short = long = 'User' | |
| + short = long = "User" | |
| elif isinstance(entity, tgty.Chat): | |
| - short = 'Chat' | |
| - long = 'Chat/Basic Group' | |
| + short = "Chat" | |
| + long = "Chat/Basic Group" | |
| elif isinstance(entity, tgty.Channel): | |
| if entity.broadcast: | |
| - short = 'Broad' | |
| - long = 'Broadcast Channel' | |
| + short = "Broad" | |
| + long = "Broadcast Channel" | |
| elif entity.megagroup: | |
| - short = 'Mega' | |
| - long = 'Super/Megagroup Channel' | |
| + short = "Mega" | |
| + long = "Super/Megagroup Channel" | |
| elif entity.gigagroup: | |
| - short = 'Giga' | |
| - long = 'Broadcast Gigagroup Channel' | |
| + short = "Giga" | |
| + long = "Broadcast Gigagroup Channel" | |
| - return short if format == 'short' else long | |
| + return short if format == "short" else long | |
| def get_peer_id_and_type(self, peer): | |
| if isinstance(peer, tgty.PeerChannel): | |
| id = peer.channel_id | |
| - type = 'chan' | |
| + type = "chan" | |
| elif isinstance(peer, tgty.PeerChat): | |
| id = peer.chat_id | |
| - type = 'chan' | |
| + type = "chan" | |
| elif isinstance(peer, tgty.PeerUser): | |
| id = peer.user_id | |
| - type = 'user' | |
| + type = "user" | |
| else: | |
| id = peer | |
| - type = '' | |
| + type = "" | |
| return id, type | |
| async def is_bot(self, irc_nick, tid=None): | |
| @@ -398,56 +443,61 @@ class TelegramHandler(object): | |
| async def edition_case(self, msg): | |
| def msg_edited(m): | |
| - return m.id in self.cache and \ | |
| - ( m.message != self.cache[m.id]['text'] | |
| - or m.media != self.cache[m.id]['media'] | |
| - ) | |
| + return m.id in self.cache and ( | |
| + m.message != self.cache[m.id]["text"] | |
| + or m.media != self.cache[m.id]["media"] | |
| + ) | |
| + | |
| async def get_reactions(m): | |
| - react = await self.telegram_client(GetMessagesReactionsRequest(m.peer_id, id=[m.id])) | |
| + react = await self.telegram_client( | |
| + GetMessagesReactionsRequest(m.peer_id, id=[m.id]) | |
| + ) | |
| updates = react.updates | |
| - r = next((x for x in updates if type(x) is tgty.UpdateMessageReactions), None) | |
| + r = next( | |
| + (x for x in updates if type(x) is tgty.UpdateMessageReactions), None | |
| + ) | |
| return r.reactions.recent_reactions if r else None | |
| react = None | |
| if msg.reactions is None: | |
| - case = 'edition' | |
| + case = "edition" | |
| elif (reactions := await get_reactions(msg)) is None: | |
| if msg_edited(msg): | |
| - case = 'edition' | |
| + case = "edition" | |
| else: | |
| - case = 'react-del' | |
| + case = "react-del" | |
| elif react := max(reactions, key=lambda y: y.date): | |
| - case = 'react-add' | |
| + case = "react-add" | |
| else: | |
| if msg_edited(msg): | |
| - case = 'edition' | |
| + case = "edition" | |
| else: | |
| - case = 'react-del' | |
| + case = "react-del" | |
| react = None | |
| return case, react | |
| def to_cache(self, id, mid, message, proc_message, user, chan, media): | |
| self.limit_cache(self.cache) | |
| self.cache[id] = { | |
| - 'mid': mid, | |
| - 'text': message, | |
| - 'rendered_text': proc_message, | |
| - 'user': user, | |
| - 'channel': chan, | |
| - 'media': media, | |
| - } | |
| + "mid": mid, | |
| + "text": message, | |
| + "rendered_text": proc_message, | |
| + "user": user, | |
| + "channel": chan, | |
| + "media": media, | |
| + } | |
| def to_volatile_cache(self, prev_id, id, ev, user, chan, date): | |
| if chan in prev_id: | |
| prid = prev_id[chan] if chan else prev_id[user] | |
| self.limit_cache(self.volatile_cache) | |
| elem = { | |
| - 'id': id, | |
| - 'rendered_event': ev, | |
| - 'user': user, | |
| - 'channel': chan, | |
| - 'date': date, | |
| - } | |
| + "id": id, | |
| + "rendered_event": ev, | |
| + "user": user, | |
| + "channel": chan, | |
| + "date": date, | |
| + } | |
| if prid not in self.volatile_cache: | |
| self.volatile_cache[prid] = [elem] | |
| else: | |
| @@ -457,10 +507,11 @@ class TelegramHandler(object): | |
| if len(cache) >= 10000: | |
| cache.popitem(last=False) | |
| - def replace_mentions(self, text, me_nick='', received=True): | |
| + def replace_mentions(self, text, me_nick="", received=True): | |
| # For received replace @mention to ~mention~ | |
| # For sent replace mention: to @mention | |
| rargs = {} | |
| + | |
| def repl_mentioned(text, me_nick, received, mark, repl_pref, repl_suff): | |
| new_text = text | |
| @@ -476,7 +527,7 @@ class TelegramHandler(object): | |
| if received: | |
| mention = mark + user | |
| mention_case = mark + username | |
| - else: # sent | |
| + else: # sent | |
| mention = user + mark | |
| mention_case = username + mark | |
| replcmnt = repl_pref + username + repl_suff | |
| @@ -487,21 +538,23 @@ class TelegramHandler(object): | |
| new_text = new_text.replace(ment, replcmnt, 1) | |
| # Next words (with space as separator) | |
| - mention = ' ' + mention | |
| - mention_case = ' ' + mention_case | |
| - replcmnt = ' ' + replcmnt | |
| - new_text = new_text.replace(mention, replcmnt).replace(mention_case, replcmnt) | |
| + mention = " " + mention | |
| + mention_case = " " + mention_case | |
| + replcmnt = " " + replcmnt | |
| + new_text = new_text.replace(mention, replcmnt).replace( | |
| + mention_case, replcmnt | |
| + ) | |
| return new_text | |
| if received: | |
| - mark = '@' | |
| - rargs['repl_pref'] = '~' | |
| - rargs['repl_suff'] = '~' | |
| - else: # sent | |
| - mark = ':' | |
| - rargs['repl_pref'] = '@' | |
| - rargs['repl_suff'] = '' | |
| + mark = "@" | |
| + rargs["repl_pref"] = "~" | |
| + rargs["repl_suff"] = "~" | |
| + else: # sent | |
| + mark = ":" | |
| + rargs["repl_pref"] = "@" | |
| + rargs["repl_suff"] = "" | |
| if text.find(mark) != -1: | |
| text_replaced = repl_mentioned(text, me_nick, received, mark, **rargs) | |
| @@ -521,38 +574,40 @@ class TelegramHandler(object): | |
| def format_reaction(self, msg, message_rendered, edition_case, reaction): | |
| react_quote_len = self.quote_len * 2 | |
| if len(message_rendered) > react_quote_len: | |
| - text_old = '{}...'.format(message_rendered[:react_quote_len]) | |
| + text_old = "{}...".format(message_rendered[:react_quote_len]) | |
| text_old = fix_braces(text_old) | |
| else: | |
| text_old = message_rendered | |
| - if edition_case == 'react-add': | |
| + if edition_case == "react-add": | |
| user = self.get_irc_user_from_telegram(reaction.peer_id.user_id) | |
| emoji = reaction.reaction.emoticon | |
| - react_action = '+' | |
| + react_action = "+" | |
| react_icon = e.emo[emoji] if emoji in e.emo else emoji | |
| - elif edition_case == 'react-del': | |
| + elif edition_case == "react-del": | |
| user = self.get_irc_user_from_telegram(msg.sender_id) | |
| - react_action = '-' | |
| - react_icon = '' | |
| - return text_old, '{}{}'.format(react_action, react_icon), user | |
| + react_action = "-" | |
| + react_icon = "" | |
| + return text_old, "{}{}".format(react_action, react_icon), user | |
| async def handle_telegram_edited(self, event): | |
| - self.logger.debug('Handling Telegram Message Edited: %s', pretty(event)) | |
| + self.logger.debug("Handling Telegram Message Edited: %s", pretty(event)) | |
| id = event.message.id | |
| mid = self.mid.num_to_id_offset(event.message.peer_id, id) | |
| - fmid = '[{}]'.format(mid) | |
| + fmid = "[{}]".format(mid) | |
| message = self.filters(event.message.message) | |
| - message_rendered = await self.render_text(event.message, mid, upd_to_webpend=None) | |
| + message_rendered = await self.render_text( | |
| + event.message, mid, upd_to_webpend=None | |
| + ) | |
| edition_case, reaction = await self.edition_case(event.message) | |
| - if edition_case == 'edition': | |
| - action = 'Edited' | |
| + if edition_case == "edition": | |
| + action = "Edited" | |
| user = self.get_irc_user_from_telegram(event.sender_id) | |
| if id in self.cache: | |
| - t = self.filters(self.cache[id]['text']) | |
| - rt = self.cache[id]['rendered_text'] | |
| + t = self.filters(self.cache[id]["text"]) | |
| + rt = self.cache[id]["rendered_text"] | |
| ht, is_ht = get_highlighted(t, message) | |
| else: | |
| @@ -574,18 +629,24 @@ class TelegramHandler(object): | |
| if self.last_reaction == reaction.date: | |
| return | |
| self.last_reaction = reaction.date | |
| - action = 'React' | |
| - text_old, edition_react, user = self.format_reaction(event.message, message_rendered, edition_case, reaction) | |
| + action = "React" | |
| + text_old, edition_react, user = self.format_reaction( | |
| + event.message, message_rendered, edition_case, reaction | |
| + ) | |
| - text = '|{} {}| {}'.format(action, text_old, edition_react) | |
| + text = "|{} {}| {}".format(action, text_old, edition_react) | |
| chan = await self.relay_telegram_message(event, user, text) | |
| - self.to_cache(id, mid, message, message_rendered, user, chan, event.message.media) | |
| + self.to_cache( | |
| + id, mid, message, message_rendered, user, chan, event.message.media | |
| + ) | |
| self.to_volatile_cache(self.prev_id, id, text, user, chan, current_date()) | |
| async def handle_next_reaction(self, event): | |
| - self.logger.debug('Handling Telegram Next Reaction (2nd, 3rd, ...): %s', pretty(event)) | |
| + self.logger.debug( | |
| + "Handling Telegram Next Reaction (2nd, 3rd, ...): %s", pretty(event) | |
| + ) | |
| reactions = event.reactions.recent_reactions | |
| react = max(reactions, key=lambda y: y.date) if reactions else None | |
| @@ -598,9 +659,11 @@ class TelegramHandler(object): | |
| message = self.filters(msg.message) | |
| message_rendered = await self.render_text(msg, mid, upd_to_webpend=None) | |
| - text_old, edition_react, user = self.format_reaction(msg, message_rendered, edition_case='react-add', reaction=react) | |
| + text_old, edition_react, user = self.format_reaction( | |
| + msg, message_rendered, edition_case="react-add", reaction=react | |
| + ) | |
| - text = '|React {}| {}'.format(text_old, edition_react) | |
| + text = "|React {}| {}".format(text_old, edition_react) | |
| chan = await self.relay_telegram_message(msg, user, text) | |
| @@ -608,33 +671,43 @@ class TelegramHandler(object): | |
| self.to_volatile_cache(self.prev_id, id, text, user, chan, current_date()) | |
| async def handle_telegram_deleted(self, event): | |
| - self.logger.debug('Handling Telegram Message Deleted: %s', pretty(event)) | |
| + self.logger.debug("Handling Telegram Message Deleted: %s", pretty(event)) | |
| for deleted_id in event.original_update.messages: | |
| if deleted_id in self.cache: | |
| - recovered_text = self.cache[deleted_id]['rendered_text'] | |
| - text = '|Deleted| {}'.format(recovered_text) | |
| - user = self.cache[deleted_id]['user'] | |
| - chan = self.cache[deleted_id]['channel'] | |
| - await self.relay_telegram_message(message=None, user=user, text=text, channel=chan) | |
| - self.to_volatile_cache(self.prev_id, deleted_id, text, user, chan, current_date()) | |
| + recovered_text = self.cache[deleted_id]["rendered_text"] | |
| + text = "|Deleted| {}".format(recovered_text) | |
| + user = self.cache[deleted_id]["user"] | |
| + chan = self.cache[deleted_id]["channel"] | |
| + await self.relay_telegram_message( | |
| + message=None, user=user, text=text, channel=chan | |
| + ) | |
| + self.to_volatile_cache( | |
| + self.prev_id, deleted_id, text, user, chan, current_date() | |
| + ) | |
| else: | |
| - text = 'Message id {} deleted not in cache'.format(deleted_id) | |
| + text = "Message id {} deleted not in cache".format(deleted_id) | |
| await self.relay_telegram_private_message(self.irc.service_user, text) | |
| async def handle_raw(self, update): | |
| - self.logger.debug('Handling Telegram Raw Event: %s', pretty(update)) | |
| + self.logger.debug("Handling Telegram Raw Event: %s", pretty(update)) | |
| - if isinstance(update, tgty.UpdateWebPage) and isinstance(update.webpage, tgty.WebPage): | |
| + if isinstance(update, tgty.UpdateWebPage) and isinstance( | |
| + update.webpage, tgty.WebPage | |
| + ): | |
| message = self.webpending.pop(update.webpage.id, None) | |
| if message: | |
| - await self.handle_telegram_message(event=None, message=message, upd_to_webpend=update.webpage) | |
| + await self.handle_telegram_message( | |
| + event=None, message=message, upd_to_webpend=update.webpage | |
| + ) | |
| elif isinstance(update, tgty.UpdateMessageReactions): | |
| await self.handle_next_reaction(update) | |
| - async def handle_telegram_message(self, event, message=None, upd_to_webpend=None, history=False): | |
| - self.logger.debug('Handling Telegram Message: %s', pretty(event or message)) | |
| + async def handle_telegram_message( | |
| + self, event, message=None, upd_to_webpend=None, history=False | |
| + ): | |
| + self.logger.debug("Handling Telegram Message: %s", pretty(event or message)) | |
| msg = event.message if event else message | |
| @@ -667,11 +740,11 @@ class TelegramHandler(object): | |
| elif message.forward: | |
| refwd_text = await self.handle_telegram_forward(message) | |
| else: | |
| - refwd_text = '' | |
| + refwd_text = "" | |
| target_mine = self.handle_target_mine(message.peer_id, user) | |
| - final_text = '[{}] {}{}{}'.format(mid, target_mine, refwd_text, text) | |
| + final_text = "[{}] {}{}{}".format(mid, target_mine, refwd_text, text) | |
| final_text = self.filters(final_text) | |
| return final_text | |
| @@ -679,9 +752,9 @@ class TelegramHandler(object): | |
| if history and self.hist_fmt: | |
| timestamp = format_timestamp(self.hist_fmt, self.timezone, date) | |
| if action: | |
| - res = '{} {}'.format(text, timestamp) | |
| + res = "{} {}".format(text, timestamp) | |
| else: | |
| - res = '{} {}'.format(timestamp, text) | |
| + res = "{} {}".format(timestamp, text) | |
| else: | |
| res = text | |
| return res | |
| @@ -690,39 +763,45 @@ class TelegramHandler(object): | |
| if history: | |
| if id in self.volatile_cache: | |
| for item in self.volatile_cache[id]: | |
| - user = item['user'] | |
| - text = item['rendered_event'] | |
| - chan = item['channel'] | |
| - date = item['date'] | |
| - text_send = self.set_history_timestamp(text, history=True, date=date, action=False) | |
| + user = item["user"] | |
| + text = item["rendered_event"] | |
| + chan = item["channel"] | |
| + date = item["date"] | |
| + text_send = self.set_history_timestamp( | |
| + text, history=True, date=date, action=False | |
| + ) | |
| await self.relay_telegram_message(None, user, text_send, chan) | |
| async def relay_telegram_message(self, message, user, text, channel=None): | |
| private = (message and message.is_private) or (not message and not channel) | |
| - action = (message and message.action) | |
| + action = message and message.action | |
| if private: | |
| await self.relay_telegram_private_message(user, text, action) | |
| chan = None | |
| else: | |
| - chan = await self.relay_telegram_channel_message(message, user, text, channel, action) | |
| + chan = await self.relay_telegram_channel_message( | |
| + message, user, text, channel, action | |
| + ) | |
| return chan | |
| async def relay_telegram_private_message(self, user, message, action=None): | |
| - self.logger.debug('Relaying Telegram Private Message: %s, %s', user, message) | |
| + self.logger.debug("Relaying Telegram Private Message: %s, %s", user, message) | |
| if action: | |
| await self.irc.send_action(user, None, message) | |
| else: | |
| await self.irc.send_msg(user, None, message) | |
| - async def relay_telegram_channel_message(self, message, user, text, channel, action): | |
| + async def relay_telegram_channel_message( | |
| + self, message, user, text, channel, action | |
| + ): | |
| if message: | |
| entity = await message.get_chat() | |
| chan = await self.get_irc_channel_from_telegram_id(message.chat_id, entity) | |
| else: | |
| chan = channel | |
| - self.logger.debug('Relaying Telegram Channel Message: %s, %s', chan, text) | |
| + self.logger.debug("Relaying Telegram Channel Message: %s, %s", chan, text) | |
| if action: | |
| await self.irc.send_action(user, chan, text) | |
| @@ -732,7 +811,7 @@ class TelegramHandler(object): | |
| return chan | |
| async def handle_telegram_chat_action(self, event): | |
| - self.logger.debug('Handling Telegram Chat Action: %s', pretty(event)) | |
| + self.logger.debug("Handling Telegram Chat Action: %s", pretty(event)) | |
| try: | |
| tid = event.action_message.to_id.channel_id | |
| @@ -742,13 +821,19 @@ class TelegramHandler(object): | |
| irc_channel = await self.get_irc_channel_from_telegram_id(tid) | |
| await self.get_telegram_channel_participants(tid) | |
| - try: # Join Chats | |
| - irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.action.users[0]) | |
| + try: # Join Chats | |
| + irc_nick = await self.get_irc_nick_from_telegram_id( | |
| + event.action_message.action.users[0] | |
| + ) | |
| except (IndexError, AttributeError): | |
| - try: # Kick | |
| - irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.action.user_id) | |
| - except (IndexError, AttributeError): # Join Channels | |
| - irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.sender_id) | |
| + try: # Kick | |
| + irc_nick = await self.get_irc_nick_from_telegram_id( | |
| + event.action_message.action.user_id | |
| + ) | |
| + except (IndexError, AttributeError): # Join Channels | |
| + irc_nick = await self.get_irc_nick_from_telegram_id( | |
| + event.action_message.sender_id | |
| + ) | |
| if event.user_added or event.user_joined: | |
| await self.irc.join_irc_channel(irc_nick, irc_channel, full_join=False) | |
| @@ -762,24 +847,26 @@ class TelegramHandler(object): | |
| channel = self.get_telegram_channel(chat) | |
| self.tid_to_iid[chat.id] = channel | |
| self.irc.iid_to_tid[channel] = chat.id | |
| - await self.irc.join_irc_channel(self.irc.irc_nick, channel, full_join=True) | |
| + await self.irc.join_irc_channel( | |
| + self.irc.irc_nick, channel, full_join=True | |
| + ) | |
| async def handle_telegram_action(self, message, mid): | |
| if isinstance(message.action, tgty.MessageActionPinMessage): | |
| replied = await message.get_reply_message() | |
| cid = self.mid.num_to_id_offset(replied.peer_id, replied.id) | |
| - action_text = 'has pinned message [{}]'.format(cid) | |
| + action_text = "has pinned message [{}]".format(cid) | |
| elif isinstance(message.action, tgty.MessageActionChatEditPhoto): | |
| _, media_type = self.scan_photo_attributes(message.action.photo) | |
| photo_url = await self.download_telegram_media(message, mid) | |
| - action_text = 'has changed chat [{}] {}'.format(media_type, photo_url) | |
| + action_text = "has changed chat [{}] {}".format(media_type, photo_url) | |
| else: | |
| - action_text = '' | |
| + action_text = "" | |
| return action_text | |
| async def handle_telegram_reply(self, message): | |
| - space = ' ' | |
| - trunc = '' | |
| + space = " " | |
| + trunc = "" | |
| replied = await message.get_reply_message() | |
| if replied: | |
| replied_msg = replied.message | |
| @@ -789,51 +876,66 @@ class TelegramHandler(object): | |
| replied_id = message.reply_to.reply_to_msg_id | |
| cid = self.mid.num_to_id_offset(message.peer_id, replied_id) | |
| if replied_id in self.cache: | |
| - text = self.cache[replied_id]['text'] | |
| - replied_user = self.cache[replied_id]['user'] | |
| - sp = ' ' | |
| + text = self.cache[replied_id]["text"] | |
| + replied_user = self.cache[replied_id]["user"] | |
| + sp = " " | |
| else: | |
| - text = '' | |
| - replied_user = '' | |
| - sp = '' | |
| - replied_msg = '|Deleted|{}{}'.format(sp, text) | |
| + text = "" | |
| + replied_user = "" | |
| + sp = "" | |
| + replied_msg = "|Deleted|{}{}".format(sp, text) | |
| if not replied_msg: | |
| - replied_msg = '' | |
| - space = '' | |
| + replied_msg = "" | |
| + space = "" | |
| elif len(replied_msg) > self.quote_len: | |
| - replied_msg = replied_msg[:self.quote_len] | |
| - trunc = '...' | |
| + replied_msg = replied_msg[: self.quote_len] | |
| + trunc = "..." | |
| if replied_user is None: | |
| - replied_nick = '{}' | |
| + replied_nick = "{}" | |
| self.refwd_me = True | |
| - elif replied_user == '': | |
| - replied_nick = '' | |
| + elif replied_user == "": | |
| + replied_nick = "" | |
| else: | |
| replied_nick = replied_user.irc_nick | |
| - return '|Re {}: [{}]{}{}{}| '.format(replied_nick, cid, space, replied_msg, trunc) | |
| + return "|Re {}: [{}]{}{}{}| ".format( | |
| + replied_nick, cid, space, replied_msg, trunc | |
| + ) | |
| async def handle_telegram_forward(self, message): | |
| - space = space2 = ' ' | |
| - if not (forwarded_peer_name := await self.get_irc_name_from_telegram_forward(message.fwd_from, saved=False)): | |
| - space = '' | |
| - saved_peer_name = await self.get_irc_name_from_telegram_forward(message.fwd_from, saved=True) | |
| + space = space2 = " " | |
| + if not ( | |
| + forwarded_peer_name := await self.get_irc_name_from_telegram_forward( | |
| + message.fwd_from, saved=False | |
| + ) | |
| + ): | |
| + space = "" | |
| + saved_peer_name = await self.get_irc_name_from_telegram_forward( | |
| + message.fwd_from, saved=True | |
| + ) | |
| if saved_peer_name and saved_peer_name != forwarded_peer_name: | |
| secondary_name = saved_peer_name | |
| else: | |
| # if it's from me I want to know who was the destination of a message (user) | |
| - if self.refwd_me and (saved_from_peer := message.fwd_from.saved_from_peer) is not None: | |
| - secondary_name = self.get_irc_user_from_telegram(saved_from_peer.user_id).irc_nick | |
| + if ( | |
| + self.refwd_me | |
| + and (saved_from_peer := message.fwd_from.saved_from_peer) is not None | |
| + ): | |
| + secondary_name = self.get_irc_user_from_telegram( | |
| + saved_from_peer.user_id | |
| + ).irc_nick | |
| else: | |
| - secondary_name = '' | |
| - space2 = '' | |
| + secondary_name = "" | |
| + space2 = "" | |
| - return '|Fwd{}{}{}{}| '.format(space, forwarded_peer_name, space2, secondary_name) | |
| + return "|Fwd{}{}{}{}| ".format( | |
| + space, forwarded_peer_name, space2, secondary_name | |
| + ) | |
| async def handle_telegram_media(self, message, user, mid): | |
| - caption = ' | {}'.format(message.message) if message.message else '' | |
| + caption = " | {}".format(message.message) if message.message else "" | |
| to_download = True | |
| - media_url_or_data = '' | |
| + media_url_or_data = "" | |
| size = 0 | |
| filename = None | |
| @@ -842,7 +944,9 @@ class TelegramHandler(object): | |
| size = document.size | |
| h_size = get_human_size(size) | |
| for x in document.attributes: | |
| - if isinstance(x, tgty.DocumentAttributeVideo) or isinstance(x, tgty.DocumentAttributeAudio): | |
| + if isinstance(x, tgty.DocumentAttributeVideo) or isinstance( | |
| + x, tgty.DocumentAttributeAudio | |
| + ): | |
| attrib_av = x | |
| if isinstance(x, tgty.DocumentAttributeFilename): | |
| attrib_file = x | |
| @@ -856,154 +960,169 @@ class TelegramHandler(object): | |
| # web | |
| return await self.handle_webpage(message.media.webpage, message, mid) | |
| elif isinstance(message.media.webpage, tgty.WebPagePending): | |
| - media_type = 'webpending' | |
| + media_type = "webpending" | |
| media_url_or_data = message.message | |
| - caption = '' | |
| + caption = "" | |
| self.webpending[message.media.webpage.id] = message | |
| else: | |
| - media_type = 'webunknown' | |
| + media_type = "webunknown" | |
| media_url_or_data = message.message | |
| - caption = '' | |
| + caption = "" | |
| elif message.photo: | |
| size, media_type = self.scan_photo_attributes(message.media.photo) | |
| elif message.audio: | |
| - size, h_size, attrib_audio, filename = scan_doc_attributes(message.media.document) | |
| - dur = get_human_duration(attrib_audio.duration) if attrib_audio else '' | |
| - per = attrib_audio.performer or '' | |
| - tit = attrib_audio.title or '' | |
| - theme = ',{}/{}'.format(per, tit) if per or tit else '' | |
| - media_type = 'audio:{},{}{}'.format(h_size, dur, theme) | |
| + size, h_size, attrib_audio, filename = scan_doc_attributes( | |
| + message.media.document | |
| + ) | |
| + dur = get_human_duration(attrib_audio.duration) if attrib_audio else "" | |
| + per = attrib_audio.performer or "" | |
| + tit = attrib_audio.title or "" | |
| + theme = ",{}/{}".format(per, tit) if per or tit else "" | |
| + media_type = "audio:{},{}{}".format(h_size, dur, theme) | |
| elif message.voice: | |
| - size, _, attrib_audio, filename = scan_doc_attributes(message.media.document) | |
| - dur = get_human_duration(attrib_audio.duration) if attrib_audio else '' | |
| - media_type = 'rec:{}'.format(dur) | |
| + size, _, attrib_audio, filename = scan_doc_attributes( | |
| + message.media.document | |
| + ) | |
| + dur = get_human_duration(attrib_audio.duration) if attrib_audio else "" | |
| + media_type = "rec:{}".format(dur) | |
| elif message.video: | |
| - size, h_size, attrib_video, filename = scan_doc_attributes(message.media.document) | |
| - dur = get_human_duration(attrib_video.duration) if attrib_video else '' | |
| - media_type = 'video:{},{}'.format(h_size, dur) | |
| - elif message.video_note: media_type = 'videorec' | |
| - elif message.gif: media_type = 'anim' | |
| - elif message.sticker: media_type = 'sticker' | |
| + size, h_size, attrib_video, filename = scan_doc_attributes( | |
| + message.media.document | |
| + ) | |
| + dur = get_human_duration(attrib_video.duration) if attrib_video else "" | |
| + media_type = "video:{},{}".format(h_size, dur) | |
| + elif message.video_note: | |
| + media_type = "videorec" | |
| + elif message.gif: | |
| + media_type = "anim" | |
| + elif message.sticker: | |
| + media_type = "sticker" | |
| elif message.document: | |
| size, h_size, _, filename = scan_doc_attributes(message.media.document) | |
| - media_type = 'file:{}'.format(h_size) | |
| + media_type = "file:{}".format(h_size) | |
| elif message.contact: | |
| - media_type = 'contact' | |
| - caption = '' | |
| + media_type = "contact" | |
| + caption = "" | |
| to_download = False | |
| if message.media.first_name: | |
| - media_url_or_data += message.media.first_name + ' ' | |
| + media_url_or_data += message.media.first_name + " " | |
| if message.media.last_name: | |
| - media_url_or_data += message.media.last_name + ' ' | |
| + media_url_or_data += message.media.last_name + " " | |
| if message.media.phone_number: | |
| media_url_or_data += message.media.phone_number | |
| elif message.game: | |
| - media_type = 'game' | |
| - caption = '' | |
| + media_type = "game" | |
| + caption = "" | |
| to_download = False | |
| if message.media.game.title: | |
| media_url_or_data = message.media.game.title | |
| elif message.geo: | |
| - media_type = 'geo' | |
| - caption = '' | |
| + media_type = "geo" | |
| + caption = "" | |
| to_download = False | |
| if self.geo_url: | |
| - geo_url = ' | ' + self.geo_url | |
| + geo_url = " | " + self.geo_url | |
| else: | |
| - geo_url = '' | |
| - lat_long_template = 'lat: {lat}, long: {long}' + geo_url | |
| - media_url_or_data = lat_long_template.format(lat=message.media.geo.lat, long=message.media.geo.long) | |
| + geo_url = "" | |
| + lat_long_template = "lat: {lat}, long: {long}" + geo_url | |
| + media_url_or_data = lat_long_template.format( | |
| + lat=message.media.geo.lat, long=message.media.geo.long | |
| + ) | |
| elif message.invoice: | |
| - media_type = 'invoice' | |
| - caption = '' | |
| + media_type = "invoice" | |
| + caption = "" | |
| to_download = False | |
| - media_url_or_data = '' | |
| + media_url_or_data = "" | |
| elif message.poll: | |
| - media_type = 'poll' | |
| - caption = '' | |
| + media_type = "poll" | |
| + caption = "" | |
| to_download = False | |
| media_url_or_data = self.handle_poll(message.media.poll) | |
| elif message.venue: | |
| - media_type = 'venue' | |
| - caption = '' | |
| + media_type = "venue" | |
| + caption = "" | |
| to_download = False | |
| - media_url_or_data = '' | |
| + media_url_or_data = "" | |
| else: | |
| - media_type = 'unknown' | |
| - caption = '' | |
| + media_type = "unknown" | |
| + caption = "" | |
| to_download = False | |
| media_url_or_data = message.message | |
| if to_download: | |
| relay_attr = (message, user, mid, media_type) | |
| - media_url_or_data = await self.download_telegram_media(message, mid, filename, size, relay_attr) | |
| + media_url_or_data = await self.download_telegram_media( | |
| + message, mid, filename, size, relay_attr | |
| + ) | |
| return self.format_media(media_type, media_url_or_data, caption) | |
| def handle_poll(self, poll): | |
| text = poll.question | |
| for ans in poll.answers: | |
| - text += '\n* ' + ans.text | |
| + text += "\n* " + ans.text | |
| return text | |
| def handle_target_mine(self, target, user): | |
| # Add the target of messages sent by self user (me) | |
| # received in other clients | |
| target_id, target_type = self.get_peer_id_and_type(target) | |
| - if user is None and target_type == 'user' and target_id != self.id: | |
| - # self user^ | |
| - # as sender | |
| + if user is None and target_type == "user" and target_id != self.id: | |
| + # self user^ | |
| + # as sender | |
| irc_id = self.get_irc_name_from_telegram_id(target_id) | |
| - target_mine = '[T: {}] '.format(irc_id) | |
| + target_mine = "[T: {}] ".format(irc_id) | |
| else: | |
| - target_mine = '' | |
| + target_mine = "" | |
| return target_mine | |
| async def handle_webpage(self, webpage, message, mid): | |
| - media_type = 'web' | |
| + media_type = "web" | |
| logo = await self.download_telegram_media(message, mid) | |
| if is_url_equiv(webpage.url, webpage.display_url): | |
| url_data = webpage.url | |
| else: | |
| - url_data = '{} | {}'.format(webpage.url, webpage.display_url) | |
| + url_data = "{} | {}".format(webpage.url, webpage.display_url) | |
| if message: | |
| # sometimes the 1st line of message contains the title, don't repeat it | |
| message_line = message.message.splitlines()[0] | |
| if message_line != webpage.title: | |
| title = webpage.title | |
| else: | |
| - title = '' | |
| + title = "" | |
| # extract the URL in the message, don't repeat it | |
| message_url = extract_url(message.message) | |
| if is_url_equiv(message_url, webpage.url): | |
| if is_url_equiv(message_url, webpage.display_url): | |
| media_url_or_data = message.message | |
| else: | |
| - media_url_or_data = '{} | {}'.format(message.message, webpage.display_url) | |
| + media_url_or_data = "{} | {}".format( | |
| + message.message, webpage.display_url | |
| + ) | |
| else: | |
| - media_url_or_data = '{} | {}'.format(message.message, url_data) | |
| + media_url_or_data = "{} | {}".format(message.message, url_data) | |
| else: | |
| title = webpage.title | |
| media_url_or_data = url_data | |
| if title and logo: | |
| - caption = ' | {} | {}'.format(title, logo) | |
| + caption = " | {} | {}".format(title, logo) | |
| elif title: | |
| - caption = ' | {}'.format(title) | |
| + caption = " | {}".format(title) | |
| elif logo: | |
| - caption = ' | {}'.format(logo) | |
| + caption = " | {}".format(logo) | |
| else: | |
| - caption = '' | |
| + caption = "" | |
| return self.format_media(media_type, media_url_or_data, caption) | |
| def format_media(self, media_type, media_url_or_data, caption): | |
| - return '[{}] {}{}'.format(media_type, media_url_or_data, caption) | |
| + return "[{}] {}{}".format(media_type, media_url_or_data, caption) | |
| def scan_photo_attributes(self, photo): | |
| size = 0 | |
| @@ -1017,16 +1136,18 @@ class TelegramHandler(object): | |
| if x.size > size: | |
| size = x.size | |
| ph_size = x | |
| - if hasattr(ph_size, 'w') and hasattr(ph_size, 'h'): | |
| - media_type = 'photo:{}x{}'.format(ph_size.w, ph_size.h) | |
| + if hasattr(ph_size, "w") and hasattr(ph_size, "h"): | |
| + media_type = "photo:{}x{}".format(ph_size.w, ph_size.h) | |
| else: | |
| - media_type = 'photo' | |
| + media_type = "photo" | |
| return size, media_type | |
| - async def download_telegram_media(self, message, mid, filename=None, size=0, relay_attr=None): | |
| + async def download_telegram_media( | |
| + self, message, mid, filename=None, size=0, relay_attr=None | |
| + ): | |
| if not self.download: | |
| - return '' | |
| + return "" | |
| if filename: | |
| idd_file = add_filename(filename, mid) | |
| new_file = sanitize_filename(idd_file) | |
| @@ -1036,11 +1157,13 @@ class TelegramHandler(object): | |
| else: | |
| await self.notice_downloading(size, relay_attr) | |
| local_path = await message.download_media(new_path) | |
| - if not local_path: return '' | |
| + if not local_path: | |
| + return "" | |
| else: | |
| await self.notice_downloading(size, relay_attr) | |
| local_path = await message.download_media(self.telegram_media_dir) | |
| - if not local_path: return '' | |
| + if not local_path: | |
| + return "" | |
| filetype = os.path.splitext(local_path)[1] | |
| gen_file = str(self.media_cn) + filetype | |
| idd_file = add_filename(gen_file, mid) | |
| @@ -1050,24 +1173,28 @@ class TelegramHandler(object): | |
| if local_path != new_path: | |
| os.replace(local_path, new_path) | |
| - if self.media_url[-1:] != '/': | |
| - self.media_url += '/' | |
| + if self.media_url[-1:] != "/": | |
| + self.media_url += "/" | |
| return self.media_url + new_file | |
| async def notice_downloading(self, size, relay_attr): | |
| if relay_attr and size > self.notice_size: | |
| message, user, mid, media_type = relay_attr | |
| - await self.relay_telegram_message(message, user, '[{}] [{}] [Downloading]'.format(mid, media_type)) | |
| + await self.relay_telegram_message( | |
| + message, user, "[{}] [{}] [Downloading]".format(mid, media_type) | |
| + ) | |
| + | |
| class mesg_id: | |
| def __init__(self, alpha): | |
| self.alpha = alpha | |
| self.base = len(alpha) | |
| - self.alphaval = { i:v for v, i in enumerate(alpha) } | |
| + self.alphaval = {i: v for v, i in enumerate(alpha)} | |
| self.mesg_base = {} | |
| - def num_to_id(self, num, neg=''): | |
| - if num < 0: return self.num_to_id(-num, '-') | |
| + def num_to_id(self, num, neg=""): | |
| + if num < 0: | |
| + return self.num_to_id(-num, "-") | |
| (high, low) = divmod(num, self.base) | |
| if high >= self.base: | |
| aux = self.num_to_id(high) | |
| @@ -1083,7 +1210,8 @@ class mesg_id: | |
| def id_to_num(self, id, n=1): | |
| if id: | |
| - if id[0] == '-': return self.id_to_num(id[1:], -1) | |
| + if id[0] == "-": | |
| + return self.id_to_num(id[1:], -1) | |
| aux = self.alphaval[id[-1:]] * n | |
| sum = self.id_to_num(id[:-1], n * self.base) | |
| return sum + aux |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment