Last active
May 6, 2025 16:06
-
-
Save gAmUssA/d8a220fb0e35a1e59be4ff592b9674c9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"namespace": "dev.gamov.flightdemo.avro", | |
"type": "record", | |
"name": "FlightData", | |
"doc": "Schema for flight data", | |
"fields": [ | |
{ | |
"name": "flightId", | |
"type": "string", | |
"doc": "Unique identifier for the flight" | |
}, | |
{ | |
"name": "callsign", | |
"type": "string", | |
"doc": "Flight callsign" | |
}, | |
{ | |
"name": "latitude", | |
"type": "double", | |
"doc": "Current latitude in degrees" | |
}, | |
{ | |
"name": "longitude", | |
"type": "double", | |
"doc": "Current longitude in degrees" | |
}, | |
{ | |
"name": "altitude", | |
"type": "int", | |
"doc": "Current altitude in feet" | |
}, | |
{ | |
"name": "heading", | |
"type": "double", | |
"doc": "Current heading in degrees (0-360)" | |
}, | |
{ | |
"name": "speed", | |
"type": "double", | |
"doc": "Current ground speed in knots" | |
}, | |
{ | |
"name": "verticalSpeed", | |
"type": "double", | |
"doc": "Current vertical speed in feet per minute" | |
}, | |
{ | |
"name": "origin", | |
"type": "string", | |
"doc": "Origin airport code" | |
}, | |
{ | |
"name": "destination", | |
"type": "string", | |
"doc": "Destination airport code" | |
}, | |
{ | |
"name": "status", | |
"type": { | |
"type": "enum", | |
"name": "FlightStatus", | |
"symbols": ["ON_TIME", "DELAYED", "CANCELLED", "DIVERTED"] | |
}, | |
"doc": "Current flight status" | |
}, | |
{ | |
"name": "timestamp", | |
"type": { | |
"type": "long", | |
"logicalType": "timestamp-millis" | |
}, | |
"doc": "Timestamp of the data point in milliseconds since epoch" | |
} | |
] | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11:47:36 WARN [Source: flights[1] -> Calc[2] -> processed_flights[3]: Writer -> processed_flights[3]: Committer (5/10)#2] o.a.flink.runtime.taskmanager.Task - Source: flights[1] -> Calc[2] -> processed_flights[3]: Writer -> processed_flights[3]: Committer (5/10)#2 (827f2a330b549f6ded8efa8d373f0016_cbc357ccb763df2852fee8c4fc7d55f2_4_2) switched from RUNNING to FAILED with failure cause: | |
java.io.IOException: Failed to deserialize consumer record due to | |
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) | |
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) | |
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) | |
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) | |
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) | |
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) | |
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) | |
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) | |
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) | |
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) | |
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) | |
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) | |
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) | |
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) | |
at java.base/java.lang.Thread.run(Thread.java:1583) | |
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = flights, partition = 7, leaderEpoch = 4, offset = 927, CreateTime = 1746542310202, serialized key size = 4, serialized value size = 74, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@23a2009a, value = [B@2918d9c7). | |
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59) | |
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) | |
... 14 common frames omitted | |
Caused by: java.io.IOException: Failed to deserialize Avro record. | |
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:143) | |
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:47) | |
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) | |
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:115) | |
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) | |
... 15 common frames omitted | |
Caused by: org.apache.avro.AvroTypeException: Found dev.gamov.flightdemo.avro.FlightStatus, expecting union[null, string] | |
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) | |
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) | |
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) | |
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) | |
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) | |
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) | |
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) | |
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) | |
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) | |
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) | |
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:109) | |
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:140) | |
... 19 common frames omitted | |
^Cmake: *** [run-stateless] Error 130 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Define schema for flights table | |
val flightsSchema = Schema.newBuilder() | |
.column("flightId", DataTypes.STRING()) | |
.column("callsign", DataTypes.STRING()) | |
.column("latitude", DataTypes.DOUBLE()) | |
.column("longitude", DataTypes.DOUBLE()) | |
.column("altitude", DataTypes.INT()) | |
.column("heading", DataTypes.DOUBLE()) | |
.column("speed", DataTypes.DOUBLE()) | |
.column("verticalSpeed", DataTypes.DOUBLE()) | |
.column("origin", DataTypes.STRING()) | |
.column("destination", DataTypes.STRING()) | |
.column("status", DataTypes.STRING()) // Flink will handle the Avro enum as a string | |
.column("timestamp", DataTypes.BIGINT()) // Match Avro's long type with timestamp-millis logical type | |
// Remove watermark for now to see if that's the issue | |
.build() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment