Skip to content

Instantly share code, notes, and snippets.

Created July 6, 2019 16:12
Show Gist options
  • Save micimize/866996217933b42020b3396629cc08cd to your computer and use it in GitHub Desktop.
Save micimize/866996217933b42020b3396629cc08cd to your computer and use it in GitHub Desktop.
dataflow for partitioning a resource based on a validator
In dataflows, the API for operating on an entire package goes as follows:
* we define a Higher order function to take in config and context
* in that function, we define and return a function that will operate on various resources.
In this instance, that function operates on the entire datapackage. To do so it yields, in order:
* the package definition, with changes (adds the new resource)
* a generator for each resource, in the order they are defined
import typing as t
from copy import deepcopy
from dataflows import PackageWrapper, ResourceWrapper
from datapackage import Resource
from .ldjson_cache import LDJsonTemporaryFile as Cache
Row = t.Dict[str, t.Any]
Rows = t.Iterable[Row]
Keep = t.Union[bool, Row]
PartitionResult = t.Tuple[Keep, t.Optional[Rows]]
Partitioner = t.Callable[[int, Row], PartitionResult]
BuildPartitioner = t.Callable[[ResourceWrapper], Partitioner]
def partition_into_cache(rows: Rows, cache: Cache, partitioner: Partitioner) -> Rows:
Copies potentially modified rows into cache based on partitioner,
Optionally removing them from the original stream, or returning a modified version
There can be multiple copies for a given row, for expansion (i.e. inner join on some other resource)
This is useful for "partial" partitions, such as logging warnings and removing rows on errors
for index, row in enumerate(rows):
keep, copies = partitioner(index + 1, row) or (True, None)
if copies is not None and any(copies):
for copy in copies:
if keep:
yield keep if isinstance(keep, dict) else row
def read_cache(cache: Cache) -> Rows:
"Read selected values back out of the cache."
for row in cache:
yield row
def partition(
source_name: str,
get_target: t.Callable[[Resource], Resource],
partitioner: Partitioner = None,
get_partitioner: BuildPartitioner = None,
"""Partition a resource into a new resource, optionally expanding each row.
Partition the given resource into a target resource based on a partitioner of the form
``partitioner(row) -> keep, copied_rows``
where keep is either a bool for keeping or discarding from the original stream,
or a modified version of the row to yield into the stream
This is useful for "partial" partitions, such as logging warnings and removing rows on errors
source_name: name of the resource to partion
get_target: function for deriving a new target resource from the source
partitioner: function to use for partitioning
get_partitioner: alternative to dynamically build a partitioner
Partition example:
>>> # full type signatures:
>>> Keep = t.Union[bool, Row]
>>> Rows = t.Iterable[Row]
>>> PartitionResult = t.Tuple[Keep, t.Optional[Rows]]
>>> Partitioner = t.Callable[[int, Row], PartitionResult]
>>> Partitioner = t.Callable[[int, Row], PartitionResult]
>>> BuildPartitioner = t.Callable[[ResourceWrapper], Partitioner]
>>> def partitioner(row_number, row) -> PartionResult:
... "logging warnings, remove errors"
... error_code, errors = validate(row):
... return error_code == 'CRITICAL_ERROR', inner_join(row, errors)
assert (partitioner is not None) or (get_partitioner is not None), (
"Cannot partition %s without a partitioner or get_partitioner" % source_name
def add_target_resource(resources):
"""Upsert and move resource definition for the selection target if missing
NOTE We don't expect any rows to be present in the target resource if present,
and if there are there might be unexpected behavior
for res in resources:
yield res
if res["name"] == source_name:
yield get_target(Resource(res)).descriptor
def select_into_target(package: PackageWrapper):
selects records from all matching resources into the target resource
descriptor = package.pkg.descriptor
descriptor["resources"] = list(add_target_resource(descriptor["resources"]))
yield package.pkg
for resource in package:
if == source_name:
with Cache() as cache:
_partitioner: Partitioner
if partitioner is not None:
_partitioner = partitioner
elif get_partitioner is not None:
_partitioner = get_partitioner(resource)
yield partition_into_cache(resource, cache, _partitioner)
yield read_cache(cache)
yield resource
return select_into_target
import os
import tempfile
from decimal import Decimal
import jsonpickle as json
class LDJsonTemporaryFile(object):
Use a line delimited tempfile for caching and reading back an arbitrary stream of json-serializable data.
* will automatically delete the temporary file on close and __exit__
* Used by `partition`
def __init__(self):
self.file = tempfile.TemporaryFile(mode="r+")
def append(self, obj):
self.file.write(json.dumps(obj) + "\n")
def __iter__(self):
for line in self.file:
# we don't need parse_float=Decimal
# because jsonpickle is lossless
yield json.loads(line)
def __enter__(self):
return self
def close(self):
del self.file
def __exit__(self, *exc_info):
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment