Skip to content

Instantly share code, notes, and snippets.

@geoHeil
Last active July 23, 2020 08:44
Show Gist options
  • Select an option

  • Save geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 to your computer and use it in GitHub Desktop.

Select an option

Save geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 to your computer and use it in GitHub Desktop.
failing
// setup in bash
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.11.1/flink-connector-kafka_2.11-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.11/1.11.1/flink-connector-kafka-base_2.11-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.11.1/flink-avro-confluent-registry-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.1/flink-avro-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/force-shading/1.11.1/force-shading-1.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.11.1/jackson-core-2.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.11.1/jackson-databind-2.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.11.1/jackson-annotations-2.11.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.0/avro-1.10.0.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/commons/commons-compress/1.20/commons-compress-1.20.jar -P lib/
// upload the schema to the registry
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"nifiRecord\",\"namespace\":\"org.apache.nifi\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"text\",\"type\":[\"null\",\"string\"]},{\"name\":\"source\",\"type\":[\"null\",\"string\"]},{\"name\":\"geo\",\"type\":[\"null\",\"string\"]},{\"name\":\"place\",\"type\":[\"null\",\"string\"]},{\"name\":\"lang\",\"type\":[\"null\",\"string\"]},{\"name\":\"created_at\",\"type\":[\"null\",\"string\"]},{\"name\":\"timestamp_ms\",\"type\":[\"null\",\"string\"]},{\"name\":\"coordinates\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_id\",\"type\":[\"null\",\"long\"]},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"]},{\"name\":\"screen_name\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_created_at\",\"type\":[\"null\",\"string\"]},{\"name\":\"followers_count\",\"type\":[\"null\",\"long\"]},{\"name\":\"friends_count\",\"type\":[\"null\",\"long\"]},{\"name\":\"user_lang\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_location\",\"type\":[\"null\",\"string\"]},{\"name\":\"hashtags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}]}]}"}' http://localhost:8081/subjects/tweets-raw-value/versions
curl --silent -X GET http://localhost:8081/subjects/ | jq .
// start the shell
export TERM=xterm-color
./bin/start-scala-shell.sh local
// try to execute
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer
}
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties
senv.enableCheckpointing(5000)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val schemaRegistryUrl = "http://localhost:8081"
// **************************************************************
// have avro hugger generate the class
// class is defined below (comments)
// **************************************************************
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)
val stream = senv.addSource(
new FlinkKafkaConsumer(
"tweets-raw",
serializer,
properties
).setStartFromEarliest() // TODO experiment with different start values
)
stream.print
senv.execute("Kafka Consumer Test")
@geoHeil
Copy link
Author

geoHeil commented Jul 10, 2020

The remaining execption in taskmanager logs:

ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to scala.Product
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment