Skip to content

Instantly share code, notes, and snippets.

@emmettbutler
Last active February 5, 2016 23:44
Show Gist options
  • Save emmettbutler/6b010d163037ea681de5 to your computer and use it in GitHub Desktop.
Save emmettbutler/6b010d163037ea681de5 to your computer and use it in GitHub Desktop.
Message too large
DEBUG:pykafka.connection:Connecting to localhost:9092
DEBUG:pykafka.connection:Successfully connected to localhost:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.cluster:Discovered 3 brokers
DEBUG:pykafka.cluster:Discovered broker id 0: emmett-debian:9092
DEBUG:pykafka.connection:Connecting to emmett-debian:9092
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9092
DEBUG:pykafka.cluster:Discovered broker id 1: emmett-debian:9093
DEBUG:pykafka.connection:Connecting to emmett-debian:9093
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9093
DEBUG:pykafka.cluster:Discovered broker id 2: emmett-debian:9094
DEBUG:pykafka.connection:Connecting to emmett-debian:9094
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9094
INFO:pykafka.cluster:Discovered 1 topics
DEBUG:pykafka.cluster:Discovered topic 'testtopic_replicated'
DEBUG:pykafka.connection:Connecting to localhost:9092
DEBUG:pykafka.connection:Successfully connected to localhost:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.topic:Adding 10 partitions
DEBUG:pykafka.topic:Adding partition testtopic_replicated/0
DEBUG:pykafka.topic:Adding partition testtopic_replicated/1
DEBUG:pykafka.topic:Adding partition testtopic_replicated/2
DEBUG:pykafka.topic:Adding partition testtopic_replicated/3
DEBUG:pykafka.topic:Adding partition testtopic_replicated/4
DEBUG:pykafka.topic:Adding partition testtopic_replicated/5
DEBUG:pykafka.topic:Adding partition testtopic_replicated/6
DEBUG:pykafka.topic:Adding partition testtopic_replicated/7
DEBUG:pykafka.topic:Adding partition testtopic_replicated/8
DEBUG:pykafka.topic:Adding partition testtopic_replicated/9
INFO:pykafka.producer:Starting new produce worker for broker 1
INFO:pykafka.producer:Starting new produce worker for broker 2
INFO:pykafka.producer:Starting new produce worker for broker 0
DEBUG:pykafka.producer:Sending 1 messages to broker 1
WARNING:pykafka.producer:Produce request for testtopic_replicated/6 to emmett-debian:9093 failed with error code 10.
INFO:pykafka.producer:Blocking until all messages are sent
Traceback (most recent call last):
File "../notes/kafka_producer_test.py", line 37, in <module>
raise exc
INFO:pykafka.producer:Worker exited for broker emmett-debian:9092
pykafka.exceptions.MessageSizeTooLarge: Produce request for testtopic_replicated/6 to emmett-debian:9093 failed with error code 10.
INFO:pykafka.producer:Worker exited for broker emmett-debian:9094
DEBUG:pykafka.producer:Finalising <pykafka.producer.Producer at 0x7f81a4d6e810>
INFO:pykafka.producer:Worker exited for broker emmett-debian:9093
INFO:pykafka.producer:Blocking until all messages are sent
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
DEBUG:pykafka.connection:Connecting to localhost:9092
DEBUG:pykafka.connection:Successfully connected to localhost:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.cluster:Discovered 3 brokers
DEBUG:pykafka.cluster:Discovered broker id 0: emmett-debian:9092
DEBUG:pykafka.connection:Connecting to emmett-debian:9092
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9092
DEBUG:pykafka.cluster:Discovered broker id 1: emmett-debian:9093
DEBUG:pykafka.connection:Connecting to emmett-debian:9093
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9093
DEBUG:pykafka.cluster:Discovered broker id 2: emmett-debian:9094
DEBUG:pykafka.connection:Connecting to emmett-debian:9094
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9094
INFO:pykafka.cluster:Discovered 1 topics
DEBUG:pykafka.cluster:Discovered topic 'testtopic_replicated'
DEBUG:pykafka.connection:Connecting to localhost:9092
DEBUG:pykafka.connection:Successfully connected to localhost:9092
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.topic:Adding 10 partitions
DEBUG:pykafka.topic:Adding partition testtopic_replicated/0
DEBUG:pykafka.topic:Adding partition testtopic_replicated/1
DEBUG:pykafka.topic:Adding partition testtopic_replicated/2
DEBUG:pykafka.topic:Adding partition testtopic_replicated/3
DEBUG:pykafka.topic:Adding partition testtopic_replicated/4
DEBUG:pykafka.topic:Adding partition testtopic_replicated/5
DEBUG:pykafka.topic:Adding partition testtopic_replicated/6
DEBUG:pykafka.topic:Adding partition testtopic_replicated/7
DEBUG:pykafka.topic:Adding partition testtopic_replicated/8
DEBUG:pykafka.topic:Adding partition testtopic_replicated/9
INFO:pykafka.rdkafka.producer:Blocking until all messages are sent
DEBUG:pykafka.rdkafka.producer:Exiting RdKafkaProducer poller thread cleanly.
Traceback (most recent call last):
File "../notes/kafka_producer_test.py", line 37, in <module>
raise exc
pykafka.exceptions.MessageSizeTooLarge: (10, 'Broker: Message size too large')
DEBUG:pykafka.producer:Finalising <pykafka.rdkafka.producer.RdKafkaProducer at 0x7f87b8a4e8d0>
INFO:pykafka.rdkafka.producer:Blocking until all messages are sent
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue
import os
import sys
from pykafka import KafkaClient
from pykafka.common import CompressionType
import time
import logging as log
log.basicConfig(level=log.DEBUG)
exc = None
def write_random_lowercase(n):
min_lc = ord(b'a')
len_lc = 26
ba = bytearray(os.urandom(n))
for i, b in enumerate(ba):
ba[i] = min_lc + b % len_lc # convert 0..255 to 97..122
return str(ba)
client = KafkaClient(hosts="localhost:9092,localhost:9093,localhost:9094", socket_timeout_ms=1000)
topic = client.topics['testtopic_replicated']
with topic.get_producer(
compression=CompressionType.SNAPPY,
min_queued_messages=1,
linger_ms=1,
max_retries=0,
use_rdkafka=True,
delivery_reports=True) as producer:
for i in range(1200):
producer.produce(write_random_lowercase(2000000))
msg, exc = producer.get_delivery_report(block=True)
if exc is not None:
raise exc
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment