The fine folks at Datamountaineer have developed the stream-reactor making it easier to integrate data piplines with Kafka.
As a demo I'm streaming Twitter data to Kafka, and then from Kafka to RethinkDB.
Configs using Landoop's fast-data-dev environment:
Source: Twitter
name=TwitterSourceConnector
connector.class=com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector
tasks.max=1
topic=storm
track.terms=storm
twitter.secret=xxx
twitter.token=xxx
twitter.consumersecret=xxx
twitter.consumerkey=xxx
Sink: RethinkDB
name=ReThinkSinkConnector
connector.class=com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkConnector
tasks.max=1
topics=storm
connect.rethink.sink.db=twitter_data
connect.rethink.sink.host=172.17.0.4
connect.rethink.sink.kcql=INSERT INTO tweets SELECT * FROM storm
value.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:3030/api/schema-registry
Everything hums along fine for several hours until it looks like kafka eats a duplicate tweet. Full trace below.
Side note: It sounds like Rethink is working on some error handling improvements around duplicate primary ids.
Rethink kafka connector docs and relevant bits:
http://docs.datamountaineer.com/en/latest/rethink.html
https://github.com/Landoop/kafka-connectors-tests/tree/master/kafka-connect-rethink
[2017-05-19 16:51:38,883] ERROR Encountered error Write error occurred. Duplicate primary key `id`:
{
"created_at": "2017-05-19T16:28:40.000+0000",
"entities": {
"hashtags": [],
"media": [],
"urls": [],
"user_mentions": [
{}
]
},
"id": 865605134142722000.0,
"is_retweet": true,
"lang": "en",
"text": "RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
"user": {
"followers_count": 42577,
"friends_count": 42548,
"id": 2300088499,
"location": "St Louis, MO",
"name": "ℹL๑บ∂β๑γ🍃💨",
"screen_name": "iBleedLxyalty",
"statuses_count": 13267,
"verified": false
}
}
{
"created_at": "2017-05-19T16:28:40.000+0000",
"entities": {
"hashtags": [],
"media": [],
"urls": [],
"user_mentions": [
{}
]
},
"id": 865605134142722000.0,
"is_retweet": true,
"lang": "en",
"text": "RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
"user": {
"followers_count": 42577,
"friends_count": 42548,
"id": 2300088499,
"location": "St Louis, MO",
"name": "ℹL๑บ∂β๑γ🍃💨",
"screen_name": "iBleedLxyalty",
"statuses_count": 13267,
"verified": false
}
} (com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter:62)
java.lang.Throwable: Write error occurred. Duplicate primary key `id`:
{
"created_at": "2017-05-19T16:28:40.000+0000",
"entities": {
"hashtags": [],
"media": [],
"urls": [],
"user_mentions": [
{}
]
},
"id": 865605134142722000.0,
"is_retweet": true,
"lang": "en",
"text": "RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
"user": {
"followers_count": 42577,
"friends_count": 42548,
"id": 2300088499,
"location": "St Louis, MO",
"name": "ℹL๑บ∂β๑γ🍃💨",
"screen_name": "iBleedLxyalty",
"statuses_count": 13267,
"verified": false
}
}
{
"created_at": "2017-05-19T16:28:40.000+0000",
"entities": {
"hashtags": [],
"media": [],
"urls": [],
"user_mentions": [
{}
]
},
"id": 865605134142722000.0,
"is_retweet": true,
"lang": "en",
"text": "RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
"user": {
"followers_count": 42577,
"friends_count": 42548,
"id": 2300088499,
"location": "St Louis, MO",
"name": "ℹL๑บ∂β๑γ🍃💨",
"screen_name": "iBleedLxyalty",
"statuses_count": 13267,
"verified": false
}
}
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.handleFailure(ReThinkWriter.scala:146)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.com$datamountaineer$streamreactor$connect$rethink$sink$ReThinkWriter$$writeRecords(ReThinkWriter.scala:100)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter$$anonfun$write$1.apply(ReThinkWriter.scala:75)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter$$anonfun$write$1.apply(ReThinkWriter.scala:75)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.write(ReThinkWriter.scala:75)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask$$anonfun$put$2.apply(ReThinkSinkTask.scala:53)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask$$anonfun$put$2.apply(ReThinkSinkTask.scala:53)
at scala.Option.foreach(Option.scala:257)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask.put(ReThinkSinkTask.scala:53)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-05-19 16:51:38,884] ERROR Task ReThinkSinkConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
java.lang.RuntimeException: java.lang.Throwable: Write error occurred. Duplicate primary key `id`:
{
"created_at": "2017-05-19T16:28:40.000+0000",
"entities": {
"hashtags": [],
"media": [],
"urls": [],
"user_mentions": [
{}
]
},
"id": 865605134142722000.0,
"is_retweet": true,
"lang": "en",
"text": "RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
"user": {
"followers_count": 42577,
"friends_count": 42548,
"id": 2300088499,
"location": "St Louis, MO",
"name": "ℹL๑บ∂β๑γ🍃💨",
"screen_name": "iBleedLxyalty",
"statuses_count": 13267,
"verified": false
}
}
{
"created_at": "2017-05-19T16:28:40.000+0000",
"entities": {
"hashtags": [],
"media": [],
"urls": [],
"user_mentions": [
{}
]
},
"id": 865605134142722000.0,
"is_retweet": true,
"lang": "en",
"text": "RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
"user": {
"followers_count": 42577,
"friends_count": 42548,
"id": 2300088499,
"location": "St Louis, MO",
"name": "ℹL๑บ∂β๑γ🍃💨",
"screen_name": "iBleedLxyalty",
"statuses_count": 13267,
"verified": false
}
}
at com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:83)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:64)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.handleTry(ReThinkWriter.scala:54)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.handleFailure(ReThinkWriter.scala:148)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.com$datamountaineer$streamreactor$connect$rethink$sink$ReThinkWriter$$writeRecords(ReThinkWriter.scala:100)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter$$anonfun$write$1.apply(ReThinkWriter.scala:75)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter$$anonfun$write$1.apply(ReThinkWriter.scala:75)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.write(ReThinkWriter.scala:75)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask$$anonfun$put$2.apply(ReThinkSinkTask.scala:53)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask$$anonfun$put$2.apply(ReThinkSinkTask.scala:53)
at scala.Option.foreach(Option.scala:257)
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask.put(ReThinkSinkTask.scala:53)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Throwable: Write error occurred. Duplicate primary key `id`:
{
"created_at": "2017-05-19T16:28:40.000+0000",
"entities": {
"hashtags": [],
"media": [],
"urls": [],
"user_mentions": [
{}
]
},
"id": 865605134142722000.0,
"is_retweet": true,
"lang": "en",
"text": "RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
"user": {
"followers_count": 42577,
"friends_count": 42548,
"id": 2300088499,
"location": "St Louis, MO",
"name": "ℹL๑บ∂β๑γ🍃💨",
"screen_name": "iBleedLxyalty",
"statuses_count": 13267,
"verified": false
}
}
{
"created_at": "2017-05-19T16:28:40.000+0000",
"entities": {
"hashtags": [],
"media": [],
"urls": [],
"user_mentions": [
{}
]
},
"id": 865605134142722000.0,
"is_retweet": true,
"lang": "en",
"text": "RT @callmiifexco: Sometimes you just have to remind yourself that storms don't last forever.",
"user": {
"followers_count": 42577,
"friends_count": 42548,
"id": 2300088499,
"location": "St Louis, MO",
"name": "ℹL๑บ∂β๑γ🍃💨",
"screen_name": "iBleedLxyalty",
"statuses_count": 13267,
"verified": false
}
}
at com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkWriter.handleFailure(ReThinkWriter.scala:146)
... 20 more
[2017-05-19 16:51:38,885] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)
[2017-05-19 16:51:38,885] ERROR Task ReThinkSinkConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-05-19 16:51:38,886] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-05-19 16:51:38,886] INFO Stopping Rethink sink. (com.datamountaineer.streamreactor.connect.rethink.sink.ReThinkSinkTask:61)
The default, the error policy is to throw the exception.
connect.rethink.sink.error.policy=THROW
connect.rethink.sink.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options, noop, the error is swallowed, throw, the error is allowed to propagate and retry. For retry the Kafka message is redelivered up to a maximum number of times specified by the connect.rethink.sink.max.retries option. The connect.rethink.sink.retry.interval option specifies the interval between retries.
By setting the config to:
connect.rethink.sink.error.policy=NOOP
Rethink is running without complaint, we'll keep an eye on the logs to make sure we're not ignoring anything unintended.
Thanks to Andrew at Datamountaineer for the help.