Last active
December 8, 2017 10:18
-
-
Save HungUnicorn/8a5c40fcf1e25c51cf77dc24a227d6d4 to your computer and use it in GitHub Desktop.
Exception in thread "main" org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class JsonIntoRow implements FlatMapFunction<JsonNode, Row> { | |
private TypeInformation<Row> typeInfo; | |
private static JsonRowDeserializationSchema deserializationSchema; | |
public JsonIntoRow(TypeInformation<Row> typeInfo){ | |
this.typeInfo = typeInfo; | |
} | |
@Override | |
public void flatMap(JsonNode value, Collector<Row> out) throws Exception { | |
ObjectMapper objectMapper = new ObjectMapper(); | |
byte[] serializedJson = objectMapper.writeValueAsBytes(value); | |
deserializationSchema = new JsonRowDeserializationSchema(typeInfo); | |
deserializationSchema.setFailOnMissingField(false); | |
Row deserializedJson = deserializationSchema.deserialize(serializedJson); | |
out.collect(deserializedJson); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
final TypeInformation<Row> typeInformation = Types.ROW( | |
new String[] {"orderNumber", "sales", "country"}, | |
new TypeInformation<?>[] { Types.STRING(), Types.DOUBLE(), Types.INT()} | |
); | |
final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); | |
final DataStream<Row> stream = FlinkUtil | |
.readCleanStream(env, URL, | |
eventTypes, "StreamSQL") | |
.flatMap(new JsonIntoRow(typeInformation)); | |
tableEnv.registerDataStreamInternal("sampleTable", stream); // Exception this line |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment