Skip to content

Instantly share code, notes, and snippets.

Last active June 24, 2021 07:23
Show Gist options
  • Save brianrodri/771e599a64653f7d0007585a7031140d to your computer and use it in GitHub Desktop.
Save brianrodri/771e599a64653f7d0007585a7031140d to your computer and use it in GitHub Desktop.

Oppia and Apache Beam

Apache Beam is used by Oppia to perform large-scale operations on the datastore. There are two types of operations:

  • Batch: Operations that are designed to be executed once on the "current state" of the datastore.


    • Count the # of models in the datastore
    • Update the StringProperty values of all models with a specific kind
    • Validate the relationships between models
  • Continuous: Operations that are designed to run indefinitely by reacting to updates in the datastore.


    • Updating the top N answers to a lesson
    • Generating notifications for the events that users have subscribed to

If you're already familiar with Apache Beam, or are eager to start writing a new job, then follow the Quick Start.

Otherwise, if you're looking for supplemental material, you can read the entire guide and fallback to the Apache Beam Programming Guide for more details.

Apache Beam Job Architecture

Jobs are composed of the following components:

  • Pipelines
  • PValues
  • PTransforms
  • Runners


Pipelines manage a "DAG" (directed acyclic graph) of PValues and the PTransforms that compute them. Conceptually, PValues are the DAG's nodes and PTransforms are the edges.

For example:

.------------. io.ReadFromText(fname) .-------. FlatMap(str.split)
| Input File | ---------------------> | Lines | -----------------.
'------------'                        '-------'                  |
   .----------------. combiners.Count.PerElement() .-------.     |
.- | (word, count)s | <--------------------------- | Words | <---'
|  '----------------'                              '-------'
| MapTuple(lambda word, count: '%s: %d' % (word, count)) .------------.
'------------------------------------------------------> | "word: #"s |
                         .-------------. io.WriteToText(ofname) |
                         | Output File | <----------------------'

Or, the equivalent Oppia code:

class WordCountJob(base_jobs.JobBase):
    def run(self, fname, ofname):
        return (
            | 'Generate Lines' >>
            | 'Generate Words' >> beam.FlatMap(str.split)
            | 'Generate (word, count)s' >> beam.combiners.Count.PerElement()
            | 'Generate "word: #"s' >> (
                beam.MapTuple(lambda word, count: '%s: %d' % (word, count)))
            | 'Write to Output File' >>


PCollections are the primary input and output PValues used by PTransforms. They are a kind of PValue that represent a dataset of (virtually) any size, including unbounded/continuous datasets.

PBegin and PEnd are "terminal" values that signal that an operation cannot produce it or act upon it, respectively.

A Pipeline object is the best example of a PBegin, and the output of a write operation is the best example of a PEnd.


ParDo and DoFn

DoFns are the most-basic unit, and are invoked on elements of a PCollection using beam.ParDo. It is analogous to the following code:

do_fn = DoFn()
for value in pcoll:
    # NOTE: We don't use the return values directly. However, it's possible for
    # the DoFn to hold onto state in more advanced implementations.

Map and FlatMap

beam.Map is an operation that transforms each item in a PCollection into a new value using a plain-old function. It is analogous to the following code:

new_pcoll = []
for value in pcoll:
return new_pcoll

beam.FlatMap is a similar transformation, but it flattens the output PCollection into a single output PCollection. It is analogous to the following code:

new_pcoll = []
for value in pcoll:
    for sub_value in fn(value):
return new_pcoll


beam.Filter reduces the elements of a PCollection into elements that have returned True from a specified function. It is analogous to the following code:

new_pcoll = []
for value in pcoll:
    if fn(value):
return new_pcoll


beam.GroupByKey is useful when you need to perform an operation on elements that share a common property. It is analogous to the following code:

groups = collections.defaultdict(lambda: collections.defaultdict(list))
for i, pcoll in enumerate(pcolls_to_group):
    # NOTE: Each PCollection must have (key, value) pairs as elements.
    for key, value in pcoll:
        # Items from each PCollection are grouped under the same key, and
        # bucketed into their corresponding index.
return groups

For example, in our validation jobs we compute two PCollections:

# Tuples of (ModelKey, True) for each model in the datastore that exists.
existing_models_pcoll = ...
# Tuples of (ModelKey, str) for each error message that should be reported when
# the corresponding model instance does not exist.
errors_if_missing_pcoll = ...

To generate a report, we use GroupByKey to pair the messages to the existing models.

After this step, we can filter out the pairs where a model existed and report the errors that are left over.

error_pcoll = (
        # A PCollection of Tuple[ModelKey, bool]. A ModelKey identifies an
        # individual model in the datastore.
        # A PCollection of Tuple[ModelKey, str]. Each item corresponds to an
        # error that should be reported when the corresponding instance does not
        # exist.
    # Returns a PCollection of Tuple[ModelKey, Tuple[List[bool], List[str]]].
    | beam.GroupByKey()
    # Discards ModelKey from the PCollection.
    | beam.Values()
    # Only keep groupings that indicate that the model is missing.
    | beam.Filter(lambda (exist_bools, _): not any(exist_bools))
    # Discard the bools and flatten the results into a PCollection of strings.
    | beam.FlatMap(lambda (_, errors): errors)


Runners provide the run() method used to visit every node (PValue) in the pipeline's DAG by executing the edges (PTransforms) to compute their values. At Oppia, we use DataflowRunner to have our Pipelines run on the Google Cloud Dataflow service:

High-level Guidelines

  • TL;DR: Inherit from base_jobs.JobBase and override the run() method.

  • The run() method must return a PCollection of JobRunResult instances.

    • In English, this means that the job must report something about what was done during its execution. This can be the errors it discovered, or the number of successful operations it was able to perform.
    • Regardless of your needs, jobs must report something; empty results are forbidden!
      • If you don't think your job has any results worth reporting, then just print a "Success" metric with the number of models it processed.
    • JobRunResult outputs should answer the following questions:
      • Did the job run without any problems? How and why do I know?
      • How much work did the job manage to do?
      • If the job encountered a problem, what caused it?
  • When writing new jobs, prefer splitting boilerplate into new, small, and simple PTransform subclasses. Then, after unit testing them, combine them liberally in your job's run() method.

    • Keep the job class and the PTransforms it uses in the same file, unless you plan on reusing them in future jobs. If you do plan on reusing the job, then ask your reviewer for guidance on how to organize it.
  • Never modify input values. If you need to make changes to an input value, then clone it first.

Quick Start

The quick start is split into case studies of increasing complexity. Study the one that best suits your needs.

If none of them help you implement your job, you may request a new one by adding a comment to #13190 with answers to the following questions:

  • Why do I want a new case study?
  • Why are the current case studies insufficient?
  • What answers would the "perfect" case study provide?

Then we'll start write a new Case Study to help you, and future contributors, as soon as we can (brianrodri@ will always notify you of how long it'll take).

Case study: CountAllModelsJob

Difficulty: Trivial

Key Concepts:

  • Fetching NDB models
  • Counting elements in a PCollection
  • Creating JobRunResult values
  • Job registration

We'll start by writing a boilerplate PTransform which accepts models as input, and returns (kind, #) tuples (where kind is the name of the model's class, as a string).

from jobs import job_utils
from jobs.types import job_run_result

import apache_beam as beam

class CountModels(beam.PTransform):
    """Returns the number of models after grouping them by their "kind".

    Kind is a unique identifier given to all models. In practice, the following
    always holds:

        job_utils.get_model_kind(FooModel) == 'FooModel'

    def expand(self, model_pcoll):
        """Method PTransform subclasses must implement.

            model_pcoll: PCollection[base_models.BaseModel]. The collection of
                models to count.

            PCollection[Tuple[str, int]]. The (kind, count) tuples corresponding
            to the input PCollection.
        return (
            # "Map" every model to its kind. Analogous to the code:
            # [job_utils.get_model_kind(model) for model in model_pcoll]
            | beam.Map(job_utils.get_model_kind)
            # Built-in PTransform that reduces a collection of values into
            # (value, # discovered) tuples.
            | beam.combiners.Count.PerElement()

Next, we'll write the job which applies the PTransform to every model in the datastore. We can keep both of their implementations in the same file, since they are so tightly coupled. Unit tests can focus on one or the other.

from core.platform import models
from jobs import base_jobs
from import ndb_io

datastore_services = models.Registry.import_datastore_services()

class CountAllModelsJob(base_jobs.JobBase):
    """Counts every model in the datastore."""

    def run(self):
        query_everything = datastore_services.query_everything()
        all_models = self.pipeline | ndb_io.GetModels(query_everything)
        return (
            | CountModels()
            # We'll convert the tuples into `JobRunResult` instances, where the
            # stdout field is used to store the tuple's value.
            | beam.Map(job_run_result.JobRunResult.as_stdout)

Finally, we'll import this job into the registry file. Let's assume the name of the file was jobs/

  # file: jobs/

  from jobs import base_jobs
  from jobs import base_validation_jobs
+ from jobs import count_all_models_jobs

Case Study: SchemaMigrationJob

Difficulty: Medium

Key Concepts:

  • Getting and Putting NDB models
  • Partitioning one PCollection into many PCollections.
  • Returning variable outputs from a DoFn

Let's start by listing the specification of a schema migration job:

  • The schema version of a model is in the closed range [1, N], where N is the latest version.
  • All migration functions are implemented in terms of taking n to n + 1.
  • Models should only be put into storage after successfully migrating to vN.
  • Models that were already at vN should be reported separately.

A recursive function seems like an intuitive fit for this type of operation, so let's workout what a diagram would look like in terms of migrate_to_next_version.

NOTE: In practice, an iterative approach would be more efficient. However, this code is concerned with teaching you how to use advanced Apache Beam constructs, so we'll go with the more-complicated approach against better judgment.

In practice, finding the best patterns on your own will become easier as you gain experience writing jobs and working with "Functional Programming" in general.

TIP: Often, when jobs are relatively complicated, it's helpful to begin by sketching a diagram of what you want the job to do. I recommend using pen and paper or a whiteboard, but in this Wiki Page we use ASCII art to keep the document self-contained.

.--------------. Partition(lambda model: model.schema_version)
| Input Models | ---------------------------------------------.
'--------------'                                              |
                                             .-----------.    |
                    .----------------------- | Model @v1 | <--|
                    |                        '-----------'    |
                    |                                         |
                    | ParDo(MigrateToNextVersion())           |
                     >-----------------------------.          |
                    |                              |          |
                    |                              v          |
                    |                        .-----------.    |
                    '----------------------- | Model ... | <--'
                                             | Model @vN |
                 .-----------.  ndb_io.PutModels() |
                 | Datastore | <-------------------'

TIP: You don't need to know what the names of the PTransforms (edges) used in a diagram are. It's easy to look up the "perfect match" after drawing it.

In this example, we could have easily replaced the edges with "plain-text" sentences without losing any generality.

There's a lot of complication in the outset, so let's make sure we use plenty of PTransforms to write them. We'll start with the most interesting one: the loop to migrate models to the next version.

class MigrateToNextVersion(beam.DoFn):

    def process(self, input_model):
        if input_model.schema_version < ExplorationModel.LATEST_SCHEMA_VERSION:
            model = job_utils.clone_model(input_model)
            yield model

class MigrateToLatestVersion(beam.PTransform):

    .--------------. Partition(lambda model: model.schema_version)
    | Input Models | ---------------------------------------------.
    '--------------'                                              |
                                                 .-----------.    |
                        .----------------------- | Model @v1 | <--|
                        |                        '-----------'    |
                        |                                         |
                        | ParDo(MigrateToNextVersion())           |
                         >-----------------------------.          |
                        |                              |          |
                        |                              v          |
                        |                        .-----------.    |
                        '----------------------- | Model ... | <--'
                                                 | Model @vN |

    def expand(self, exp_model_pcoll):
        models_by_schema_version = (
            | beam.Partition(
                lambda model, _: model.schema_version - 1,

        do_fn = MigrateToNextVersion()
        results = [models_by_schema_version[0] | beam.Map(do_fn)]

        for models_at_ith_version in models_by_schema_version[1:-1]:
            models_to_migrate = (
                (results[-1].updated_models, models_at_ith_version)
                | beam.Flatten()
            results.append(models_to_migrate | beam.FlatMap(do_fn))

NOTE: This implementation won't work as-is, it's merely a scaffold of the key components we need to use to build out our diagram.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment