Skip to content

Instantly share code, notes, and snippets.

@pbrumblay
Created May 25, 2018 18:01
Show Gist options
  • Save pbrumblay/b49bf08e0b652319120dfb1c861f0936 to your computer and use it in GitHub Desktop.
Save pbrumblay/b49bf08e0b652319120dfb1c861f0936 to your computer and use it in GitHub Desktop.
Use BigQueryIO.writeTableRows() to replace partitions based off values in TableRow elements
package com.fearlesstg;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
import java.util.ArrayList;
import java.util.List;
@SuppressWarnings("Duplicates")
public class ReplaceBQPartition {
public static void main(String[] args) {
PipelineOptionsFactory.register(ReplaceBQPartitionOptions.class);
ReplaceBQPartitionOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ReplaceBQPartitionOptions.class);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("PARTITION_DATE").setType("DATE"));
fields.add(new TableFieldSchema().setName("TEXT_FIELD").setType("STRING"));
fields.add(new TableFieldSchema().setName("INT_FIELD").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("LoadDate").setType("DATE"));
fields.add(new TableFieldSchema().setName("RecordSource").setType("STRING"));
TableSchema tableSchema = new TableSchema().setFields(fields);
Pipeline p = Pipeline.create(options);
TableRow row1 = testRow1();
TableRow row2 = testRow2();
TableRow row3 = testRow3();
List<TableRow> allRows = new ArrayList<>();
allRows.add(row1);
allRows.add(row2);
allRows.add(row3);
PCollection<TableRow> tableRowsToWrite = p.apply(Create.of(allRows));
tableRowsToWrite.apply("Write to raw data tables", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.to(new GetPartitionFromTableRowFn(options.getBqTable()))
.withSchema(tableSchema));
p.run().waitUntilFinish();
}
public interface ReplaceBQPartitionOptions extends PipelineOptions {
String getPartitionValue();
void setPartitionValue(String value);
String getBqTable();
void setBqTable(String value);
}
private static TableRow testRow1() {
TableRow tr = new TableRow();
tr.set("PARTITION_DATE", "2018-01-01");
tr.set("TEXT_FIELD", "This is some data for 1");
tr.set("INT_FIELD", "1111");
tr.set("LoadDate", "2018-01-01");
tr.set("RecordSource", "somesource1");
return tr;
}
private static TableRow testRow2() {
TableRow tr = new TableRow();
tr.set("PARTITION_DATE", "2018-02-02");
tr.set("TEXT_FIELD", "This is some data for 2");
tr.set("INT_FIELD", "2222");
tr.set("LoadDate", "2018-01-01");
tr.set("RecordSource", "somesource2");
return tr;
}
private static TableRow testRow3() {
TableRow tr = new TableRow();
tr.set("PARTITION_DATE", "2018-03-03");
tr.set("TEXT_FIELD", "This is some data for 3");
tr.set("INT_FIELD", "3333");
tr.set("LoadDate", "2018-01-01");
tr.set("RecordSource", "somesource3");
return tr;
}
}
//From: https://shinesolutions.com/2017/12/05/fun-with-serializable-functions-and-dynamic-destinations-in-cloud-dataflow/
class GetPartitionFromTableRowFn implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {
private final String bqDestination;
public GetPartitionFromTableRowFn(String bqDestination) {
this.bqDestination = bqDestination;
}
public TableDestination apply(ValueInSingleWindow<TableRow> element) {
//NOTE that the table decorator MUST be in the format of $YYYYMMDD - NO DASHES OR YOU'LL GET A
//MISLEADING ERROR MESSAGE FROM BQ REST API
TimePartitioning DAY_PARTITION = new TimePartitioning().setType("DAY");
DAY_PARTITION.setField("PARTITION_DATE");
return new TableDestination(this.bqDestination + "$" + element.getValue().get("PARTITION_DATE").toString().replaceAll("-", ""), null, DAY_PARTITION);
}
}
@pbrumblay
Copy link
Author

java -cp <jar> com.fearlesstg.ReplaceBQPartition --runner=DataflowRunner --bqTable=<project:dataset.table> --tempLocation=gs://<some gcs path> --project=<project>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment