-
-
Save MrTheFirst/2c8207dd7d23888800b9a048a49b190e to your computer and use it in GitHub Desktop.
key.rt.ru
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import logging | |
import time | |
from pyppeteer import launch | |
from urllib.parse import urlparse, parse_qs | |
from bs4 import BeautifulSoup | |
from enum import Enum | |
from typing import Optional, Tuple, Literal | |
from rostelecom_key.session import LOGGER, RtKeySession | |
_LOGGER = logging.getLogger(__name__) | |
class AuthenticationStep1Result(Enum): | |
AUTHENTICATED_BY_COOKIES = 1 | |
NEED_TO_FOLLOW_NEXT_STEPS = 2 | |
class AuthenticationStep2Result(Enum): | |
NEED_TO_FOLLOW_NEXT_STEPS = 1 | |
class AuthenticationSendSmsResult(Enum): | |
SMS_WAS_SENT = 1 | |
class AuthenticationDoubleRedirect(Enum): | |
SUCCESS = 1 | |
class AuthenticationDoubleRedirectResponse(): | |
def __init__(self, code: str, t: str, state: str): | |
self.code = code | |
self.t = t | |
self.state = state | |
class AuthenticationGetTokenResponse(): | |
def __init__(self, bearer_token: str, auth_token: str): | |
self.bearer_token = bearer_token | |
self.auth_token = auth_token | |
class ElementNotFound(Exception): | |
pass | |
class Authentication: | |
BASE_AUTHENTICATION_ENDPOINT = 'https://b2c.passport.rt.ru/auth/realms/b2c/protocol/openid-connect/auth' | |
BASE_AUTHENTICATION_REDIRECT_URI = 'https://sso.key.rt.ru/api/v1/oauth/b2c/callback' | |
BASE_SIGNIN_WITH_TIME_TEMPLATE = 'https://key.rt.ru/main/signin?t={}' | |
HOUSEHOLD_GET_TOKEN_URI = 'https://household.key.rt.ru/api/v3/app/sso/oauth2/token' | |
CLIENT_ID = None | |
def __init__(self, session: RtKeySession, client_id = 'lk_dmh'): | |
self.session = session | |
self.CLIENT_ID = client_id | |
async def get_browser(self): | |
if self.browser is None: | |
self.browser = await launch({ | |
'headless': True | |
}) | |
self.page = await self.browser.newPage() | |
await self.page.setRequestInterception(True) | |
return self.browser | |
async def authenticate_via_sms(self, phone: str) -> Tuple[Literal[AuthenticationSendSmsResult.SMS_WAS_SENT], str]: | |
(step1_result, step1_followup) = await self.authentication_step1() | |
if step1_result is AuthenticationStep1Result.AUTHENTICATED_BY_COOKIES: | |
# Goes directly to Get Token? | |
pass | |
elif step1_result is AuthenticationStep1Result.NEED_TO_FOLLOW_NEXT_STEPS: | |
(step2_result, step2_followup) = await self.authentication_step2(str(step1_followup)) | |
if step2_result is AuthenticationStep2Result.NEED_TO_FOLLOW_NEXT_STEPS: | |
return await self.send_sms_with_code(step2_followup, phone) | |
raise Exception | |
async def send_sms_with_code(self, uri: str, phone: str) -> Tuple[Literal[AuthenticationSendSmsResult.SMS_WAS_SENT], str]: | |
r = await self.session.post(uri, data={ 'phone': phone, 'otp_get_code': 'Получить+код' }) | |
html_text = await r.text() | |
soup = BeautifulSoup(html_text, 'html.parser') | |
collection = soup.select('#kc-form-login .page-right-container-header:nth-of-type(1)') | |
if len(collection) != 1: | |
raise Exception | |
if 'Код подтверждения отправлен' not in collection[0].text: | |
raise Exception | |
element = soup.find('form', id='kc-form-login') | |
if element is None: | |
raise ElementNotFound | |
return (AuthenticationSendSmsResult.SMS_WAS_SENT, str(element.get('action'))) # type: ignore | |
async def authenticate_with_code_from_sms(self, uri: str, phone: str, code: str): | |
r = await self.session.post(uri, data={ | |
'username': phone, | |
'password': code, | |
'otp_login': 'Войти', | |
'otp_login': 'Войти' | |
}) | |
html_text = await r.text() | |
soup = BeautifulSoup(html_text, 'html.parser') | |
errors = soup.select('.error-container') | |
# Assert there are no errors | |
assert(len(errors) == 0) | |
(_, data) = await self.authentication_double_redirect(r) | |
return await self.get_token(data) | |
async def get_token(self, data: AuthenticationDoubleRedirectResponse, grant_type: str = 'authorization_code'): | |
r = await self.session.post(self.HOUSEHOLD_GET_TOKEN_URI, data={ | |
'code': data.code, | |
't': data.t, | |
'state': data.state, | |
'grant_type': grant_type | |
}) | |
response_json = await r.json() | |
# Assert there are no errors | |
assert(response_json['error'] is not None) | |
# Two type of tokens, one for one group of methods, and second for another ones | |
self.session.set_auth_token(response_json['data']['vc']['access_token']) | |
self.session.set_bearer_token(response_json['data']['key']['access_token']) | |
return AuthenticationGetTokenResponse(response_json['data']['key']['access_token'], response_json['data']['vc']['access_token']) | |
async def authentication_step2(self, uri: str) -> Tuple[Literal[AuthenticationStep2Result.NEED_TO_FOLLOW_NEXT_STEPS], str]: | |
r = await self.session.get(uri) | |
# LOGGER.debug(uri) | |
if r.status != 302: | |
raise Exception(r.status) | |
location = str(r.headers.get('Location')) | |
async with self.session.get(location) as response: | |
html_text = await response.text() | |
soup = BeautifulSoup(html_text, 'html.parser') | |
element = soup.find('form', id='kc-form-login') | |
if element is None: | |
raise ElementNotFound | |
return (AuthenticationStep1Result.NEED_TO_FOLLOW_NEXT_STEPS, str(element.get('action'))) # type: ignore | |
async def authentication_double_redirect(self, request) -> Tuple[Literal[AuthenticationDoubleRedirect.SUCCESS], AuthenticationDoubleRedirectResponse]: | |
# That means we already authenticated with AUTHENTICATED_BY_COOKIES | |
# We should follow redirects and then catch params from Location header | |
location = str(request.headers.get('Location')) | |
async with self.session.get(location) as response: | |
if response.status == 302: | |
location = response.headers.get('Location') | |
async with self.session.get(str(location)) as response: | |
parsed_url = urlparse(location) | |
params = parse_qs(str(parsed_url.query)) | |
return (AuthenticationDoubleRedirect.SUCCESS, AuthenticationDoubleRedirectResponse(params['code'][0], params['t'][0], params['state'][0])) | |
raise Exception | |
async def authentication_step1(self) -> Tuple[Literal[AuthenticationStep1Result.AUTHENTICATED_BY_COOKIES, AuthenticationStep1Result.NEED_TO_FOLLOW_NEXT_STEPS], Optional[str]]: | |
r = await self.session.get(self.BASE_AUTHENTICATION_ENDPOINT, params={ | |
'client_id': self.CLIENT_ID, | |
'redirect_uri': self.BASE_AUTHENTICATION_REDIRECT_URI, | |
'response_type': 'code', | |
'state': self.BASE_SIGNIN_WITH_TIME_TEMPLATE.format(round(time.time() * 1000)) | |
}, allow_redirects=False) | |
if r.status == 302: | |
result = await self.authentication_double_redirect(r) | |
if result == AuthenticationDoubleRedirect.SUCCESS: | |
return (AuthenticationStep1Result.AUTHENTICATED_BY_COOKIES, None) | |
else: | |
html_text = await r.text() | |
soup = BeautifulSoup(html_text, 'html.parser') | |
element = soup.find('form', id='kc-form-login') | |
if element is None: | |
raise ElementNotFound | |
return (AuthenticationStep1Result.NEED_TO_FOLLOW_NEXT_STEPS, str(element.get('action'))) # type: ignore | |
raise Exception |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment