Skip to content

Instantly share code, notes, and snippets.

@WesleyBatista
Last active June 16, 2018 00:42
Show Gist options
  • Save WesleyBatista/16fe8efbb6cf98aabe8d5581d2fb34c9 to your computer and use it in GitHub Desktop.
Save WesleyBatista/16fe8efbb6cf98aabe8d5581d2fb34c9 to your computer and use it in GitHub Desktop.
Kafka 0.11.0.0 with kafka-manager running with docker-compose
import os
from kafka import KafkaConsumer
from settings import BOOTSTRAP_SERVERS, TOPIC
consumer = KafkaConsumer(TOPIC, bootstrap_servers=BOOTSTRAP_SERVERS, auto_offset_reset='earliest')
print("connected to: {}".format(BOOTSTRAP_SERVERS))
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka1:
image: wurstmeister/kafka:0.11.0.0
environment:
KAFKA_BROKER_ID: 901
KAFKA_CREATE_TOPICS: "topic_1:1:1:compact,topic_2:1:1:compact"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 192.168.39.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_NUM_PARTITIONS: 2
KAFKA_ZOOKEEPER_CONNECT: zk:2181
KAFKA_DELETE_TOPIC_ENABLE: 'true'
ports:
- "9092:9092"
- "9992:9999"
links:
- zookeeper:zk
kafka2:
image: wurstmeister/kafka:0.11.0.0
environment:
KAFKA_BROKER_ID: 902
KAFKA_CREATE_TOPICS: "topic_1:1:1:compact,topic_2:1:1:compact"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 192.168.39.1
KAFKA_ADVERTISED_PORT: 9093
KAFKA_NUM_PARTITIONS: 2
KAFKA_ZOOKEEPER_CONNECT: zk:2181
KAFKA_DELETE_TOPIC_ENABLE: 'true'
ports:
- "9093:9092"
- "9993:9999"
links:
- zookeeper:zk
kafka3:
image: wurstmeister/kafka:0.11.0.0
environment:
KAFKA_BROKER_ID: 903
KAFKA_CREATE_TOPICS: "topic_1:1:1:compact,topic_2:1:1:compact"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 192.168.39.1
KAFKA_ADVERTISED_PORT: 9094
KAFKA_NUM_PARTITIONS: 2
KAFKA_ZOOKEEPER_CONNECT: zk:2181
KAFKA_DELETE_TOPIC_ENABLE: 'true'
ports:
- "9094:9092"
- "9994:9999"
links:
- zookeeper:zk
kafka-manager:
image: sheepkiller/kafka-manager:latest
ports:
- "9000:9000"
links:
- zookeeper
- kafka1
- kafka2
- kafka3
environment:
ZK_HOSTS: zookeeper:2181
APPLICATION_SECRET: letmein
KM_ARGS: -Djava.net.preferIPv4Stack=true
#!/usr/bin/env python
import string
import random
from kafka import KafkaProducer
from settings import BOOTSTRAP_SERVERS, TOPIC
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
for _ in range(100):
msg = "message {}".format(_)
key = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
print(msg)
producer.send(TOPIC, key=bytes(key, encoding='utf-8'), value=bytes(msg, encoding='utf-8')).get()
BOOTSTRAP_SERVERS="192.168.39.1:9093"
TOPIC="__consumer_offsets"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment