Created
July 6, 2019 16:12
-
-
Save micimize/866996217933b42020b3396629cc08cd to your computer and use it in GitHub Desktop.
dataflow for partitioning a resource based on a validator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
In dataflows, the API for operating on an entire package goes as follows: | |
* we define a Higher order function to take in config and context | |
* in that function, we define and return a function that will operate on various resources. | |
In this instance, that function operates on the entire datapackage. To do so it yields, in order: | |
* the package definition, with changes (adds the new resource) | |
* a generator for each resource, in the order they are defined | |
""" | |
import typing as t | |
from copy import deepcopy | |
from dataflows import PackageWrapper, ResourceWrapper | |
from datapackage import Resource | |
from .ldjson_cache import LDJsonTemporaryFile as Cache | |
Row = t.Dict[str, t.Any] | |
Rows = t.Iterable[Row] | |
Keep = t.Union[bool, Row] | |
PartitionResult = t.Tuple[Keep, t.Optional[Rows]] | |
Partitioner = t.Callable[[int, Row], PartitionResult] | |
BuildPartitioner = t.Callable[[ResourceWrapper], Partitioner] | |
def partition_into_cache(rows: Rows, cache: Cache, partitioner: Partitioner) -> Rows: | |
""" | |
Copies potentially modified rows into cache based on partitioner, | |
Optionally removing them from the original stream, or returning a modified version | |
There can be multiple copies for a given row, for expansion (i.e. inner join on some other resource) | |
This is useful for "partial" partitions, such as logging warnings and removing rows on errors | |
""" | |
for index, row in enumerate(rows): | |
keep, copies = partitioner(index + 1, row) or (True, None) | |
if copies is not None and any(copies): | |
for copy in copies: | |
cache.append(copy) | |
if keep: | |
yield keep if isinstance(keep, dict) else row | |
def read_cache(cache: Cache) -> Rows: | |
"Read selected values back out of the cache." | |
for row in cache: | |
yield row | |
def partition( | |
source_name: str, | |
get_target: t.Callable[[Resource], Resource], | |
partitioner: Partitioner = None, | |
get_partitioner: BuildPartitioner = None, | |
): | |
"""Partition a resource into a new resource, optionally expanding each row. | |
Partition the given resource into a target resource based on a partitioner of the form | |
``partitioner(row) -> keep, copied_rows`` | |
where keep is either a bool for keeping or discarding from the original stream, | |
or a modified version of the row to yield into the stream | |
This is useful for "partial" partitions, such as logging warnings and removing rows on errors | |
Arguments: | |
source_name: name of the resource to partion | |
get_target: function for deriving a new target resource from the source | |
partitioner: function to use for partitioning | |
get_partitioner: alternative to dynamically build a partitioner | |
Partition example: | |
>>> # full type signatures: | |
>>> Keep = t.Union[bool, Row] | |
>>> Rows = t.Iterable[Row] | |
>>> PartitionResult = t.Tuple[Keep, t.Optional[Rows]] | |
>>> Partitioner = t.Callable[[int, Row], PartitionResult] | |
>>> Partitioner = t.Callable[[int, Row], PartitionResult] | |
>>> BuildPartitioner = t.Callable[[ResourceWrapper], Partitioner] | |
>>> def partitioner(row_number, row) -> PartionResult: | |
... "logging warnings, remove errors" | |
... error_code, errors = validate(row): | |
... return error_code == 'CRITICAL_ERROR', inner_join(row, errors) | |
""" | |
assert (partitioner is not None) or (get_partitioner is not None), ( | |
"Cannot partition %s without a partitioner or get_partitioner" % source_name | |
) | |
def add_target_resource(resources): | |
"""Upsert and move resource definition for the selection target if missing | |
NOTE We don't expect any rows to be present in the target resource if present, | |
and if there are there might be unexpected behavior | |
""" | |
for res in resources: | |
yield res | |
if res["name"] == source_name: | |
yield get_target(Resource(res)).descriptor | |
def select_into_target(package: PackageWrapper): | |
""" | |
selects records from all matching resources into the target resource | |
""" | |
descriptor = package.pkg.descriptor | |
descriptor["resources"] = list(add_target_resource(descriptor["resources"])) | |
yield package.pkg | |
for resource in package: | |
if resource.res.name == source_name: | |
with Cache() as cache: | |
_partitioner: Partitioner | |
if partitioner is not None: | |
_partitioner = partitioner | |
elif get_partitioner is not None: | |
_partitioner = get_partitioner(resource) | |
yield partition_into_cache(resource, cache, _partitioner) | |
yield read_cache(cache) | |
else: | |
yield resource | |
return select_into_target |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import tempfile | |
from decimal import Decimal | |
import jsonpickle as json | |
class LDJsonTemporaryFile(object): | |
""" | |
Use a line delimited tempfile for caching and reading back an arbitrary stream of json-serializable data. | |
* will automatically delete the temporary file on close and __exit__ | |
* Used by `partition` | |
""" | |
def __init__(self): | |
self.file = tempfile.TemporaryFile(mode="r+") | |
def append(self, obj): | |
self.file.write(json.dumps(obj) + "\n") | |
def __iter__(self): | |
self.file.seek(0) | |
for line in self.file: | |
# we don't need parse_float=Decimal | |
# because jsonpickle is lossless | |
yield json.loads(line) | |
def __enter__(self): | |
return self | |
def close(self): | |
self.file.close() | |
del self.file | |
def __exit__(self, *exc_info): | |
self.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment