Created
December 13, 2023 14:30
-
-
Save vsmelov/38d24d07181e381e99095efb24d1c95b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from functools import wraps | |
from quart import Quart, jsonify, request | |
from db import Config | |
import logging | |
import pprint | |
import aiohttp | |
import json | |
from tools.log import init_logger | |
from tools.sentry_tools import init_sentry | |
init_logger( | |
log_dir=os.getenv('PROGRAM_LOG_DIR', './logs'), | |
module_name="omni", | |
level='DEBUG', | |
fmt=os.getenv('PROGRAM_LOG_FORMAT', '%(asctime)s - %(name)s - %(levelname)s - %(message)s'), | |
) | |
logging.info(f'starting omni_http.py') | |
init_sentry() | |
app = Quart(__name__) | |
config = Config() | |
secret_key = os.environ['ZENDESK_SECRET_KEY'] | |
def catch_errors(f): | |
@wraps(f) | |
async def wrapped(*args, **kwargs): | |
try: | |
return await f(*args, **kwargs) | |
except Exception as e: | |
logging.exception(f'failed in handler {f.__name__} {type(e)=} {e=}') | |
return jsonify({"error": f'{type(e)}'}), 500 | |
return wrapped | |
@app.route("/webhook", methods=['POST']) | |
@catch_errors | |
async def webhook(): | |
text = await request.get_data() | |
headers = request.headers | |
if headers.get('X-Api-Key') != secret_key: | |
logging.warning(f'wrong secret key: {headers.get("X-Api-Key")}') | |
return jsonify({"error": "wrong secret key"}), 403 | |
try: | |
data = json.loads(text) | |
except Exception as e: | |
logging.info(f'bad json: {text}') | |
logging.exception(f'bad json: {e}') | |
return jsonify({"error": "wrong json"}), 400 | |
logging.info(f'webhook:\n{pprint.pformat(data)}') | |
for event in data['events']: | |
await handle_event(event) | |
return jsonify({"status": "ok"}) | |
ZENDESK_SUBDOMAIN = os.environ['ZENDESK_SUBDOMAIN'] | |
ZENDESK_API_TOKEN = os.environ['ZENDESK_API_TOKEN'] | |
async def handle_event(event): | |
# ignore same id | |
_type = event['type'] | |
if _type != 'conversation:message': | |
logging.info(f'ignoring event {_type}') | |
chat_id = event['payload']['conversation']['id'] | |
message = event['payload']['message'] | |
author_name = message['author']['displayName'] | |
content = message['content'] | |
if content['type'] != 'text': | |
await send_zendesk_message(chat_id, 'only text messages are supported') | |
text = content['text'] | |
reply = f'Hi {author_name}\n--\n\nReceived message:\n{text}' | |
await send_zendesk_message(chat_id, reply) | |
async def send_zendesk_message(chat_id, message): | |
try: | |
await _send_zendesk_message(chat_id, message) | |
except Exception as e: | |
logging.exception(f'failed to send zendesk message {e=}') | |
async def _send_zendesk_message(chat_id, message): | |
url = f"https://{subdomain}.zendesk.com/api/v2/conversations/{conversation_id}/messages" | |
url = f"https://{ZENDESK_SUBDOMAIN}.zendesk.com/api/v2/chats/{chat_id}/messages" | |
headers = { | |
"Authorization": f"Bearer {ZENDESK_API_TOKEN}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"message": message | |
} | |
async with aiohttp.ClientSession() as session: | |
async with session.post(url, headers=headers, data=json.dumps(data)) as response: | |
return await response.json() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment