Created
March 30, 2017 15:48
-
-
Save dhalperi/4bbd13021dd5f9998250cff99b155db6 to your computer and use it in GitHub Desktop.
BigQuery example: load an external archive into date-partitioned tables
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (C) 2016 Google Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | |
* use this file except in compliance with the License. You may obtain a copy of | |
* the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
* License for the specific language governing permissions and limitations under | |
* the License. | |
*/ | |
package com.google.cloud.dataflow.examples.partition; | |
import com.google.api.services.bigquery.model.TableFieldSchema; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import com.google.cloud.dataflow.sdk.Pipeline; | |
import com.google.cloud.dataflow.sdk.io.BigQueryIO; | |
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; | |
import com.google.cloud.dataflow.sdk.io.CountingInput; | |
import com.google.cloud.dataflow.sdk.io.Read; | |
import com.google.cloud.dataflow.sdk.options.Default; | |
import com.google.cloud.dataflow.sdk.options.Description; | |
import com.google.cloud.dataflow.sdk.options.PipelineOptions; | |
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; | |
import com.google.cloud.dataflow.sdk.transforms.DoFn; | |
import com.google.cloud.dataflow.sdk.transforms.GroupByKey; | |
import com.google.cloud.dataflow.sdk.transforms.MapElements; | |
import com.google.cloud.dataflow.sdk.transforms.PTransform; | |
import com.google.cloud.dataflow.sdk.transforms.ParDo; | |
import com.google.cloud.dataflow.sdk.transforms.Partition; | |
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction; | |
import com.google.cloud.dataflow.sdk.values.KV; | |
import com.google.cloud.dataflow.sdk.values.PCollection; | |
import com.google.cloud.dataflow.sdk.values.PCollectionList; | |
import com.google.cloud.dataflow.sdk.values.PDone; | |
import com.google.common.collect.ImmutableList; | |
import org.joda.time.DateTimeZone; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import org.joda.time.format.DateTimeFormat; | |
import org.joda.time.format.DateTimeFormatter; | |
import java.util.concurrent.ThreadLocalRandom; | |
/** | |
* A Dataflow that uses the idiom of date sharded BigQuery tables. | |
* | |
* <p>NOTE: Follow [BEAM-437] (https://issues.apache.org/jira/browse/BEAM-437). | |
* This code should become completely unnecessary. | |
*/ | |
@SuppressWarnings("serial") | |
public class DateShardedBigQueryTables { | |
/** | |
* A partition function that assigns an incoming element to a partition based on how many | |
* days it is after some sentinel start time. | |
* | |
* <p>The first {@code numPartitions-1} partitions contain elements with times up to | |
* over {@code numPartitions-1} days. The last partition is used for any elements that are | |
* outside the expected range. | |
*/ | |
private static class DateShardingPartitionFunction<T> implements Partition.PartitionFn<T> { | |
private SimpleFunction<T, Instant> dateFn; | |
private Instant startDate; | |
public DateShardingPartitionFunction( | |
SimpleFunction<T, Instant> dateFn, Instant startDate) { | |
this.dateFn = dateFn; | |
this.startDate = startDate; | |
} | |
@Override | |
public int partitionFor(T elem, int numPartitions) { | |
Instant elementDate = dateFn.apply(elem); | |
long dateOffset = new Duration(startDate, elementDate).getStandardDays(); | |
if (dateOffset >= 0 && dateOffset < (numPartitions - 1)) { | |
return (int) dateOffset; | |
} | |
return numPartitions - 1; | |
} | |
} | |
/** | |
* Uses a {@link GroupByKey} to split the pipeline in half, so that the stage that includes | |
* partitioning (which contains all tuples) is isolated from each individual partition. | |
* | |
* <p>Works by adding a random key to each incoming element, grouping on that random key, and | |
* then removing the keys from all elements after the {@link GroupByKey}. | |
*/ | |
private static class Reshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> { | |
private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> { | |
@Override | |
public void processElement(ProcessContext c) throws Exception { | |
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); | |
} | |
} | |
private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> { | |
@Override | |
public void processElement(ProcessContext c) throws Exception { | |
for (T s : c.element().getValue()) { | |
c.output(s); | |
} | |
} | |
} | |
@Override | |
public PCollection<T> apply(PCollection<T> input) { | |
return input | |
.apply(ParDo.of(new AddArbitraryKey<T>())) | |
.apply(GroupByKey.<Integer, T>create()) | |
.apply(ParDo.of(new RemoveArbitraryKey<T>())); | |
} | |
} | |
/** | |
* The function that actually builds the pipeline. It uses the input functions and constants | |
* to create {@code numDays} different partitions each corresponding to a day in the range | |
* {@code [0, numDays)} days after the {@code startDate}. It also creates an extra partition for | |
* any elements that fall outside that date range. | |
* | |
* @param input the values to be partitioned and written out | |
* @param startDate the initial time reference -- aka, day 0. | |
* @param numDays the number of consecutive days, beginning at the {@code startDate}, over which | |
* records will be partitioned. | |
* @param dateFn a function that assigns an element to its timestamp. | |
* @param tableNameFn a function that generates a BigQuery table name from a date. | |
* @param tableRowFn a function that turns a value of type {@code T} into a {@link TableRow} | |
* ready for insertion to BigQuery. Must match {@code writeSchema}. | |
* @param writeSchema the schema of the BigQuery tables into which the {@link TableRow} records | |
* will be written. | |
* @param leftoverTableName the name of the table where all the records that fall outside the | |
* range {@code [startDate, startDate + numDays days)} will be inserted. | |
*/ | |
public static <T> void shardByDate( | |
PCollection<T> input, | |
Instant startDate, | |
int numDays, | |
SimpleFunction<T, Instant> dateFn, | |
SimpleFunction<Instant, String> tableNameFn, | |
SimpleFunction<T, TableRow> tableRowFn, | |
TableSchema writeSchema, | |
String leftoverTableName) { | |
Partition.PartitionFn<T> partFn = new DateShardingPartitionFunction<>(dateFn, startDate); | |
PCollectionList<T> partitions = input.apply(Partition.of(numDays + 1, partFn)); | |
for (int i = 0; i < numDays; ++i) { | |
Instant partitionDate = startDate.plus(Duration.standardDays(i)); | |
String partitionTableName = tableNameFn.apply(partitionDate); | |
partitions | |
.get(i) | |
.apply( | |
"Write_" + i, new ReshuffleAndWrite<>(partitionTableName, tableRowFn, writeSchema)); | |
} | |
partitions | |
.get(numDays) | |
.apply( | |
"Write_leftovers", new ReshuffleAndWrite<>(leftoverTableName, tableRowFn, writeSchema)); | |
} | |
private static class ReshuffleAndWrite<T> extends PTransform<PCollection<T>, PDone> { | |
private final transient String tableName; | |
private final transient TableSchema writeSchema; | |
private final transient SimpleFunction<T, TableRow> tableRowFn; | |
public ReshuffleAndWrite( | |
String tableName, SimpleFunction<T, TableRow> tableRowFn, TableSchema writeSchema) { | |
this.tableName = tableName; | |
this.tableRowFn = tableRowFn; | |
this.writeSchema = writeSchema; | |
} | |
@Override | |
public PDone apply(PCollection<T> in) { | |
return in.apply("Reshuffle", new Reshuffle<T>()) | |
.apply("TableRow", MapElements.via(tableRowFn)) | |
.apply( | |
"Write", | |
BigQueryIO.Write.to(tableName) | |
.withSchema(writeSchema) | |
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); | |
} | |
} | |
/** | |
* A Joda-time formatter that prints a date in format like {@code "20160101"}. Threadsafe. | |
*/ | |
private static final DateTimeFormatter FORMATTER = | |
DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC); | |
interface Options extends PipelineOptions { | |
@Description("The start date of the sharded tables in YYYYMMDD form, e.g., 2016-01-31") | |
@Default.String("2016-01-01") | |
String getStartDate(); | |
void setStartDate(String startDate); | |
@Description("The number of days for which to produce output tables") | |
@Default.Integer(100) | |
Integer getNumOutputDays(); | |
void setNumOutputDays(Integer numOutputDays); | |
@Description("The number of days over which to produce random data. If more than" | |
+ " --numOutputDays, there will be data in the leftovers table. If fewer than" | |
+ " --numOutputDays, there will be empty output tables.") | |
@Default.Integer(105) | |
Integer getNumDataDays(); | |
void setNumDataDays(Integer numDataDays); | |
@Description("The number of records produced per day of data (set by --numDataDays).") | |
@Default.Integer(5000) | |
Integer getNumRecordsPerDay(); | |
void setNumRecordsPerDay(Integer numRecordsPerDay); | |
@Description("The BigQuery partition table name. When writing, a per-day partition suffix " | |
+ " $YYYYMMDD will be appended. E.g., 'project:dataset.sharding_test$20161213'.") | |
@Default.String("stress.sharding_test") | |
String getOutputTable(); | |
void setOutputTable(String outputTable); | |
} | |
public static void main(String[] args) { | |
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); | |
Pipeline p = Pipeline.create(options); | |
TableFieldSchema outputField = new TableFieldSchema().setName("value").setType("INTEGER"); | |
TableSchema outputSchema = new TableSchema().setFields(ImmutableList.of(outputField)); | |
final Instant startDate = Instant.parse(options.getStartDate()); | |
final int numDataDays = options.getNumDataDays(); | |
final int totalNumRecords = numDataDays * options.getNumRecordsPerDay(); | |
final String baseTableName = options.getOutputTable(); | |
SimpleFunction<Long, Instant> dateFn = new SimpleFunction<Long, Instant>() { | |
@Override | |
public Instant apply(Long value) { | |
return startDate.plus(Duration.standardDays( | |
ThreadLocalRandom.current().nextInt(numDataDays))); | |
} | |
}; | |
SimpleFunction<Instant, String> tableNameFn = new SimpleFunction<Instant, String>() { | |
@Override | |
public String apply(Instant instant) { | |
return String.format("%s$%s", baseTableName, FORMATTER.print(instant)); | |
} | |
}; | |
SimpleFunction<Long, TableRow> tableRowFn = new SimpleFunction<Long, TableRow>() { | |
@Override | |
public TableRow apply(Long value) { | |
return new TableRow().set("value", value); | |
} | |
}; | |
// Creates records and assigned their timestamps. | |
PCollection<Long> inputData = | |
p.apply("GenerateData", CountingInput.upTo(totalNumRecords)); | |
shardByDate( | |
inputData, | |
startDate, | |
options.getNumOutputDays(), | |
dateFn, | |
tableNameFn, | |
tableRowFn, | |
outputSchema, | |
baseTableName + "_leftoverValues"); | |
p.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Can you please help me, how to do partition from the non-partitioned table? I want cheapest and time-consuming solution. In a kind of urgent.