Skip to content

Instantly share code, notes, and snippets.

@negz
Last active December 13, 2025 00:12
Show Gist options
  • Select an option

  • Save negz/394bd39d1dda27e6ab9948e949be794b to your computer and use it in GitHub Desktop.

Select an option

Save negz/394bd39d1dda27e6ab9948e949be794b to your computer and use it in GitHub Desktop.
Crossplane Python SDK Design Spec - Early Sketch

Crossplane Python SDK Design Spec

Overview

This document specifies a new user experience for Crossplane composition functions in Python. The goal is to provide a declarative, type-safe API that feels similar to Pulumi/Terraform while hiding protobuf complexity entirely.

Desired User Experience

Basic Example

import crossplane.function import composition
from .model.com.example.platform.storagebucket import v1alpha1 as storagebucketv1alpha1
from .model.io.upbound.gcp.storage.bucket import v1beta1 as bucketv1beta1
from .model.io.upbound.gcp.storage.bucketacl import v1beta1 as bucketaclv1beta1

@composition.function
def compose(ctx: composition.Context):
    # XR follows same pattern as composed resources
    xr = ctx.composite(storagebucketv1alpha1.XStorageBucket)
    
    # Read from observed XR (user's input parameters)
    location = xr.observed.spec.parameters.location
    versioning = xr.observed.spec.parameters.versioning
    acl_param = xr.observed.spec.parameters.acl
    
    # Compose a Bucket - full IDE autocomplete on kwargs
    bucket = ctx.resource("bucket", bucketv1beta1.Bucket(
        spec=bucketv1beta1.BucketSpec(
            forProvider=bucketv1beta1.ForProvider(
                location=location,
                versioning=[bucketv1beta1.VersioningItem(enabled=versioning)],
            )
        )
    ))
    
    # Compose an ACL that depends on the Bucket's external name
    # If Bucket isn't observed yet, bucket.observed.external_name returns Observable
    acl = ctx.resource("acl", bucketaclv1beta1.BucketACL(
        spec=bucketaclv1beta1.BucketACLSpec(
            forProvider=bucketaclv1beta1.ACLForProvider(
                bucket=bucket.observed.external_name,  # Observable if not ready
                predefinedAcl=acl_param,
            )
        )
    ))
    
    # Update XR status with computed values
    xr.status.bucketName = bucket.observed.external_name

Key UX Principles

  1. Unified resource model: XR and composed resources follow the same pattern
  2. Full IDE/LSP support: Autocomplete, type hints, go-to-definition all work
  3. No protobuf exposure: Users never see RunFunctionRequest, Struct, etc.
  4. Declarative dependencies: Reference observed fields, SDK handles ordering
  5. SSA-safe: Only fields explicitly set are emitted to desired state

The @composition.function Decorator

The decorator handles all protobuf translation and response building:

def function(func: Callable[[Context], None]) -> Callable[[fnv1.RunFunctionRequest, fnv1.RunFunctionResponse], None]:
    """Transform a typed composition function into a gRPC handler.
    
    Handles:
    1. Parse request → Context with typed models
    2. Call user function
    3. Build response:
       - Detect dependency cycles → fatal result
       - Walk resources for Observables → skip those with unmet deps
       - Serialize clean resources → rsp.desired
       - Add DAG info → rsp.results (informational)
    """
    @functools.wraps(func)
    def wrapper(req: fnv1.RunFunctionRequest, rsp: fnv1.RunFunctionResponse) -> None:
        ctx = Context(req, rsp)
        func(ctx)
        ctx._build_response()
    
    return wrapper

Users write typed Python; the decorator bridges to/from protobuf.

Resource Access Model

The .observed Property

Every resource (XR or composed) has a .observed property that returns the same model type, providing access to the observed state from the cluster.

Access Pattern Reads From Use Case
resource.spec.X Desired What you're setting
resource.observed.spec.X Observed Read-back actual spec
resource.observed.status.X Observed Provider outputs
resource.observed.metadata.X Observed Annotations, name, uid

Type Consistency

The key insight enabling IDE support: .observed returns the same type as the resource itself.

bucket              → Bucket
bucket.observed     → Bucket (same type!)
bucket.observed.spec.forProvider.location → str | Observable | None

This means autocomplete works uniformly through the entire access chain.

Convenience Properties

Common access patterns have shortcuts:

# These are equivalent
bucket.observed.metadata.annotations["crossplane.io/external-name"]
bucket.observed.external_name  # Convenience property

# .external_name follows the same pattern as other fields:
bucket.external_name           # Reads desired (what you set)
bucket.observed.external_name  # Reads observed (what exists)

Generated Model Changes

Current Models (datamodel-code-generator)

class ForProvider(BaseModel):
    location: Optional[str] = None
    bucket: Optional[str] = None
    versioning: Optional[List[VersioningItem]] = None

Required Changes

from crossplane.function import Observable
from pydantic import BaseModel, ConfigDict, PrivateAttr
from typing import Self

class ForProvider(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    
    # Every field type gains | Observable
    location: str | Observable | None = None
    bucket: str | Observable | None = None
    versioning: List[VersioningItem] | Observable | None = None

Resource Base Class

Top-level resources (Bucket, XR, etc.) need additional SDK integration:

class Bucket(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    
    # apiVersion and kind are fixed literals - no Observable needed
    apiVersion: Literal["storage.gcp.upbound.io/v1beta1"] = "storage.gcp.upbound.io/v1beta1"
    kind: Literal["Bucket"] = "Bucket"
    metadata: ObjectMeta | Observable | None = None
    spec: BucketSpec | Observable | None = None
    status: BucketStatus | Observable | None = None
    
    # SDK integration (could be in base class or generated)
    _observed: Self | None = PrivateAttr(default=None)
    _name: str | None = PrivateAttr(default=None)
    
    @property
    def observed(self) -> Self:
        """Access observed state. Returns Observable-filled instance if not ready."""
        if self._observed is None:
            return _make_observable_instance(type(self), self._name)
        return self._observed
    
    @property
    def external_name(self) -> str | Observable | None:
        """Convenience: external-name annotation from this resource's metadata."""
        # Reads from self (desired or observed depending on which instance this is)
        # Use bucket.observed.external_name to read from observed state
        if self.metadata is None or self.metadata.annotations is None:
            return None
        return self.metadata.annotations.get("crossplane.io/external-name")

Template Changes Summary

  1. Add | Observable to every field type union (except apiVersion and kind, which are fixed literals)
  2. Add model_config = ConfigDict(arbitrary_types_allowed=True) to each model
  3. Add _observed: Self | None private attribute to top-level resource models
  4. Add _name: str | None private attribute to top-level resource models
  5. Add observed property to top-level resource models
  6. Add convenience properties (external_name, etc.) to top-level resource models
  7. Import Observable from the SDK

The Observable Pattern

What is Observable?

Observable is a forward reference - a sentinel value representing "get this value from that source when it becomes available."

class Observable:
    """Forward reference to a value that may not exist yet."""
    
    def __init__(self, source_path: str):
        self.source_path = source_path  # e.g., "bucket.status.atProvider.id"
    
    def __repr__(self) -> str:
        return f"Observable({self.source_path!r})"
    
    def __bool__(self) -> bool:
        return False  # Falsy for conditionals

How Observable Values Arise

When accessing .observed on a resource that doesn't exist in observed state:

def _make_observable_instance(model_type: Type[T], resource_name: str) -> T:
    """Create instance where all field accesses return Observable."""
    # Returns a model instance where every field is Observable
    # with the appropriate source path
    ...

Example flow:

# If "bucket" doesn't exist in observed resources yet:
bucket.observed.external_name
# Returns: Observable("bucket.external_name")

bucket.observed.status.atProvider.selfLink
# Returns: Observable("bucket.status.atProvider.selfLink")

Observable as Dependency Declaration

When you assign an Observable to a field, you're declaring a dependency:

acl = ctx.resource("acl", BucketACL(
    spec=BucketACLSpec(
        forProvider=ACLForProvider(
            bucket=bucket.observed.external_name,  # Observable("bucket.external_name")
        )
    )
))

This is valid because field types include | Observable:

class ACLForProvider(BaseModel):
    bucket: str | Observable | None = None  # Accepts Observable

Dependency Resolution at Emit Time

When the SDK builds the RunFunctionResponse, it walks each resource's model tree:

def should_emit(resource: BaseModel) -> bool:
    """Check if resource has any unresolved Observable values."""
    for field_value in walk_fields(resource):
        if isinstance(field_value, Observable):
            return False  # Has unmet dependency
    return True

def build_response(ctx: Context) -> RunFunctionResponse:
    response = RunFunctionResponse()
    
    for name, resource in ctx._resources.items():
        if should_emit(resource):
            # Convert to protobuf Struct, add to desired
            response.desired.resources[name].resource.update(
                resource.model_dump(exclude_unset=True, mode='json')
            )
        # else: skip - dependencies not ready, will be called again
    
    return response

The Dependency DAG is Implicit

Unlike explicit sequencing, the dependency graph emerges naturally from Observable references:

# This code implicitly creates: vpc → subnet → security_group

vpc = ctx.resource("vpc", VPC(
    spec=VPCSpec(forProvider=VPCForProvider(cidrBlock="10.0.0.0/16"))
))

subnet = ctx.resource("subnet", Subnet(
    spec=SubnetSpec(forProvider=SubnetForProvider(
        vpcId=vpc.observed.status.atProvider.vpcId,  # Observable if VPC not ready
    ))
))

sg = ctx.resource("security-group", SecurityGroup(
    spec=SGSpec(forProvider=SGForProvider(
        vpcId=vpc.observed.status.atProvider.vpcId,      # Depends on VPC
        subnetId=subnet.observed.status.atProvider.id,   # Depends on Subnet
    ))
))

Reconciliation flow:

  1. First call: Only VPC emitted (subnet/sg have Observable dependencies)
  2. Second call (VPC observed): VPC + Subnet emitted (sg still has Observable)
  3. Third call (VPC + Subnet observed): All three emitted

Context Object

The Context object is the entry point for composition:

class Context:
    def composite(self, model_type: Type[T]) -> T:
        """Get the XR with .observed linked.
        
        Initializes from req.desired.composite (previous pipeline state)
        and links req.observed.composite to .observed property.
        """
        instance = self._parse_desired_composite(model_type)
        instance._observed = self._parse_observed_composite(model_type)
        return instance
    
    def resource(self, name: str, instance: T) -> T:
        """Register a composed resource.
        
        Merges the provided instance with req.desired.resources[name] 
        (previous pipeline state) and links req.observed.resources[name]
        to .observed property.
        """
        # Merge with previous desired if exists
        previous = self._get_desired_resource(name, type(instance))
        if previous:
            instance = self._merge_models(previous, instance)
        
        instance._observed = self._get_observed_resource(name, type(instance))
        instance._name = name
        self._resources[name] = instance
        return instance
    
    # Request/response access
    required_resources: Dict[str, List[BaseModel]]  # Resources requested from Crossplane
    credentials: Dict[str, Credentials]              # Function credentials
    input: BaseModel | None                          # Function input from Composition
    context: Dict[str, Any]                          # Cross-function context
    
    # Response building
    requirements: Requirements                        # Request resources for next call
    results: Results                                  # Add warnings/fatals
    ttl: timedelta                                    # Response TTL

Pipeline Semantics

Crossplane composition pipelines are cumulative. Each function builds on previous functions' work:

Function 1          Function 2          Function 3          Crossplane
    │                   │                   │                   │
    │ Add resource A    │                   │                   │
    │──────────────────►│                   │                   │
    │                   │ Add resource B    │                   │
    │                   │──────────────────►│                   │
    │                   │                   │ Modify resource A │
    │                   │                   │──────────────────►│
    │                   │                   │                   │ SSA all resources

How It Works

  1. Request contains previous state: req.desired.resources has resources from previous functions
  2. Function builds on that: ctx.resource() merges user's model with previous desired
  3. Response contains full state: rsp.desired.resources contains the merged result
  4. Crossplane SSAs the final result: All functions' contributions are applied

Example

# Function 1 set: bucket.spec.forProvider.location = "US"
# This function (Function 2) runs:

bucket = ctx.resource("bucket", Bucket(
    spec=BucketSpec(
        forProvider=ForProvider(
            project="my-project",  # Add project field
        )
    )
))

# bucket.spec.forProvider.location → "US" (preserved from Function 1)
# bucket.spec.forProvider.project → "my-project" (added by this function)

# Can read and modify previous function's work:
if bucket.spec.forProvider.location == "US":
    bucket.spec.forProvider.storageClass = "STANDARD"

# At emit: all fields (location, project, storageClass) go to rsp.desired
# Crossplane SSAs the combined result

Access Model with Pipeline

Access Source Purpose
bucket.spec.X Previous desired + your changes Cumulative desired state
bucket.observed.X req.observed.resources Cluster state

The distinction between "previous desired" and "your changes" is not exposed—they're merged into a single model. This keeps the API simple while supporting the pipeline model.

SSA Safety

Crossplane uses Server-Side Apply. Only fields explicitly set should appear in desired state—otherwise we'd "own" fields we didn't intend to.

How Fields Get Set

In the pipeline model, fields can be set by:

  1. Previous functions: Inherited via ctx.resource() merge
  2. Current function: Via kwargs or direct assignment

Both are intentional—previous functions made deliberate choices, and so did you.

Serialization

resource.model_dump(exclude_unset=True, mode='json')

This excludes fields that are pure schema defaults (never set by any function), but includes fields set by previous functions or the current function.

What NOT to Do

Don't copy entire observed resources to desired:

# ❌ BAD - owns all fields including metadata.uid, resourceVersion, etc.
bucket = ctx.resource("bucket", observed_bucket)

# ✅ GOOD - only set the fields you care about
bucket = ctx.resource("bucket", Bucket(
    spec=BucketSpec(
        forProvider=ForProvider(
            location=observed_bucket.observed.spec.forProvider.location,
        )
    )
))

The Observable pattern prevents accidental copying—if you reference an observed field, you get either the value (for intentional use) or Observable (for dependency tracking), never an accidental wholesale copy.

Required Resources

Required resources (extra resources requested from Crossplane) are read-only references to existing cluster resources. They follow the Observable pattern but cannot be modified.

@composition.function
def compose(ctx: composition.Context):
    xr = ctx.composite(XStorageBucket)
    
    # Request existing VPCs with specific labels
    ctx.requirements.resources["existing-vpcs"] = ResourceSelector(
        api_version="ec2.aws.upbound.io/v1beta1",
        kind="VPC",
        match_labels={"env": "prod"},
    )
    
    # Access required resources (available on next invocation)
    for vpc in ctx.required_resources.get("existing-vpcs", VPC):
        if vpc.observed.status.atProvider.isDefault:
            # Use this VPC
            bucket = ctx.resource("bucket", Bucket(
                spec=BucketSpec(
                    forProvider=ForProvider(
                        project=vpc.observed.spec.forProvider.project,
                    )
                )
            ))
            break

Characteristics

  • Read-only: Changes to required resources are ignored by Crossplane
  • Observable support: Fields may return Observable if not yet available
  • Two-phase: Request via ctx.requirements, access via ctx.required_resources on next invocation

Open Questions

  1. Error handling: How to surface errors when Observable never resolves?
  2. Base class vs generation: Should observed property be in a base class or generated?

Design Decisions

  1. Cycle detection: Detect dependency cycles and produce a fatal result. The SDK should build the DAG and check for cycles before emitting.

  2. Debugging: Add an informational result to RunFunctionResponse showing the dependency DAG. Users can inspect this to understand why dependent resources aren't created yet.

  3. Required resources: Support Observable pattern (fields may not be set yet), but required resources are fundamentally read-only. Changes to required resources are ignored by Crossplane—they're for reading external state only.

  4. Merge semantics: Deep merge is required. When ctx.resource() merges previous desired with user's kwargs, nested models must be recursively merged, not shallow-replaced.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment