Created
October 19, 2018 15:10
-
-
Save LifeMoroz/07bed691a3a62daa9740383f7000d607 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import copy | |
import json | |
import logging | |
import uuid | |
from decimal import Decimal | |
from django.conf import settings | |
from django.http import HttpRequest | |
from django.utils.encoding import force_bytes | |
from pypo.monitoring import monitoring | |
from pypo.utils.encoders import JSONEncoder | |
from scribe import scribe | |
from thrift.protocol import TBinaryProtocol | |
from thrift.transport import TSocket, TTransport | |
from pharma.cart.serializers.models import CartSerializer | |
from pharma.cart.services import CartService | |
from pharma.core.utils.common import is_production_site | |
from pharma.pharma_scribe.constants import BACKEND_EVENTS, FRONTEND_EVENTS | |
from pharma.pharma_scribe.serializers import RequestSerializer | |
logger = logging.getLogger(__name__) | |
class DWHPacket: | |
def __init__(self, type: str, request: HttpRequest, **kwargs): | |
self.type = type | |
self.request = request | |
self.options = {} | |
self.extra = kwargs | |
self.forced = {} | |
self.is_server_side = False | |
@classmethod | |
def get_packet(cls, type, request, event_dict): | |
event_options = event_dict.get(type) | |
if type is None or event_options is None: | |
return | |
packet = cls(type, request) | |
cart_service = CartService.get_by_cookie(request.COOKIES.get('current_cart'), request.rb_info) | |
packet.set_cart(cart_service) | |
return packet | |
@classmethod | |
def get_frontend_event(cls, type, request: HttpRequest): | |
return cls.get_packet(type, request, FRONTEND_EVENTS) | |
@classmethod | |
def get_backend_event(cls, type, request: HttpRequest): | |
return cls.get_packet(type, request, BACKEND_EVENTS) | |
def set(self, key, data): # INFO: be careful with mutable types | |
self.forced[key] = data | |
def _set(self, key, obj, serializer): | |
self.options[key] = { | |
'object': obj, | |
'serializer': lambda o: serializer(o).data | |
} | |
def set_frontend_data(self, data): | |
self.options['from_frontend'] = { | |
'object': data, | |
'serializer': lambda o: o | |
} | |
def set_cart(self, cart: CartService, serializer=None): | |
self._set('cart', cart, serializer or CartSerializer) | |
def _iter(self, value): | |
if isinstance(value, uuid.UUID): | |
value = str(value) | |
if isinstance(value, (str, bytes)): | |
return base64.b64encode(force_bytes(value)).decode() | |
if isinstance(value, dict): | |
for k, v in value.items(): | |
value[k] = self._iter(v) | |
return value | |
if isinstance(value, (list, tuple)): | |
return [self._iter(i) for i in value] | |
if isinstance(value, (int, float, Decimal)) or value is None: | |
return value | |
logger.warning(f'Unexpected type {type(value)}') | |
return value | |
def pack(self) -> dict: | |
packed = { | |
'type': self.type, | |
'request': RequestSerializer(self.request).data, | |
'is_server_side': self.is_server_side, | |
**{key: item['serializer'](item['object']) for key, item in self.options.items()}, | |
**self.extra, | |
**self.forced | |
} | |
return self._iter(copy.deepcopy(packed)) | |
class ScribeService(object): | |
_instance = None | |
def __init__(self, host, port, category): | |
self.category = category | |
self.host = host | |
self.port = port | |
self.transport, self.client = self._get_connection() | |
def _get_connection(self): | |
socket = TSocket.TSocket(host=self.host, port=self.port) | |
socket.setTimeout(settings.DWH_TIMEOUT_MS) | |
transport = TTransport.TFramedTransport(socket) | |
protocol = TBinaryProtocol.TBinaryProtocol(trans=transport, strictRead=False, strictWrite=False) | |
client = scribe.Client(iprot=protocol, oprot=protocol) | |
return transport, client | |
def _get_log_entry(self, message): | |
return scribe.LogEntry(self.category, message) | |
def _send(self, log_entry): | |
if not self.transport.isOpen(): | |
with monitoring.monitor('scribe.opening'): | |
self.transport.open() | |
with monitoring.monitor('scribe.sending'): | |
return self.client.Log(messages=[log_entry]) | |
def __del__(self): | |
if self.transport.isOpen(): | |
self.transport.close() | |
def _log(self, packet: DWHPacket) -> bool: | |
dumped = json.dumps(packet.pack(), cls=JSONEncoder) | |
logger.debug("SCRIBE SEND DATA to `%s:%r`: %s", self.host, self.port, dumped) | |
log_entry = self._get_log_entry(dumped) | |
return self._send(log_entry) | |
@classmethod | |
def get_instance(cls) -> 'ScribeService': | |
if not cls._instance: | |
cls._instance = cls(host=settings.DWH_IP, port=settings.DWH_PORT, category=settings.DWH_CATEGORY) | |
return cls._instance | |
@classmethod | |
def log(cls, packet: DWHPacket) -> bool: | |
if not is_production_site(packet.request): | |
return True | |
try: | |
return cls.get_instance()._log(packet) | |
except Exception: | |
logger.warning("Exception during sending packet to DWH:", exc_info=True) | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment