Last active
July 23, 2020 08:44
-
-
Save geoHeil/5a5a4ae0ca2a8049617afa91acf40f89 to your computer and use it in GitHub Desktop.
failing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// setup in bash | |
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.11.1/flink-connector-kafka_2.11-1.11.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.11/1.11.1/flink-connector-kafka-base_2.11-1.11.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.11.1/flink-avro-confluent-registry-1.11.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.1/flink-avro-1.11.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/org/apache/flink/force-shading/1.11.1/force-shading-1.11.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P lib/ | |
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.11.1/jackson-core-2.11.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.11.1/jackson-databind-2.11.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.11.1/jackson-annotations-2.11.1.jar -P lib/ | |
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.0/avro-1.10.0.jar -P lib/ | |
wget https://repo1.maven.org/maven2/org/apache/commons/commons-compress/1.20/commons-compress-1.20.jar -P lib/ | |
// upload the schema to the registry | |
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"nifiRecord\",\"namespace\":\"org.apache.nifi\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"]},{\"name\":\"text\",\"type\":[\"null\",\"string\"]},{\"name\":\"source\",\"type\":[\"null\",\"string\"]},{\"name\":\"geo\",\"type\":[\"null\",\"string\"]},{\"name\":\"place\",\"type\":[\"null\",\"string\"]},{\"name\":\"lang\",\"type\":[\"null\",\"string\"]},{\"name\":\"created_at\",\"type\":[\"null\",\"string\"]},{\"name\":\"timestamp_ms\",\"type\":[\"null\",\"string\"]},{\"name\":\"coordinates\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_id\",\"type\":[\"null\",\"long\"]},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"]},{\"name\":\"screen_name\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_created_at\",\"type\":[\"null\",\"string\"]},{\"name\":\"followers_count\",\"type\":[\"null\",\"long\"]},{\"name\":\"friends_count\",\"type\":[\"null\",\"long\"]},{\"name\":\"user_lang\",\"type\":[\"null\",\"string\"]},{\"name\":\"user_location\",\"type\":[\"null\",\"string\"]},{\"name\":\"hashtags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}]}]}"}' http://localhost:8081/subjects/tweets-raw-value/versions | |
curl --silent -X GET http://localhost:8081/subjects/ | jq . | |
// start the shell | |
export TERM=xterm-color | |
./bin/start-scala-shell.sh local | |
// try to execute | |
import org.apache.flink.streaming.connectors.kafka.{ | |
FlinkKafkaConsumer, | |
FlinkKafkaProducer | |
} | |
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema | |
import java.util.Properties | |
senv.enableCheckpointing(5000) | |
val properties = new Properties() | |
properties.setProperty("bootstrap.servers", "localhost:9092") | |
properties.setProperty("group.id", "test") | |
val schemaRegistryUrl = "http://localhost:8081" | |
// ************************************************************** | |
// have avro hugger generate the class | |
// class is defined below (comments) | |
// ************************************************************** | |
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl) | |
val stream = senv.addSource( | |
new FlinkKafkaConsumer( | |
"tweets-raw", | |
serializer, | |
properties | |
).setStartFromEarliest() // TODO experiment with different start values | |
) | |
stream.print | |
senv.execute("Kafka Consumer Test") |
The generated Specific Class
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
import scala.annotation.switch
/**
* Twitter tweet record limited to basic information
* @param tweet_id System-assigned numeric tweet ID. Cannot be changed by the user.
* @param text the main text of a tweet
* @param source user agent of tweet submitting device
* @param geo geo if available
* @param place place if available
* @param lang language of the tweet
* @param created_at created at timestamp string formatted
* @param timestamp_ms created at timestamp epoch long formatted
* @param coordinates coordinates if available
* @param user_id System-assigned numeric tweet ID. Cannot be changed by the user.
* @param user_name speaking user name, can be changed
* @param screen_name screen name, can be changed
* @param user_created_at Timestamp of user creation
* @param followers_count follower count
* @param friends_count friends count
* @param user_lang language of user profile
* @param user_location location if available
* @param hashtags hashtags as list of strings if available
*/
final case class Tweet(var tweet_id: Option[String], var text: Option[String], var source: Option[String], var geo: Option[String], var place: Option[String], var lang: Option[String], var created_at: Option[String], var timestamp_ms: Option[String], var coordinates: Option[String], var user_id: Option[Long], var user_name: Option[String], var screen_name: Option[String], var user_created_at: Option[String], var followers_count: Option[Long], var friends_count: Option[Long], var user_lang: Option[String], var user_location: Option[String], var hashtags: Option[Seq[String]]) extends org.apache.avro.specific.SpecificRecordBase {
def this() = this(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None)
def get(field$: Int): AnyRef = {
(field$: @switch) match {
case 0 => {
tweet_id match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 1 => {
text match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 2 => {
source match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 3 => {
geo match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 4 => {
place match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 5 => {
lang match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 6 => {
created_at match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 7 => {
timestamp_ms match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 8 => {
coordinates match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 9 => {
user_id match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 10 => {
user_name match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 11 => {
screen_name match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 12 => {
user_created_at match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 13 => {
followers_count match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 14 => {
friends_count match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 15 => {
user_lang match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 16 => {
user_location match {
case Some(x) => x
case None => null
}
}.asInstanceOf[AnyRef]
case 17 => {
hashtags match {
case Some(x) => scala.collection.JavaConverters.bufferAsJavaListConverter({
x map { x =>
x
}
}.toBuffer).asJava
case None => null
}
}.asInstanceOf[AnyRef]
case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
}
def put(field$: Int, value: Any): Unit = {
(field$: @switch) match {
case 0 => this.tweet_id = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 1 => this.text = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 2 => this.source = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 3 => this.geo = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 4 => this.place = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 5 => this.lang = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 6 => this.created_at = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 7 => this.timestamp_ms = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 8 => this.coordinates = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 9 => this.user_id = {
value match {
case null => None
case _ => Some(value)
}
}.asInstanceOf[Option[Long]]
case 10 => this.user_name = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 11 => this.screen_name = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 12 => this.user_created_at = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 13 => this.followers_count = {
value match {
case null => None
case _ => Some(value)
}
}.asInstanceOf[Option[Long]]
case 14 => this.friends_count = {
value match {
case null => None
case _ => Some(value)
}
}.asInstanceOf[Option[Long]]
case 15 => this.user_lang = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 16 => this.user_location = {
value match {
case null => None
case _ => Some(value.toString)
}
}.asInstanceOf[Option[String]]
case 17 => this.hashtags = {
value match {
case null => None
case _ => Some(value match {
case (array: java.util.List[_]) => {
Seq((scala.collection.JavaConverters.asScalaIteratorConverter(array.iterator).asScala.toSeq map { x =>
x.toString
}: _*))
}
})
}
}.asInstanceOf[Option[Seq[String]]]
case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
()
}
def getSchema: org.apache.avro.Schema = Tweet.SCHEMA$
}
object Tweet {
val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Tweet\",\"namespace\":\"com.github.geoheil.streamingreference\",\"doc\":\"Twitter tweet record limited to basic information\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"],\"doc\":\"System-assigned numeric tweet ID. Cannot be changed by the user.\"},{\"name\":\"text\",\"type\":[\"null\",\"string\"],\"doc\":\"the main text of a tweet\"},{\"name\":\"source\",\"type\":[\"null\",\"string\"],\"doc\":\"user agent of tweet submitting device\"},{\"name\":\"geo\",\"type\":[\"null\",\"string\"],\"doc\":\"geo if available\"},{\"name\":\"place\",\"type\":[\"null\",\"string\"],\"doc\":\"place if available\"},{\"name\":\"lang\",\"type\":[\"null\",\"string\"],\"doc\":\"language of the tweet\"},{\"name\":\"created_at\",\"type\":[\"null\",\"string\"],\"doc\":\"created at timestamp string formatted\"},{\"name\":\"timestamp_ms\",\"type\":[\"null\",\"string\"],\"doc\":\"created at timestamp epoch long formatted\"},{\"name\":\"coordinates\",\"type\":[\"null\",\"string\"],\"doc\":\"coordinates if available\"},{\"name\":\"user_id\",\"type\":[\"null\",\"long\"],\"doc\":\"System-assigned numeric tweet ID. Cannot be changed by the user.\"},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"doc\":\"speaking user name, can be changed\"},{\"name\":\"screen_name\",\"type\":[\"null\",\"string\"],\"doc\":\"screen name, can be changed\"},{\"name\":\"user_created_at\",\"type\":[\"null\",\"string\"],\"doc\":\"Timestamp of user creation\"},{\"name\":\"followers_count\",\"type\":[\"null\",\"long\"],\"doc\":\"follower count\"},{\"name\":\"friends_count\",\"type\":[\"null\",\"long\"],\"doc\":\"friends count\"},{\"name\":\"user_lang\",\"type\":[\"null\",\"string\"],\"doc\":\"language of user profile\"},{\"name\":\"user_location\",\"type\":[\"null\",\"string\"],\"doc\":\"location if available\"},{\"name\":\"hashtags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"hashtags as list of strings if available\"}]}")
}
The remaining execption in taskmanager logs:
ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to scala.Product
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The Avro Schema definition: