Skip to content

Instantly share code, notes, and snippets.

@atuchak
Created December 13, 2023 15:34
Show Gist options
  • Save atuchak/334158570fd59da896da8e623f699a7f to your computer and use it in GitHub Desktop.
Save atuchak/334158570fd59da896da8e623f699a7f to your computer and use it in GitHub Desktop.
CloudPayments Webhook Facade
from __future__ import annotations
from abc import abstractmethod, ABC
from typing import Callable
from django.conf import settings
from django.http import Http404
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.product_configuration.logic.interactors.basket.cloud_payments import (
cloud_payments__register_webhook_call,
)
from common.api.serializers.common import CloudPaymentResponseOnNotificationSerializer
from common.dto.payments import CloudPaymentResponseOnNotificationDto
class IntegrationBase(ABC):
def load_settings(self, slug):
return getattr(settings, f'CLOUD_PAYMENST_{slug}_PRIVATE_KEY')
def validate_hmac(self, slug, payload, hmac_data):
settings = self.load_settings(slug)
return self._validate_hmac(settings, payload, hmac_data)
def process(self):
event_tuype = ''
if event_tuype == 'PAY':
return self.process_pay()
@abstractmethod
def process_pay(self):
pass
class Gazprom(IntegrationBase):
def __init__(self, call_cloud_cancel: Callable):
self._call_cloud_cancel = call_cloud_cancel
def process_pay(self):
# вызов старой логики
pass
self._call_cloud_cancel()
# def call_cloud_cancel():
# requests.post('https://cloud.ru/sadsd/')
class WebhookFascide:
def __init__(self, integrations):
self._map = integrations
def run(self, slug, payload, hmac_data) -> CloudPaymentResponseOnNotificationDto:
map_ = {
'gazprom': Gazprom,
}
try:
cls = map_[slug]
except:
raise Http404
# validate hmac
res = cls.validate_hmac(hmac_data, payload)
if res is not None:
# TODO: Сделать нормально
raise Http404('Hmac is not valid.')
result = cls.process(payload)
return result
class CloudPaymentsWebhookAPIView(APIView):
def post(self, request: Request, slug: str) -> Response:
headers = {header: value for header, value in request.headers.items()}
# 1
cloud_payments__register_webhook_call(slug=slug, headers=headers, payload=request.data)
# 2 Business
hmac_data = b''
integrations = {}
WebhookFascide(integrations=integrations).run(slug, payload=request.data, hamc_data=hmac_data)
response_dto = CloudPaymentResponseOnNotificationDto()
response_serializer = CloudPaymentResponseOnNotificationSerializer(instance=response_dto)
return Response(status=status.HTTP_200_OK, data=response_serializer.data)
@atuchak
Copy link
Author

atuchak commented Dec 13, 2023

CLOUD_PAYMENT_BI_API_PASSWORD = Value(None, environ_name='CLOUD_PAYMENT_BI_API_PASSWORD')
CLOUD_PAYMENT__slug__API_PASSWORD = Value(None, environ_name='CLOUD_PAYMENT_BI_API_PASSWORD')
CLOUD_PAYMENT__SUPER_SLUG2__API_PASSWORD = Value(None)
CLOUD_PAYMENT__SUPER_SLUG3__API_PASSWORD = Value(None)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment