Created
September 4, 2020 16:47
-
-
Save om2c0de/d2bbc7f41875b2e7e68c7b330f24f1e9 to your computer and use it in GitHub Desktop.
Patched controller vega
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
from http import HTTPStatus | |
from typing import Awaitable, Optional, Union | |
from auth.exceptions import UserNotFound | |
from auth.jwt_utils import jwt_decode | |
from services.projects import Projects | |
from services.users import User, Users | |
from utils.constants import BaseErrorCodes | |
from utils.error_response import Error | |
from tornado.escape import json_decode | |
from .kerberos import KerberosAuthHandler | |
auth_logger = logging.getLogger('auth') | |
class Controller(KerberosAuthHandler): | |
current_user = None # Чтобы pyrestful не вызывал get_current_user() при каждом запросе | |
def set_default_headers(self): | |
# Logger.info("setting CORS") | |
self.set_header("Access-Control-Allow-Origin", "*") | |
self.set_header("Access-Control-Allow-Headers", "x-requested-with") | |
self.set_header("Access-Control-Allow-Headers", "access-control-allow-origin,authorization,content-type") | |
self.set_header('Access-Control-Allow-Methods', '*') | |
self.set_header('Access-Control-Allow-Credentials', 'true') | |
self.set_header('Access-Control-Expose-Headers', 'Content-Disposition') | |
def options(self): | |
# no body | |
self.set_status(HTTPStatus.NO_CONTENT) | |
self.finish() | |
def send_response(self, status_code: int, body: Union[str, bytes, dict]) -> None: | |
""" | |
Отправляет HTTP-ответ с переданными статус-кодом и телом. | |
В контроллерах должно использоваться так: | |
return self.send_response(...) | |
Так сделано, чтобы не забыть написать выражение return после вызова метода. | |
""" | |
self.set_status(status_code) | |
self.finish(body) | |
return None | |
def _get_user_id_from_auth_header(self) -> str or None: | |
auth_header = str(self._get_auth_header()) | |
if 'Bearer' in auth_header: # JWT authentication | |
_, token_encoded = auth_header.split(' ') | |
token_decoded = jwt_decode(token_encoded) | |
return token_decoded['user_id'] | |
elif 'Negotiate' in auth_header: # Kerberos authentication | |
return self.get_secure_cookie('user_id') | |
return None | |
@staticmethod | |
def _search_for_user_by_id(user_id): | |
user = Users().read_by_filter({'user_id': user_id}) | |
if user is None: | |
raise UserNotFound() | |
return user | |
def get_current_user(self) -> User: | |
user_id = self._get_user_id_from_auth_header() | |
if user_id: | |
user = self._search_for_user_by_id(user_id) | |
auth_logger.debug(f'Got current user: {user}') | |
return user | |
return None | |
def check_project_exists(self, project_id: str, logger_handler) -> Optional[bool]: | |
"""Актуально только для контроллеров: calculation, imports, results | |
Arguments: | |
project_id {str} | |
logger_handler {Logger} | |
Returns: | |
[True, None] | |
""" | |
if not Projects().exists(object_id=project_id): | |
message = f"Project({project_id}) was not found." | |
logger_handler.info(message) | |
error = Error(code=BaseErrorCodes.BAD_REQUEST, message=message) | |
return self.send_response( # type: ignore | |
status_code=HTTPStatus.BAD_REQUEST, body=error.get_full_error_dict() | |
) | |
return True | |
def _get_auth_header(self) -> str: | |
auth_header = self.request.headers.get('Authorization') | |
if not auth_header: | |
auth_logger.debug(f'Auth header not found. Headers: {self.request.headers}') | |
self.authenticate_redirect() | |
return auth_header | |
def get_resource_id_from_body(self, resource_id_alias: str) -> Optional[str]: | |
try: | |
body_dict = json_decode(self.request.body) | |
except json.decoder.JSONDecodeError: | |
auth_logger.debug( | |
f'Not found resource id ({resource_id_alias}) in body: invalid json in body: {self.request.body}' | |
) | |
return None # not json | |
try: | |
resource_id = body_dict[resource_id_alias] | |
auth_logger.debug(f'Found resource id ({resource_id_alias}) in body: {resource_id}') | |
return resource_id | |
except KeyError: | |
auth_logger.debug(f'Not found resource id ({resource_id_alias}) in body: {self.request.body}') | |
return None # no resource id | |
class ControllerException(Exception): | |
def __init__( | |
self, code: str, data: Optional[str] = None, source: Optional[str] = None, reason: Optional[str] = None | |
): | |
self.code = code | |
self.data = data | |
self.source = source | |
self.reason = reason | |
@property | |
def extra(self): | |
return self.data | |
@property | |
def error_code(self): | |
return self.code | |
def __str__(self): | |
json_data = {'code': str(self.code), 'data': self.data} | |
try: | |
result = json.dumps(json_data) | |
except Exception as ex: | |
result = f'ControllerException (Internal error) -> {ex}' | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment