Skip to content

Instantly share code, notes, and snippets.

@timrobertson100
Created August 27, 2019 13:44
Show Gist options
  • Save timrobertson100/f620d36c41dd6a5efca074b1704bcc8e to your computer and use it in GitHub Desktop.
Save timrobertson100/f620d36c41dd6a5efca074b1704bcc8e to your computer and use it in GitHub Desktop.
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class BackbonePreRelease {
private static final String SELECT_SQL = "SELECT kingdom, count(*) AS c FROM `hive`.`%s`.`%s`";
public static void main(String[] args) {
PipelineOptionsFactory.register(BackbonePreReleaseOptions.class);
BackbonePreReleaseOptions options = PipelineOptionsFactory.fromArgs(args).as(BackbonePreReleaseOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
TableProvider tableProvider = HCatalogTableProvider.create(
Collections.singletonMap(HiveConf.ConfVars.METASTOREURIS.varname, options.getMetastoreUris()));
PCollection<Row> output = p.apply(
"Execute SQL",
SqlTransform.query(
String.format(SELECT_SQL, options.getDatabase(), options.getTable()))
.withTableProvider("hive", tableProvider)
);
PCollection<String> test = output.apply("Convert to String", ParDo.of( new DoFn<Row,String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String k = c.element().getString("kingdom");
long count = c.element().getInt64("c");
c.output(k + ": " + count);
}
}));
test.apply(TextIO.write().to(options.getTargetDir()));
p.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment