Skip to content

Instantly share code, notes, and snippets.

@khaerulumam42
Last active December 20, 2021 08:55
Show Gist options
  • Select an option

  • Save khaerulumam42/e08f3617f047dda20c8cff4fa70a6f85 to your computer and use it in GitHub Desktop.

Select an option

Save khaerulumam42/e08f3617f047dda20c8cff4fa70a6f85 to your computer and use it in GitHub Desktop.
import logging
from json import loads
from kafka import KafkaConsumer
logging.basicConfig(level=logging.WARNING)
consumer = KafkaConsumer(
"tutorial_topic",
bootstrap_servers=["kafka:9092"],
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id="my-group",
value_deserializer=lambda x: loads(x.decode("utf-8")))
for message in consumer:
message = message.value
logging.warning(f"get message {message}")
from fastapi import HTTPException, FastAPI, Request
from kafka import KafkaProducer
import logging
import asyncio
import uvicorn
import traceback
import json
producer = KafkaProducer(bootstrap_servers=["kafka:9092"],
value_serializer=lambda x:
json.dumps(x).encode("utf-8"))
logging.basicConfig(level=logging.WARNING)
app = FastAPI()
@app.post("/v1/webhook")
def webhook(request: Request):
try:
payload = asyncio.run(request.json())
event_id = payload["event_id"]
topic = "tutorial_topic"
producer.send(topic, payload)
logging.warning(f"payload {payload} sent to topic {topic}")
return {"event_id": event_id, "payload": payload, "topic": topic, "status": "OK"}
except Exception as e:
error_message = f"error on main function {e}"
detail = {"status": "error", "message": error_message}
logging.warning(traceback.print_exc())
return HTTPException(status_code=500, detail=detail)
if __name__ == '__main__':
uvicorn.run(app, debug=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment