Created
October 23, 2015 05:29
-
-
Save ankurcha/56f33837c10bed31093a to your computer and use it in GitHub Desktop.
Helper class to convert Apache Avro schema to BigQuery Table schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mallo64.dataflow | |
import com.google.api.client.json.GenericJson; | |
import com.google.api.services.bigquery.model.TableCell; | |
import com.google.api.services.bigquery.model.TableFieldSchema; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import com.google.cloud.dataflow.sdk.transforms.DoFn; | |
import org.apache.avro.Schema; | |
import org.apache.avro.specific.SpecificRecord; | |
import java.util.List; | |
import java.util.stream.Collectors; | |
import static org.apache.avro.Schema.*; | |
public class AvroToBigQuery<TRecord extends SpecificRecord> extends DoFn<TRecord, TableRow> { | |
@Override | |
public void processElement(ProcessContext processContext) throws Exception { | |
processContext.output(getTableRow(processContext.element())); | |
} | |
static TableRow getTableRow(SpecificRecord record) { | |
TableRow row = new TableRow(); | |
encode(record, row); | |
return row; | |
} | |
static TableCell getTableCell(SpecificRecord record) { | |
TableCell cell = new TableCell(); | |
encode(record, cell); | |
return cell; | |
} | |
private static void encode(SpecificRecord record, GenericJson row) { | |
Schema schema = record.getSchema(); | |
schema.getFields().forEach(field -> { | |
Type type = field.schema().getType(); | |
switch (type) { | |
case RECORD: | |
row.set(field.name(), getTableCell((SpecificRecord) record.get(field.pos()))); | |
break; | |
case INT: | |
case LONG: | |
row.set(field.name(), ((Number)record.get(field.pos())).longValue()); | |
break; | |
case BOOLEAN: | |
row.set(field.name(), record.get(field.pos())); | |
break; | |
case FLOAT: | |
case DOUBLE: | |
row.set(field.name(), ((Number)record.get(field.pos())).doubleValue()); | |
break; | |
default: | |
row.set(field.name(), String.valueOf(record.get(field.pos()))); | |
} | |
}); | |
} | |
public static TableSchema getTableSchemaRecord(Schema schema) { | |
return new TableSchema().setFields(getFieldsSchema(schema.getFields())); | |
} | |
static List<TableFieldSchema> getFieldsSchema(List<Schema.Field> fields) { | |
return fields.stream().map(field -> { | |
TableFieldSchema column = new TableFieldSchema().setName(field.name()); | |
Type type = field.schema().getType(); | |
switch (type) { | |
case RECORD: | |
column.setType("RECORD"); | |
column.setFields(getFieldsSchema(fields)); | |
break; | |
case INT: | |
case LONG: | |
column.setType("INTEGER"); | |
break; | |
case BOOLEAN: | |
column.setType("BOOLEAN"); | |
break; | |
case FLOAT: | |
case DOUBLE: | |
column.setType("FLOAT"); | |
break; | |
default: | |
column.setType("STRING"); | |
} | |
return column; | |
}).collect(Collectors.toList()); | |
} | |
} |
Hello guys. Can you help me to get info about how I can use the TableSchema for creating an empty BQ table? New BQ java sdk version push me to use the new class that represents the BQ schema: com.google.cloud.bigquery.Schema. It not so flexible to create the new bq schema dynamically like in the example above.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Did you ever get the reverse?