Skip to content

Instantly share code, notes, and snippets.

@Attumm
Created September 16, 2017 11:40
Show Gist options
  • Save Attumm/7d07102aece79619cd11720a175d118d to your computer and use it in GitHub Desktop.
Save Attumm/7d07102aece79619cd11720a175d118d to your computer and use it in GitHub Desktop.
from confluent_kafka import Consumer, KafkaError
import time
c = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'first',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['test_once_ok_ok_ok'])
start = time.time()
running = True
items = []
count = 0
while running:
msg = c.poll()
if not msg.error():
items.append(msg.value().decode('utf-8'))
print(items[-1].split(' ')[-1])
count += 1
print(count, end='\r')
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()
print(f'{len(running)}: items took: {start - time.time()}')
from confluent_kafka import Producer
import time
p = Producer({'bootstrap.servers': 'localhost:9092'})
start = time.time()
times = 1_000_000_000
for i in range(times):
p.produce('test_once_ok_ok_ok', key='hello', value=f'{i}')
p.poll(0)
if i % 1000 == 0:
p.flush()
p.flush()
print(f'{times}:items took: {time.time() - start}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment