|
#!/usr/bin/env python |
|
# |
|
# Copyright 2020 Confluent Inc. |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
# |
|
|
|
# ============================================================================= |
|
# |
|
# Produce messages to Confluent Cloud |
|
# Using Confluent Python Client for Apache Kafka |
|
# |
|
# ============================================================================= |
|
|
|
from confluent_kafka import Producer, KafkaError |
|
import json |
|
import os |
|
|
|
if __name__ == '__main__': |
|
|
|
# Initialization |
|
# Create Producer instance |
|
p = Producer({ |
|
'bootstrap.servers': os.environ['BOOTSTRAP_SERVERS'], |
|
'sasl.mechanisms': os.environ['SASL_MECHANISMS'], |
|
'security.protocol': os.environ['SECURITY_PROTOCOL'], |
|
'sasl.username': os.environ['SASL_USERNAME'], |
|
'sasl.password': os.environ['SASL_PASSWORD'], |
|
}) |
|
|
|
topic = os.environ['TOPIC'] |
|
|
|
delivered_records = 0 |
|
|
|
# Optional per-message on_delivery handler (triggered by poll() or flush()) |
|
# when a message has been successfully delivered or |
|
# permanently failed delivery (after retries). |
|
def acked(err, msg): |
|
global delivered_records |
|
"""Delivery report handler called on |
|
successful or failed delivery of message |
|
""" |
|
if err is not None: |
|
print("Failed to deliver message: {}".format(err)) |
|
else: |
|
delivered_records += 1 |
|
print("Produced record to topic {} partition [{}] @ offset {}" |
|
.format(msg.topic(), msg.partition(), msg.offset())) |
|
|
|
for n in range(10): |
|
record_key = "alice" |
|
record_value = json.dumps({'count': n}) |
|
print("Producing record: {}\t{}".format(record_key, record_value)) |
|
p.produce(topic, key=record_key, value=record_value, on_delivery=acked) |
|
# p.poll() serves delivery reports (on_delivery) |
|
# from previous produce() calls. |
|
p.poll(0) |
|
|
|
p.flush(10) |
|
|
|
print("{} messages were produced to topic {}!".format(delivered_records, topic)) |