The clean way to build Open Data Contracts defined Python pipelines is not to start from a framework, a registry, or a mutable runtime context. It is to start from the actual computation.
In order to read the code below it may be helpful to get a recap on Partial Functions in Python.
For each contract, define:
- typed immutable records
- typed parsed configuration
- full-arity kernels that state all dependencies explicitly
- binding functions that fix those dependencies once
- stage-shaped callables that can be composed into a lazy pipeline
The runtime should not improvise. It should not rediscover business rules from scattered configuration, hidden imports, or shared helpers. It should construct real clients, bind the stable dependencies, create the per-run context, and run the already-defined pipeline.
That matters because many real runtimes have a shared operational requirement for every op, even when the business operation is "about" something else. If every op must see the same db connection for lazy op logging, that does not invalidate the design. It simply means the honest kernel has one more explicit argument, and binding happens in one more explicit phase.
This document is the cleaned-up version of that discovery.
Python pipelines usually become confusing for the same reasons:
- too much code is public
- helpers become shared by accident
- runtime code starts reconstructing contract logic
dict[str, Any]becomes the transport format between stages- clients and config leak through ambient imports instead of explicit arguments
- "generic pipeline machinery" erases the local shape of the real pipeline
That confusion is often misdiagnosed as a tooling problem:
- maybe the call graph is missing
- maybe the code intelligence is weak
- maybe the LSP is not clever enough
But the more basic problem is usually architectural:
the code no longer looks like the thing it does
The remedy is not a bigger abstraction layer. The remedy is to restore the shape of the computation.
Most ingestion or transformation flows reduce to the same pattern:
source candidate
-> materialised artifact(s)
-> extracted row(s)
-> loaded rows
Examples:
email
-> attachment(s)
-> jsonl row(s)
file
-> parsed row(s)
-> reporting row(s)
staging row
-> enriched row(s)
-> final row(s)
The nouns change. The shape does not.
So the right abstraction is not "pipeline engine". The right abstraction is:
- one typed stage boundary at a time
- one contract-local pipeline at a time
A pipeline stage is:
S : X -> Y*
Read this as:
- input type
X - output stream of
Y
The * means zero or more values. In Python, that is naturally an
Iterator[Y].
Examples:
sensor : Unit -> Email*
materialise_attachment : Email -> Workbook*
extract_rows : Workbook -> Row*
load_rows : Row* -> int
The key point is not the notation. The key point is that the stage boundary is typed and explicit.
The actual implementation usually needs more than just the stage input. It also needs config, clients, matchers, writers, clocks, and operational context.
So the true implementation should first be written in its honest form:
K : (A1, A2, ..., An, R, X) -> Y*
Where:
A1..Anare stable assembly-time dependenciesRis the per-run contextXis the stage inputY*is the output stream
This is the full-arity kernel.
It matters because the signature tells the whole truth. Nothing is smuggled in through globals, runtime lookups, hidden helpers, or ambient state.
Parameter order is not cosmetic. In a left-to-right currying model, parameter order states binding order:
- stable contract and client dependencies first
- per-run context next
- business input last
If a new operational concern is discovered, the right repair is usually not to spray more ambient state through the runtime. It is to extend the correct assembly-time or run-time record and keep the kernel honest.
In Python:
def _fetch_impl(
cfg: FetchConfig,
mail_client: MailClient,
object_store: ObjectStore,
run_ctx: RunContext,
email: Email,
) -> Iterator[Workbook]:
...def _extract_impl(
cfg: ExtractConfig,
object_store: ObjectStore,
run_ctx: RunContext,
workbook: Workbook,
) -> Iterator[Row]:
...These kernels are the real computation.
The central idea of the whole design is:
Kernel : (AssemblyDeps, RunContext, Input) -> Output*
And binding then follows the actual phases:
Assembly : AssemblyDeps -> ((RunContext, Input) -> Output*)
Runtime : RunContext -> (Input -> Output*)
This is better than thinking in terms of "client factories" alone, because a client factory only constructs one capability. The real architecture must say when a capability is bound and when a run-scoped requirement is bound.
The shortest truthful slogan is:
Contract -> Assembly -> RunContext -> Input
The contract defines the shape. Assembly binds the stable world. Runtime supplies the run. Execution consumes the input.
Once the honest kernel exists, binding should happen in the same order as the truth of the system.
# when we first know the application configuration bind
bind_A(K) = A
# when we have the detail to setup a client to use an S3 bucket bind that
bind_R(A) = S
Meaning:
- take kernel
K - bind stable assembly-time dependencies
A - obtain an assembled stage that still expects a run context
- bind run context
R - obtain the final stage function
S
To load email attachments into a bucket and get a handle to the workbook in the bucket to do further processing::
# The complete solution needs all of this
K_fetch : (PipelineConfig, MailClient, ObjectStore, RunContext, Email) -> Workbook*
We will get into the python later to see how the Python partial fuction lets us call with the parameter on the left to get a function that has one well parameter. So bind only the parts of the configurations that we need by curry-ing in that parameter at the top of our python script:
# Whether its the dev, test, or prod bucket we can hold in the PipelineConfig which is set once at startup
Configured_fetcher: (PipelineConfig) -> (MailClient, ObjectStore, RunContext, Email) -> Workbook*
What is going on is that the PipelineConfig which is a frozen dataclass is baked intot the returned factory.
We can then setup an email client and bucket client once but bind them into a dozen piplines. This looks like:
becomes:
# Once the email client and bucket client have been setup we now use a factor that curries those in
Fetcher_factory: (MailClient, ObjectStore) -> (RunContext, mail) -> Workbook*
That was the crucial transition. The assembly layer binds what is stable across many runs. The runtime binds what is shared across the lifetime of the python script that can process any number of emails.
When we find a dozen unread emails to load into buckets to then process for each one make fetcher of the workbooks in the attachments:
pipeline : (RunContext, Email) -> Workbook*
fetch : Email -> Workbook*
That runtime should see only stage-shaped callables. Yet we can chain these things together they compose. We can do that for each email then chain onto it things that extract jsonl from the attachments and then load that jsonl into a database.
The design only stays clean if the phases are kept distinct.
This layer owns:
- the contract YAML
- parsing of that YAML
- typed config records
- typed domain records
- contract-local invariants
- full-arity kernels
- the local pipeline runner
This is the functional core.
It may be expressed as a namespace class with @staticmethod methods or as
module-level functions. The important thing is not style. The important thing is
that this layer remains contract-local and dependency-explicit.
This layer owns:
- stable dependency records
- binding of kernels into run-aware stage callables
- contract-specific pipeline assembly
- no hidden runtime lookups
This is where:
(A, R, X) -> Y*
becomes:
(R, X) -> Y*
This layer owns:
- process entrypoints
- environment loading
- concrete clients
- transactions
- retries
- tracing
- logging
- resource management
- construction of the per-run context
This is where the runtime provides the shared operational facts for one run.
For example:
- one db connection used to lazy-log ops
- one run id
- one clock
- one audit sink
That per-run context is not an accidental cross-cutting concern. It is part of the honest arity of every op in that run, whether the op is otherwise about email, S3, or something else.
This is where:
(R, X) -> Y*
becomes:
X -> Y*
The runtime should not reinterpret the contract. It should not reconstruct business rules. It should not rediscover stage structure.
It should build dependencies, create the run context, bind the assembled stages, and execute.
Python is good enough for this design if the code is disciplined.
Use frozen dataclasses for:
- domain values
- config values
- dependency records
Example:
from dataclasses import dataclass
@dataclass(frozen=True)
class Email:
message_id: str
subject: str
sender_email: str@dataclass(frozen=True)
class MailClient:
list_messages: Callable[[], Iterator[RawMessage]]
list_attachments: Callable[[str], Iterator[RawAttachment]]This gives a narrow, truthful surface instead of an opaque "client object" with dozens of methods the pipeline never needed.
Use named type aliases for the stage shapes.
type AssembledContractSensor = Callable[[RunContext], Iterator[Email]]
type AssembledContractFetch = Callable[[RunContext, Email], Iterator[Workbook]]
type AssembledContractExtract = Callable[[RunContext, Workbook], Iterator[Row]]
type AssembledContractLoad = Callable[[RunContext, Iterable[Row]], int]
type ContractSensor = Callable[[], Iterator[Email]]
type ContractFetch = Callable[[Email], Iterator[Workbook]]
type ContractExtract = Callable[[Workbook], Iterator[Row]]
type ContractLoad = Callable[[Iterable[Row]], int]These names prevent the code from dissolving into anonymous
Callable[..., ...] soup.
Binding dependencies with functools.partial is not cleverness. It is simply
the correct operator for turning a full-arity kernel into a run-aware stage,
and then a run-aware stage into a final stage function.
from functools import partial
def build_fetch(
cfg: FetchConfig,
mail_client: MailClient,
object_store: ObjectStore,
) -> AssembledContractFetch:
return partial(_fetch_impl, cfg, mail_client, object_store)
def bind_run_fetch(
run_ctx: RunContext,
fetch: AssembledContractFetch,
) -> ContractFetch:
return partial(fetch, run_ctx)Assembly binds once for many runs. Runtime binds once for one run. The stage then runs many times inside that run.
If something is truly deferred, model that truthfully as a zero-argument callable in a dependency or run-context record. Do not introduce generic hooks or supplier-style indirection as a substitute for explicit arguments.
The pipeline itself is just flat-map composition:
def rows() -> Iterator[Row]:
for email in sensor():
for workbook in fetch(email):
yield from extract(workbook)Nothing more exotic is needed.
from collections.abc import Callable, Iterable, Iterator, Mapping
from dataclasses import dataclass
from typing import Any
import re
@dataclass(frozen=True)
class Email:
message_id: str
subject: str
sender_email: str
@dataclass(frozen=True)
class Workbook:
bucket: str
key: str
filename: str
content_type: str
message_id: str
@dataclass(frozen=True)
class Row:
values: dict[str, object]
@dataclass(frozen=True)
class SensorConfig:
sender_match: re.Pattern[str]
subject_match: re.Pattern[str]
@dataclass(frozen=True)
class FetchConfig:
pass
@dataclass(frozen=True)
class ExtractConfig:
sheet_name: str
filename_match: re.Pattern[str]
content_type_match: re.Pattern[str]
@dataclass(frozen=True)
class LoadConfig:
pass@dataclass(frozen=True)
class MailClient:
list_messages: Callable[[], Iterator[RawMessage]]
list_attachments: Callable[[str], Iterator[RawAttachment]]
@dataclass(frozen=True)
class ObjectStore:
put_bytes: Callable[[str, bytes, str], None]
get_bytes: Callable[[str], bytes]
@dataclass(frozen=True)
class RowWriter:
write_row: Callable[[Row], None]
@dataclass(frozen=True)
class OpsLogDb:
execute: Callable[[str, tuple[object, ...]], None]
@dataclass(frozen=True)
class RunContext:
ops_db: OpsLogDb
run_id: str
log_op: Callable[[str, Mapping[str, object]], None]type AssembledContractSensor = Callable[[RunContext], Iterator[Email]]
type AssembledContractFetch = Callable[[RunContext, Email], Iterator[Workbook]]
type AssembledContractExtract = Callable[[RunContext, Workbook], Iterator[Row]]
type AssembledContractLoad = Callable[[RunContext, Iterable[Row]], int]
type ContractSensor = Callable[[], Iterator[Email]]
type ContractFetch = Callable[[Email], Iterator[Workbook]]
type ContractExtract = Callable[[Workbook], Iterator[Row]]
type ContractLoad = Callable[[Iterable[Row]], int]def _sensor_impl(
cfg: SensorConfig,
mail_client: MailClient,
run_ctx: RunContext,
) -> Iterator[Email]:
...
def _fetch_impl(
cfg: FetchConfig,
mail_client: MailClient,
object_store: ObjectStore,
run_ctx: RunContext,
email: Email,
) -> Iterator[Workbook]:
...
def _extract_impl(
cfg: ExtractConfig,
object_store: ObjectStore,
run_ctx: RunContext,
workbook: Workbook,
) -> Iterator[Row]:
...
def _load_impl(
cfg: LoadConfig,
row_writer: RowWriter,
run_ctx: RunContext,
rows: Iterable[Row],
) -> int:
...This is the point of maximum truthfulness.
def build_sensor(
cfg: SensorConfig,
mail_client: MailClient,
) -> AssembledContractSensor:
...
def build_fetch(
cfg: FetchConfig,
mail_client: MailClient,
object_store: ObjectStore,
) -> AssembledContractFetch:
...
def build_extract(
cfg: ExtractConfig,
object_store: ObjectStore,
) -> AssembledContractExtract:
...
def build_load(
cfg: LoadConfig,
row_writer: RowWriter,
) -> AssembledContractLoad:
...
def bind_run_sensor(
run_ctx: RunContext,
sensor: AssembledContractSensor,
) -> ContractSensor:
...
def bind_run_fetch(
run_ctx: RunContext,
fetch: AssembledContractFetch,
) -> ContractFetch:
...
def bind_run_extract(
run_ctx: RunContext,
extract: AssembledContractExtract,
) -> ContractExtract:
...
def bind_run_load(
run_ctx: RunContext,
load: AssembledContractLoad,
) -> ContractLoad:
...@dataclass(frozen=True)
class AssembledPipeline:
sensor: AssembledContractSensor
fetch: AssembledContractFetch
extract: AssembledContractExtract
load: AssembledContractLoad
@dataclass(frozen=True)
class ContractPipeline:
sensor: ContractSensor
fetch: ContractFetch
extract: ContractExtract
load: ContractLoad
def bind_run(
pipeline: AssembledPipeline,
run_ctx: RunContext,
) -> ContractPipeline:
return ContractPipeline(
sensor=partial(pipeline.sensor, run_ctx),
fetch=partial(pipeline.fetch, run_ctx),
extract=partial(pipeline.extract, run_ctx),
load=partial(pipeline.load, run_ctx),
)
def run_pipeline(
pipeline: ContractPipeline,
) -> int:
def rows() -> Iterator[Row]:
for email in pipeline.sensor():
for workbook in pipeline.fetch(email):
yield from pipeline.extract(workbook)
return pipeline.load(rows())That is the whole shape:
Contract -> Assembly -> RunContext -> Input
The codebase stays comprehensible only if the local shape of each contract is preserved.
That means:
- one contract should look like one contract
- one pipeline should look like one pipeline
- function factories should remain contract-local unless the abstraction is genuinely universal
The desire to "reuse" often destroys the pattern.
If two contracts happen to:
- watch the same mailbox
- read the same workbook
- differ only by sheet name
that still does not imply they should share their pipeline logic.
They may be structurally parallel while remaining operationally separate.
The rule is:
preserve the visible pattern before chasing reuse
Shared code should be extremely small and extremely explicit.
The discussion that led to this design recovered one useful exception:
some generated common model artifacts may legitimately sit behind a hard seam.
If a repository has distinct subsystems such as:
generateloadserve
then a small shared package containing generated dataclasses or validators can be used as the one narrow common surface.
That shared surface should be:
- small
- explicit
- re-export controlled
- generated deterministically
The important thing is not sharing as such. The important thing is that sharing is narrow enough not to dissolve the subsystem boundaries.
Generated artifacts are not just a build detail. They are part of the boundary discipline.
If generation is deterministic:
- generate in CI
- compare generated output against committed state
- fail on drift
This is a useful invariant because it prevents silent divergence between:
- contract inputs
- generator logic
- emitted dataclasses or validators
So the architectural principle is:
generated outputs may be committed, but they must be reproducible and checked
That is the same general philosophy as the full-arity kernel: make the truth visible and testable.
These are the things that most reliably create Python sprawl:
- registries
- dependency injection containers
- mutable payload maps
- hidden late binding
- generic supplier or hook points that exist only to smuggle control flow
- generic configurator sludge
- runtime logic that repeatedly consults raw config
- shared helpers that erase contract identity
- ambient clients imported from elsewhere
These are the things that keep the code local:
- immutable records
- parsed config records
- full-arity kernels
- binding once
- stage-shaped callables
- contract-local assembly
- runtime as execution, not interpretation
Tooling matters because Python will not protect these boundaries by default.
Use ruff both for baseline code quality and architectural enforcement.
[tool.ruff]
line-length = 100
exclude = ["*_validator.py", "_generated"]
force-exclude = true
[tool.ruff.lint]
select = [
"E",
"F",
"I",
"B",
"UP",
"TID252",
]
[tool.ruff.lint.per-file-ignores]
"tests/*.py" = ["E401", "E402", "E501", "E702", "I001"]
[tool.ruff.lint.flake8-tidy-imports.banned-api]
"build" = { msg = "Strict architectural boundary: contracts cannot depend on build tools" }
"backend" = { msg = "Strict architectural boundary: contracts cannot depend on backend" }
"generator" = { msg = "Strict architectural boundary: contracts cannot depend on generator" }
"frontend" = { msg = "Strict architectural boundary: contracts cannot depend on frontend" }
"contracts._generated" = { msg = "contracts functional core must not import generated artifacts" }This combination matters because it does two jobs:
- keeps source mechanically clean
- stops cross-layer import drift
Use strict typing where the contracts and stages live.
[tool.mypy]
strict = true
warn_unused_ignores = true
warn_redundant_casts = true
disallow_any_generics = true
no_implicit_optional = trueStatic typing is especially valuable for:
- stage aliases
- config parsing
- dependency records
- catching accidental cross-stage misuse
Architectural boundaries should be enforced, not merely described.
Whether the enforcement is done with import-linter, Ruff banned APIs, poison pills, or package layout, the principle is the same:
the build must reject code that crosses the declared seams
For one contract:
contracts/
hello_world_dataset.yaml
src/contracts/
hello_world_dataset.py
hello_world_dataset_bind.py
bin/
hello_world_dataset.py
For a larger system with hard subsystem separation:
repo/
common/
generate/
load/
serve/
Where:
common/is tiny and explicitgenerate/owns code emissionload/owns the data pipelinesserve/owns API/runtime serving concerns
The point is not fashion. The point is to stop Python's default openness from collapsing everything into one blurry module graph.
Typed functional composition in Python is not about dressing up simple code in category-theory language.
It is about preserving the visible shape of the computation.
The most useful distilled model is this:
Kernel : (AssemblyDeps, RunContext, Input) -> Output*
Assembly : AssemblyDeps -> ((RunContext, Input) -> Output*)
Runtime : RunContext -> (Input -> Output*)
Pipeline = composition of stage-shaped functions
And the most important practical corollaries are:
- write the full-arity kernel first
- order parameters by binding phase
- bind stable dependencies in assembly
- bind run context in runtime
- keep contracts local
- share almost nothing
- let configuration drive the construction early, not the runtime late
If the code still reads like the pipeline, the architecture is probably healthy.
If the code reads like a bag of helpers, registries, and contexts, the shape has already been lost.
The goal is to keep the shape.