Skip to content

Instantly share code, notes, and snippets.

@polleyg
Last active March 23, 2020 08:58
Show Gist options
  • Save polleyg/32af5177c964303c1dff750e6a26a19b to your computer and use it in GitHub Desktop.
Save polleyg/32af5177c964303c1dff750e6a26a19b to your computer and use it in GitHub Desktop.
all the java
public class BeamSQLMagic {
public static final String HEADER = "year,month,day,wikimedia_project,language,title,views";
public static final Schema SCHEMA = Schema.builder()
.addStringField("lang")
.addInt32Field("views")
.build();
public static void main(String[] args) {
PipelineOptionsFactory.register(DataflowPipelineOptions.class);
DataflowPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(DataflowPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("read_from_gcs", TextIO.read().from("gs://batch-pipeline-sql/input/*"))
.apply("transform_to_row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
.apply("transform_sql", SqlTransform.query(
"SELECT lang, SUM(views) as sum_views " +
"FROM PCOLLECTION GROUP BY lang")
)
.apply("transform_to_string", ParDo.of(new RowToString()))
.apply("write_to_gcs", TextIO.write().to("gs://batch-pipeline-sql/output/output.csv").withoutSharding());
pipeline.run();
}
//ParDo for String -> Row (SQL)
public static class RowParDo extends DoFn<String, Row> {
@ProcessElement
public void processElement(ProcessContext c) {
if (!c.element().equalsIgnoreCase(HEADER)) {
String[] vals = c.element().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)");
Row appRow = Row
.withSchema(SCHEMA)
.addValues(vals[4], Integer.valueOf(vals[6]))
.build();
c.output(appRow);
}
}
}
//ParDo for Row (SQL) -> String
public static class RowToString extends DoFn<Row, String> {
@ProcessElement
public void processElement(ProcessContext c) {
String line = c.element().getValues()
.stream()
.map(Object::toString)
.collect(Collectors.joining(","));
c.output(line);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment