Last active
December 20, 2021 08:55
-
-
Save khaerulumam42/e08f3617f047dda20c8cff4fa70a6f85 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| from json import loads | |
| from kafka import KafkaConsumer | |
| logging.basicConfig(level=logging.WARNING) | |
| consumer = KafkaConsumer( | |
| "tutorial_topic", | |
| bootstrap_servers=["kafka:9092"], | |
| auto_offset_reset="earliest", | |
| enable_auto_commit=True, | |
| group_id="my-group", | |
| value_deserializer=lambda x: loads(x.decode("utf-8"))) | |
| for message in consumer: | |
| message = message.value | |
| logging.warning(f"get message {message}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from fastapi import HTTPException, FastAPI, Request | |
| from kafka import KafkaProducer | |
| import logging | |
| import asyncio | |
| import uvicorn | |
| import traceback | |
| import json | |
| producer = KafkaProducer(bootstrap_servers=["kafka:9092"], | |
| value_serializer=lambda x: | |
| json.dumps(x).encode("utf-8")) | |
| logging.basicConfig(level=logging.WARNING) | |
| app = FastAPI() | |
| @app.post("/v1/webhook") | |
| def webhook(request: Request): | |
| try: | |
| payload = asyncio.run(request.json()) | |
| event_id = payload["event_id"] | |
| topic = "tutorial_topic" | |
| producer.send(topic, payload) | |
| logging.warning(f"payload {payload} sent to topic {topic}") | |
| return {"event_id": event_id, "payload": payload, "topic": topic, "status": "OK"} | |
| except Exception as e: | |
| error_message = f"error on main function {e}" | |
| detail = {"status": "error", "message": error_message} | |
| logging.warning(traceback.print_exc()) | |
| return HTTPException(status_code=500, detail=detail) | |
| if __name__ == '__main__': | |
| uvicorn.run(app, debug=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment