-
-
Save kaxil/d57ac68b4f299802f82de1d9608225a4 to your computer and use it in GitHub Desktop.
Write from Cloud Pub/Sub to BigQuery using Fileload and save cost on streaming inserts!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ... | |
import com.google.api.services.bigquery.model.TableFieldSchema; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import com.google.api.services.bigquery.model.TimePartitioning; | |
import com.google.common.collect.ImmutableList; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.Coder; | |
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; | |
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; | |
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; | |
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; | |
import org.apache.beam.sdk.transforms.MapElements; | |
import org.apache.beam.sdk.transforms.PTransform; | |
import org.apache.beam.sdk.transforms.SimpleFunction; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.joda.time.Duration; | |
import java.io.ByteArrayInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.nio.charset.StandardCharsets; | |
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED; | |
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS; | |
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND; | |
public class ClickLogConsumer { | |
private static final int BATCH_INTERVAL = 15; | |
private static final int NUM_SHARDS = 100; | |
private static final String TOPIC = "projects/pureapp-199411/topics/clicks"; | |
private static final String PROJECT = "pure-app"; | |
private static final String TIMESTAMP_ATTRIBUTE = "timestamp"; | |
private static final String TIME_PARTITIONING_COLUMN = "created_at"; | |
private static final String DATASET_FIELD = "user_id"; | |
private static final String TABLE_FIELD = "campaign_id"; | |
private static final TableSchema SCHEMA = new TableSchema().setFields( | |
ImmutableList.of( | |
new TableFieldSchema().setName(TIME_PARTITIONING_COLUMN).setType("TIMESTAMP"), | |
new TableFieldSchema().setName("exchange").setType("STRING")) | |
); | |
public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() { | |
return new JsonToTableRow(); | |
} | |
private static class JsonToTableRow | |
extends PTransform<PCollection<String>, PCollection<TableRow>> { | |
@Override | |
public PCollection<TableRow> expand(PCollection<String> stringPCollection) { | |
return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via( | |
new SimpleFunction<String, TableRow>() { | |
@Override | |
public TableRow apply(String json) { | |
try { | |
InputStream inputStream = new ByteArrayInputStream( | |
json.getBytes(StandardCharsets.UTF_8.name())); | |
//OUTER is used here to prevent EOF exception | |
return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER); | |
} catch (IOException e) { | |
throw new RuntimeException("Unable to parse input", e); | |
} | |
} | |
})); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
Pipeline pipeline = Pipeline.create(options); | |
pipeline | |
.apply(PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(TOPIC)) | |
.apply(jsonToTableRow()) | |
.apply(BigQueryIO.write() | |
.withTriggeringFrequency(Duration.standardMinutes(BATCH_INTERVAL)) | |
.withMethod(FILE_LOADS) | |
.withNumFileShards(NUM_SHARDS) | |
.withWriteDisposition(WRITE_APPEND) | |
.withCreateDisposition(CREATE_IF_NEEDED) | |
.withSchema(SCHEMA) | |
.to((row) -> { | |
String datasetName = row.getValue().get(DATASET_FIELD).toString(); | |
String tableName = row.getValue().get(TABLE_FIELD).toString(); | |
return new TableDestination(String.format("%s:%s.%s", PROJECT, datasetName, tableName), "Some destination"); | |
}) | |
.withTimePartitioning(new TimePartitioning().setField(TIME_PARTITIONING_COLUMN))); | |
pipeline.run(); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment