This document specifies a new user experience for Crossplane composition functions in Python. The goal is to provide a declarative, type-safe API that feels similar to Pulumi/Terraform while hiding protobuf complexity entirely.
import crossplane.function import composition
from .model.com.example.platform.storagebucket import v1alpha1 as storagebucketv1alpha1
from .model.io.upbound.gcp.storage.bucket import v1beta1 as bucketv1beta1
from .model.io.upbound.gcp.storage.bucketacl import v1beta1 as bucketaclv1beta1
@composition.function
def compose(ctx: composition.Context):
# XR follows same pattern as composed resources
xr = ctx.composite(storagebucketv1alpha1.XStorageBucket)
# Read from observed XR (user's input parameters)
location = xr.observed.spec.parameters.location
versioning = xr.observed.spec.parameters.versioning
acl_param = xr.observed.spec.parameters.acl
# Compose a Bucket - full IDE autocomplete on kwargs
bucket = ctx.resource("bucket", bucketv1beta1.Bucket(
spec=bucketv1beta1.BucketSpec(
forProvider=bucketv1beta1.ForProvider(
location=location,
versioning=[bucketv1beta1.VersioningItem(enabled=versioning)],
)
)
))
# Compose an ACL that depends on the Bucket's external name
# If Bucket isn't observed yet, bucket.observed.external_name returns Observable
acl = ctx.resource("acl", bucketaclv1beta1.BucketACL(
spec=bucketaclv1beta1.BucketACLSpec(
forProvider=bucketaclv1beta1.ACLForProvider(
bucket=bucket.observed.external_name, # Observable if not ready
predefinedAcl=acl_param,
)
)
))
# Update XR status with computed values
xr.status.bucketName = bucket.observed.external_name- Unified resource model: XR and composed resources follow the same pattern
- Full IDE/LSP support: Autocomplete, type hints, go-to-definition all work
- No protobuf exposure: Users never see
RunFunctionRequest,Struct, etc. - Declarative dependencies: Reference observed fields, SDK handles ordering
- SSA-safe: Only fields explicitly set are emitted to desired state
The decorator handles all protobuf translation and response building:
def function(func: Callable[[Context], None]) -> Callable[[fnv1.RunFunctionRequest, fnv1.RunFunctionResponse], None]:
"""Transform a typed composition function into a gRPC handler.
Handles:
1. Parse request → Context with typed models
2. Call user function
3. Build response:
- Detect dependency cycles → fatal result
- Walk resources for Observables → skip those with unmet deps
- Serialize clean resources → rsp.desired
- Add DAG info → rsp.results (informational)
"""
@functools.wraps(func)
def wrapper(req: fnv1.RunFunctionRequest, rsp: fnv1.RunFunctionResponse) -> None:
ctx = Context(req, rsp)
func(ctx)
ctx._build_response()
return wrapperUsers write typed Python; the decorator bridges to/from protobuf.
Every resource (XR or composed) has a .observed property that returns the same
model type, providing access to the observed state from the cluster.
| Access Pattern | Reads From | Use Case |
|---|---|---|
resource.spec.X |
Desired | What you're setting |
resource.observed.spec.X |
Observed | Read-back actual spec |
resource.observed.status.X |
Observed | Provider outputs |
resource.observed.metadata.X |
Observed | Annotations, name, uid |
The key insight enabling IDE support: .observed returns the same type as
the resource itself.
bucket → Bucket
bucket.observed → Bucket (same type!)
bucket.observed.spec.forProvider.location → str | Observable | None
This means autocomplete works uniformly through the entire access chain.
Common access patterns have shortcuts:
# These are equivalent
bucket.observed.metadata.annotations["crossplane.io/external-name"]
bucket.observed.external_name # Convenience property
# .external_name follows the same pattern as other fields:
bucket.external_name # Reads desired (what you set)
bucket.observed.external_name # Reads observed (what exists)class ForProvider(BaseModel):
location: Optional[str] = None
bucket: Optional[str] = None
versioning: Optional[List[VersioningItem]] = Nonefrom crossplane.function import Observable
from pydantic import BaseModel, ConfigDict, PrivateAttr
from typing import Self
class ForProvider(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
# Every field type gains | Observable
location: str | Observable | None = None
bucket: str | Observable | None = None
versioning: List[VersioningItem] | Observable | None = NoneTop-level resources (Bucket, XR, etc.) need additional SDK integration:
class Bucket(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
# apiVersion and kind are fixed literals - no Observable needed
apiVersion: Literal["storage.gcp.upbound.io/v1beta1"] = "storage.gcp.upbound.io/v1beta1"
kind: Literal["Bucket"] = "Bucket"
metadata: ObjectMeta | Observable | None = None
spec: BucketSpec | Observable | None = None
status: BucketStatus | Observable | None = None
# SDK integration (could be in base class or generated)
_observed: Self | None = PrivateAttr(default=None)
_name: str | None = PrivateAttr(default=None)
@property
def observed(self) -> Self:
"""Access observed state. Returns Observable-filled instance if not ready."""
if self._observed is None:
return _make_observable_instance(type(self), self._name)
return self._observed
@property
def external_name(self) -> str | Observable | None:
"""Convenience: external-name annotation from this resource's metadata."""
# Reads from self (desired or observed depending on which instance this is)
# Use bucket.observed.external_name to read from observed state
if self.metadata is None or self.metadata.annotations is None:
return None
return self.metadata.annotations.get("crossplane.io/external-name")- Add
| Observableto every field type union (exceptapiVersionandkind, which are fixed literals) - Add
model_config = ConfigDict(arbitrary_types_allowed=True)to each model - Add
_observed: Self | Noneprivate attribute to top-level resource models - Add
_name: str | Noneprivate attribute to top-level resource models - Add
observedproperty to top-level resource models - Add convenience properties (
external_name, etc.) to top-level resource models - Import
Observablefrom the SDK
Observable is a forward reference - a sentinel value representing "get
this value from that source when it becomes available."
class Observable:
"""Forward reference to a value that may not exist yet."""
def __init__(self, source_path: str):
self.source_path = source_path # e.g., "bucket.status.atProvider.id"
def __repr__(self) -> str:
return f"Observable({self.source_path!r})"
def __bool__(self) -> bool:
return False # Falsy for conditionalsWhen accessing .observed on a resource that doesn't exist in observed state:
def _make_observable_instance(model_type: Type[T], resource_name: str) -> T:
"""Create instance where all field accesses return Observable."""
# Returns a model instance where every field is Observable
# with the appropriate source path
...Example flow:
# If "bucket" doesn't exist in observed resources yet:
bucket.observed.external_name
# Returns: Observable("bucket.external_name")
bucket.observed.status.atProvider.selfLink
# Returns: Observable("bucket.status.atProvider.selfLink")When you assign an Observable to a field, you're declaring a dependency:
acl = ctx.resource("acl", BucketACL(
spec=BucketACLSpec(
forProvider=ACLForProvider(
bucket=bucket.observed.external_name, # Observable("bucket.external_name")
)
)
))This is valid because field types include | Observable:
class ACLForProvider(BaseModel):
bucket: str | Observable | None = None # Accepts ObservableWhen the SDK builds the RunFunctionResponse, it walks each resource's model tree:
def should_emit(resource: BaseModel) -> bool:
"""Check if resource has any unresolved Observable values."""
for field_value in walk_fields(resource):
if isinstance(field_value, Observable):
return False # Has unmet dependency
return True
def build_response(ctx: Context) -> RunFunctionResponse:
response = RunFunctionResponse()
for name, resource in ctx._resources.items():
if should_emit(resource):
# Convert to protobuf Struct, add to desired
response.desired.resources[name].resource.update(
resource.model_dump(exclude_unset=True, mode='json')
)
# else: skip - dependencies not ready, will be called again
return responseUnlike explicit sequencing, the dependency graph emerges naturally from Observable references:
# This code implicitly creates: vpc → subnet → security_group
vpc = ctx.resource("vpc", VPC(
spec=VPCSpec(forProvider=VPCForProvider(cidrBlock="10.0.0.0/16"))
))
subnet = ctx.resource("subnet", Subnet(
spec=SubnetSpec(forProvider=SubnetForProvider(
vpcId=vpc.observed.status.atProvider.vpcId, # Observable if VPC not ready
))
))
sg = ctx.resource("security-group", SecurityGroup(
spec=SGSpec(forProvider=SGForProvider(
vpcId=vpc.observed.status.atProvider.vpcId, # Depends on VPC
subnetId=subnet.observed.status.atProvider.id, # Depends on Subnet
))
))Reconciliation flow:
- First call: Only VPC emitted (subnet/sg have Observable dependencies)
- Second call (VPC observed): VPC + Subnet emitted (sg still has Observable)
- Third call (VPC + Subnet observed): All three emitted
The Context object is the entry point for composition:
class Context:
def composite(self, model_type: Type[T]) -> T:
"""Get the XR with .observed linked.
Initializes from req.desired.composite (previous pipeline state)
and links req.observed.composite to .observed property.
"""
instance = self._parse_desired_composite(model_type)
instance._observed = self._parse_observed_composite(model_type)
return instance
def resource(self, name: str, instance: T) -> T:
"""Register a composed resource.
Merges the provided instance with req.desired.resources[name]
(previous pipeline state) and links req.observed.resources[name]
to .observed property.
"""
# Merge with previous desired if exists
previous = self._get_desired_resource(name, type(instance))
if previous:
instance = self._merge_models(previous, instance)
instance._observed = self._get_observed_resource(name, type(instance))
instance._name = name
self._resources[name] = instance
return instance
# Request/response access
required_resources: Dict[str, List[BaseModel]] # Resources requested from Crossplane
credentials: Dict[str, Credentials] # Function credentials
input: BaseModel | None # Function input from Composition
context: Dict[str, Any] # Cross-function context
# Response building
requirements: Requirements # Request resources for next call
results: Results # Add warnings/fatals
ttl: timedelta # Response TTLCrossplane composition pipelines are cumulative. Each function builds on previous functions' work:
Function 1 Function 2 Function 3 Crossplane
│ │ │ │
│ Add resource A │ │ │
│──────────────────►│ │ │
│ │ Add resource B │ │
│ │──────────────────►│ │
│ │ │ Modify resource A │
│ │ │──────────────────►│
│ │ │ │ SSA all resources
- Request contains previous state:
req.desired.resourceshas resources from previous functions - Function builds on that:
ctx.resource()merges user's model with previous desired - Response contains full state:
rsp.desired.resourcescontains the merged result - Crossplane SSAs the final result: All functions' contributions are applied
# Function 1 set: bucket.spec.forProvider.location = "US"
# This function (Function 2) runs:
bucket = ctx.resource("bucket", Bucket(
spec=BucketSpec(
forProvider=ForProvider(
project="my-project", # Add project field
)
)
))
# bucket.spec.forProvider.location → "US" (preserved from Function 1)
# bucket.spec.forProvider.project → "my-project" (added by this function)
# Can read and modify previous function's work:
if bucket.spec.forProvider.location == "US":
bucket.spec.forProvider.storageClass = "STANDARD"
# At emit: all fields (location, project, storageClass) go to rsp.desired
# Crossplane SSAs the combined result| Access | Source | Purpose |
|---|---|---|
bucket.spec.X |
Previous desired + your changes | Cumulative desired state |
bucket.observed.X |
req.observed.resources |
Cluster state |
The distinction between "previous desired" and "your changes" is not exposed—they're merged into a single model. This keeps the API simple while supporting the pipeline model.
Crossplane uses Server-Side Apply. Only fields explicitly set should appear in desired state—otherwise we'd "own" fields we didn't intend to.
In the pipeline model, fields can be set by:
- Previous functions: Inherited via
ctx.resource()merge - Current function: Via kwargs or direct assignment
Both are intentional—previous functions made deliberate choices, and so did you.
resource.model_dump(exclude_unset=True, mode='json')This excludes fields that are pure schema defaults (never set by any function), but includes fields set by previous functions or the current function.
Don't copy entire observed resources to desired:
# ❌ BAD - owns all fields including metadata.uid, resourceVersion, etc.
bucket = ctx.resource("bucket", observed_bucket)
# ✅ GOOD - only set the fields you care about
bucket = ctx.resource("bucket", Bucket(
spec=BucketSpec(
forProvider=ForProvider(
location=observed_bucket.observed.spec.forProvider.location,
)
)
))The Observable pattern prevents accidental copying—if you reference an observed field, you get either the value (for intentional use) or Observable (for dependency tracking), never an accidental wholesale copy.
Required resources (extra resources requested from Crossplane) are read-only references to existing cluster resources. They follow the Observable pattern but cannot be modified.
@composition.function
def compose(ctx: composition.Context):
xr = ctx.composite(XStorageBucket)
# Request existing VPCs with specific labels
ctx.requirements.resources["existing-vpcs"] = ResourceSelector(
api_version="ec2.aws.upbound.io/v1beta1",
kind="VPC",
match_labels={"env": "prod"},
)
# Access required resources (available on next invocation)
for vpc in ctx.required_resources.get("existing-vpcs", VPC):
if vpc.observed.status.atProvider.isDefault:
# Use this VPC
bucket = ctx.resource("bucket", Bucket(
spec=BucketSpec(
forProvider=ForProvider(
project=vpc.observed.spec.forProvider.project,
)
)
))
break- Read-only: Changes to required resources are ignored by Crossplane
- Observable support: Fields may return Observable if not yet available
- Two-phase: Request via
ctx.requirements, access viactx.required_resourceson next invocation
- Error handling: How to surface errors when Observable never resolves?
- Base class vs generation: Should
observedproperty be in a base class or generated?
-
Cycle detection: Detect dependency cycles and produce a fatal result. The SDK should build the DAG and check for cycles before emitting.
-
Debugging: Add an informational result to
RunFunctionResponseshowing the dependency DAG. Users can inspect this to understand why dependent resources aren't created yet. -
Required resources: Support Observable pattern (fields may not be set yet), but required resources are fundamentally read-only. Changes to required resources are ignored by Crossplane—they're for reading external state only.
-
Merge semantics: Deep merge is required. When
ctx.resource()merges previous desired with user's kwargs, nested models must be recursively merged, not shallow-replaced.