Skip to content

Instantly share code, notes, and snippets.

@simbo1905
Last active April 17, 2026 09:39
Show Gist options
  • Select an option

  • Save simbo1905/4fe1278629bca7ec0b26cd75c8a49cc0 to your computer and use it in GitHub Desktop.

Select an option

Save simbo1905/4fe1278629bca7ec0b26cd75c8a49cc0 to your computer and use it in GitHub Desktop.
Typed functional composition in Python for contract-driven pipelines

Emit Functions, Not Frameworks

A truer account of typed functional composition for contract-driven Python

Abstract

The clean way to build Open Data Contracts defined Python pipelines is not to start from a framework, a registry, or a mutable runtime context. It is to start from the actual computation.

In order to read the code below it may be helpful to get a recap on Partial Functions in Python.

For each contract, define:

  • typed immutable records
  • typed parsed configuration
  • full-arity kernels that state all dependencies explicitly
  • binding functions that fix those dependencies once
  • stage-shaped callables that can be composed into a lazy pipeline

The runtime should not improvise. It should not rediscover business rules from scattered configuration, hidden imports, or shared helpers. It should construct real clients, bind the stable dependencies, create the per-run context, and run the already-defined pipeline.

That matters because many real runtimes have a shared operational requirement for every op, even when the business operation is "about" something else. If every op must see the same db connection for lazy op logging, that does not invalidate the design. It simply means the honest kernel has one more explicit argument, and binding happens in one more explicit phase.

This document is the cleaned-up version of that discovery.


1. The real problem

Python pipelines usually become confusing for the same reasons:

  • too much code is public
  • helpers become shared by accident
  • runtime code starts reconstructing contract logic
  • dict[str, Any] becomes the transport format between stages
  • clients and config leak through ambient imports instead of explicit arguments
  • "generic pipeline machinery" erases the local shape of the real pipeline

That confusion is often misdiagnosed as a tooling problem:

  • maybe the call graph is missing
  • maybe the code intelligence is weak
  • maybe the LSP is not clever enough

But the more basic problem is usually architectural:

the code no longer looks like the thing it does

The remedy is not a bigger abstraction layer. The remedy is to restore the shape of the computation.


2. The shape does not change

Most ingestion or transformation flows reduce to the same pattern:

source candidate
  -> materialised artifact(s)
  -> extracted row(s)
  -> loaded rows

Examples:

email
  -> attachment(s)
  -> jsonl row(s)
file
  -> parsed row(s)
  -> reporting row(s)
staging row
  -> enriched row(s)
  -> final row(s)

The nouns change. The shape does not.

So the right abstraction is not "pipeline engine". The right abstraction is:

  • one typed stage boundary at a time
  • one contract-local pipeline at a time

3. The mathematical core

3.1 A stage is just a function

A pipeline stage is:

S : X -> Y*

Read this as:

  • input type X
  • output stream of Y

The * means zero or more values. In Python, that is naturally an Iterator[Y].

Examples:

sensor                 : Unit -> Email*
materialise_attachment : Email -> Workbook*
extract_rows           : Workbook -> Row*
load_rows              : Row* -> int

The key point is not the notation. The key point is that the stage boundary is typed and explicit.

3.2 The honest form is the full-arity kernel

The actual implementation usually needs more than just the stage input. It also needs config, clients, matchers, writers, clocks, and operational context.

So the true implementation should first be written in its honest form:

K : (A1, A2, ..., An, R, X) -> Y*

Where:

  • A1..An are stable assembly-time dependencies
  • R is the per-run context
  • X is the stage input
  • Y* is the output stream

This is the full-arity kernel.

It matters because the signature tells the whole truth. Nothing is smuggled in through globals, runtime lookups, hidden helpers, or ambient state.

Parameter order is not cosmetic. In a left-to-right currying model, parameter order states binding order:

  • stable contract and client dependencies first
  • per-run context next
  • business input last

If a new operational concern is discovered, the right repair is usually not to spray more ambient state through the runtime. It is to extend the correct assembly-time or run-time record and keep the kernel honest.

In Python:

def _fetch_impl(
    cfg: FetchConfig,
    mail_client: MailClient,
    object_store: ObjectStore,
    run_ctx: RunContext,
    email: Email,
) -> Iterator[Workbook]:
    ...
def _extract_impl(
    cfg: ExtractConfig,
    object_store: ObjectStore,
    run_ctx: RunContext,
    workbook: Workbook,
) -> Iterator[Row]:
    ...

These kernels are the real computation.

3.3 The most important equation

The central idea of the whole design is:

Kernel : (AssemblyDeps, RunContext, Input) -> Output*

And binding then follows the actual phases:

Assembly : AssemblyDeps -> ((RunContext, Input) -> Output*)
Runtime  : RunContext -> (Input -> Output*)

This is better than thinking in terms of "client factories" alone, because a client factory only constructs one capability. The real architecture must say when a capability is bound and when a run-scoped requirement is bound.

The shortest truthful slogan is:

Contract -> Assembly -> RunContext -> Input

The contract defines the shape. Assembly binds the stable world. Runtime supplies the run. Execution consumes the input.


3.4 Binding turns kernels into stages

Once the honest kernel exists, binding should happen in the same order as the truth of the system.

# when we first know the application configuration bind
bind_A(K) = A
# when we have the detail to setup a client to use an S3 bucket bind that 
bind_R(A) = S

Meaning:

  • take kernel K
  • bind stable assembly-time dependencies A
  • obtain an assembled stage that still expects a run context
  • bind run context R
  • obtain the final stage function S

To load email attachments into a bucket and get a handle to the workbook in the bucket to do further processing::

# The complete solution needs all of this
K_fetch : (PipelineConfig, MailClient, ObjectStore, RunContext, Email) -> Workbook*

We will get into the python later to see how the Python partial fuction lets us call with the parameter on the left to get a function that has one well parameter. So bind only the parts of the configurations that we need by curry-ing in that parameter at the top of our python script:

# Whether its the dev, test, or prod bucket we can hold in the PipelineConfig which is set once at startup
Configured_fetcher: (PipelineConfig) -> (MailClient, ObjectStore, RunContext, Email) -> Workbook*

What is going on is that the PipelineConfig which is a frozen dataclass is baked intot the returned factory.

We can then setup an email client and bucket client once but bind them into a dozen piplines. This looks like:

becomes:

# Once the email client and bucket client have been setup we now use a factor that curries those in
Fetcher_factory: (MailClient, ObjectStore) -> (RunContext, mail) -> Workbook*

That was the crucial transition. The assembly layer binds what is stable across many runs. The runtime binds what is shared across the lifetime of the python script that can process any number of emails.

When we find a dozen unread emails to load into buckets to then process for each one make fetcher of the workbooks in the attachments:

pipeline : (RunContext, Email) -> Workbook*
fetch           : Email -> Workbook*

That runtime should see only stage-shaped callables. Yet we can chain these things together they compose. We can do that for each email then chain onto it things that extract jsonl from the attachments and then load that jsonl into a database.

4. The real four-term shape

The design only stays clean if the phases are kept distinct.

4.1 Contract layer

This layer owns:

  • the contract YAML
  • parsing of that YAML
  • typed config records
  • typed domain records
  • contract-local invariants
  • full-arity kernels
  • the local pipeline runner

This is the functional core.

It may be expressed as a namespace class with @staticmethod methods or as module-level functions. The important thing is not style. The important thing is that this layer remains contract-local and dependency-explicit.

4.2 Assembly layer

This layer owns:

  • stable dependency records
  • binding of kernels into run-aware stage callables
  • contract-specific pipeline assembly
  • no hidden runtime lookups

This is where:

(A, R, X) -> Y*

becomes:

(R, X) -> Y*

4.3 Runtime layer

This layer owns:

  • process entrypoints
  • environment loading
  • concrete clients
  • transactions
  • retries
  • tracing
  • logging
  • resource management
  • construction of the per-run context

This is where the runtime provides the shared operational facts for one run.

For example:

  • one db connection used to lazy-log ops
  • one run id
  • one clock
  • one audit sink

That per-run context is not an accidental cross-cutting concern. It is part of the honest arity of every op in that run, whether the op is otherwise about email, S3, or something else.

This is where:

(R, X) -> Y*

becomes:

X -> Y*

4.4 Execution

The runtime should not reinterpret the contract. It should not reconstruct business rules. It should not rediscover stage structure.

It should build dependencies, create the run context, bind the assembled stages, and execute.


5. What Python is actually good at here

Python is good enough for this design if the code is disciplined.

5.1 Frozen records

Use frozen dataclasses for:

  • domain values
  • config values
  • dependency records

Example:

from dataclasses import dataclass


@dataclass(frozen=True)
class Email:
    message_id: str
    subject: str
    sender_email: str
@dataclass(frozen=True)
class MailClient:
    list_messages: Callable[[], Iterator[RawMessage]]
    list_attachments: Callable[[str], Iterator[RawAttachment]]

This gives a narrow, truthful surface instead of an opaque "client object" with dozens of methods the pipeline never needed.

5.2 Named stage aliases

Use named type aliases for the stage shapes.

type AssembledContractSensor = Callable[[RunContext], Iterator[Email]]
type AssembledContractFetch = Callable[[RunContext, Email], Iterator[Workbook]]
type AssembledContractExtract = Callable[[RunContext, Workbook], Iterator[Row]]
type AssembledContractLoad = Callable[[RunContext, Iterable[Row]], int]

type ContractSensor = Callable[[], Iterator[Email]]
type ContractFetch = Callable[[Email], Iterator[Workbook]]
type ContractExtract = Callable[[Workbook], Iterator[Row]]
type ContractLoad = Callable[[Iterable[Row]], int]

These names prevent the code from dissolving into anonymous Callable[..., ...] soup.

5.3 Partial application

Binding dependencies with functools.partial is not cleverness. It is simply the correct operator for turning a full-arity kernel into a run-aware stage, and then a run-aware stage into a final stage function.

from functools import partial


def build_fetch(
    cfg: FetchConfig,
    mail_client: MailClient,
    object_store: ObjectStore,
) -> AssembledContractFetch:
    return partial(_fetch_impl, cfg, mail_client, object_store)

def bind_run_fetch(
    run_ctx: RunContext,
    fetch: AssembledContractFetch,
) -> ContractFetch:
    return partial(fetch, run_ctx)

Assembly binds once for many runs. Runtime binds once for one run. The stage then runs many times inside that run.

If something is truly deferred, model that truthfully as a zero-argument callable in a dependency or run-context record. Do not introduce generic hooks or supplier-style indirection as a substitute for explicit arguments.

5.4 Generators as lazy streams

The pipeline itself is just flat-map composition:

def rows() -> Iterator[Row]:
    for email in sensor():
        for workbook in fetch(email):
            yield from extract(workbook)

Nothing more exotic is needed.


6. A better worked model

6.1 Domain and config

from collections.abc import Callable, Iterable, Iterator, Mapping
from dataclasses import dataclass
from typing import Any
import re


@dataclass(frozen=True)
class Email:
    message_id: str
    subject: str
    sender_email: str


@dataclass(frozen=True)
class Workbook:
    bucket: str
    key: str
    filename: str
    content_type: str
    message_id: str


@dataclass(frozen=True)
class Row:
    values: dict[str, object]


@dataclass(frozen=True)
class SensorConfig:
    sender_match: re.Pattern[str]
    subject_match: re.Pattern[str]


@dataclass(frozen=True)
class FetchConfig:
    pass


@dataclass(frozen=True)
class ExtractConfig:
    sheet_name: str
    filename_match: re.Pattern[str]
    content_type_match: re.Pattern[str]


@dataclass(frozen=True)
class LoadConfig:
    pass

6.2 Dependency records

@dataclass(frozen=True)
class MailClient:
    list_messages: Callable[[], Iterator[RawMessage]]
    list_attachments: Callable[[str], Iterator[RawAttachment]]


@dataclass(frozen=True)
class ObjectStore:
    put_bytes: Callable[[str, bytes, str], None]
    get_bytes: Callable[[str], bytes]


@dataclass(frozen=True)
class RowWriter:
    write_row: Callable[[Row], None]


@dataclass(frozen=True)
class OpsLogDb:
    execute: Callable[[str, tuple[object, ...]], None]


@dataclass(frozen=True)
class RunContext:
    ops_db: OpsLogDb
    run_id: str
    log_op: Callable[[str, Mapping[str, object]], None]

6.3 Stage aliases

type AssembledContractSensor = Callable[[RunContext], Iterator[Email]]
type AssembledContractFetch = Callable[[RunContext, Email], Iterator[Workbook]]
type AssembledContractExtract = Callable[[RunContext, Workbook], Iterator[Row]]
type AssembledContractLoad = Callable[[RunContext, Iterable[Row]], int]

type ContractSensor = Callable[[], Iterator[Email]]
type ContractFetch = Callable[[Email], Iterator[Workbook]]
type ContractExtract = Callable[[Workbook], Iterator[Row]]
type ContractLoad = Callable[[Iterable[Row]], int]

6.4 Full-arity kernels

def _sensor_impl(
    cfg: SensorConfig,
    mail_client: MailClient,
    run_ctx: RunContext,
) -> Iterator[Email]:
    ...


def _fetch_impl(
    cfg: FetchConfig,
    mail_client: MailClient,
    object_store: ObjectStore,
    run_ctx: RunContext,
    email: Email,
) -> Iterator[Workbook]:
    ...


def _extract_impl(
    cfg: ExtractConfig,
    object_store: ObjectStore,
    run_ctx: RunContext,
    workbook: Workbook,
) -> Iterator[Row]:
    ...


def _load_impl(
    cfg: LoadConfig,
    row_writer: RowWriter,
    run_ctx: RunContext,
    rows: Iterable[Row],
) -> int:
    ...

This is the point of maximum truthfulness.

6.5 Binding

def build_sensor(
    cfg: SensorConfig,
    mail_client: MailClient,
) -> AssembledContractSensor:
    ...


def build_fetch(
    cfg: FetchConfig,
    mail_client: MailClient,
    object_store: ObjectStore,
) -> AssembledContractFetch:
    ...


def build_extract(
    cfg: ExtractConfig,
    object_store: ObjectStore,
) -> AssembledContractExtract:
    ...


def build_load(
    cfg: LoadConfig,
    row_writer: RowWriter,
) -> AssembledContractLoad:
    ...


def bind_run_sensor(
    run_ctx: RunContext,
    sensor: AssembledContractSensor,
) -> ContractSensor:
    ...


def bind_run_fetch(
    run_ctx: RunContext,
    fetch: AssembledContractFetch,
) -> ContractFetch:
    ...


def bind_run_extract(
    run_ctx: RunContext,
    extract: AssembledContractExtract,
) -> ContractExtract:
    ...


def bind_run_load(
    run_ctx: RunContext,
    load: AssembledContractLoad,
) -> ContractLoad:
    ...

6.6 Composition

@dataclass(frozen=True)
class AssembledPipeline:
    sensor: AssembledContractSensor
    fetch: AssembledContractFetch
    extract: AssembledContractExtract
    load: AssembledContractLoad


@dataclass(frozen=True)
class ContractPipeline:
    sensor: ContractSensor
    fetch: ContractFetch
    extract: ContractExtract
    load: ContractLoad


def bind_run(
    pipeline: AssembledPipeline,
    run_ctx: RunContext,
) -> ContractPipeline:
    return ContractPipeline(
        sensor=partial(pipeline.sensor, run_ctx),
        fetch=partial(pipeline.fetch, run_ctx),
        extract=partial(pipeline.extract, run_ctx),
        load=partial(pipeline.load, run_ctx),
    )


def run_pipeline(
    pipeline: ContractPipeline,
) -> int:
    def rows() -> Iterator[Row]:
        for email in pipeline.sensor():
            for workbook in pipeline.fetch(email):
                yield from pipeline.extract(workbook)

    return pipeline.load(rows())

That is the whole shape:

Contract -> Assembly -> RunContext -> Input

7. What should never be shared

The codebase stays comprehensible only if the local shape of each contract is preserved.

That means:

  • one contract should look like one contract
  • one pipeline should look like one pipeline
  • function factories should remain contract-local unless the abstraction is genuinely universal

The desire to "reuse" often destroys the pattern.

If two contracts happen to:

  • watch the same mailbox
  • read the same workbook
  • differ only by sheet name

that still does not imply they should share their pipeline logic.

They may be structurally parallel while remaining operationally separate.

The rule is:

preserve the visible pattern before chasing reuse

Shared code should be extremely small and extremely explicit.


8. What may be shared

The discussion that led to this design recovered one useful exception:

some generated common model artifacts may legitimately sit behind a hard seam.

If a repository has distinct subsystems such as:

  • generate
  • load
  • serve

then a small shared package containing generated dataclasses or validators can be used as the one narrow common surface.

That shared surface should be:

  • small
  • explicit
  • re-export controlled
  • generated deterministically

The important thing is not sharing as such. The important thing is that sharing is narrow enough not to dissolve the subsystem boundaries.


9. Deterministic generation is part of the architecture

Generated artifacts are not just a build detail. They are part of the boundary discipline.

If generation is deterministic:

  • generate in CI
  • compare generated output against committed state
  • fail on drift

This is a useful invariant because it prevents silent divergence between:

  • contract inputs
  • generator logic
  • emitted dataclasses or validators

So the architectural principle is:

generated outputs may be committed, but they must be reproducible and checked

That is the same general philosophy as the full-arity kernel: make the truth visible and testable.


10. The anti-patterns to reject

These are the things that most reliably create Python sprawl:

  • registries
  • dependency injection containers
  • mutable payload maps
  • hidden late binding
  • generic supplier or hook points that exist only to smuggle control flow
  • generic configurator sludge
  • runtime logic that repeatedly consults raw config
  • shared helpers that erase contract identity
  • ambient clients imported from elsewhere

These are the things that keep the code local:

  • immutable records
  • parsed config records
  • full-arity kernels
  • binding once
  • stage-shaped callables
  • contract-local assembly
  • runtime as execution, not interpretation

11. Linting and static rules that reinforce the design

Tooling matters because Python will not protect these boundaries by default.

11.1 Ruff

Use ruff both for baseline code quality and architectural enforcement.

[tool.ruff]
line-length = 100
exclude = ["*_validator.py", "_generated"]
force-exclude = true

[tool.ruff.lint]
select = [
  "E",
  "F",
  "I",
  "B",
  "UP",
  "TID252",
]

[tool.ruff.lint.per-file-ignores]
"tests/*.py" = ["E401", "E402", "E501", "E702", "I001"]

[tool.ruff.lint.flake8-tidy-imports.banned-api]
"build" = { msg = "Strict architectural boundary: contracts cannot depend on build tools" }
"backend" = { msg = "Strict architectural boundary: contracts cannot depend on backend" }
"generator" = { msg = "Strict architectural boundary: contracts cannot depend on generator" }
"frontend" = { msg = "Strict architectural boundary: contracts cannot depend on frontend" }
"contracts._generated" = { msg = "contracts functional core must not import generated artifacts" }

This combination matters because it does two jobs:

  • keeps source mechanically clean
  • stops cross-layer import drift

11.2 Mypy

Use strict typing where the contracts and stages live.

[tool.mypy]
strict = true
warn_unused_ignores = true
warn_redundant_casts = true
disallow_any_generics = true
no_implicit_optional = true

Static typing is especially valuable for:

  • stage aliases
  • config parsing
  • dependency records
  • catching accidental cross-stage misuse

11.3 Import boundaries

Architectural boundaries should be enforced, not merely described.

Whether the enforcement is done with import-linter, Ruff banned APIs, poison pills, or package layout, the principle is the same:

the build must reject code that crosses the declared seams


12. Practical layout

For one contract:

contracts/
  hello_world_dataset.yaml

src/contracts/
  hello_world_dataset.py
  hello_world_dataset_bind.py

bin/
  hello_world_dataset.py

For a larger system with hard subsystem separation:

repo/
  common/
  generate/
  load/
  serve/

Where:

  • common/ is tiny and explicit
  • generate/ owns code emission
  • load/ owns the data pipelines
  • serve/ owns API/runtime serving concerns

The point is not fashion. The point is to stop Python's default openness from collapsing everything into one blurry module graph.


13. The real conclusion

Typed functional composition in Python is not about dressing up simple code in category-theory language.

It is about preserving the visible shape of the computation.

The most useful distilled model is this:

Kernel   : (AssemblyDeps, RunContext, Input) -> Output*
Assembly : AssemblyDeps -> ((RunContext, Input) -> Output*)
Runtime  : RunContext -> (Input -> Output*)
Pipeline = composition of stage-shaped functions

And the most important practical corollaries are:

  • write the full-arity kernel first
  • order parameters by binding phase
  • bind stable dependencies in assembly
  • bind run context in runtime
  • keep contracts local
  • share almost nothing
  • let configuration drive the construction early, not the runtime late

If the code still reads like the pipeline, the architecture is probably healthy.

If the code reads like a bag of helpers, registries, and contexts, the shape has already been lost.

The goal is to keep the shape.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment