Created
December 13, 2023 15:34
-
-
Save atuchak/334158570fd59da896da8e623f699a7f to your computer and use it in GitHub Desktop.
CloudPayments Webhook Facade
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from abc import abstractmethod, ABC | |
from typing import Callable | |
from django.conf import settings | |
from django.http import Http404 | |
from rest_framework import status | |
from rest_framework.request import Request | |
from rest_framework.response import Response | |
from rest_framework.views import APIView | |
from apps.product_configuration.logic.interactors.basket.cloud_payments import ( | |
cloud_payments__register_webhook_call, | |
) | |
from common.api.serializers.common import CloudPaymentResponseOnNotificationSerializer | |
from common.dto.payments import CloudPaymentResponseOnNotificationDto | |
class IntegrationBase(ABC): | |
def load_settings(self, slug): | |
return getattr(settings, f'CLOUD_PAYMENST_{slug}_PRIVATE_KEY') | |
def validate_hmac(self, slug, payload, hmac_data): | |
settings = self.load_settings(slug) | |
return self._validate_hmac(settings, payload, hmac_data) | |
def process(self): | |
event_tuype = '' | |
if event_tuype == 'PAY': | |
return self.process_pay() | |
@abstractmethod | |
def process_pay(self): | |
pass | |
class Gazprom(IntegrationBase): | |
def __init__(self, call_cloud_cancel: Callable): | |
self._call_cloud_cancel = call_cloud_cancel | |
def process_pay(self): | |
# вызов старой логики | |
pass | |
self._call_cloud_cancel() | |
# def call_cloud_cancel(): | |
# requests.post('https://cloud.ru/sadsd/') | |
class WebhookFascide: | |
def __init__(self, integrations): | |
self._map = integrations | |
def run(self, slug, payload, hmac_data) -> CloudPaymentResponseOnNotificationDto: | |
map_ = { | |
'gazprom': Gazprom, | |
} | |
try: | |
cls = map_[slug] | |
except: | |
raise Http404 | |
# validate hmac | |
res = cls.validate_hmac(hmac_data, payload) | |
if res is not None: | |
# TODO: Сделать нормально | |
raise Http404('Hmac is not valid.') | |
result = cls.process(payload) | |
return result | |
class CloudPaymentsWebhookAPIView(APIView): | |
def post(self, request: Request, slug: str) -> Response: | |
headers = {header: value for header, value in request.headers.items()} | |
# 1 | |
cloud_payments__register_webhook_call(slug=slug, headers=headers, payload=request.data) | |
# 2 Business | |
hmac_data = b'' | |
integrations = {} | |
WebhookFascide(integrations=integrations).run(slug, payload=request.data, hamc_data=hmac_data) | |
response_dto = CloudPaymentResponseOnNotificationDto() | |
response_serializer = CloudPaymentResponseOnNotificationSerializer(instance=response_dto) | |
return Response(status=status.HTTP_200_OK, data=response_serializer.data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
CLOUD_PAYMENT_BI_API_PASSWORD = Value(None, environ_name='CLOUD_PAYMENT_BI_API_PASSWORD')
CLOUD_PAYMENT__slug__API_PASSWORD = Value(None, environ_name='CLOUD_PAYMENT_BI_API_PASSWORD')
CLOUD_PAYMENT__SUPER_SLUG2__API_PASSWORD = Value(None)
CLOUD_PAYMENT__SUPER_SLUG3__API_PASSWORD = Value(None)