Skip to content

Instantly share code, notes, and snippets.

@blech75
Last active October 19, 2024 01:07
Show Gist options
  • Save blech75/b229819c8cc1358c8b4b4dd45b8930d5 to your computer and use it in GitHub Desktop.
Save blech75/b229819c8cc1358c8b4b4dd45b8930d5 to your computer and use it in GitHub Desktop.
dataflow tooling
import os
import sys
PROJECT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
# add the project path so we can access any code that we need there. always
# check if it's there before we add because we don't want to mess up the
# delicate path order.
if PROJECT_PATH not in sys.path:
sys.path.append(PROJECT_PATH)
from ._cli import main
main()

About the dataflow package and tooling

The dataflow package and tooling (./scripts/dataflow) serves a few purposes:

  • makes it easier (and possible!) to write, test, and run Dataflow (Apache Beam) jobs locally and on GCP;
  • formalizes some patterns and provides some structure to jobs;
  • ensures any data modifications are logged.

Our typical use cases for Dataflow jobs include:

  • query Datastore entities for bulk modification/deletion
  • query Datastore entities and use props as input for making bulk API calls
  • query Datastore entities and produce a report (CSV, etc.)
  • read an input file (CSV) and use the records as input for making bulk API calls
  • (permutations of the above)
  • etc.

Features

  • DataflowOptions, a custom PipelineOptions subclass, is defined.
    • This allows us to find out all relevant options for our use cases.
  • Pipeline options are automatically configured to helpful defaults (see configure_options()).
    • Effective values for relevant options are logged before running the job.
    • Each run of a job is automatically given a "run id" based on the current datetime. A custom run id can be specified manually, if desired.
    • The job's module name and the run id are combined to form a "run name" (e.g. some-job__20240107_1612).
    • For runs affecting GCP data, the service name is prepended to the run name (e.g. service_name__some-job__20240107_1612)
  • You can quickly build jobs by composing some functions and transforms.
    • Currently available functions (beam.DoFn, TaggedOutputFn):
      • Counter()
      • DuplicateEntity(key_transformer, props_formatter)
      • SkipEntityIf(options, test_condition)
      • CallApiWithEntity(options, compute_url, prepare_data)
      • CreateTaskWithEntity(options, queue_name, service, version, path, prepare_data, headers)
      • GetEntityFromRow(options)
    • Currently available transforms (beam.PTransform):
      • QueryAndCountEntities(query)
      • ExecuteFnForEntities(options, entity_handler=beam.DoFn)
      • UpdateEntities(options, entity_updater=beam.DoFn)
      • WriteEntityDictsToBigQuery(options)
      • DumpKeys(options)
      • DumpEntities(options)
      • DeleteEntities(options)
      • RunQueryInBq(options)
      • WriteTextToFile(options)
      • WriteCsvToFile(options)
  • The transforms are configured to log actions and serialize entities to files (local/GCS) and BigQuery.
    • GCS folders are named with the run name, and are stored in the GCS datastore_job_runs folder of the associated Dataflow project.
    • BigQuery result tables are named with the the run name, and stored in the datastore_job_runs dataset of the associated Dataflow project.
  • Google Cloud logging is automatically configured to log to the dataflow log (projects/GCP_PROJECT/logs/dataflow) with the appropriate resource labels (job_name, job_id) along with the service this job is associated with (labels.service).
  • After the run has completed, metrics and relevant URLs to GCP Console are logged for convenience.
  • Uses Dataflow-specific env vars (dataflow/.env), if present.
  • icecream is enabled when running locally.
  • The dataflow module can be used on its own (./.venv3/bin/python3 -m dataflow), or via ./scripts/dataflow, which makes common usage much easier.

Usage

This assumes you have already created a local development environment via ./scripts/venv create. See the main README.md for more info.

In this documentation, "local" means a POSIX shell environment, i.e. your personal machine or your personal Google Cloud Shell.

You can get help via ./scripts/dataflow -h. And you can see the underlying Beam/Dataflow options via ./scripts/dataflow -o (which just calls ./.venv3/bin/python -m dataflow --help).

Run locally against the local Datastore Emulator

This option is super-useful for developing and testing Dataflow jobs because it runs the Dataflow job locally without affecting GCP. You should always do this first!

Assumptions

  • Datastore Emulator is started via ./scripts/dsemulator (or gcloud beta emulators datastore start).
    • i.e. run ./scripts/dsemulator in one terminal session
  • Datastore backup has been restored into the emulator via ./scripts/dsbackup.
    • i.e. run ./scripts/dsbackup dl -f && ./scripts/dsbackup import in another terminal session

What happens

  • runs via DirectRunner locally, so the job starts immediately and we are not charged
  • queries/modifies the local data inside Datastore Emulator
  • logs to console
  • outputs results to a local directory (dataflow_job_runs)

Command variations

  • Run via ./scripts/dataflow (recommended)

    If you are using all the defaults, this command will implicitly set DATASTORE_EMULATOR_HOST with default values:

    ./scripts/dataflow -d localdev JOB

    Alternatively, if you specify DATASTORE_EMULATOR_HOST, this command will implicitly use localdev as the project name used in Datastore Emulator:

    DATASTORE_EMULATOR_HOST=localhost:9999 \
    ./scripts/dataflow JOB

    Or, if you need to specify a custom DATASTORE_EMULATOR_HOST and a custom project name in Datastore Emulator, then you can use this command:

    DATASTORE_EMULATOR_HOST=localhost:10101 \
    ./scripts/dataflow -d my-custom-project JOB
  • Run via the dataflow module, directly (advanced)

    This is useful for dataflow module development and should be rarely used.

    GOOGLE_CLOUD_DISABLE_GRPC=1 \
    DATASTORE_EMULATOR_HOST=localhost:9999 \
    ./.venv3/bin/python3 -m dataflow JOB_NAME \
      --datastore_project=localdev

Run locally against GCP Datastore

This option is useful for small- to medium-size Dataflow jobs. We still run the Dataflow job locally, which is much faster than using Google Cloud Dataflow, but we are modifying the data in Datastore on a specified GCP project.

Since we are modifying remote (possibly production) data, the results are logged to Google Cloud Storage and BigQuery in the associated GCP project.

Assumptions

  • You have thoroughly tested the job locally via the previous option.
  • The targeted entities in the GCP project's Datastore have been recently backed up.

What happens

  • runs via DirectRunner locally, so the job starts immediately and we are not charged
  • queries/modifies Datastore in the specified GCP project (GCP_PROJECT)
  • logs locally and on GCP_PROJECT
  • outputs results to GCP_PROJECT's GCS and BQ

Command variations

  • Run via ./scripts/dataflow (recommended)

    ./scripts/dataflow -d GCP_PROJECT JOB
  • Run via the dataflow module, directly (advanced)

    This is useful for dataflow module development and should be rarely used.

    ./.venv3/bin/python3 -m dataflow JOB_NAME \
      --datastore_project=GCP_PROJECT

Run on Google Cloud Dataflow against GCP Datastore

This option is recommended for larger Dataflow jobs. We start a Google Cloud Dataflow job in the specified GCP project, and modify Datastore in the same GCP project.

Since we are modifying remote (possibly production) data, the results are logged to Google Cloud Storage and BigQuery in the associated GCP project.

Assumptions

  • You have thoroughly tested the job locally via the previous option.
  • The targeted entities in the GCP project's Datastore have been recently backed up.

What happens

  • runs via DataflowRunner on GCP (in appropriate region), so it takes a few minutes to create and start the worker VMs, and we are charged for usage.
  • queries/modifies Datastore in the specified GCP project (GCP_PROJECT)
  • logs locally and on GCP_PROJECT
  • outputs results to GCP_PROJECT's GCS and BQ

Command variations

  • Run via ./scripts/dataflow (recommended)

    ./scripts/dataflow -p GCP_PROJECT JOB

    If the job is particularly large, we can optimize worker VM startup by using a custom container image:

    ./scripts/dataflow -p GCP_PROJECT -i JOB
  • Run via the dataflow module, directly (advanced)

    This is useful for dataflow module development and should be rarely used.

    ./.venv3/bin/python3 -m dataflow JOB_NAME \
      --project=GCP_PROJECT \
      --region=GCP_REGION

    Same as above re: a particularly large job:

    ./.venv3/bin/python3 -m dataflow JOB_NAME \
      --project=GCP_PROJECT \
      --region=GCP_REGION \
      --prebuild_sdk_container_engine=cloud_build \
      --docker_registry_push_url=us-docker.pkg.dev/GCP_PROJECT/dataflow-images \
      --sdk_location=container

Caveats

  • If the Dataflow job modifies Datastore entities, App Engine Memcache should be purged after the run is complete.
  • We do not have access to our application code inside the scope of a Dataflow job, nor do we have the ability to use NDB. That means any modifications to Datastore entities must be done by using the apache_beam.io.gcp.datastore.v1new.types module or google.cloud.datastore.entity.Entity.

TODO

  • Convert key jobs into Dataflow Templates, which can be started via gcloud or Google Cloud Console.
import importlib
import logging
import os
import sys
from argparse import ArgumentError
from datetime import datetime, timedelta
from dotenv import load_dotenv
from ._common import (
FOLDER_PREFIX,
PROJECT_PATH,
beam,
configure_logging,
is_totally_local,
output_path,
)
from ._options import DataflowOptions, configure_options
logger = logging.getLogger(__package__)
# name of class in job module that handles running the job.
DF_JOB_CLASS = "DataflowJob"
# added to start/end times to ensure log window contains all entries
LOG_WINDOW_BUFFER = timedelta(seconds=30)
# use a separate .env file in this dir for dataflow jobs
DOTENV_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), ".env"))
def main(argv=None):
load_dotenv(DOTENV_PATH)
# first, parse all the args we passed into a `PipelineOptions` object.
options = DataflowOptions(argv)
try:
# now adjust the options based on our own needs. note that instantiation
# of the `PipelineOptions` is pulled out of `configure_options()` so we
# can get access to `options.parser` when it raises an `ArgumentError`.
options = configure_options(options)
except ArgumentError as err:
# ArgumentParser.error() exits
options.parser.error(str(err))
configure_logging(options)
df_job = _import_job_module(options.module_name)
logger.info(f"Starting Dataflow job {options.job_name}...")
# always log the options so we have a record of how exactly the job was run
opts = [f"- {k}: {repr(v)}" for k, v in options.changed_or_relevant().items()]
logger.info("\n".join(["Using pipeline options:"] + opts))
if options.show_options_only:
return
start_dt = datetime.utcnow()
with beam.Pipeline(options=options) as p:
p | "Run Dataflow Job" >> df_job.DataflowJob(options, dict(**os.environ))
end_dt = datetime.utcnow()
hh, mm, ss = (str(end_dt - start_dt)[:-7]).split(":")
logger.info(f"Completed Dataflow job {options.job_name} in {hh}h {mm}m {ss}s")
_display_metrics(p.result)
_display_links(options, start_dt, end_dt)
def module_main(spec):
"""Convenience helper to handle running the individual job module.
in `job_module.py`:
```py
if __name__ == "__main__":
module_main(__spec__)
```
"""
_parent, _, name = spec.name.rpartition(".")
main([name] + sys.argv[1:])
# ---
def _import_job_module(module_name):
df_job = importlib.import_module(f"{__package__}.{module_name}")
if not hasattr(df_job, DF_JOB_CLASS) and (
not isinstance(getattr(df_job, DF_JOB_CLASS), beam.PTransform)
):
raise RuntimeError(
f"The `{module_name}.py` file must define a subclass of "
"`PTransform` named `DataflowJob`."
)
return df_job
def _display_metrics(result):
MetricsFilter = beam.metrics.metric.MetricsFilter
MetricResults = beam.metrics.metric.MetricResults
metrics = result.metrics().query(MetricsFilter())
f_metrics = [_fmt_metric(m) for m in metrics[MetricResults.COUNTERS]]
logger.info("Job Metrics (unordered):\n%s", "\n".join(f_metrics))
def _display_links(options, start_dt, end_dt):
if options.gcs_output:
gcs_uri = output_path(options)
gcs_url = (
f"https://console.cloud.google.com/storage/browser"
f"/{options.gcs_project}.appspot.com/{FOLDER_PREFIX}/{options.folder_name}"
f"?project={options.gcs_project}"
)
logger.info(f"Dataflow file GCS URL: {gcs_url} ")
logger.info(f"Dataflow file GCS URI: {gcs_uri}")
else:
local_path = output_path(options)
logger.info(f"Dataflow file output is in {PROJECT_PATH}/{local_path}")
if options.bq_output:
# we can't link directly to the table because there may be multiple
# tables created, each with a step suffix and we don't have that info
# here.
#
# bq_table = options.bq_table_name
bq_dataset = options.bq_dataset
bq_project = options.bq_project
bq_url = (
"https://console.cloud.google.com/bigquery"
f"?project={bq_project}"
# query string works for now, but is completely undocumented.
f"&ws=!1m4!1m3!3m2!1s{bq_project}!2s{bq_dataset}"
)
logger.info(f"Dataflow BQ URL: {bq_url}")
if not is_totally_local(options):
project = options.datastore_project or options.project
logs_url = (
f"https://console.cloud.google.com/logs/query"
f";query=resource.type:%22dataflow%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdiagnostic-log%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fsystem%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fkubelet%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fdocker%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fvm-monitor%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fvm-health%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fagent%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fharness%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fharness-startup%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fresource%22%0A-logName%3D%22projects%2F{project}%2Flogs%2Fdataflow.googleapis.com%252Fworker-startup%22%0A" # noqa E501
f";summaryFields=resource%252Flabels%252Fjob_name,logName,labels%252Fservice:true:28:end;" # noqa E501
f";lfeCustomFields=resource%252Flabels%252Fjob_name"
f";startTime={_fmt_dt(start_dt - LOG_WINDOW_BUFFER)}"
f";endTime={_fmt_dt(end_dt + LOG_WINDOW_BUFFER)}"
f"?project={project}"
)
logger.info(f"Dataflow GCP logs URL: {logs_url}")
# ---
def _fmt_dt(dt):
return dt.isoformat()[:-7] + "Z"
def _fmt_metric(metric):
return f"{_fmt_step(metric.key.step)} > {metric.key.metric.name}: {metric.result}"
def _fmt_step(step):
# Run Dataflow Job/QueryAndCountEntities/Count SupplyItem entities
# Run Dataflow Job/UpdateEntities/Write updated entities to Datastore/Write Batch to Datastore # noqa E501
steps = step.split("/")[1:]
return " > ".join(steps)
import logging
import os
import warnings
from datetime import datetime
import google.cloud.logging
from google.cloud.logging import Resource
from google.cloud.logging_v2.handlers import CloudLoggingHandler
from .app_yaml import APP_YAML, AppYaml
PROJECT_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
# ---
# silence some logging and warnings _before_ importing the beam libraries. this
# means we have to add `noqa E402` to the beam imports.
#
for mod, lvl in [
("apache_beam.runners.portability.fn_api_runner.translations", logging.WARNING),
("apache_beam.typehints.native_type_compatibility", logging.WARNING),
("apache_beam.options.pipeline_options", logging.ERROR),
]:
logging.getLogger(mod).setLevel(lvl)
for mod, line in [
("apache_beam.io.gcp.datastore.v1new.datastoreio", 250),
("apache_beam.io.gcp.datastore.v1new.datastoreio", 251),
("google.cloud.datastore.query", 234),
]:
warnings.filterwarnings("ignore", category=UserWarning, module=mod, lineno=line)
import apache_beam as beam # noqa E402
from apache_beam.io import ( # noqa E402
ReadFromText,
ReadFromTextWithFilename,
WriteToText,
)
from apache_beam.io.gcp.bigquery import ( # noqa E402
ReadAllFromBigQuery,
ReadFromBigQuery,
WriteToBigQuery,
)
from apache_beam.io.gcp.datastore.v1new.datastoreio import ( # noqa E402
DeleteFromDatastore,
ReadFromDatastore,
WriteToDatastore,
)
from apache_beam.io.gcp.datastore.v1new.helper import ( # noqa E402
get_client as get_datastore_client,
)
from apache_beam.io.gcp.datastore.v1new.types import Entity, Key, Query # noqa E402
from apache_beam.io.gcp.internal.clients import bigquery # noqa E402
from apache_beam.metrics import Metrics # noqa E402
from apache_beam.metrics.metric import MetricsFilter # noqa E402
from apache_beam.options.pipeline_options import ( # noqa E402
DebugOptions,
DirectOptions,
GoogleCloudOptions,
ProfilingOptions,
SetupOptions,
StandardOptions,
TestDataflowOptions,
TestOptions,
WorkerOptions,
)
from google.cloud.datastore.entity import Entity as DatastoreEntity # noqa E402
from google.cloud.datastore.key import Key as DatastoreKey # noqa E402
from google.cloud.datastore.query import Query as DatastoreQuery # noqa E402
# ---
# configure logging
#
log_level = os.getenv("LOG_LEVEL", "info").upper()
logging.basicConfig(level=logging._checkLevel(log_level))
logger = logging.getLogger(__package__)
# the `logName` used in GCP Logging
LOG_NAME = "dataflow"
def configure_logging(options):
if is_runner_local(options) and is_data_local(options):
return
project = options.project or options.datastore_project
region = options.region or LOCAL_PROJECT_NAME
handler_args = {
"name": LOG_NAME,
"labels": {
"service": SERVICE_NAME,
},
"resource": Resource(
type="dataflow_step",
labels={
"project_id": project,
"region": region,
"job_name": options.job_name,
"job_id": options.run_id,
},
),
}
root_logger = logging.getLogger()
gcl_client = google.cloud.logging.Client(project=project)
if is_runner_local(options):
handler = CloudLoggingHandler(gcl_client, **handler_args)
root_logger.addHandler(handler)
return
gcl_client.setup_logging(
log_level=logging._checkLevel(log_level),
**handler_args,
)
# ---
# a simpler version of icecream for dataflow
#
# using try/catch here because we don't have/want icecream in jobs running on
# Google Cloud Dataflow.
try:
import icecream
icecream.install()
icecream.ic.configureOutput(includeContext=True, contextAbsPath=True)
except Exception:
pass
# ---
# constants
#
DATAFLOW_JOB_RUNS = "dataflow_job_runs"
DIRECT_RUNNER = "DirectRunner"
DATAFLOW_RUNNER = "DataflowRunner"
APPSPOT_REGIONS = {
"foo-12345": "ue",
"bar-23456": "uk",
}
DEFAULT_BQ_PROJECT = "baz-34567"
# ---
# defaults
#
# trim off microseconds and "slugify" the iso datetime for use in folder names, etc.
now_iso = datetime.utcnow().isoformat()[:-10]
DF_RUN_ID = now_iso.replace("-", "").replace(":", "").replace("T", "_")
LOCAL_PROJECT_NAME = os.getenv("LOCAL_PROJECT_NAME", "localdev")
# default value is included in .gitignore
FOLDER_PREFIX = os.getenv("FOLDER_PREFIX", DATAFLOW_JOB_RUNS)
APP_YAML_PATH = os.path.join(os.path.abspath(os.path.dirname(__file__)), APP_YAML)
app_yaml = AppYaml(APP_YAML_PATH)
SERVICE_NAME = app_yaml.get("service")
NAMESPACE = app_yaml.get("env_variables.NAMESPACE")
X_GEN_API_KEY = app_yaml.get("env_variables.X_GEN_API_KEY")
PORT = os.getenv("PORT", "8080")
LOCALDEV_URL = os.getenv("LOCALDEV_URL", "http://localhost:{}".format(PORT))
# ---
# base classes
class BaseDataflowJob(beam.PTransform):
"""Base class for use with our dataflow tooling."""
def __init__(self, options, env):
"""Create a new `BaseDataflowJob` instance.
Args:
options (PipelineOptions): Options object passed to Pipeline
env (dict): Copy of `os.environ`
"""
super().__init__()
self.options = options
self.env = env
# ---
# helpers
def slugify(s):
return s.lower().replace("_", "-")
def prefix_slug(base, prefix=None, sep="_"):
prefix = f"{prefix}{sep}" if prefix else ""
if prefix and base.startswith(prefix):
return base
return prefix + base
def suffix_slug(base, suffix=None, sep="_"):
suffix = f"{sep}{suffix}" if suffix else ""
if suffix and base.endswith(suffix):
return base
return base + suffix
def suffix_parens(base, suffix=None):
suffix = f" ({suffix})" if suffix else ""
if suffix and base.endswith(suffix):
return base
return base + suffix
def prepend_service_name(name):
return f"{SERVICE_NAME}__{name}"
def append_run_id(name):
return f"{name}__{DF_RUN_ID}"
def gcs_bucket_path(project, *paths):
path = "/".join(paths)
return f"gs://{project}.appspot.com/{FOLDER_PREFIX}/{path}"
def is_runner_local(options):
return options.runner == DIRECT_RUNNER
def is_data_local(options):
return options.datastore_project == LOCAL_PROJECT_NAME
def is_totally_local(options):
return is_runner_local(options) and is_data_local(options)
def output_path(options, *paths):
"""Returns a local path or GCS URI based on the provided `options` and any
additional `paths` provided.
A local path is returned for local runners that are modifying local data,
unless a GCS project is specified. Otherwise, a GCS URI is returned.
Args:
options (PipelineOptions): This pipeline's options.
*paths (str): Additional paths.
Returns:
str: local path or GCS URI.
"""
gcs_project = options.gcs_project
folder_name = options.folder_name
path = "/".join([folder_name, *paths])
if is_totally_local(options) and not gcs_project:
return f"{FOLDER_PREFIX}/{path}"
return gcs_bucket_path(gcs_project, path)
def appspot_service_url(project, service, version=None):
if project == LOCAL_PROJECT_NAME:
return LOCALDEV_URL
ver = f"{version}-dot-" if version else ""
try:
region = f"{APPSPOT_REGIONS[project]}.r."
except KeyError:
region = ""
return f"https://{ver}{service}-dot-{project}.{region}appspot.com"
ENABLED_VALUES = ["1", "true", "yes", "on"]
DISABLED_VALUES = ["0", "false", "no", "off"]
def is_enabled(val: str):
return val.lower() in ENABLED_VALUES
import json
import logging
import requests
from google.cloud.tasks_v2 import (
AppEngineHttpRequest,
AppEngineRouting,
CloudTasksClient,
CreateTaskRequest,
HttpMethod,
Task,
)
from ._common import Entity, Key, Metrics, beam, bigquery, get_datastore_client
logger = logging.getLogger(__name__)
ITEMS = "items"
class TaggedOutputDoFn(beam.DoFn):
OUTPUT_TAG_SUCCESS = "success"
OUTPUT_TAG_UPDATES = "updates"
OUTPUT_TAG_SKIPPED = "skipped"
OUTPUT_TAG_ERRORS = "errors"
class Counter(beam.DoFn):
"""A basic `DoFn` to count the number of items in the `PCollection` using
Metrics.
https://beam.apache.org/documentation/programming-guide/#metrics
"""
default_name = ITEMS
def __init__(self, namespace=None, name=None):
super().__init__()
cls = self.__class__
self.counter = Metrics.counter(
namespace or cls,
name or cls.default_name,
)
def process(self, element):
if element:
self.counter.inc()
yield element
class DuplicateEntity(TaggedOutputDoFn):
"""A `beam.DoFn` that accepts a Google Datastore entity and duplicates it by
copying & modifying the key via a provided `key_transformer`. The duplicate
entity's properties may also be changed if a `props_formatter` is provided.
NOTE: The `key_transformer` must add an id/name because the duplicated
entity is written via `WriteToDatastore()`, which cannot handle partial
keys.
Usage:
```py
# in DataflowJob.expand()
# ...
def change_namespace(orig_key):
return Key(
orig_key.kind,
orig_key.id,
project=orig_key.project,
namespace="foo-bar",
)
def transform_data(entity):
return {
"foo": entity["bar"]
}
# ...
| UpdateEntities(DuplicateEntity(
change_namespace,
props_formatter=transform_data
))
```
Args:
key_transformer (callable): Callable that accepts the original/source
key (`google.cloud.datastore.key.Key`) and returns a new key with
project, namespace, kind, and id/name set accordingly.
props_formatter (callable, optional): Callable that accepts an entity's
`properties` and returns a `dict`. Defaults to None, which means we
use `properties` attribute as-is.
Returns:
beam.pvalue.TaggedOutput(apache_beam.io.gcp.datastore.v1new.types.Entity)
"""
def __init__(self, key_transformer, props_formatter=None):
if not key_transformer:
raise RuntimeError("A key_transformer must be supplied!")
super().__init__()
self.key_transformer = key_transformer
self.props_formatter = props_formatter
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.datastore.v1new.types.html#apache_beam.io.gcp.datastore.v1new.types.Entity
def process(self, src_entity):
src_ds_key = src_entity.key.to_client_key()
src_hr_key = to_hr_key(src_ds_key, abs_path=True)
logger.debug(
"Processing %s with properties %s",
src_hr_key,
sorted(src_entity.properties.keys()),
)
source_hr_key = to_hr_key(src_ds_key)
try:
dest_ds_key = self.key_transformer(src_ds_key)
except Exception as err:
logger.error(
"Failed transforming key for %s (%s)",
source_hr_key,
f"{err.__class__.__name__}: {str(err)}",
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_ERRORS, src_entity)]
dest_key = Key.from_client_key(dest_ds_key)
dest_entity = Entity(key=dest_key)
try:
dest_entity_dict = entity_to_dict(src_entity, self.props_formatter)
except Exception as err:
logger.error(
"Failed formatting properties for %s (%s)",
source_hr_key,
f"{err.__class__.__name__}: {str(err)}",
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_ERRORS, src_entity)]
dest_props_dict = dest_entity_dict["properties"]
logger.debug("Destination props/values %s", dest_props_dict)
dest_entity.set_properties(dest_props_dict)
abs_path = (src_ds_key.namespace != dest_ds_key.namespace) or (
src_ds_key.project != dest_ds_key.project
)
logger.info(
"Successfully duplicated entity %s to %s",
to_hr_key(src_ds_key, abs_path),
to_hr_key(dest_ds_key, abs_path),
)
return [dest_entity]
class SkipEntityIf(TaggedOutputDoFn):
"""A `beam.DoFn` that accepts a Google Datastore entity and skips further
processing (sent to the 'skipped' tagged output stream) if the provided
`test_condition` (callable) is `True`.
If an error occurs in `test_condition`, the entity is sent to the `errors`
tagged output stream.
Usage:
```py
# in DataflowJob.expand()
# ...
def should_skip(entity):
return bool(entity["foo"] == "bar)
# ...
| beam.ParDo(SkipEntityIf(options, should_skip))
```
Args:
options (PipelineOptions): This pipeline's options.
test_condition (callable): Callable that accepts a
`google.cloud.datastore.entity.Entity` and returns `True` if the
entity should be skipped.
Returns:
beam.pvalue.TaggedOutput(apache_beam.io.gcp.datastore.v1new.types.Entity)
"""
def __init__(self, options, test_condition):
super().__init__()
self.options = options
self.test_condition = test_condition
def process(self, entity, **kwargs):
entity_ds_key = entity.key.to_client_key()
client_entity = entity.to_client_entity()
entity_hr_key = to_hr_key(entity_ds_key)
try:
skip = self.test_condition(client_entity)
except Exception as err:
logger.error(
"Failed executing %s for %s (%s); Discarding",
self.test_condition.__name__,
entity_hr_key,
format_error(err),
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_ERRORS, entity)]
if skip:
logging.info(
"Skipping {} per {}".format(entity_hr_key, self.test_condition.__name__)
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_SKIPPED, entity)]
return [entity]
class CallApiWithEntity(TaggedOutputDoFn):
"""A `beam.DoFn` that uses `requests` to make an API call using `requests`
for the passed entity. The API call is a POST, but can be customized by
overriding the `call_api()` method.
Exceptions are handled by yielding to two additional tagged outputs (skipped
and errors) for compatibility with the `ExecuteFnForEntities` transform.
Usage:
```py
# in DataflowJob.expand()
# ...
def compute_url(_entity):
return "https://baz/api/migrate-foo"
def prepare_data(entity):
return {
"foo": entity["foo"],
}
# ...
| beam.ParDo(CallApiWithEntity(options, compute_url, prepare_data))
```
Args:
options (PipelineOptions): This pipeline's options.
compute_url (callable): Callable that accepts a
`google.cloud.datastore.entity.Entity` and returns a URL as a `str`.
prepare_data (callable): Callable that accepts a
`google.cloud.datastore.entity.Entity` and returns a suitable value
for the data used in the API call.
dry_run (bool, optional): Calls the API if True. Defaults to False.
Returns:
beam.pvalue.TaggedOutput(apache_beam.io.gcp.datastore.v1new.types.Entity)
"""
def __init__(self, options, compute_url, prepare_data, headers=None):
super().__init__()
self.options = options
self.compute_url = compute_url
self.prepare_data = prepare_data
self.headers = headers or {}
def process(self, entity, dry_run=False, **kwargs):
entity_ds_key = entity.key.to_client_key()
client_entity = entity.to_client_entity()
entity_hr_key = to_hr_key(entity_ds_key)
logger.debug(
"Processing %s with properties %s",
entity_hr_key,
sorted(entity.properties.keys()),
)
try:
url = self.compute_url(client_entity)
logger.debug("Computed URL %s", url)
except Exception as err:
logger.error(
"Failed computing URL for %s (%s); "
"Skipping API call and further processing for entity.",
entity_hr_key,
format_error(err),
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_SKIPPED, entity)]
try:
data = self.prepare_data(client_entity)
logger.debug("Prepared data:\n%s", data)
except Exception as err:
logger.error(
"Failed preparing data for %s (%s); "
"Skipping API call and further processing for entity.",
entity_hr_key,
format_error(err),
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_SKIPPED, entity)]
if dry_run:
logger.warning(
"This is a DRY RUN; "
"Skipping API call and pretending it succeeded by returning entity."
)
return entity
# in case call_api() fails entirely
resp_data = None
try:
resp = self.call_api(url, data)
try:
resp_data = resp.json()
except requests.JSONDecodeError:
resp_data = resp.text
logger.debug(
"HTTP %s for %s API response:\n%s",
resp.status_code,
entity_hr_key,
resp_data,
)
if resp.status_code >= 500:
# raises RequestException, handled by outer `try`
resp.raise_for_status()
if resp.status_code >= 400:
logger.error(
"API call failed for %s due to client error (HTTP %s); "
"Skipping entity.\n%s",
entity_hr_key,
resp.status_code,
resp_data,
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_SKIPPED, entity)]
except (
requests.RequestException,
requests.ConnectionError,
requests.ConnectTimeout,
requests.ReadTimeout,
requests.Timeout,
requests.TooManyRedirects,
requests.HTTPError,
) as err:
logger.error(
"API call failed for %s (%s); Skipping entity.\n%s",
entity_hr_key,
format_error(err),
resp_data,
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_ERRORS, entity)]
logger.info("Successfully called API for %s", entity_hr_key)
return [entity]
def call_api(self, url, data):
# override with whatever is required for your API call.
return requests.post(url, json=data, headers=self.headers)
class CreateTaskWithEntity(TaggedOutputDoFn):
"""A `beam.DoFn` that enqueues a Google Cloud Task
(`google.cloud.tasks_v2.Task`) of type `AppEngineHttpRequest` for the passed
entity.
The `gct_project` and `gct_region` options are honored.
Usage:
```py
# in DataflowJob.expand()
# ...
def prepare_data(entity):
return {
"foo": entity["foo"]
}
# ...
| ExecuteFnForEntities(
options,
entity_handler=CreateTaskWithEntity(
options,
queue_name="bar-service-migrate-foo",
service="bar-service",
version="migrate-foo",
path="/api/foo",
prepare_data=prepare_data,
headers={
"x-baz": "quux",
},
),
)
```
Args:
options (PipelineOptions): This pipeline's options.
queue_name (str): Name of Google Cloud Task queue onto which `Task` will
be enqueued.
service (str): Name of App Engine service that `Task` will use.
version (str): Version name of App Engine service that `Task` will use.
path (str): Root-relative path (target) that `Task` will use.
prepare_data (callable): Callable that accepts a
`google.cloud.datastore.entity.Entity` and returns a
JSON-serializable `Task` payload.
headers (dict): HTTP headers used by `Task` when calling service.
dry_run (bool, optional): Calls the Task API if True. Defaults to False.
Returns:
beam.pvalue.TaggedOutput(apache_beam.io.gcp.datastore.v1new.types.Entity)
"""
def __init__(
self,
options,
queue_name,
service,
version,
path,
prepare_data,
headers=None,
dry_run=False,
):
super().__init__()
self.options = options
self.queue_name = queue_name
self.path = path
self.service = service
self.version = version
self.prepare_data = prepare_data
self.headers = headers or {}
self.dry_run = dry_run
def process(self, entity, *args, **kwargs):
entity_ds_key = entity.key.to_client_key()
client_entity = entity.to_client_entity()
entity_hr_key = to_hr_key(entity_ds_key)
try:
payload = self.prepare_data(client_entity)
logger.debug("Prepared data:\n%s", payload)
except Exception as err:
logger.error(
"Failed preparing data for %s (%s); Entity processing aborted.",
entity_hr_key,
format_error(err),
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_SKIPPED, entity)]
if self.dry_run:
logger.info(
"[DRY RUN] Enqueued task to %s in project %s for: %s\n%s",
self.queue_name,
self.options.gct_project,
entity_hr_key,
to_json(payload, pretty=True),
)
return [entity]
queue_path = CloudTasksClient.queue_path(
project=self.options.gct_project,
location=self.options.gct_region,
queue=self.queue_name,
)
logger.debug("Using queue_path: %s", queue_path)
try:
req = CreateTaskRequest(parent=queue_path, task=self._create_task(payload))
task = CloudTasksClient().create_task(req)
except Exception as err:
logger.error(
"Failed adding task for %s (%s); Entity processing aborted.",
entity_hr_key,
format_error(err),
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_ERRORS, entity)]
logger.info(
"Enqueued task:%s to %s for %s\n%s",
task.name,
queue_path,
entity_hr_key,
to_json(payload, pretty=True),
)
return [entity]
def _create_task(self, payload):
return Task(
app_engine_http_request=AppEngineHttpRequest(
app_engine_routing=AppEngineRouting(
service=self.service,
version=self.version,
),
http_method=HttpMethod.POST,
headers=self.headers,
relative_uri=self.path,
body=to_json(payload).encode(),
),
)
class GetEntityFromRow(TaggedOutputDoFn):
"""A `beam.DoFn` that constructs a Datastore key from a column/key from the
provided BQ `row` (or any `dict`-like data), retrieves the `Entity` from
Datastore, and yields it to the next step.
The `__key__` column/key is used by default and we expect its value to have
`kind`, `id`, and `name` keys. If the column/key only contains an `id` or
`name` key, we can instantiate this function with a `kind` arg.
If the column/key contains any other kind of value, we must supply our own
`key_from_row` callable to parse the value and return a
`google.cloud.datastore.key.Key`. Any errors raised by this callable will
send the row to the 'errors' tagged output.
If the entity referenced by the generated key does not exist, the row will
be sent to the 'skipped' tagged output.
Usage:
* `foo` column contains key structure
```py
# ...
# in dataflow job
# ...
GetDatastoreEntityFromRow(options, key_col="foo")
```
* 'bar' column contains ndb id/name of `Bar` entities
```py
def key_from_row(row, ds_client, col, kind):
return ds_client.key(kind, row[col])
# ...
# in dataflow job
GetDatastoreEntityFromRow(
options,
key_col="bar",
key_from_row=key_from_row,
kind="Bar"
)
```
Args:
options (PipelineOptions): This pipeline's options.
key_col (str): Name of column/key containing Datastore key in provided
`row` (or dict-like structure).
key_from_row (callable, optional): Callable that accepts `row`,
`ds_client`, `col`, and `kind` args and returns a key
(`google.cloud.datastore.key.Key`).
kind (str):
pair_with_row (bool, Optional): If True, return the input row as the
second field of a tuple. Defaults to False.
Returns:
beam.pvalue.TaggedOutput(apache_beam.io.gcp.datastore.v1new.types.Entity)
"""
DEFAULT_KEY_COL = "__key__"
def __init__(
self,
options,
key_col=None,
key_from_row=None,
kind=None,
pair_with_row=False,
):
super().__init__()
self.options = options
self.project = options.datastore_project
self.namespace = options.datastore_namespace
self.key_col = key_col or self.DEFAULT_KEY_COL
self.key_from_row = key_from_row
self.kind = kind
self.pair_with_row = pair_with_row
def process(self, row):
ds_client = get_datastore_client(project=self.project, namespace=self.namespace)
key_from_row = self.key_from_row or self._key_from_row
try:
entity_ds_key = key_from_row(
row,
ds_client,
col=self.key_col,
kind=self.kind,
)
except Exception as err:
logger.error(
"Failed extracting key from column %s (%s)",
self.key_col,
format_error(err),
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_ERRORS, row)]
entity_hr_key = to_hr_key(entity_ds_key)
ds_entity = ds_client.get(entity_ds_key)
if not ds_entity:
logger.warning(
"Skipping %s; Entity does not exist in Datastore!",
entity_hr_key,
)
return [beam.pvalue.TaggedOutput(self.OUTPUT_TAG_SKIPPED, row)]
logger.debug(
"Retrieved entity %s with properties %s",
entity_hr_key,
sorted(ds_entity.keys()),
)
entity = Entity.from_client_entity(ds_entity)
if self.pair_with_row:
return [(entity, row)]
return [entity]
@staticmethod
def _key_from_row(row, ds_client, col, kind):
key_struct = row[col]
try:
key_kind = key_struct["kind"]
except KeyError:
key_kind = kind
try:
key_id_or_name = key_struct["id"] or key_struct["name"]
except KeyError:
key_id_or_name = key_struct
if not key_kind:
raise ValueError(f"Must specify `kind` if using simple value in {col}")
logging.debug(
"Extracted key_kind=%s, key_id_or_name=%s (%s) from col=%s",
key_kind,
key_id_or_name,
key_id_or_name.__class__.__name__,
col,
)
return ds_client.key(key_kind, key_id_or_name)
# ---
def build_bq_schema(fields):
# https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
schema = bigquery.TableSchema()
for f in fields:
s = bigquery.TableFieldSchema()
s.name = f["name"]
s.type = f["type"]
s.mode = f["mode"]
schema.fields.append(s)
return schema
def key_to_dict(key):
"""Returns a dict for the provided entity Key.
Args:
key (apache_beam.io.gcp.datastore.v1new.types.Key): Datastore key in
Dataflow step.
Returns:
dict: keys/values of Key
"""
return {
"project": key.project,
"namespace": key.namespace,
"parent": key.parent,
"path_elements": key.path_elements,
}
def entity_to_dict(ent, props_formatter=None):
"""Returns a `dict` that represents the provided Google Datastore entity.
Args:
ent (apache_beam.io.gcp.datastore.v1new.types.Entity): Datastore entity
in Dataflow step.
props_formatter (callable, optional): Callable to format the entity's
properties. Defaults to None, which means we use the `properties`
attribute as-is.
Returns:
dict: A dict with `key` (datastore key as `dict`) and `prop` (entity's
properties, also a `dict`).
"""
return {
"key": key_to_dict(ent.key),
"properties": (
props_formatter(ent.properties) if props_formatter else ent.properties
),
}
def serialize_to_json(obj, default=str, **kwargs):
return json.dumps(obj, default=default, **kwargs)
# from genservice.util; duplicated here to avoid importing genservice because
# doing so has side effects intended only for app engine.
def to_json(data, pretty=False, **kwargs):
if pretty:
kwargs["indent"] = 2
return serialize_to_json(data, **kwargs)
def serialize_dict_values(d):
"""Returns a `dict` with the same keys, but with its values serialized to
JSON.
Used for making a BQ column out of a Google Datastore entity dict, with its
properties as JSON.
Args:
d (dict): The dict to serialize
Returns:
dict: keys with each value serialized to JSON
"""
return {k: serialize_to_json(v) for k, v in d.items()}
def to_hr_key(key, abs_path=False):
"""Returns a so-called 'human-readable' key: `kind:id_or_name`.
The key's project and namespace can be included as a prefix by passing
`abs_path=True`.
Args:
key (google.cloud.datastore.key.Key): The key to format. (Also accepts a
apache_beam.io.gcp.datastore.v1new.types.Key for convenience.)
abs_path (bool): Includes the project and namespace as a prefix if True.
Defaults to False.
Returns:
string: The human-readable version of the passed ndb key.
"""
# also handle apache_beam.io.gcp.datastore.v1new.types.Key too because
# keeping the types straight is painful.
if isinstance(key, Key):
key = key.to_client_key()
hr_key = f"{key.kind}:{key.id_or_name}"
if abs_path:
hr_key = f"{key.project}:{key.namespace}:{hr_key}"
return hr_key
def format_error(error):
return f"{error.__class__.__name__}{str(error)}"
import os
from argparse import ArgumentError
from ._common import (
DATAFLOW_JOB_RUNS,
DATAFLOW_RUNNER,
DF_RUN_ID,
DIRECT_RUNNER,
NAMESPACE,
PROJECT_PATH,
SERVICE_NAME,
DebugOptions,
DirectOptions,
GoogleCloudOptions,
ProfilingOptions,
SetupOptions,
StandardOptions,
TestDataflowOptions,
TestOptions,
WorkerOptions,
append_run_id,
gcs_bucket_path,
is_data_local,
prepend_service_name,
slugify,
)
RELEVANT_OPTIONS = [
# from DataflowOptions
"module_name",
"run_id",
"datastore_project",
"datastore_namespace",
"folder_name",
"bq_output",
"bq_project",
"bq_dataset",
"bq_table_name",
"gcs_output",
"gcs_project",
"gct_project",
"gct_region",
"limit",
#
# from StandardOptions
"runner",
#
# from DebugOptions
"dataflow_job_file",
"experiments",
#
# from GoogleCloudOptions
"job_name",
"project",
"region",
"labels",
"staging_location",
"temp_location",
"dataflow_service_options",
#
# from SetupOptions
"requirements_file",
"requirements_cache",
"requirements_cache_only_sources",
"extra_packages",
"setup_file",
"beam_plugins",
"save_main_session",
]
class DataflowOptions(
GoogleCloudOptions,
StandardOptions,
SetupOptions,
DebugOptions,
TestOptions,
TestDataflowOptions,
DirectOptions,
ProfilingOptions,
WorkerOptions,
):
"""Beam options specific to our needs. We're subclassing a bunch of relevant
`PipelineOptions` subclasses so we can see output when passing `--help`.
"""
@classmethod
def _add_argparse_args(cls, parser):
# retain a reference to the parser so we can compare our options to the
# default options via `changed_or_relevant()`
cls.parser = parser
# specify the `prog` because the CLI is invoked via `__main__.py`
parser.prog = "dataflow"
parser.add_argument(
"module_name",
action="store",
help="The module (job) to run.",
)
parser.add_argument(
"--show_options_only",
default=False,
action="store_true",
help="Prints the options and then quits",
)
parser.add_argument(
"--datastore_project",
required=False,
action="store",
help=(
"The name of the GCP Project containing the Datastore to query "
"and/or update. Use 'localdev' to specify the local datastore "
"emulator on its default port (9999); requires and respects "
"the `DATASTORE_EMULATOR_HOST` env var."
),
)
parser.add_argument(
"--datastore_namespace",
required=False,
action="store",
default=NAMESPACE,
help=(
"Namespace of Google Cloud Datastore on which to query/update/delete."
"If unspecified, defaults to `<job_name>__<current_datetime>`."
),
)
parser.add_argument(
"--run_id",
required=False,
default=DF_RUN_ID,
action="store",
help=(
"Unique value that identifies each Dataflow run. "
"If unspecified, defaults to `<job_name>__<current_datetime>`."
),
)
parser.add_argument(
"--folder_name",
required=False,
action="store",
help=(
"Folder in which to store results and artifacts. "
"If unspecified, defaults to value of `--run_id`."
),
)
parser.add_argument(
"--bq_output",
required=False,
action="store_true",
help="Save data to BigQuery, if applicable, using other --bq_* options.",
)
parser.add_argument(
"--bq_project",
required=False,
action="store",
help=(
"Name of BQ project in which to store results. "
"If unspecified, defaults to value of `--project`."
),
)
parser.add_argument(
"--bq_dataset",
required=False,
default=DATAFLOW_JOB_RUNS,
action="store",
help=(
"Name of BQ dataset in which to store results. "
"If unspecified, defaults to `dataflow_job_runs`."
),
)
parser.add_argument(
"--bq_table_name",
required=False,
action="store",
help=(
"Name of BQ table in which to store results. "
"If unspecified, defaults to value of `--run_id`."
),
)
parser.add_argument(
"--gcs_output",
required=False,
action="store_true",
help=(
"Store artifacts in Google Cloud Storage, if applicable, "
"using other --gcs_* options."
),
)
parser.add_argument(
"--gcs_project",
required=False,
action="store",
help=(
"Name of GCS project in which to store results. "
"If unspecified, defaults to value of `--project`."
),
)
parser.add_argument(
"--gct_project",
required=False,
action="store",
help=(
"Name of Cloud Tasks project with queues. "
"If unspecified, defaults to value of `--project`."
),
)
parser.add_argument(
"--gct_region",
required=False,
action="store",
help=(
"Region name (us-central1, etc.) of the specified of Cloud Tasks "
"project with queues. If unspecified, defaults to value of `--region`."
),
)
parser.add_argument(
"--limit",
default=0, # DataflowRunner requires an int value (instead of None)
required=False,
type=int,
action="store",
help=(
"Amount of Datastore entities to process. "
"If unspecified, processes all."
),
)
def changed_or_relevant(self):
"""Returns a dict of relevant options or options that are not set to
default values, to aid in understanding how Beam is configured.
See `RELEVANT_OPTIONS` below.
Returns:
dict: key-value pairs of beam options
"""
return {
option: getattr(self, option)
for option in self._visible_option_list()
if (
option in RELEVANT_OPTIONS
or self.parser.get_default(option) != getattr(self, option)
)
}
# ---
DEFAULT_LABELS = {
"service": SERVICE_NAME,
}
def configure_options(options):
"""Configures the provided [freshly-parsed] `PipelineOptions` options for
our needs.
A bunch of defaults are set here for required options that have optional
arguments because the default values are based on other options/args.
Args:
options (PipelineOptions): Options to configure.
Raises:
ArgumentError: If there are any missing options.
Returns:
PipelineOptions: Configured options
"""
# DataflowOptions
#
options.job_name = options.job_name or slugify(options.module_name)
options.folder_name = options.folder_name or append_run_id(options.job_name)
options.datastore_project = options.datastore_project or options.project
if not options.datastore_project:
raise ArgumentError(
_get_action(options.parser, "datastore_project"),
"Either --datastore_project or --project must be specified.",
)
options.bq_output = not is_data_local(options)
options.bq_project = (
options.bq_project
or options.project
or (not is_data_local(options) and options.datastore_project)
)
# always prefix the table name with the namespace, because even locally we
# may be modifying remote datastore.
options.bq_table_name = prepend_service_name(
options.bq_table_name or options.folder_name
)
if options.bq_output and not options.bq_project:
raise ArgumentError(
_get_action(options.parser, "bq_project"),
"Either --bq_project or --project must be specified.",
)
options.gcs_output = not is_data_local(options)
options.gcs_project = (
options.gcs_project
or options.project
or (not is_data_local(options) and options.datastore_project)
)
if options.gcs_output:
options.folder_name = prepend_service_name(options.folder_name)
if options.gcs_output and not options.gcs_project:
raise ArgumentError(
_get_action(options.parser, "gcs_project"),
"Either --gcs_project or --project must be specified.",
)
options.gct_project = options.gct_project or options.project
options.gct_region = options.gct_region or options.region
# StandardOptions
#
options.runner = options.runner or DIRECT_RUNNER # also see DirectOptions
# SetupOptions
#
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module
# level).
#
# CLEANUP: maybe move this to ./scripts/dataflow because the default value
# is False, and we might not want to override it in some cases.
options.save_main_session = True
# WorkerOptions
#
# configure logging
sdk_log_level = os.getenv("SDK_LOG_LEVEL", "info").upper()
options.default_sdk_harness_log_level = sdk_log_level
options.sdk_harness_log_level_overrides = {
"apache_beam.runners.dataflow": sdk_log_level,
}
# allows us to deploy multiple files and add dependencies
options.setup_file = os.path.join(PROJECT_PATH, "setup.py")
# DebugOptions
#
# for larger datasets
# options.dataflow_job_file = f"{options.job_name}_job_file.json"
# options.add_experiment("upload_graph")
#
# https://cloud.google.com/dataflow/docs/guides/data-sampling#python
# options.add_experiment("enable_data_sampling")
#
# this seems to be the default now
# https://cloud.google.com/dataflow/docs/runner-v2
# options.add_experiment("use_runner_v2")
# the remaining config is for using Google Cloud Dataflow, so bail now if no
# GCP project is specified.
if not options.project:
return options
# we'll be running on GCP, so use DataflowRunner
options.runner = DATAFLOW_RUNNER
# we need to modify the job name to disambiguate between services.
options.job_name = slugify(prepend_service_name(options.job_name))
# always capture results on GCP when we're running on GCP
options.gcs_output = True
options.bq_output = True
# GoogleCloudOptions
#
# labels help keep track of costs
options.labels = DEFAULT_LABELS.update(options.labels or {})
# keep dataflow files for each run in a consistent spot per project
options.staging_location = options.staging_location or gcs_bucket_path(
options.project,
options.folder_name,
"staging",
)
options.temp_location = options.temp_location or gcs_bucket_path(
options.project,
options.folder_name,
"temp",
)
return options
def _get_action(parser, arg_dest):
for a in parser._actions:
if a.dest == arg_dest:
return a
import logging
from ._common import (
DEFAULT_BQ_PROJECT,
DeleteFromDatastore,
ReadFromBigQuery,
ReadFromDatastore,
WriteToDatastore,
WriteToText,
beam,
gcs_bucket_path,
is_runner_local,
output_path,
prefix_slug,
suffix_parens,
suffix_slug,
)
from ._functions import (
Counter,
build_bq_schema,
entity_to_dict,
serialize_dict_values,
serialize_to_json,
)
logger = logging.getLogger(__package__)
ENTITIES = "entities"
KEYS = "keys"
FOUND = "found"
ERRORS = "errors"
SKIPPED = "skipped"
DELETED = "deleted"
JSONL = ".jsonl"
CSV = ".csv"
class QueryAndCountEntities(beam.PTransform):
"""A composite transform that accepts a (yet-to-be-run) pipeline (`p`),
queries Datastore with the provided `query` via `ReadFromDatastore()`,
counts the number of entities returned (via `Counter`), and returns
them as a `PCollection`.
Usage:
```py
# in DataflowJob.expand()
# ...
foo_query = Query(
kind='Foo',
namespace=options.datastore_namespace,
project=options.datastore_project,
**opt_args,
)
return p | QueryAndCountEntities(foo_query)
```
Args:
query (apache_beam.io.gcp.datastore.v1new.types.Query): Google Datastore
query used to fetch the entities.
num_splits (int, optional): Number of splits for the query. Defaults to 0.
Returns:
(PCollection(list(str))): A `PCollection` of the Google Datastore entities.
"""
COUNTER_LABEL = "Entities found in Datastore"
def __init__(self, query, num_splits=0):
super().__init__()
self.query = query
self.num_splits = num_splits
self.COUNTER_NAMESPACE = self.__class__.__name__
self.COUNT_ENTITIES = f"Count {query.kind} entities"
self.READ_ENTITIES = f"Read {query.kind} entities from Datastore"
def expand(self, p):
return (
p
| self.READ_ENTITIES >> ReadFromDatastore(self.query, self.num_splits)
| self.COUNT_ENTITIES
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL))
)
class UpdateEntities(beam.PTransform):
"""A composite transform that accepts a `PCollection` of Google Datastore
entities, calls the provided `entity_updater` on each entity, and then
writes the updated entities to the Datastore project specified in the
provided `options`.
The `entity_writer` is assumed to provide three tagged outputs (see
https://beam.apache.org/documentation/programming-guide/#additional-outputs)
and have the output tags stored as class properties (`OUTPUT_TAG_UPDATES`,
`OUTPUT_TAG_ERRORS`, `OUTPUT_TAG_SKIPPED`).
The entities in the 'errors' tagged output are sent to the `DumpEntities`
transform, and the entities in the 'skipped' tagged output are sent to the
`DumpKeys` transform.
Usage:
```py
# in DataflowJob.expand()
# ...
class UpdateFooToBar(beam.DoFn):
OUTPUT_TAG_UPDATES = "updates"
OUTPUT_TAG_SKIPPED = "skipped"
OUTPUT_TAG_ERRORS = "errors"
def process(self, entity):
# ...
return entity
# ...
| QueryAndCountEntities(foo_query)
| UpdateEntities(options, UpdateFooToBar())
```
Args:
options (PipelineOptions): This pipeline's options.
entity_updater (TaggedOutputDoFn): A `beam.DoFn` instance that handles
updating the entity or directs it to the applicable tagged output.
Note this function must return a list of one item because it's
executed via `beam.ParDo()`.
dry_run (bool, optional): Writes the updated entities to Datastore if
True. Defaults to False.
step_slug (str, optional): Suffix slug appended to transform labels, in
case the step is used multiple times. Defaults to None.
Returns:
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Entity))
"""
COUNTER_LABEL = "Entities updated in Datastore"
def __init__(self, options, entity_updater, dry_run=False, step_slug=None):
super().__init__()
self.options = options
self.entity_updater = entity_updater
self.dry_run = dry_run
self.step_slug = step_slug
self.datastore_project = options.datastore_project
self.throttle_rampup = not is_runner_local(options)
self.errors_slug = prefix_slug(ERRORS, step_slug)
self.skipped_slug = prefix_slug(SKIPPED, step_slug)
self.COUNTER_NAMESPACE = self.__class__.__name__
self.COUNT_ENTITIES = suffix_parens("Count Datastore entities", step_slug)
self.UPDATE_ENTITIES = suffix_parens("Update entities", step_slug)
self.WRITE_ENTITIES = suffix_parens(
"Write updated entities to Datastore", step_slug
)
def expand(self, entities):
(
updated_entities,
error_entities,
skipped_entities,
) = entities | self.UPDATE_ENTITIES >> beam.ParDo(
self.entity_updater
).with_outputs(
self.entity_updater.OUTPUT_TAG_ERRORS,
self.entity_updater.OUTPUT_TAG_SKIPPED,
main=self.entity_updater.OUTPUT_TAG_UPDATES,
)
error_entities | DumpEntities(self.options, step_slug=self.errors_slug)
skipped_entities | DumpKeys(self.options, step_slug=self.skipped_slug)
updated_entities | self.COUNT_ENTITIES >> beam.ParDo(
Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL)
)
if self.dry_run:
logger.info("This is a DRY RUN; Skipping write to Datastore.")
return updated_entities
updated_entities | self.WRITE_ENTITIES >> WriteToDatastore(
self.datastore_project, self.throttle_rampup
)
return updated_entities
class WriteEntityDictsToBigQuery(beam.PTransform):
"""A composite transform that accepts a `PCollection` of Google Datastore
entity dicts, conforms it to the BQ schema and writes it to a new BigQuery
table in the BQ project specified in the provided `options`.
https://cloud.google.com/dataflow/docs/guides/write-to-bigquery
Usage:
```py
# in DataflowJob.expand()
# ...
| QueryAndCountEntities(foo_query)
| WriteEntityDictsToBigQuery(options)
```
Args:
options (PipelineOptions): This pipeline's options.
bq_schema_fields (dict, optional): A dict that describes a custom
schema. Defaults to None, which means it uses the default
(`DEFAULT_BQ_FIELDS`)
step_slug (str, optional): Suffix slug appended to transform labels (and
BQ table names), in case the step is used multiple times. Defaults
to None.
"""
COUNTER_LABEL = "Entities written to BigQuery"
DEFAULT_BQ_FIELDS = [
{
"name": "key",
"type": "json",
"mode": "nullable",
},
{
"name": "properties",
"type": "json",
"mode": "nullable",
},
]
def __init__(self, options, bq_schema_fields=None, step_slug=None):
super().__init__()
self.options = options
self.step_slug = step_slug
self.bq_project = options.bq_project
self.bq_dataset = options.bq_dataset
self.bq_table = suffix_slug(options.bq_table_name, step_slug)
self.bq_schema = build_bq_schema(
fields=bq_schema_fields or self.DEFAULT_BQ_FIELDS
)
self.bq_path = f"{self.bq_project}:{self.bq_dataset}/{self.bq_table}"
self.COUNTER_NAMESPACE = self.__class__.__name__
self.COUNT_ENTITIES = suffix_parens("Count Datastore entities", step_slug)
self.PREPARE_ENTITIES = suffix_parens(
"Prepare entity dicts for BQ schema", step_slug
)
self.WRITE_ENTITIES = suffix_parens(
f"Write entity dicts to BiqQuery in {self.bq_path}", step_slug
)
def expand(self, entity_dicts):
bq_args = dict(
table=self.bq_table,
dataset=self.bq_dataset,
project=self.bq_project,
schema=self.bq_schema,
# CLEANUP: change to STORAGE_WRITE_API; STREAMING_INSERTS is deprecated
method="STREAMING_INSERTS",
# create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
)
(
entity_dicts
| self.COUNT_ENTITIES
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL))
| self.PREPARE_ENTITIES >> beam.Map(serialize_dict_values)
| self.WRITE_ENTITIES >> beam.io.WriteToBigQuery(**bq_args)
)
class DumpKeys(beam.PTransform):
"""A composite transform that accepts a `PCollection` of Google Datastore
entities, gets the entities' keys, serializes them to JSON, and writes them
to a `keys.jsonl` file (local or GCS).
Usage:
```py
# in DataflowJob.expand()
# ...
| QueryAndCountEntities(foo_query)
| DumpKeys(options)
```
Args:
options (PipelineOptions): This pipeline's options.
step_slug (str, optional): Suffix slug appended to transform labels, in
case the step is used multiple times. Defaults to None.
Returns:
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Key))
"""
COUNTER_LABEL = "Keys dumped to file"
def __init__(self, options, step_slug=None):
super().__init__()
self.options = options
self.step_slug = step_slug
path = output_path(options)
self.keys_file = suffix_slug(KEYS, self.step_slug)
self.keys_path = f"{path}/{self.keys_file}"
self.COUNTER_NAMESPACE = self.__class__.__name__
self.COUNT_ENTITIES = suffix_parens("Count Datastore entity keys", step_slug)
self.GET_KEYS = suffix_parens("Get keys of entities", step_slug)
self.WRITE_KEYS = suffix_parens(
f"Write keys to file {self.keys_file}", step_slug
)
def expand(self, entities):
keys = (
entities
| self.COUNT_ENTITIES
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL))
| self.GET_KEYS >> beam.Map(lambda ent: ent.key)
)
keys | self.WRITE_KEYS >> WriteToText(self.keys_path, file_name_suffix=JSONL)
return keys
class DumpEntities(beam.PTransform):
"""A composite transform that accepts a `PCollection` of Google Datastore
entities, converts each into a `dict` and then:
* ...serializes it to JSON, and writes all entities to a `entities.jsonl`
file (local or in the GCS project specified in the provided `options`)
* ...conforms it to the BQ schema and writes all entities to a new
BigQuery table in the BQ project specified in the provided `options`
Usage:
```py
# in DataflowJob.expand()
# ...
| QueryAndCountEntities(foo_query)
| DumpEntities(options)
```
Args:
options (PipelineOptions): This pipeline's options.
bq_schema_fields (dict, optional): A dict that describes a custom
schema. Defaults to None, which means it uses the default
(`DEFAULT_BQ_FIELDS`)
props_formatter (callable, optional): Callable that accepts an entity's
`properties` and returns a `dict`. Defaults to None, which means we
use `properties` attribute as-is.
step_slug (str, optional): Suffix slug appended to transform labels, in
case the step is used multiple times. Defaults to None.
Returns:
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Entity))
"""
COUNTER_LABEL = "Entities dumped to file"
def __init__(
self,
options,
bq_schema_fields=None,
props_formatter=None,
step_slug=None,
):
super().__init__()
self.options = options
self.props_formatter = props_formatter
self.bq_schema_fields = bq_schema_fields
self.step_slug = step_slug
path = output_path(options)
self.file_name = suffix_slug(ENTITIES, self.step_slug)
self.file_path = f"{path}/{self.file_name}"
self.COUNTER_NAMESPACE = self.__class__.__name__
self.COUNT_ENTITIES = suffix_parens("Count Datastore entities", step_slug)
self.CONVERT_ENTITIES = suffix_parens(
"Convert Datastore entities to dicts", step_slug
)
self.SERIALIZE_ENTITIES = suffix_parens(
"Serialize entity dicts to JSON", step_slug
)
self.WRITE_FILE = suffix_parens(
"Write serialized entity dicts to file", step_slug
)
self.WRITE_ENTITIES = suffix_parens(
"Write serialized entity dicts to BigQuery", step_slug
)
def expand(self, entities):
entity_dicts = (
entities
| self.COUNT_ENTITIES
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL))
| self.CONVERT_ENTITIES
>> beam.Map(lambda ent: entity_to_dict(ent, self.props_formatter))
)
(
entity_dicts
| self.SERIALIZE_ENTITIES >> beam.Map(serialize_to_json)
| self.WRITE_FILE >> WriteToText(self.file_path, file_name_suffix=JSONL)
)
if self.options.bq_output:
entity_dicts | self.WRITE_ENTITIES >> WriteEntityDictsToBigQuery(
self.options, self.bq_schema_fields, self.step_slug
)
return entities
class DeleteEntities(beam.PTransform):
"""A composite transform that accepts a `PCollection` of Google Datastore
entities, gets the entities' keys, saves them to a file (local or GCS), and
deletes Entities by key in the Datastore project specified in the provided
`options`.
Usage:
```py
# in DataflowJob.expand()
# ...
| QueryAndCountEntities(foo_query)
| DeleteEntities(options)
```
Args:
options (PipelineOptions): This pipeline's options.
dry_run (bool, optional): Writes the updated entities to Datastore if
True. Defaults to False.
step_slug (str, optional): Suffix slug appended to transform labels, in
case the step is used multiple times. Defaults to None.
Returns:
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Entity))
"""
COUNTER_LABEL = "Entities deleted from Datastore"
def __init__(self, options, dry_run=False, step_slug=None):
super().__init__()
self.options = options
self.dry_run = dry_run
self.step_slug = step_slug
self.datastore_project = options.datastore_project
self.throttle_rampup = not is_runner_local(options)
path = output_path(options)
self.file_name = suffix_slug(DELETED, self.step_slug)
self.file_path = f"{path}/{self.file_name}"
self.delete_slug = prefix_slug(DELETED, self.step_slug)
self.COUNTER_NAMESPACE = self.__class__.__name__
self.COUNT_ENTITIES = suffix_parens("Count Datastore entities", step_slug)
self.DELETE_ENTITIES = suffix_parens(
"Delete entities from Datastore", step_slug
)
def expand(self, entities):
keys = (
entities
| self.COUNT_ENTITIES
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL))
| DumpKeys(self.options, step_slug=self.delete_slug)
)
if self.dry_run:
logger.info("This is a DRY RUN; Skipping delete from Datastore.")
return
keys | self.DELETE_ENTITIES >> DeleteFromDatastore(
self.datastore_project,
self.throttle_rampup,
)
return entities
class ExecuteFnForEntities(beam.PTransform):
"""A composite transform that accepts a `PCollection` of Google Datastore
entities, calls the provided `entity_handler` on each entity and returns
the provided entities.
The `entity_writer` is assumed to provide three tagged outputs (see
https://beam.apache.org/documentation/programming-guide/#additional-outputs)
and have the output tags stored as class properties (`OUTPUT_TAG_UPDATES`,
`OUTPUT_TAG_ERRORS`, `OUTPUT_TAG_SKIPPED`).
The entities in the 'errors' tagged output are sent to the `DumpEntities`
transform, and the entities in the 'skipped' tagged output are sent to the
`DumpKeys` transform.
Usage:
```py
# in DataflowJob.expand()
# ...
class FooWithEntity(TaggedOutputDoFn):
def process(self, entity):
# ...
return entity
class BarWithEntity(TaggedOutputDoFn):
def process(self, entity):
# ...
return entity
# ...
| QueryAndCountEntities(foo_query)
| ExecuteFnForEntities(options, entity_handler=FooWithEntity(), step_slug="foo")
| ExecuteFnForEntities(options, entity_handler=BarWithEntity(), step_slug="bar")
```
Args:
options (PipelineOptions): This pipeline's options.
entity_handler (TaggedOutputDoFn): A `beam.DoFn` instance that does
something with the entity and directs it to the applicable tagged
output. Note this function must return a list of one item because
it's executed via `beam.ParDo()`.
step_slug (str, optional): Suffix slug appended to transform labels, in
case the step is used multiple times. Defaults to None.
Returns:
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Entity))
"""
def __init__(self, options, entity_handler, step_slug=None, **kwargs):
super().__init__()
self.options = options
self.entity_handler = entity_handler
self.step_slug = step_slug
self.kwargs = kwargs
self.throttle_rampup = not is_runner_local(options)
self.errors_slug = prefix_slug(ERRORS, step_slug)
self.skipped_slug = prefix_slug(SKIPPED, step_slug)
self.EXEC_FN = suffix_parens("ExecuteFn", step_slug)
def expand(self, entities):
(
success_entities,
error_entities,
skipped_entities,
) = entities | self.EXEC_FN >> beam.ParDo(
self.entity_handler, self.kwargs
).with_outputs(
self.entity_handler.OUTPUT_TAG_ERRORS,
self.entity_handler.OUTPUT_TAG_SKIPPED,
main=self.entity_handler.OUTPUT_TAG_SUCCESS,
)
error_entities | DumpEntities(self.options, step_slug=self.errors_slug)
skipped_entities | DumpKeys(self.options, step_slug=self.skipped_slug)
return success_entities
class RunQueryInBq(beam.PTransform):
"""A thin wrapper around the `apache_beam.io.gcp.big_query.ReadFromBigQuery`
PTransform to make using it easier. Any additional keyword arguments after
`query` are passed directly to `ReadFromBigQuery()`.
Helpful things we do:
* Append `LIMIT n` to the provided `query` if `options.limit` is set and log
a warning to make it visible.
* Log SQL query
* Set the `project` arg to `options.bq_project` (with fall back to
`DEFAULT_BQ_PROJECT`)
* Set the `gcs_location` arg to the 'bqtemp' folder in the Dataflow job's
output folder.
* Set `use_standard_sql=True` (pass `use_standard_sql=False` to override)
"""
COUNTER_LABEL = "Rows returned from BigQuery"
def __init__(self, options, query, step_slug=None, **kwargs):
super().__init__()
self.options = options
self.bq_project = options.bq_project or DEFAULT_BQ_PROJECT
query_lines = [query]
# courtesy of DataflowOptions
if options.limit:
query_lines.append(f"LIMIT {options.limit}")
logger.warning("Appending 'LIMIT %s' to SQL query", options.limit)
self.query = "\n".join(query_lines)
logger.info("Using SQL query: %s", self.query)
self.gcs_location = gcs_bucket_path(
self.bq_project, options.folder_name, "bqtemp"
)
self.RUN_QUERY = suffix_parens("Run Query in BQ", step_slug)
self.COUNT_ROWS = "Count rows"
self.COUNTER_NAMESPACE = self.__class__.__name__
if "use_standard_sql" not in kwargs:
kwargs["use_standard_sql"] = True
self.read_from_bq_kwargs = kwargs
def expand(self, p):
return (
p
| self.RUN_QUERY
>> ReadFromBigQuery(
query=self.query,
project=self.bq_project,
gcs_location=self.gcs_location,
**self.read_from_bq_kwargs,
)
| self.COUNT_ROWS
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL))
)
class WriteTextToFile(beam.PTransform):
"""A convenience wrapper around `beam.io.WriteToText` that takes care of the
following:
- accepts a filename (`output_file`) sets the `file_path_prefix` and
`file_path_suffix` args accordingly. When an extension is not set, ".txt"
is used.
- writes the file to the Dataflow job's GCS bucket (or local dir)
- counts lines written to file
"""
COUNTER_LABEL = "Lines written to file"
def __init__(self, options, output_file, step_slug=None, **kwargs):
super().__init__()
self.options = options
try:
(filename, suffix) = output_file.split(".")
except ValueError:
filename = output_file
suffix = "txt"
self.file_path_prefix = output_path(options, filename)
self.file_name_suffix = f".{suffix}"
self.WRITE_FILE = suffix_parens("Write file", step_slug)
self.COUNT_LINES = "Count lines"
self.COUNTER_NAMESPACE = self.__class__.__name__
self.write_to_text_kwargs = kwargs
def expand(self, p):
return (
p
| self.COUNT_LINES
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL))
| self.WRITE_FILE
>> beam.io.WriteToText(
file_path_prefix=self.file_path_prefix,
file_name_suffix=self.file_name_suffix,
**self.write_to_text_kwargs,
)
)
class WriteCsvToFile(beam.PTransform):
"""A convenience wrapper around the `WriteTextToFile` (`PTransform`)
configured for writing CSV files which handles the following:
- appends ".csv" to `filename` if not already present
- passes `header` arg (for `beam.io.WriteToText`) along
- accepts a `row_formatter` (callable) that accepts a "row" and is expected
to return a `str`.
"""
def __init__(self, options, filename, header, row_formatter, step_slug=None):
super().__init__()
self.options = options
self.filename = filename if filename.endswith(CSV) else filename + CSV
self.header = header
self.row_formatter = row_formatter
self.file_slug = prefix_slug("written", step_slug)
self.FORMAT_DATA = "Format data as CSV"
self.WRITE_CSV = "Write CSV to file"
def expand(self, p):
return (
p
| self.FORMAT_DATA >> beam.Map(self.row_formatter)
| self.WRITE_CSV
>> WriteTextToFile(
self.options,
self.filename,
header=self.header,
step_slug=self.file_slug,
)
)
#!/usr/bin/env python
"""A tiny CLI interface to read a service's `app.yaml` and manipulate it
in-memory only, without persisting changes to disk. See CLI help for more info.
Reading and manipulating `app.yaml` are useful things to do in local dev, and
helps us avoid hardcoding in our scripts/tooling, thereby reducing errors and
generally making our tooling more pleasant to use.
"""
import argparse
import os
import sys
if __package__ is None and __name__ == "__main__":
usage = [
(
"Error: This script must be used in the context of a Python module. "
"See `python -m scripts.app_yaml -h` for full usage."
),
"Hint: Run `./scripts/app_yaml` instead.",
]
print("\n\n".join(usage), file=sys.stderr)
sys.exit(1)
import six
# yaml is included in the google cloud SDK, so it's only available when this
# module is part of the scripts package (thanks to __init__.py).
#
import yaml
# same deal as yaml re: pkg, except dotenv is a dev dep.
from dotenv import dotenv_values
APP_YAML = "app.yaml"
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
APP_YAML_PATH = os.path.join(PROJECT_ROOT, APP_YAML)
# since we no longer have the `lib.dev` dir, we need a way to import packages
# from our virtualenv in `.venv2` when running locally under `dev_appserver.py`.
#
# by default, paths that begin with '.' are disallowed via the
# `DEFAULT_SKIP_FILES` regexp: ^(.*/)?((#.*#)|(.*~)|(.*\.py[co])|(.*/RCS/.*)|(\..*)|)$
# `(see $GAE_SDK_ROOT/python27/sdk/google/appengine/api/appinfo.py`)
#
# we can override the regexp by setting the `skip_files` in `app.yaml` during
# local development via `cli_gaedev()` below. we don't want to modify the
# checked-in `app.yaml` because that can get deployed directly via `gcloud`
# (bypassing `./scripts/deploy`) and `skip_files` overrides `.gcloudignore`.
#
# this approach is intended for Python 2, but it also works with Python 3 which,
# by default, uses a dynamically created venv on each launch, thereby facing
# similar issues.
#
# NOTE: this regexp is the same as `DEFAULT_SKIP_FILES` minus `(\..*)` -- the
# bit that excludes paths beginning with a "dot".
#
GAEDEV_SKIP_FILES = r"^(.*/)?((#.*#)|(.*~)|(.*\.py[co])|(.*/RCS/.*)|)$"
def _read_yaml_file(file_path):
if not os.path.exists(file_path):
raise FileNotFoundError(file_path)
with open(file_path, "r") as stream:
yml = yaml.safe_load(stream)
return yml
class AppYaml(object):
def __init__(self, file_path=APP_YAML_PATH):
self.config = _read_yaml_file(file_path)
def update(self, section, new_data):
data = self.config.get(section)
data.update(new_data)
self.config.update({section: data})
def import_dotenv(self, filename=None):
dotenv_path = filename or ".env"
env_vars = dotenv_values(dotenv_path=dotenv_path, verbose=True)
self.update("env_variables", env_vars)
def get(self, prop):
# allow for object notation (a.b) arbitrary levels deep. simple, but not
# ideal. leveraging KeyError fall-through here so that we know if a key
# exists or not. we don't want users thinking they got an default value when
# the key doesn't exist.
data = self.config
for p in prop.split("."):
data = data[p]
return data
# ----
def cli_get(parser, args, app_yaml):
if not args.prop:
print(parser.format_help(), file=sys.stderr)
sys.exit(1)
data = app_yaml.get(args.prop)
# CLEANUP: this is just a quick hack for formatting env vars, our
# primary need at the moment.
if isinstance(data, dict):
# CLEANUP: handle quote escaping
lines = ['{}="{}"'.format(k, v) for k, v in six.iteritems(data)]
print("\n".join(sorted(lines)))
else:
print(data)
return
def cli_import_dotenv(_parser, args, app_yaml):
app_yaml.import_dotenv(args.env)
if args.dump:
print(yaml.safe_dump(app_yaml.config), file=sys.stdout)
def cli_gaedev(_parser, args, app_yaml):
app_yaml.import_dotenv(args.env)
app_yaml.config["skip_files"] = GAEDEV_SKIP_FILES
if args.dump:
print(yaml.safe_dump(app_yaml.config), file=sys.stdout)
def cli_missing_method(_parser, args, _app_yaml):
raise NotImplementedError("Missing handler for {}.".format(args.cmd))
# ----
parser = argparse.ArgumentParser(
description="Read and manipulate gcloud app.yaml files."
)
parser.add_argument(
"-f",
"--file",
action="store",
help="App YAML filename/path",
default=APP_YAML_PATH,
)
commands = parser.add_subparsers(help="sub-commands", dest="cmd")
parser_get = commands.add_parser(
"get", help="Prints specified top-level property to stdout."
)
parser_get.add_argument(
"prop",
type=str,
help="Property to retrieve, in dotted notation (e.g.: 'a', 'a.b')",
)
parser_gae = commands.add_parser(
"gaedev",
help="Merge .env file contents into 'env_variables' property and set skip_files",
)
# same args as import_dotenv
parser_gae.add_argument(
"-e",
"--env",
action="store",
nargs="?",
const=None,
default=None,
help="Name of .env file to read from. Defaults to .env",
)
# same args as import_dotenv
parser_gae.add_argument(
"-d",
"--dump",
action="store_true",
help="Dump entire config to output serialized as YAML",
)
parser_imp = commands.add_parser(
"import_dotenv", help="Merge .env file contents into 'env_variables' property."
)
# same args as gaedev
parser_imp.add_argument(
"-e",
"--env",
action="store",
nargs="?",
const=None,
default=None,
help="Name of .env file to read from. Defaults to .env",
)
# same args as gaedev
parser_imp.add_argument(
"-d",
"--dump",
action="store_true",
help="Dump entire config to output serialized as YAML",
)
def cli():
args = parser.parse_args()
app_yaml = AppYaml(args.file)
arg_method_map = {
"get": cli_get,
"import_dotenv": cli_import_dotenv,
"gaedev": cli_gaedev,
}
arg_method_map.get(args.cmd, cli_missing_method)(parser, args, app_yaml)
if __name__ == "__main__":
try:
cli()
except NotImplementedError:
parser.print_usage()
except Exception as e:
print("Error: {} ({})".format(e, e.__class__.__name__), file=sys.stderr)
sys.exit(1)
sys.exit(0)
import logging
from ._cli import module_main
from ._common import (
SERVICE_NAME,
X_GEN_API_KEY,
BaseDataflowJob,
Query,
appspot_service_url,
beam,
is_enabled,
is_runner_local,
)
from ._functions import CreateTaskWithEntity, SkipEntityIf
from ._transforms import ExecuteFnForEntities, QueryAndCountEntities
logger = logging.getLogger(__name__)
# dataflow seems to choose a low value for `num_splits` (~12) when left
# unspecified. the value of `num_splits` effectively limits the max # of
# workers dataflow will scale to. in order to get through our > 3m entities at a
# much faster rate, we're manually specifying a high # of splits.
NUM_SPLITS = 128
QUEUE_NAME = "migrate-user-keys"
VERSION = "migrate-user-keys"
PATH = "/api/migrate/user-key"
class DataflowJob(BaseDataflowJob):
def expand(self, p):
options = self.options
if not options.gct_region:
raise RuntimeError("--gct_region is required for Cloud Tasks API")
opt_args = {}
# courtesy of DataflowOptions
if options.limit:
opt_args["limit"] = options.limit
logger.warning("Datastore query using limit = %s", options.limit)
migration_query = Query(
kind="User",
namespace=options.datastore_namespace,
project=options.datastore_project,
filters=[
# (property_name, operator, value)
],
# WARNING: using the `order` arg requires a specific index for the
# ordered property. it also causes query to return entities much
# slower because access is no longer random, thereby significantly
# slowing down the whole job. as such, this is typically used in
# combination with the `limit` arg only when re-running a migration
# to process entities added/updated during the migration.
#
# order=["-property"],
#
# projection=['property_1', 'property_2']
**opt_args,
)
def prepare_data(entity):
"""Returns JSON_serializable data for current entity.
Args:
entity (google.cloud.datastore.entity.Entity): entity
Returns:
dict: POST data
"""
return {
"atid": entity["atid"],
"email": entity["email"],
"ndb_id": entity.key.id_or_name,
}
def is_already_processed(entity):
# If atid/email don't exist, will throw a KeyError and be handled as
# appropriate by the caller.
return entity["atid"] == entity.key.id_or_name
service_url = appspot_service_url(
options.datastore_project, # already "or'd" with options.project
SERVICE_NAME,
VERSION,
)
url = f"{service_url}{PATH}"
logger.info("Using API URL: %s", url)
headers = {
"x-gen-api-key": X_GEN_API_KEY,
}
dry_run = is_enabled(self.env.get("DRY_RUN", ""))
query_opts = {} if is_runner_local(options) else {"num_splits": NUM_SPLITS}
return (
p
| QueryAndCountEntities(migration_query, **query_opts)
| "Skip Processed Entities"
>> beam.ParDo(SkipEntityIf(options, is_already_processed))
| "Migrate Entities"
>> ExecuteFnForEntities(
options,
entity_handler=CreateTaskWithEntity(
options,
queue_name=QUEUE_NAME,
service=SERVICE_NAME,
version=VERSION,
path=PATH,
prepare_data=prepare_data,
headers=headers,
dry_run=dry_run,
),
)
)
# ---
# allow running this module directly via `python3 -m dataflow.foo`
if __name__ == "__main__":
module_main(__spec__)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment