Skip to content

Instantly share code, notes, and snippets.

@mfenniak
Created March 10, 2017 18:03
Show Gist options
  • Save mfenniak/46113b4e3cbe35cc54ee103cb0515f34 to your computer and use it in GitHub Desktop.
Save mfenniak/46113b4e3cbe35cc54ee103cb0515f34 to your computer and use it in GitHub Desktop.
Log output & sequence from Kafka Streams CommitFailedException
App starts up. Consumer config is logged showing max.poll.interval.ms = 1800000 (30 minutes)
{
"timestamp":"2017-03-08T22:13:51.139+00:00",
"message":"ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.20.4.219:9092, 10.20.6.54:9092]
check.crcs = true
client.id = timesheet-list-3d89bd2e-941f-4272-9916-aad4e6f82e36-StreamThread-1-consumer
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = timesheet-list
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 1800000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 1801000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
",
"logger_name":"org.apache.kafka.clients.consumer.ConsumerConfig",
"thread_name":"main",
"level":"INFO",
"level_value":20000,
"app":"timesheet-list",
"build":"1.1.0.423"
}
Everything is "OK"; committing about every 30 seconds:
{
"timestamp":"2017-03-08T22:22:40.366+00:00",
"message":"stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed",
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name":"StreamThread-1",
"level":"INFO",
"level_value":20000,
"app":"timesheet-list",
"build":"1.1.0.423"
}
... snip ...
{
"timestamp":"2017-03-08T22:22:44.028+00:00",
"message":"stream-thread [StreamThread-1] Committing task StreamTask 18_7",
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name":"StreamThread-1",
"level":"INFO",
"level_value":20000,
"app":"timesheet-list",
"build":"1.1.0.423"
}
About commit.interval.ms/30s later when I'd expect the next commit to start:
{
"timestamp":"2017-03-08T22:23:10.845+00:00",
"message":"Marking the coordinator 10.20.4.219:9092 (id: 2147483644 rack: null) dead for group timesheet-list",
"logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
"thread_name":"kafka-coordinator-heartbeat-thread | timesheet-list",
"level":"INFO",
"level_value":20000,
"app":"timesheet-list",
"build":"1.1.0.423"
}
Similar message about 80s later:
{
"timestamp":"2017-03-08T22:33:39.232+00:00",
"message":"Marking the coordinator 10.20.4.219:9092 (id: 2147483644 rack: null) dead for group timesheet-list",
"logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
"thread_name":"StreamThread-1",
"level":"INFO",
"level_value":20000,
"app":"timesheet-list",
"build":"1.1.0.423"
}
Anoter 70s later, we've recovered?
{
"timestamp":"2017-03-08T22:24:57.696+00:00",
"message":"stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed",
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name":"StreamThread-1",
"level":"INFO",
"level_value":20000,
"app":"timesheet-list",
"build":"1.1.0.423"
}
{
"timestamp":"2017-03-08T22:24:58.149+00:00",
"message":"stream-thread [StreamThread-1] Committing task StreamTask 0_0",
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name":"StreamThread-1",
"level":"INFO",
"level_value":20000,
"app":"timesheet-list",
"build":"1.1.0.423"
}
{
"timestamp":"2017-03-08T22:25:10.469+00:00",
"message":"Discovered coordinator 10.20.4.219:9092 (id: 2147483644 rack: null) for group timesheet-list.",
"logger_name":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
"thread_name":"StreamThread-1",
"level":"INFO",
"level_value":20000,
"app":"timesheet-list",
"build":"1.1.0.423"
}
Oh no, disaster:
{
"timestamp":"2017-03-08T22:25:45.296+00:00",
"message":"stream-thread [StreamThread-1] Failed to commit StreamTask 0_0 state: ",
"logger_name":"org.apache.kafka.streams.processor.internals.StreamThread",
"thread_name":"StreamThread-1",
"level":"WARN",
"level_value":30000,
"stack_trace":"org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
\tat org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
\tat org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
\tat org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
\tat org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
\tat org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
\tat org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777)
\tat org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:764)
\tat org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739)
\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661)
\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
",
"app":"timesheet-list",
"build":"1.1.0.423"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment