Skip to content

Instantly share code, notes, and snippets.

@outtoin
Created March 21, 2019 12:39
Show Gist options
  • Save outtoin/12d1334b1345403c57372fcc6653c11b to your computer and use it in GitHub Desktop.
Save outtoin/12d1334b1345403c57372fcc6653c11b to your computer and use it in GitHub Desktop.
from kafka import KafkaProducer
from confluent_kafka import Producer
import multiprocessing
Class KProducer(object):
def __init__(self, bootstrap_servers, cbootstrap_servers, topic):
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
self.cproducer = Producer({'bootstrap.servers': cbootstrap_servers})
self.topic = topic
def delivery_report(self, err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
def sendk(self, data):
self.producer.send(self.topic, data)
# self.producer.flush()
def sendc(self, data):
self.cproducer.poll(0)
self.cproducer.send(self.topic, data, callback=self.delivery_report)
# self.cproducer.flush()
if __name__ == "__main__":
p = KProducer(["some_server:9092" ...], "some_server:9092, ...", "some_topic")
process = multiprocessing.Process(target=p.send, args=(some_data))
process.daemon = True
process.start() # It doesn't work
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment