Skip to content

Instantly share code, notes, and snippets.

@sandboxws
Last active December 15, 2018 04:19
Show Gist options
  • Select an option

  • Save sandboxws/050f66beb7dbf8feb90f0ebd3ce60832 to your computer and use it in GitHub Desktop.

Select an option

Save sandboxws/050f66beb7dbf8feb90f0ebd3ce60832 to your computer and use it in GitHub Desktop.
Beam Batch Pipeline
package com.sandboxws;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
public class BeamBatchPipeline {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
// Step 1 - Read CSV file.
PCollection<String> csvRows = pipeline.apply("Read from CSV",
TextIO.read().from("./reviews.csv"));
// Step 2 - Extract ratings and count them.
PCollection<KV<String, Long>> ratingsCounts = csvRows
.apply("Extract Ratings",
FlatMapElements.into(TypeDescriptors.strings())
.via(csvRow -> Arrays.asList(csvRow.split(",")[1])))
.apply("Count Ratings", Count.<String>perElement());
// Step 3 - Write results to CSV
ratingsCounts
.apply("FormatResults", MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> ratingsCount) -> ratingsCount.getKey() + " " + ratingsCount.getValue()))
.apply(TextIO.write().to("./ratings_results").withSuffix(".csv"));
// Run the pipeline and wait till it finishes before exiting
pipeline.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment