Last active
February 5, 2016 23:44
-
-
Save emmettbutler/6b010d163037ea681de5 to your computer and use it in GitHub Desktop.
Message too large
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DEBUG:pykafka.connection:Connecting to localhost:9092 | |
DEBUG:pykafka.connection:Successfully connected to localhost:9092 | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue | |
INFO:pykafka.cluster:Discovered 3 brokers | |
DEBUG:pykafka.cluster:Discovered broker id 0: emmett-debian:9092 | |
DEBUG:pykafka.connection:Connecting to emmett-debian:9092 | |
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9092 | |
DEBUG:pykafka.cluster:Discovered broker id 1: emmett-debian:9093 | |
DEBUG:pykafka.connection:Connecting to emmett-debian:9093 | |
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9093 | |
DEBUG:pykafka.cluster:Discovered broker id 2: emmett-debian:9094 | |
DEBUG:pykafka.connection:Connecting to emmett-debian:9094 | |
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9094 | |
INFO:pykafka.cluster:Discovered 1 topics | |
DEBUG:pykafka.cluster:Discovered topic 'testtopic_replicated' | |
DEBUG:pykafka.connection:Connecting to localhost:9092 | |
DEBUG:pykafka.connection:Successfully connected to localhost:9092 | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue | |
INFO:pykafka.topic:Adding 10 partitions | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/0 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/1 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/2 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/3 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/4 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/5 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/6 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/7 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/8 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/9 | |
INFO:pykafka.producer:Starting new produce worker for broker 1 | |
INFO:pykafka.producer:Starting new produce worker for broker 2 | |
INFO:pykafka.producer:Starting new produce worker for broker 0 | |
DEBUG:pykafka.producer:Sending 1 messages to broker 1 | |
WARNING:pykafka.producer:Produce request for testtopic_replicated/6 to emmett-debian:9093 failed with error code 10. | |
INFO:pykafka.producer:Blocking until all messages are sent | |
Traceback (most recent call last): | |
File "../notes/kafka_producer_test.py", line 37, in <module> | |
raise exc | |
INFO:pykafka.producer:Worker exited for broker emmett-debian:9092 | |
pykafka.exceptions.MessageSizeTooLarge: Produce request for testtopic_replicated/6 to emmett-debian:9093 failed with error code 10. | |
INFO:pykafka.producer:Worker exited for broker emmett-debian:9094 | |
DEBUG:pykafka.producer:Finalising <pykafka.producer.Producer at 0x7f81a4d6e810> | |
INFO:pykafka.producer:Worker exited for broker emmett-debian:9093 | |
INFO:pykafka.producer:Blocking until all messages are sent | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DEBUG:pykafka.connection:Connecting to localhost:9092 | |
DEBUG:pykafka.connection:Successfully connected to localhost:9092 | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue | |
INFO:pykafka.cluster:Discovered 3 brokers | |
DEBUG:pykafka.cluster:Discovered broker id 0: emmett-debian:9092 | |
DEBUG:pykafka.connection:Connecting to emmett-debian:9092 | |
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9092 | |
DEBUG:pykafka.cluster:Discovered broker id 1: emmett-debian:9093 | |
DEBUG:pykafka.connection:Connecting to emmett-debian:9093 | |
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9093 | |
DEBUG:pykafka.cluster:Discovered broker id 2: emmett-debian:9094 | |
DEBUG:pykafka.connection:Connecting to emmett-debian:9094 | |
DEBUG:pykafka.connection:Successfully connected to emmett-debian:9094 | |
INFO:pykafka.cluster:Discovered 1 topics | |
DEBUG:pykafka.cluster:Discovered topic 'testtopic_replicated' | |
DEBUG:pykafka.connection:Connecting to localhost:9092 | |
DEBUG:pykafka.connection:Successfully connected to localhost:9092 | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue | |
INFO:pykafka.topic:Adding 10 partitions | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/0 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/1 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/2 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/3 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/4 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/5 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/6 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/7 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/8 | |
DEBUG:pykafka.topic:Adding partition testtopic_replicated/9 | |
INFO:pykafka.rdkafka.producer:Blocking until all messages are sent | |
DEBUG:pykafka.rdkafka.producer:Exiting RdKafkaProducer poller thread cleanly. | |
Traceback (most recent call last): | |
File "../notes/kafka_producer_test.py", line 37, in <module> | |
raise exc | |
pykafka.exceptions.MessageSizeTooLarge: (10, 'Broker: Message size too large') | |
DEBUG:pykafka.producer:Finalising <pykafka.rdkafka.producer.RdKafkaProducer at 0x7f87b8a4e8d0> | |
INFO:pykafka.rdkafka.producer:Blocking until all messages are sent | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue | |
INFO:pykafka.handlers:RequestHandler.stop: about to flush requests queue |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
from pykafka import KafkaClient | |
from pykafka.common import CompressionType | |
import time | |
import logging as log | |
log.basicConfig(level=log.DEBUG) | |
exc = None | |
def write_random_lowercase(n): | |
min_lc = ord(b'a') | |
len_lc = 26 | |
ba = bytearray(os.urandom(n)) | |
for i, b in enumerate(ba): | |
ba[i] = min_lc + b % len_lc # convert 0..255 to 97..122 | |
return str(ba) | |
client = KafkaClient(hosts="localhost:9092,localhost:9093,localhost:9094", socket_timeout_ms=1000) | |
topic = client.topics['testtopic_replicated'] | |
with topic.get_producer( | |
compression=CompressionType.SNAPPY, | |
min_queued_messages=1, | |
linger_ms=1, | |
max_retries=0, | |
use_rdkafka=True, | |
delivery_reports=True) as producer: | |
for i in range(1200): | |
producer.produce(write_random_lowercase(2000000)) | |
msg, exc = producer.get_delivery_report(block=True) | |
if exc is not None: | |
raise exc | |
time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment