Skip to content

Instantly share code, notes, and snippets.

@gAmUssA
Last active May 6, 2025 16:06
Show Gist options
  • Save gAmUssA/d8a220fb0e35a1e59be4ff592b9674c9 to your computer and use it in GitHub Desktop.
Save gAmUssA/d8a220fb0e35a1e59be4ff592b9674c9 to your computer and use it in GitHub Desktop.
{
"namespace": "dev.gamov.flightdemo.avro",
"type": "record",
"name": "FlightData",
"doc": "Schema for flight data",
"fields": [
{
"name": "flightId",
"type": "string",
"doc": "Unique identifier for the flight"
},
{
"name": "callsign",
"type": "string",
"doc": "Flight callsign"
},
{
"name": "latitude",
"type": "double",
"doc": "Current latitude in degrees"
},
{
"name": "longitude",
"type": "double",
"doc": "Current longitude in degrees"
},
{
"name": "altitude",
"type": "int",
"doc": "Current altitude in feet"
},
{
"name": "heading",
"type": "double",
"doc": "Current heading in degrees (0-360)"
},
{
"name": "speed",
"type": "double",
"doc": "Current ground speed in knots"
},
{
"name": "verticalSpeed",
"type": "double",
"doc": "Current vertical speed in feet per minute"
},
{
"name": "origin",
"type": "string",
"doc": "Origin airport code"
},
{
"name": "destination",
"type": "string",
"doc": "Destination airport code"
},
{
"name": "status",
"type": {
"type": "enum",
"name": "FlightStatus",
"symbols": ["ON_TIME", "DELAYED", "CANCELLED", "DIVERTED"]
},
"doc": "Current flight status"
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "Timestamp of the data point in milliseconds since epoch"
}
]
}
11:47:36 WARN [Source: flights[1] -> Calc[2] -> processed_flights[3]: Writer -> processed_flights[3]: Committer (5/10)#2] o.a.flink.runtime.taskmanager.Task - Source: flights[1] -> Calc[2] -> processed_flights[3]: Writer -> processed_flights[3]: Committer (5/10)#2 (827f2a330b549f6ded8efa8d373f0016_cbc357ccb763df2852fee8c4fc7d55f2_4_2) switched from RUNNING to FAILED with failure cause:
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = flights, partition = 7, leaderEpoch = 4, offset = 927, CreateTime = 1746542310202, serialized key size = 4, serialized value size = 74, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@23a2009a, value = [B@2918d9c7).
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 common frames omitted
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:143)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:47)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:115)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
... 15 common frames omitted
Caused by: org.apache.avro.AvroTypeException: Found dev.gamov.flightdemo.avro.FlightStatus, expecting union[null, string]
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:109)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:140)
... 19 common frames omitted
^Cmake: *** [run-stateless] Error 130
// Define schema for flights table
val flightsSchema = Schema.newBuilder()
.column("flightId", DataTypes.STRING())
.column("callsign", DataTypes.STRING())
.column("latitude", DataTypes.DOUBLE())
.column("longitude", DataTypes.DOUBLE())
.column("altitude", DataTypes.INT())
.column("heading", DataTypes.DOUBLE())
.column("speed", DataTypes.DOUBLE())
.column("verticalSpeed", DataTypes.DOUBLE())
.column("origin", DataTypes.STRING())
.column("destination", DataTypes.STRING())
.column("status", DataTypes.STRING()) // Flink will handle the Avro enum as a string
.column("timestamp", DataTypes.BIGINT()) // Match Avro's long type with timestamp-millis logical type
// Remove watermark for now to see if that's the issue
.build()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment