Skip to content

Instantly share code, notes, and snippets.

@bomzheg
Last active May 4, 2021 18:16
Show Gist options
  • Save bomzheg/ebe1625ade8e780d2a016e65ab85fc37 to your computer and use it in GitHub Desktop.
Save bomzheg/ebe1625ade8e780d2a016e65ab85fc37 to your computer and use it in GitHub Desktop.
import asyncio
import typing
import pyrogram
from aiogram.types import User
from loguru import logger
from pyrogram import Client
from pyrogram.errors import RPCError, UsernameNotOccupied, FloodWait
from app import config
# from https://gist.github.com/jorektheglitch/5ea4972d9cb87c2ec682604e53d1ff94 by @entressi
from app.services.restrict_call import RestrictCall
SLEEP_TIME = 100
class UserGetter:
def __init__(self):
self._client_api_bot = Client("karma_bot", bot_token=config.token, api_id=config.API_ID,
api_hash=config.API_HASH, no_updates=True)
async def get_user(self, username: str = None, fullname: str = None, chat_id: int = None) -> User:
async def try_by_name() -> typing.Optional[User]:
try:
return await self.get_user_by_fullname(chat_id, fullname)
except (IndexError, RPCError):
return None
if username is not None:
try:
user_tg = await self.get_user_by_username(username)
except RPCError:
user_tg = await try_by_name()
else:
user_tg = await try_by_name()
return user_tg
@RestrictCall(SLEEP_TIME)
async def get_user_by_username(self, username: str) -> User:
try:
logger.info("get user of username {username}", username=username)
user = await self._client_api_bot.get_users(username)
logger.info("found user {user}", user=self.get_user_dict_for_log(user))
except UsernameNotOccupied:
logger.info("Username not found {username}", username=username)
raise
except FloodWait as e:
logger.error("Flood Wait {e}", e=e)
await asyncio.sleep(e.x)
raise IndexError
return self.get_aiogram_user_by_pyrogram(user)
@RestrictCall(SLEEP_TIME)
async def get_user_by_fullname(self, chat_id: int, fullname: str) -> User:
try:
logger.info("get user of name {name}", name=fullname)
chat_members = await self._client_api_bot.get_chat_members(chat_id=chat_id, query=fullname)
logger.info(
"found: {users}",
users=[self.get_user_dict_for_log(chat_member.user) for chat_member in chat_members]
)
user = chat_members[0].user
except IndexError:
logger.info("name not found {name}", name=fullname)
raise
except FloodWait as e:
logger.error("Flood Wait {e}", e=e)
await asyncio.sleep(e.x)
raise IndexError
return self.get_aiogram_user_by_pyrogram(user)
@staticmethod
def get_aiogram_user_by_pyrogram(user: pyrogram.User) -> User:
return User(
id=user.id,
is_bot=user.is_bot,
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
language_code=user.language_code,
)
@staticmethod
def get_user_dict_for_log(user: pyrogram.User) -> dict:
return dict(
id=user.id,
is_bot=user.is_bot,
first_name=user.first_name,
last_name=user.last_name,
username=user.username,
)
async def start(self):
if not self._client_api_bot.is_connected:
await self._client_api_bot.start()
async def stop(self):
if self._client_api_bot.is_connected:
await self._client_api_bot.stop()
async def __aenter__(self) -> "UserGetter":
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment