|
import logging |
|
|
|
from ._common import ( |
|
DEFAULT_BQ_PROJECT, |
|
DeleteFromDatastore, |
|
ReadFromBigQuery, |
|
ReadFromDatastore, |
|
WriteToDatastore, |
|
WriteToText, |
|
beam, |
|
gcs_bucket_path, |
|
is_runner_local, |
|
output_path, |
|
prefix_slug, |
|
suffix_parens, |
|
suffix_slug, |
|
) |
|
from ._functions import ( |
|
Counter, |
|
build_bq_schema, |
|
entity_to_dict, |
|
serialize_dict_values, |
|
serialize_to_json, |
|
) |
|
|
|
logger = logging.getLogger(__package__) |
|
|
|
ENTITIES = "entities" |
|
KEYS = "keys" |
|
FOUND = "found" |
|
ERRORS = "errors" |
|
SKIPPED = "skipped" |
|
DELETED = "deleted" |
|
|
|
JSONL = ".jsonl" |
|
CSV = ".csv" |
|
|
|
|
|
class QueryAndCountEntities(beam.PTransform): |
|
"""A composite transform that accepts a (yet-to-be-run) pipeline (`p`), |
|
queries Datastore with the provided `query` via `ReadFromDatastore()`, |
|
counts the number of entities returned (via `Counter`), and returns |
|
them as a `PCollection`. |
|
|
|
Usage: |
|
```py |
|
# in DataflowJob.expand() |
|
# ... |
|
foo_query = Query( |
|
kind='Foo', |
|
namespace=options.datastore_namespace, |
|
project=options.datastore_project, |
|
**opt_args, |
|
) |
|
|
|
return p | QueryAndCountEntities(foo_query) |
|
``` |
|
|
|
Args: |
|
query (apache_beam.io.gcp.datastore.v1new.types.Query): Google Datastore |
|
query used to fetch the entities. |
|
num_splits (int, optional): Number of splits for the query. Defaults to 0. |
|
|
|
Returns: |
|
(PCollection(list(str))): A `PCollection` of the Google Datastore entities. |
|
""" |
|
|
|
COUNTER_LABEL = "Entities found in Datastore" |
|
|
|
def __init__(self, query, num_splits=0): |
|
super().__init__() |
|
|
|
self.query = query |
|
self.num_splits = num_splits |
|
|
|
self.COUNTER_NAMESPACE = self.__class__.__name__ |
|
self.COUNT_ENTITIES = f"Count {query.kind} entities" |
|
self.READ_ENTITIES = f"Read {query.kind} entities from Datastore" |
|
|
|
def expand(self, p): |
|
return ( |
|
p |
|
| self.READ_ENTITIES >> ReadFromDatastore(self.query, self.num_splits) |
|
| self.COUNT_ENTITIES |
|
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL)) |
|
) |
|
|
|
|
|
class UpdateEntities(beam.PTransform): |
|
"""A composite transform that accepts a `PCollection` of Google Datastore |
|
entities, calls the provided `entity_updater` on each entity, and then |
|
writes the updated entities to the Datastore project specified in the |
|
provided `options`. |
|
|
|
The `entity_writer` is assumed to provide three tagged outputs (see |
|
https://beam.apache.org/documentation/programming-guide/#additional-outputs) |
|
and have the output tags stored as class properties (`OUTPUT_TAG_UPDATES`, |
|
`OUTPUT_TAG_ERRORS`, `OUTPUT_TAG_SKIPPED`). |
|
|
|
The entities in the 'errors' tagged output are sent to the `DumpEntities` |
|
transform, and the entities in the 'skipped' tagged output are sent to the |
|
`DumpKeys` transform. |
|
|
|
Usage: |
|
```py |
|
# in DataflowJob.expand() |
|
# ... |
|
class UpdateFooToBar(beam.DoFn): |
|
OUTPUT_TAG_UPDATES = "updates" |
|
OUTPUT_TAG_SKIPPED = "skipped" |
|
OUTPUT_TAG_ERRORS = "errors" |
|
|
|
def process(self, entity): |
|
# ... |
|
return entity |
|
|
|
# ... |
|
| QueryAndCountEntities(foo_query) |
|
| UpdateEntities(options, UpdateFooToBar()) |
|
``` |
|
|
|
Args: |
|
options (PipelineOptions): This pipeline's options. |
|
entity_updater (TaggedOutputDoFn): A `beam.DoFn` instance that handles |
|
updating the entity or directs it to the applicable tagged output. |
|
Note this function must return a list of one item because it's |
|
executed via `beam.ParDo()`. |
|
dry_run (bool, optional): Writes the updated entities to Datastore if |
|
True. Defaults to False. |
|
step_slug (str, optional): Suffix slug appended to transform labels, in |
|
case the step is used multiple times. Defaults to None. |
|
|
|
Returns: |
|
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Entity)) |
|
""" |
|
|
|
COUNTER_LABEL = "Entities updated in Datastore" |
|
|
|
def __init__(self, options, entity_updater, dry_run=False, step_slug=None): |
|
super().__init__() |
|
|
|
self.options = options |
|
self.entity_updater = entity_updater |
|
self.dry_run = dry_run |
|
self.step_slug = step_slug |
|
|
|
self.datastore_project = options.datastore_project |
|
self.throttle_rampup = not is_runner_local(options) |
|
|
|
self.errors_slug = prefix_slug(ERRORS, step_slug) |
|
self.skipped_slug = prefix_slug(SKIPPED, step_slug) |
|
|
|
self.COUNTER_NAMESPACE = self.__class__.__name__ |
|
self.COUNT_ENTITIES = suffix_parens("Count Datastore entities", step_slug) |
|
self.UPDATE_ENTITIES = suffix_parens("Update entities", step_slug) |
|
self.WRITE_ENTITIES = suffix_parens( |
|
"Write updated entities to Datastore", step_slug |
|
) |
|
|
|
def expand(self, entities): |
|
( |
|
updated_entities, |
|
error_entities, |
|
skipped_entities, |
|
) = entities | self.UPDATE_ENTITIES >> beam.ParDo( |
|
self.entity_updater |
|
).with_outputs( |
|
self.entity_updater.OUTPUT_TAG_ERRORS, |
|
self.entity_updater.OUTPUT_TAG_SKIPPED, |
|
main=self.entity_updater.OUTPUT_TAG_UPDATES, |
|
) |
|
|
|
error_entities | DumpEntities(self.options, step_slug=self.errors_slug) |
|
skipped_entities | DumpKeys(self.options, step_slug=self.skipped_slug) |
|
|
|
updated_entities | self.COUNT_ENTITIES >> beam.ParDo( |
|
Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL) |
|
) |
|
|
|
if self.dry_run: |
|
logger.info("This is a DRY RUN; Skipping write to Datastore.") |
|
return updated_entities |
|
|
|
updated_entities | self.WRITE_ENTITIES >> WriteToDatastore( |
|
self.datastore_project, self.throttle_rampup |
|
) |
|
|
|
return updated_entities |
|
|
|
|
|
class WriteEntityDictsToBigQuery(beam.PTransform): |
|
"""A composite transform that accepts a `PCollection` of Google Datastore |
|
entity dicts, conforms it to the BQ schema and writes it to a new BigQuery |
|
table in the BQ project specified in the provided `options`. |
|
|
|
https://cloud.google.com/dataflow/docs/guides/write-to-bigquery |
|
|
|
Usage: |
|
```py |
|
# in DataflowJob.expand() |
|
# ... |
|
| QueryAndCountEntities(foo_query) |
|
| WriteEntityDictsToBigQuery(options) |
|
``` |
|
|
|
Args: |
|
options (PipelineOptions): This pipeline's options. |
|
bq_schema_fields (dict, optional): A dict that describes a custom |
|
schema. Defaults to None, which means it uses the default |
|
(`DEFAULT_BQ_FIELDS`) |
|
step_slug (str, optional): Suffix slug appended to transform labels (and |
|
BQ table names), in case the step is used multiple times. Defaults |
|
to None. |
|
""" |
|
|
|
COUNTER_LABEL = "Entities written to BigQuery" |
|
|
|
DEFAULT_BQ_FIELDS = [ |
|
{ |
|
"name": "key", |
|
"type": "json", |
|
"mode": "nullable", |
|
}, |
|
{ |
|
"name": "properties", |
|
"type": "json", |
|
"mode": "nullable", |
|
}, |
|
] |
|
|
|
def __init__(self, options, bq_schema_fields=None, step_slug=None): |
|
super().__init__() |
|
|
|
self.options = options |
|
self.step_slug = step_slug |
|
|
|
self.bq_project = options.bq_project |
|
self.bq_dataset = options.bq_dataset |
|
self.bq_table = suffix_slug(options.bq_table_name, step_slug) |
|
self.bq_schema = build_bq_schema( |
|
fields=bq_schema_fields or self.DEFAULT_BQ_FIELDS |
|
) |
|
|
|
self.bq_path = f"{self.bq_project}:{self.bq_dataset}/{self.bq_table}" |
|
|
|
self.COUNTER_NAMESPACE = self.__class__.__name__ |
|
self.COUNT_ENTITIES = suffix_parens("Count Datastore entities", step_slug) |
|
self.PREPARE_ENTITIES = suffix_parens( |
|
"Prepare entity dicts for BQ schema", step_slug |
|
) |
|
self.WRITE_ENTITIES = suffix_parens( |
|
f"Write entity dicts to BiqQuery in {self.bq_path}", step_slug |
|
) |
|
|
|
def expand(self, entity_dicts): |
|
bq_args = dict( |
|
table=self.bq_table, |
|
dataset=self.bq_dataset, |
|
project=self.bq_project, |
|
schema=self.bq_schema, |
|
# CLEANUP: change to STORAGE_WRITE_API; STREAMING_INSERTS is deprecated |
|
method="STREAMING_INSERTS", |
|
# create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, |
|
# write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY, |
|
) |
|
|
|
( |
|
entity_dicts |
|
| self.COUNT_ENTITIES |
|
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL)) |
|
| self.PREPARE_ENTITIES >> beam.Map(serialize_dict_values) |
|
| self.WRITE_ENTITIES >> beam.io.WriteToBigQuery(**bq_args) |
|
) |
|
|
|
|
|
class DumpKeys(beam.PTransform): |
|
"""A composite transform that accepts a `PCollection` of Google Datastore |
|
entities, gets the entities' keys, serializes them to JSON, and writes them |
|
to a `keys.jsonl` file (local or GCS). |
|
|
|
Usage: |
|
```py |
|
# in DataflowJob.expand() |
|
# ... |
|
| QueryAndCountEntities(foo_query) |
|
| DumpKeys(options) |
|
``` |
|
|
|
Args: |
|
options (PipelineOptions): This pipeline's options. |
|
step_slug (str, optional): Suffix slug appended to transform labels, in |
|
case the step is used multiple times. Defaults to None. |
|
|
|
Returns: |
|
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Key)) |
|
""" |
|
|
|
COUNTER_LABEL = "Keys dumped to file" |
|
|
|
def __init__(self, options, step_slug=None): |
|
super().__init__() |
|
|
|
self.options = options |
|
self.step_slug = step_slug |
|
|
|
path = output_path(options) |
|
self.keys_file = suffix_slug(KEYS, self.step_slug) |
|
self.keys_path = f"{path}/{self.keys_file}" |
|
|
|
self.COUNTER_NAMESPACE = self.__class__.__name__ |
|
self.COUNT_ENTITIES = suffix_parens("Count Datastore entity keys", step_slug) |
|
self.GET_KEYS = suffix_parens("Get keys of entities", step_slug) |
|
self.WRITE_KEYS = suffix_parens( |
|
f"Write keys to file {self.keys_file}", step_slug |
|
) |
|
|
|
def expand(self, entities): |
|
keys = ( |
|
entities |
|
| self.COUNT_ENTITIES |
|
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL)) |
|
| self.GET_KEYS >> beam.Map(lambda ent: ent.key) |
|
) |
|
keys | self.WRITE_KEYS >> WriteToText(self.keys_path, file_name_suffix=JSONL) |
|
|
|
return keys |
|
|
|
|
|
class DumpEntities(beam.PTransform): |
|
"""A composite transform that accepts a `PCollection` of Google Datastore |
|
entities, converts each into a `dict` and then: |
|
|
|
* ...serializes it to JSON, and writes all entities to a `entities.jsonl` |
|
file (local or in the GCS project specified in the provided `options`) |
|
|
|
* ...conforms it to the BQ schema and writes all entities to a new |
|
BigQuery table in the BQ project specified in the provided `options` |
|
|
|
Usage: |
|
```py |
|
# in DataflowJob.expand() |
|
# ... |
|
| QueryAndCountEntities(foo_query) |
|
| DumpEntities(options) |
|
``` |
|
|
|
Args: |
|
options (PipelineOptions): This pipeline's options. |
|
bq_schema_fields (dict, optional): A dict that describes a custom |
|
schema. Defaults to None, which means it uses the default |
|
(`DEFAULT_BQ_FIELDS`) |
|
props_formatter (callable, optional): Callable that accepts an entity's |
|
`properties` and returns a `dict`. Defaults to None, which means we |
|
use `properties` attribute as-is. |
|
step_slug (str, optional): Suffix slug appended to transform labels, in |
|
case the step is used multiple times. Defaults to None. |
|
|
|
Returns: |
|
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Entity)) |
|
""" |
|
|
|
COUNTER_LABEL = "Entities dumped to file" |
|
|
|
def __init__( |
|
self, |
|
options, |
|
bq_schema_fields=None, |
|
props_formatter=None, |
|
step_slug=None, |
|
): |
|
super().__init__() |
|
|
|
self.options = options |
|
self.props_formatter = props_formatter |
|
self.bq_schema_fields = bq_schema_fields |
|
self.step_slug = step_slug |
|
|
|
path = output_path(options) |
|
self.file_name = suffix_slug(ENTITIES, self.step_slug) |
|
self.file_path = f"{path}/{self.file_name}" |
|
|
|
self.COUNTER_NAMESPACE = self.__class__.__name__ |
|
self.COUNT_ENTITIES = suffix_parens("Count Datastore entities", step_slug) |
|
self.CONVERT_ENTITIES = suffix_parens( |
|
"Convert Datastore entities to dicts", step_slug |
|
) |
|
self.SERIALIZE_ENTITIES = suffix_parens( |
|
"Serialize entity dicts to JSON", step_slug |
|
) |
|
self.WRITE_FILE = suffix_parens( |
|
"Write serialized entity dicts to file", step_slug |
|
) |
|
self.WRITE_ENTITIES = suffix_parens( |
|
"Write serialized entity dicts to BigQuery", step_slug |
|
) |
|
|
|
def expand(self, entities): |
|
entity_dicts = ( |
|
entities |
|
| self.COUNT_ENTITIES |
|
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL)) |
|
| self.CONVERT_ENTITIES |
|
>> beam.Map(lambda ent: entity_to_dict(ent, self.props_formatter)) |
|
) |
|
|
|
( |
|
entity_dicts |
|
| self.SERIALIZE_ENTITIES >> beam.Map(serialize_to_json) |
|
| self.WRITE_FILE >> WriteToText(self.file_path, file_name_suffix=JSONL) |
|
) |
|
|
|
if self.options.bq_output: |
|
entity_dicts | self.WRITE_ENTITIES >> WriteEntityDictsToBigQuery( |
|
self.options, self.bq_schema_fields, self.step_slug |
|
) |
|
|
|
return entities |
|
|
|
|
|
class DeleteEntities(beam.PTransform): |
|
"""A composite transform that accepts a `PCollection` of Google Datastore |
|
entities, gets the entities' keys, saves them to a file (local or GCS), and |
|
deletes Entities by key in the Datastore project specified in the provided |
|
`options`. |
|
|
|
Usage: |
|
```py |
|
# in DataflowJob.expand() |
|
# ... |
|
| QueryAndCountEntities(foo_query) |
|
| DeleteEntities(options) |
|
``` |
|
|
|
Args: |
|
options (PipelineOptions): This pipeline's options. |
|
dry_run (bool, optional): Writes the updated entities to Datastore if |
|
True. Defaults to False. |
|
step_slug (str, optional): Suffix slug appended to transform labels, in |
|
case the step is used multiple times. Defaults to None. |
|
|
|
Returns: |
|
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Entity)) |
|
""" |
|
|
|
COUNTER_LABEL = "Entities deleted from Datastore" |
|
|
|
def __init__(self, options, dry_run=False, step_slug=None): |
|
super().__init__() |
|
|
|
self.options = options |
|
self.dry_run = dry_run |
|
self.step_slug = step_slug |
|
|
|
self.datastore_project = options.datastore_project |
|
self.throttle_rampup = not is_runner_local(options) |
|
|
|
path = output_path(options) |
|
self.file_name = suffix_slug(DELETED, self.step_slug) |
|
self.file_path = f"{path}/{self.file_name}" |
|
|
|
self.delete_slug = prefix_slug(DELETED, self.step_slug) |
|
|
|
self.COUNTER_NAMESPACE = self.__class__.__name__ |
|
self.COUNT_ENTITIES = suffix_parens("Count Datastore entities", step_slug) |
|
self.DELETE_ENTITIES = suffix_parens( |
|
"Delete entities from Datastore", step_slug |
|
) |
|
|
|
def expand(self, entities): |
|
keys = ( |
|
entities |
|
| self.COUNT_ENTITIES |
|
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL)) |
|
| DumpKeys(self.options, step_slug=self.delete_slug) |
|
) |
|
|
|
if self.dry_run: |
|
logger.info("This is a DRY RUN; Skipping delete from Datastore.") |
|
return |
|
|
|
keys | self.DELETE_ENTITIES >> DeleteFromDatastore( |
|
self.datastore_project, |
|
self.throttle_rampup, |
|
) |
|
|
|
return entities |
|
|
|
|
|
class ExecuteFnForEntities(beam.PTransform): |
|
"""A composite transform that accepts a `PCollection` of Google Datastore |
|
entities, calls the provided `entity_handler` on each entity and returns |
|
the provided entities. |
|
|
|
The `entity_writer` is assumed to provide three tagged outputs (see |
|
https://beam.apache.org/documentation/programming-guide/#additional-outputs) |
|
and have the output tags stored as class properties (`OUTPUT_TAG_UPDATES`, |
|
`OUTPUT_TAG_ERRORS`, `OUTPUT_TAG_SKIPPED`). |
|
|
|
The entities in the 'errors' tagged output are sent to the `DumpEntities` |
|
transform, and the entities in the 'skipped' tagged output are sent to the |
|
`DumpKeys` transform. |
|
|
|
Usage: |
|
```py |
|
# in DataflowJob.expand() |
|
# ... |
|
class FooWithEntity(TaggedOutputDoFn): |
|
def process(self, entity): |
|
# ... |
|
return entity |
|
|
|
class BarWithEntity(TaggedOutputDoFn): |
|
def process(self, entity): |
|
# ... |
|
return entity |
|
|
|
# ... |
|
| QueryAndCountEntities(foo_query) |
|
| ExecuteFnForEntities(options, entity_handler=FooWithEntity(), step_slug="foo") |
|
| ExecuteFnForEntities(options, entity_handler=BarWithEntity(), step_slug="bar") |
|
``` |
|
|
|
Args: |
|
options (PipelineOptions): This pipeline's options. |
|
entity_handler (TaggedOutputDoFn): A `beam.DoFn` instance that does |
|
something with the entity and directs it to the applicable tagged |
|
output. Note this function must return a list of one item because |
|
it's executed via `beam.ParDo()`. |
|
step_slug (str, optional): Suffix slug appended to transform labels, in |
|
case the step is used multiple times. Defaults to None. |
|
|
|
Returns: |
|
(PCollection(apache_beam.io.gcp.datastore.v1new.types.Entity)) |
|
""" |
|
|
|
def __init__(self, options, entity_handler, step_slug=None, **kwargs): |
|
super().__init__() |
|
|
|
self.options = options |
|
self.entity_handler = entity_handler |
|
self.step_slug = step_slug |
|
self.kwargs = kwargs |
|
|
|
self.throttle_rampup = not is_runner_local(options) |
|
|
|
self.errors_slug = prefix_slug(ERRORS, step_slug) |
|
self.skipped_slug = prefix_slug(SKIPPED, step_slug) |
|
|
|
self.EXEC_FN = suffix_parens("ExecuteFn", step_slug) |
|
|
|
def expand(self, entities): |
|
( |
|
success_entities, |
|
error_entities, |
|
skipped_entities, |
|
) = entities | self.EXEC_FN >> beam.ParDo( |
|
self.entity_handler, self.kwargs |
|
).with_outputs( |
|
self.entity_handler.OUTPUT_TAG_ERRORS, |
|
self.entity_handler.OUTPUT_TAG_SKIPPED, |
|
main=self.entity_handler.OUTPUT_TAG_SUCCESS, |
|
) |
|
|
|
error_entities | DumpEntities(self.options, step_slug=self.errors_slug) |
|
skipped_entities | DumpKeys(self.options, step_slug=self.skipped_slug) |
|
|
|
return success_entities |
|
|
|
|
|
class RunQueryInBq(beam.PTransform): |
|
"""A thin wrapper around the `apache_beam.io.gcp.big_query.ReadFromBigQuery` |
|
PTransform to make using it easier. Any additional keyword arguments after |
|
`query` are passed directly to `ReadFromBigQuery()`. |
|
|
|
Helpful things we do: |
|
* Append `LIMIT n` to the provided `query` if `options.limit` is set and log |
|
a warning to make it visible. |
|
* Log SQL query |
|
* Set the `project` arg to `options.bq_project` (with fall back to |
|
`DEFAULT_BQ_PROJECT`) |
|
* Set the `gcs_location` arg to the 'bqtemp' folder in the Dataflow job's |
|
output folder. |
|
* Set `use_standard_sql=True` (pass `use_standard_sql=False` to override) |
|
""" |
|
|
|
COUNTER_LABEL = "Rows returned from BigQuery" |
|
|
|
def __init__(self, options, query, step_slug=None, **kwargs): |
|
super().__init__() |
|
|
|
self.options = options |
|
|
|
self.bq_project = options.bq_project or DEFAULT_BQ_PROJECT |
|
|
|
query_lines = [query] |
|
# courtesy of DataflowOptions |
|
if options.limit: |
|
query_lines.append(f"LIMIT {options.limit}") |
|
logger.warning("Appending 'LIMIT %s' to SQL query", options.limit) |
|
|
|
self.query = "\n".join(query_lines) |
|
logger.info("Using SQL query: %s", self.query) |
|
|
|
self.gcs_location = gcs_bucket_path( |
|
self.bq_project, options.folder_name, "bqtemp" |
|
) |
|
|
|
self.RUN_QUERY = suffix_parens("Run Query in BQ", step_slug) |
|
self.COUNT_ROWS = "Count rows" |
|
self.COUNTER_NAMESPACE = self.__class__.__name__ |
|
|
|
if "use_standard_sql" not in kwargs: |
|
kwargs["use_standard_sql"] = True |
|
|
|
self.read_from_bq_kwargs = kwargs |
|
|
|
def expand(self, p): |
|
return ( |
|
p |
|
| self.RUN_QUERY |
|
>> ReadFromBigQuery( |
|
query=self.query, |
|
project=self.bq_project, |
|
gcs_location=self.gcs_location, |
|
**self.read_from_bq_kwargs, |
|
) |
|
| self.COUNT_ROWS |
|
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL)) |
|
) |
|
|
|
|
|
class WriteTextToFile(beam.PTransform): |
|
"""A convenience wrapper around `beam.io.WriteToText` that takes care of the |
|
following: |
|
|
|
- accepts a filename (`output_file`) sets the `file_path_prefix` and |
|
`file_path_suffix` args accordingly. When an extension is not set, ".txt" |
|
is used. |
|
- writes the file to the Dataflow job's GCS bucket (or local dir) |
|
- counts lines written to file |
|
""" |
|
|
|
COUNTER_LABEL = "Lines written to file" |
|
|
|
def __init__(self, options, output_file, step_slug=None, **kwargs): |
|
super().__init__() |
|
|
|
self.options = options |
|
|
|
try: |
|
(filename, suffix) = output_file.split(".") |
|
except ValueError: |
|
filename = output_file |
|
suffix = "txt" |
|
|
|
self.file_path_prefix = output_path(options, filename) |
|
self.file_name_suffix = f".{suffix}" |
|
|
|
self.WRITE_FILE = suffix_parens("Write file", step_slug) |
|
self.COUNT_LINES = "Count lines" |
|
self.COUNTER_NAMESPACE = self.__class__.__name__ |
|
|
|
self.write_to_text_kwargs = kwargs |
|
|
|
def expand(self, p): |
|
return ( |
|
p |
|
| self.COUNT_LINES |
|
>> beam.ParDo(Counter(self.COUNTER_NAMESPACE, self.COUNTER_LABEL)) |
|
| self.WRITE_FILE |
|
>> beam.io.WriteToText( |
|
file_path_prefix=self.file_path_prefix, |
|
file_name_suffix=self.file_name_suffix, |
|
**self.write_to_text_kwargs, |
|
) |
|
) |
|
|
|
|
|
class WriteCsvToFile(beam.PTransform): |
|
"""A convenience wrapper around the `WriteTextToFile` (`PTransform`) |
|
configured for writing CSV files which handles the following: |
|
|
|
- appends ".csv" to `filename` if not already present |
|
- passes `header` arg (for `beam.io.WriteToText`) along |
|
- accepts a `row_formatter` (callable) that accepts a "row" and is expected |
|
to return a `str`. |
|
""" |
|
|
|
def __init__(self, options, filename, header, row_formatter, step_slug=None): |
|
super().__init__() |
|
|
|
self.options = options |
|
self.filename = filename if filename.endswith(CSV) else filename + CSV |
|
self.header = header |
|
self.row_formatter = row_formatter |
|
|
|
self.file_slug = prefix_slug("written", step_slug) |
|
|
|
self.FORMAT_DATA = "Format data as CSV" |
|
self.WRITE_CSV = "Write CSV to file" |
|
|
|
def expand(self, p): |
|
return ( |
|
p |
|
| self.FORMAT_DATA >> beam.Map(self.row_formatter) |
|
| self.WRITE_CSV |
|
>> WriteTextToFile( |
|
self.options, |
|
self.filename, |
|
header=self.header, |
|
step_slug=self.file_slug, |
|
) |
|
) |