Created
September 6, 2018 12:14
-
-
Save polleyg/a2f2e4d8567b459c11f420fa6fb027d6 to your computer and use it in GitHub Desktop.
Templated Dataflow pipeline for reading Wikipedia page views from a file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.polleyg; | |
import com.google.api.services.bigquery.model.TableFieldSchema; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; | |
import org.apache.beam.sdk.options.Description; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.options.ValueProvider; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import java.util.ArrayList; | |
import java.util.List; | |
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED; | |
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND; | |
/** | |
* Do some randomness | |
*/ | |
public class TemplatePipeline { | |
public static void main(String[] args) { | |
PipelineOptionsFactory.register(TemplateOptions.class); | |
TemplateOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(TemplateOptions.class); | |
Pipeline pipeline = Pipeline.create(options); | |
pipeline.apply("READ", TextIO.read().from(options.getInputFile())) | |
.apply("TRANSFORM", ParDo.of(new WikiParDo())) | |
.apply("WRITE", BigQueryIO.writeTableRows() | |
.to(String.format("%s:dotc_2018.wiki_demo", options.getProject())) | |
.withCreateDisposition(CREATE_IF_NEEDED) | |
.withWriteDisposition(WRITE_APPEND) | |
.withSchema(getTableSchema())); | |
pipeline.run(); | |
} | |
private static TableSchema getTableSchema() { | |
List<TableFieldSchema> fields = new ArrayList<>(); | |
fields.add(new TableFieldSchema().setName("year").setType("INTEGER")); | |
fields.add(new TableFieldSchema().setName("month").setType("INTEGER")); | |
fields.add(new TableFieldSchema().setName("day").setType("INTEGER")); | |
fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING")); | |
fields.add(new TableFieldSchema().setName("language").setType("STRING")); | |
fields.add(new TableFieldSchema().setName("title").setType("STRING")); | |
fields.add(new TableFieldSchema().setName("views").setType("INTEGER")); | |
return new TableSchema().setFields(fields); | |
} | |
public interface TemplateOptions extends DataflowPipelineOptions { | |
@Description("GCS path of the file to read from") | |
ValueProvider<String> getInputFile(); | |
void setInputFile(ValueProvider<String> value); | |
} | |
public static class WikiParDo extends DoFn<String, TableRow> { | |
public static final String HEADER = "year,month,day,wikimedia_project,language,title,views"; | |
@ProcessElement | |
public void processElement(ProcessContext c) throws Exception { | |
if (c.element().equalsIgnoreCase(HEADER)) return; | |
String[] split = c.element().split(","); | |
if (split.length > 7) return; | |
TableRow row = new TableRow(); | |
for (int i = 0; i < split.length; i++) { | |
TableFieldSchema col = getTableSchema().getFields().get(i); | |
row.set(col.getName(), split[i]); | |
} | |
c.output(row); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment