Created
August 31, 2018 11:08
-
-
Save kohlerm/649f92195e71697a1931f81def176ec9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.beam.examples; | |
import java.io.Serializable; | |
import org.apache.beam.examples.common.ExampleUtils; | |
import org.apache.beam.runners.spark.SparkRunner; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.StringUtf8Coder; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.metrics.Counter; | |
import org.apache.beam.sdk.metrics.Distribution; | |
import org.apache.beam.sdk.metrics.Metrics; | |
import org.apache.beam.sdk.options.Default; | |
import org.apache.beam.sdk.options.Description; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.options.Validation.Required; | |
import org.apache.beam.sdk.transforms.*; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import spark.jobserver.api.JobEnvironment; | |
import spark.jobserver.japi.JSparkJob; | |
import org.apache.beam.runners.spark.SparkContextOptions; | |
import com.typesafe.config.Config; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
// | |
// | |
// | |
/** | |
* An example that counts words in Shakespeare and includes Beam best practices. | |
* | |
* <p>This class, {@link WordCount}, is the second in a series of four successively more detailed | |
* 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. | |
* After you've looked at this example, then see the {@link DebuggingWordCount} | |
* pipeline, for introduction of additional concepts. | |
* | |
* <p>For a detailed walkthrough of this example, see | |
* <a href="https://beam.apache.org/get-started/wordcount-example/"> | |
* https://beam.apache.org/get-started/wordcount-example/ | |
* </a> | |
* | |
* <p>Basic concepts, also in the MinimalWordCount example: | |
* Reading text files; counting a PCollection; writing to text files | |
* | |
* <p>New Concepts: | |
* <pre> | |
* 1. Executing a Pipeline both locally and using the selected runner | |
* 2. Using ParDo with static DoFns defined out-of-line | |
* 3. Building a composite transform | |
* 4. Defining your own pipeline options | |
* </pre> | |
* | |
* <p>Concept #1: you can execute this pipeline either locally or using by selecting another runner. | |
* These are now command-line options and not hard-coded as they were in the MinimalWordCount | |
* example. | |
* | |
* <p>To change the runner, specify: | |
* <pre>{@code | |
* --runner=YOUR_SELECTED_RUNNER | |
* } | |
* </pre> | |
* | |
* <p>To execute this pipeline, specify a local output file (if using the | |
* {@code DirectRunner}) or output prefix on a supported distributed file system. | |
* <pre>{@code | |
* --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX] | |
* }</pre> | |
* | |
* <p>The input file defaults to a public data set containing the text of of King Lear, | |
* by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. | |
*/ | |
public class WordCountJS implements JSparkJob<String>, Serializable { | |
/** | |
* Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns | |
* statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it | |
* to a ParDo in the pipeline. | |
*/ | |
private static final Logger LOG = LoggerFactory.getLogger(WordCountJS.class); | |
static List<String> lines = new ArrayList<String>(Arrays.asList( | |
"It does not matter", | |
"what temperature the room is", | |
"is always room temperature")); | |
public String run(JavaSparkContext context, JobEnvironment runtime, Config config) { | |
// WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() | |
// .as(WordCountOptions.class); | |
WordCountOptions options = PipelineOptionsFactory.create().as(WordCountOptions.class); | |
String text = null; | |
try { | |
// get jobserver config data | |
text = config.getString("input.string"); | |
LOG.info("config input.text: " + text); | |
} | |
catch (com.typesafe.config.ConfigException.Missing ex) { | |
LOG.info("could not read config"); | |
} | |
if(text != null) { | |
try { | |
lines.add(text); | |
} | |
catch(java.lang.Exception ex) { | |
LOG.info("could not add text from config: " + ex.getMessage()); | |
} | |
} | |
// add the sample text to options | |
options.setText(lines); | |
// pass in jobserver context | |
options.setProvidedSparkContext(context); | |
// tell sparkrunner to use provided context from jobserver | |
options.setUsesProvidedSparkContext(true); | |
runWordCount(options); | |
return null; | |
} | |
public Config verify(JavaSparkContext context, JobEnvironment runtime, Config config) { | |
// config is optional - no need to validate | |
return config; | |
} | |
static class ExtractWordsFn extends DoFn<String, String> { | |
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines"); | |
private final Distribution lineLenDist = Metrics.distribution( | |
ExtractWordsFn.class, "lineLenDistro"); | |
@ProcessElement | |
public void processElement(@Element String element, OutputReceiver<String> receiver) { | |
lineLenDist.update(element.length()); | |
if (element.trim().isEmpty()) { | |
emptyLines.inc(); | |
} | |
// Split the line into words. | |
String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1); | |
// Output each word encountered into the output PCollection. | |
for (String word : words) { | |
if (!word.isEmpty()) { | |
receiver.output(word); | |
} | |
} | |
} | |
} | |
/** A SimpleFunction that converts a Word and Count into a printable string. */ | |
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { | |
@Override | |
public String apply(KV<String, Long> input) { | |
return input.getKey() + ": " + input.getValue(); | |
} | |
} | |
/** A SimpleFunction that simply logs the string value without any other transformation */ | |
public static class LogCounts extends SimpleFunction<String, String> { | |
@Override | |
public String apply(String input) { | |
// simply log count result | |
LOG.info("Count result: " + input); | |
return input; | |
} | |
} | |
/** | |
* A PTransform that converts a PCollection containing lines of text into a PCollection of | |
* formatted word counts. | |
* | |
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and | |
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, | |
* modular testing, and an improved monitoring experience. | |
*/ | |
public static class CountWords extends PTransform<PCollection<String>, | |
PCollection<KV<String, Long>>> { | |
@Override | |
public PCollection<KV<String, Long>> expand(PCollection<String> lines) { | |
// Convert lines of text into individual words. | |
PCollection<String> words = lines.apply( | |
ParDo.of(new ExtractWordsFn())); | |
// Count the number of times each word occurs. | |
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement()); | |
return wordCounts; | |
} | |
} | |
/** | |
* Options supported by {@link WordCount}. | |
* | |
* <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments | |
* to be processed by the command-line parser, and specify default values for them. You can then | |
* access the options values in your pipeline code. | |
* | |
* <p>Inherits standard configuration options. | |
*/ | |
public interface WordCountOptions extends SparkContextOptions { | |
/** | |
* One property to pass in static text for the word count | |
*/ | |
@Description("Sample text") | |
@Required | |
List<String> getText(); | |
void setText(List<String> value); | |
} | |
static void runWordCount(WordCountOptions options) { | |
options.setRunner(SparkRunner.class); | |
Pipeline p = Pipeline.create(options); | |
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the | |
// static FormatAsTextFn() to the ParDo transform. | |
p.apply("ReadLines", Create.of(lines)).setCoder(StringUtf8Coder.of()) | |
.apply(new CountWords()) | |
.apply(MapElements.via(new FormatAsTextFn())) | |
.apply(MapElements.via(new LogCounts())); | |
p.run().waitUntilFinish(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running (Java) Beam Jobs with spark-jobserver
run it in context created like this:
curl -d "" 'localhost:8090/contexts/jcontext?context-factory=spark.jobserver.context.JavaSparContextFactory'