Skip to content

Instantly share code, notes, and snippets.

@epsi95
Created November 6, 2020 18:37
Show Gist options
  • Select an option

  • Save epsi95/30ddf8805fdcaed1d6f7c1adda88e674 to your computer and use it in GitHub Desktop.

Select an option

Save epsi95/30ddf8805fdcaed1d6f7c1adda88e674 to your computer and use it in GitHub Desktop.
from flask import Flask, request
from confluent_kafka import Producer
import json
import logging
logging.basicConfig(format='%(process)d >> %(asctime)s - %(message)s', level=logging.INFO)
app = Flask(__name__)
def acknowledgement(err, msg):
# function to handle acknowledgement from kafka server
if err is not None:
logging.info("Failed to deliver message: {0}: {1}"
.format(msg.value(), err.str()))
else:
logging.info("Message produced: {0}".format(msg.value()))
p = Producer({'bootstrap.servers': 'localhost:9092'})
@app.route("/data", methods=["POST"])
def json_example():
req = request.get_json()
print(req)
p.produce(topic="car_data", key=req["id"], value=json.dumps(req), callback=acknowledgement)
# Wait for all messages in the Producer queue to be delivered
p.flush(30)
return "OK", 200
if __name__ == "__main__":
app.run(port=9000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment