Created
March 10, 2017 18:03
-
-
Save mfenniak/46113b4e3cbe35cc54ee103cb0515f34 to your computer and use it in GitHub Desktop.
Log output & sequence from Kafka Streams CommitFailedException
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
App starts up. Consumer config is logged showing max.poll.interval.ms = 1800000 (30 minutes) | |
{ | |
"timestamp":"2017-03-08T22:13:51.139+00:00", | |
"message":"ConsumerConfig values: | |
auto.commit.interval.ms = 5000 | |
auto.offset.reset = earliest | |
bootstrap.servers = [10.20.4.219:9092, 10.20.6.54:9092] | |
check.crcs = true | |
client.id = timesheet-list-3d89bd2e-941f-4272-9916-aad4e6f82e36-StreamThread-1-consumer | |
connections.max.idle.ms = 540000 | |
enable.auto.commit = false | |
exclude.internal.topics = true | |
fetch.max.bytes = 52428800 | |
fetch.max.wait.ms = 500 | |
fetch.min.bytes = 1 | |
group.id = timesheet-list | |
heartbeat.interval.ms = 3000 | |
interceptor.classes = null | |
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer | |
max.partition.fetch.bytes = 1048576 | |
max.poll.interval.ms = 1800000 | |
max.poll.records = 1000 | |
metadata.max.age.ms = 300000 | |
metric.reporters = [] | |
metrics.num.samples = 2 | |
metrics.recording.level = INFO | |
metrics.sample.window.ms = 30000 | |
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor] | |
receive.buffer.bytes = 65536 | |
reconnect.backoff.ms = 50 | |
request.timeout.ms = 1801000 | |
retry.backoff.ms = 100 | |
sasl.jaas.config = null | |
sasl.kerberos.kinit.cmd = /usr/bin/kinit | |
sasl.kerberos.min.time.before.relogin = 60000 | |
sasl.kerberos.service.name = null | |
sasl.kerberos.ticket.renew.jitter = 0.05 | |
sasl.kerberos.ticket.renew.window.factor = 0.8 | |
sasl.mechanism = GSSAPI | |
security.protocol = PLAINTEXT | |
send.buffer.bytes = 131072 | |
session.timeout.ms = 10000 | |
ssl.cipher.suites = null | |
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] | |
ssl.endpoint.identification.algorithm = null | |
ssl.key.password = null | |
ssl.keymanager.algorithm = SunX509 | |
ssl.keystore.location = null | |
ssl.keystore.password = null | |
ssl.keystore.type = JKS | |
ssl.protocol = TLS | |
ssl.provider = null | |
ssl.secure.random.implementation = null | |
ssl.trustmanager.algorithm = PKIX | |
ssl.truststore.location = null | |
ssl.truststore.password = null | |
ssl.truststore.type = JKS | |
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer | |
", | |
"logger_name":"org.apache.kafka.clients.consumer.ConsumerConfig", | |
"thread_name":"main", | |
"level":"INFO", | |
"level_value":20000, | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} | |
Everything is "OK"; committing about every 30 seconds: | |
{ | |
"timestamp":"2017-03-08T22:22:40.366+00:00", | |
"message":"stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed", | |
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread", | |
"thread_name":"StreamThread-1", | |
"level":"INFO", | |
"level_value":20000, | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} | |
... snip ... | |
{ | |
"timestamp":"2017-03-08T22:22:44.028+00:00", | |
"message":"stream-thread [StreamThread-1] Committing task StreamTask 18_7", | |
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread", | |
"thread_name":"StreamThread-1", | |
"level":"INFO", | |
"level_value":20000, | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} | |
About commit.interval.ms/30s later when I'd expect the next commit to start: | |
{ | |
"timestamp":"2017-03-08T22:23:10.845+00:00", | |
"message":"Marking the coordinator 10.20.4.219:9092 (id: 2147483644 rack: null) dead for group timesheet-list", | |
"logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator", | |
"thread_name":"kafka-coordinator-heartbeat-thread | timesheet-list", | |
"level":"INFO", | |
"level_value":20000, | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} | |
Similar message about 80s later: | |
{ | |
"timestamp":"2017-03-08T22:33:39.232+00:00", | |
"message":"Marking the coordinator 10.20.4.219:9092 (id: 2147483644 rack: null) dead for group timesheet-list", | |
"logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator", | |
"thread_name":"StreamThread-1", | |
"level":"INFO", | |
"level_value":20000, | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} | |
Anoter 70s later, we've recovered? | |
{ | |
"timestamp":"2017-03-08T22:24:57.696+00:00", | |
"message":"stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed", | |
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread", | |
"thread_name":"StreamThread-1", | |
"level":"INFO", | |
"level_value":20000, | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} | |
{ | |
"timestamp":"2017-03-08T22:24:58.149+00:00", | |
"message":"stream-thread [StreamThread-1] Committing task StreamTask 0_0", | |
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread", | |
"thread_name":"StreamThread-1", | |
"level":"INFO", | |
"level_value":20000, | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} | |
{ | |
"timestamp":"2017-03-08T22:25:10.469+00:00", | |
"message":"Discovered coordinator 10.20.4.219:9092 (id: 2147483644 rack: null) for group timesheet-list.", | |
"logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator", | |
"thread_name":"StreamThread-1", | |
"level":"INFO", | |
"level_value":20000, | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} | |
Oh no, disaster: | |
{ | |
"timestamp":"2017-03-08T22:25:45.296+00:00", | |
"message":"stream-thread [StreamThread-1] Failed to commit StreamTask 0_0 state: ", | |
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread", | |
"thread_name":"StreamThread-1", | |
"level":"WARN", | |
"level_value":30000, | |
"stack_trace":"org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. | |
\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) | |
\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) | |
\tat org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) | |
\tat org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) | |
\tat org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) | |
\tat org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) | |
\tat org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) | |
\tat org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777) | |
\tat org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:764) | |
\tat org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739) | |
\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661) | |
\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) | |
", | |
"app":"timesheet-list", | |
"build":"1.1.0.423" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment