Skip to content

Instantly share code, notes, and snippets.

@darkowlzz
Last active March 24, 2023 20:19
Show Gist options
  • Save darkowlzz/02cbdeaa3e5d05bdc620515a66311684 to your computer and use it in GitHub Desktop.
Save darkowlzz/02cbdeaa3e5d05bdc620515a66311684 to your computer and use it in GitHub Desktop.
controller-patterns (flux)

Controller Patterns

This document describes some controller patterns built on top of the controller-runtime/kubebuilder tooling to help simplify how the controller code is written and organized. It does that by creating some abstractions and patterns to reduce the cognitive load while reading and writing controller code. Some of these are inspired by the ideas from the cluster API (CAPI) controllers, modified to fit the needs of flux controllers because of the differences in the domain of operations. Familiarity with controller code structure created by the kubebuilder scaffold is assumed.

NOTE: Some of the recommendations and best practices are highlighted in bold and italic.

Control Loop (Reconciler)

In controller-runtime controllers, the main control loop, Reconcile(), is an event handler. A controller subscribes to events for objects it manages and the control loop processes the objects, whenever it receives an event, to reconcile the object to the desired state. The status of the reconciliation is reported by updating the status API of the object. At the end of a reconciliation, the control loop can either return a successful result (desired state = actual state), ask for a requeue in order to wait for certain condition, or, in case of any failure, return an error and attempt a retry to resolve the failure.

An example of a controller's Reconcile() method generated by the kubebuilder scaffold looks like:

func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	_ = log.FromContext(ctx)

	// TODO(user): your logic here

	return ctrl.Result{}, nil
}

Request contains the details about the event that the reconciler handles. It contains the Namespace/Name of the event source, which can be used to fetch the source object.

Result contains the result of a reconcile invocation. It contains information about the reconciliation, whether it succeeded, or needs to be retried. The Result is composed of Result.Requeue and Result.RequeueAfter. Result.Requeue is a boolean, used to request an immediate requeue. Result.RequeueAfter is a duration, used to request a requeue after a certain duration. Only one of Requeue or RequeueAfter should be used. RequeueAfter takes precedence over Requeue when both are set.

The Reconcile() method also returns an error, which is used to signal reconciliation failure, which results in a retry. It is similar to setting Result.Requeue with value true. When there's an error, the Result value is ignored, so there's no need to set Result when error is not nil, refer controller-runtime/pkg/internal/controller/controller.go.

Status reporting

While reconciling an object, the reconciler performs actions to bring the state of the world closer to the declared state in the object's spec. The state of the world may or may not be reconciled with the declared state within the lifetime of a reconciliation. It may take a few reconciliations before the desired state is achieved. Since the reconciler is responsible for the object it operates on, it is also responsible for reporting the status of the object in the object status API. The object status tells an observer about the status of an object, whether it's ready, succeeded in what it's supposed to do, not ready, failed, stalled, etc. This can be done by patching the object's status. For patching, the change in the object compared with the object in the K8s API server needs to be calculated. For flux controllers, there are some helpers for this.

In the Reconcile(), a patch helper can be initialized with the initial version of the object, before processing it:

...
import (
	...
	"github.com/fluxcd/pkg/runtime/patch"
	...
)

func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	// Fetch the latest version of the object.
	...

	// Initialize the patch helper with the current version of the object.
	patchHelper, err := patch.NewHelper(obj, r.Client)
	if err != nil {
		return ctrl.Result{}, err
	}

	// Process the object and update the status of the object.
	...

	// Patch the object in the K8s API server using the patch helper.
	if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
		...
	}
	...
}

After processing the object, the updated object status can be saved in the K8s API server.

When using this pattern, it's better to patch the object status once, with all the aggregated updates at the end of a reconciliation. Go's defer statement can be very useful for this purpose. The final Patch() can be called in a deferred block.

Summarizing and Patching

The status of an object can be populated with various information. Depending on the controller, there may be certain expectations from the status. For example, kstatus defines certain standards for the object status. Conforming to such standards makes it easier to integrate with other systems. A client compatible with kstatus can read any object status that conforms to kstatus.

For flux controllers, there are some status condition helpers that can be used to construct kstatus compatible object status. These are based on similar helpers in the CAPI project, modified to work with flux objects. Refer: https://pkg.go.dev/github.com/fluxcd/pkg/[email protected]/conditions .

In Reconcile(), along with patching status at the very end, we may need to perform certain operations like summarizing the result, logging, emitting events, etc. In order to not clutter the body of Reconcile(), these operations can be grouped into a separate function, say summarizeAndPatch(), which also makes these operations testable independently of the main reconciliation code.

func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
	...

	defer func() {
		result, retErr = r.summarizeAndPatch(ctx, obj, patchHelper, recResult, retErr)
		...
	}()

	...
}

func (r *FooReconciler) summarizeAndPatch(ctx context.Context, obj *Foo, patchHelper *patch.Helper, res sreconcile.Result, recErr error) (ctrl.Result, error) {
	// Log, emit event, summarize the result, etc.
	...

	// Finally, patch the resource.
	if err := patchHelper.Patch(ctx, obj); err != nil {
		...
	}
	...
}

Note that the Reconcile() in the above code uses named return values to set the result and error in the deferred block. The summarizeAndPatch() method takes a few arguments and return the final reconciler results. These are related to the result and error of the reconciliation. More details about them in the following sections.

The next patterns involves the results of reconcilers and how they are used with the pattern described above.

Results of Reconciler

In a Reconcile function, the result can be returned from anywhere in the function, based on certain conditions. These results have different meanings which are crucial for proper functioning of the controller. It's important to return appropriate result under different conditions.

Usually the Reconciler result can be used to compose the following types of results:

  1. Success
  2. Failure
  3. Immediate requeue
  4. Wait and requeue

They may have different meanings for different controllers. In case of flux controllers, a Successful reconciliation returns a result with Result.RequeueAfter set to some interval. Following describes in detail what all the results mean for flux controllers. These can also be applied to other controllers in general, but the examples are used from flux controllers.

Success

All the flux objects have a Spec.Interval property which is used to set the period at which flux checks for any updates, in case of no object update event from K8s API server. A successful Result contains the RequeueAfter value set to Spec.Interval value.

A successful result returned from Reconcile() looks like:

return ctrl.Result{RequeueAfter: obj.GetInterval().Duration}, nil

Failure

A failure result returned from Reconcile() due to an error looks like:

return ctrl.Result{}, err

Immediate Requeue

For cases when the controller needs to perform an immediate requeue, the Result.Requeue value is set to true:

return ctrl.Result{Requeue: true}, nil

Wait and Requeue

For cases when there's no failure, no error, but the controller needs to wait for certain conditions to be satisfied, the controller can ask for a requeue after a certain period by setting Result.RequeueAfter to some duration.

return ctrl.Result{RequeueAfter: 10*time.Second}, nil

This is different from success result because of the value of Result.RequeueAfter, which may or may not match with Spec.Interval.

Result abstraction

The reconciler result, as described above, has too many small details in them that could make it harder to write correctly and read easily. In order to reduce the cognitive load when thinking about results, these results can be simplified by adding a layer of abstraction to represent the different results more explicitly and clearly.

A new internal Result type can be introduced to represent the same types of results with constants:

const (
	// ResultEmpty indicates a reconcile result which does not requeue.
	ResultEmpty Result = iota
	// ResultRequeue indicates a reconcile result which should immediately
	// requeue.
	ResultRequeue
	// ResultSuccess indicates a reconcile result which should be
	// requeued on the interval as defined on the reconciled object.
	ResultSuccess
)

Using these constants in flux controllers, the return from Reconcile() can be written as:

  • Successful result:
return ResultSuccess, nil
  • Failure result:
return ResultEmpty, err
  • Immediate requeue result:
return ResultRequeue, nil

NOTE: Although it would be considered idiomatic go to return nil instead of ResultEmpty when there's no result, the underlying type of these constants is int, which can't be nil. There could be alternate implementations of this using interfaces and more custom types that may make it feel like more idiomatic go.

When using these constants, the abstracted results can be passed to a helper function to build and return the result that controller-runtime understands. This can be done in a function, say BuildRuntimeResult():

func BuildRuntimeResult(successInterval time.Duration, rr Result, err error) ctrl.Result {
	switch rr {
	case ResultRequeue:
		return ctrl.Result{Requeue: true}
	case ResultSuccess:
		return ctrl.Result{RequeueAfter: successInterval}
	default:
		return ctrl.Result{}
	}
}

For the fourth type of result, wait and requeue, since the RequeueAfter period could be any arbitrary duration, it's hard to put the duration information in result when we replace the result with constants. The accompanying error can be used to express this information. The error being dynamic, non-constant, can be used to store such information.

Before continuing, the above described BuildRuntimeResult() can be used in summarizeAndPatch() as:

func (r *FooReconciler) summarizeAndPatch(ctx context.Context, obj *Foo, patchHelper *patch.Helper, res sreconcile.Result, recErr error) (ctrl.Result, error) {
	...

	// res - is of the abstracted result constant type.
	// recErr - is the reconciliation error to be returned.
	result := BuildRuntimeResult(obj.GetRequeueAfter(), res, recErr)
	...

	// Rest of the summarizeAndPatch.

	return result, recErr
}

Using custom errors to express the result

The result of a Reconcile() consists of a Result and an error. The error has a special meaning to the runtime. If the error is not nil, the runtime logs the error message and puts the event back into the work queue to be processed again. Since we have defined an abstration layer between the returned values that the runtime expects and the reconcile results, we can also define more contextual errors and handle them accordingly. This introduces the ability to be more expressive in the returned result with context specific meanings.

For example, to express that a controller needs to wait before it can further process an object but without treating this as an actual runtime error, a Waiting error can be returned with information about the reason for the wait, wait duration, etc. In summarizeAndPatch() we can analyze the error and return an appropriate runtime result with the RequeueAfter value.

Similarly, expressing that an object has entered a stalled state becomes much easier with the abstraction. A context specific error, say Stalling error, can be returned with information about the reason for the stalled state. The summarizeAndPatch() can handle this error accordingly. Since a stalled state means that the controller can't process the object with the current configuration any further, requeuing isn't needed, but the users have to be notified about the state. So, summarizeAndPatch() can log and emit event about the stalled state, notifying the user, and swallow the error to prevent requeuing by the runtime.

summarizeAndPatch() can be used like a middleware to add custom handling of the results returned from the reconciler, without cluttering the main reconcile code with custom result transformations to fit the expectations of the runtime. Any common or repeated patterns around the returned results can be added in this layer.

Contextual error examples

As described above, abstracting the result of a reconciliation introduces the ability to express more in context specific ways. Following are examples of some context specific errors with details about how to implement and use them.

Waiting error

When a reconciler needs to wait for a condition to be true before further processing an object, it needs to wait and requeue. This wait could be different from the object reconcile interval. For example, an object can be configured to reconcile every 30 minutes, but when the reconciliation is in progress, it may depend on some other condition to be true, it needs to wait for a minute and retry. In such situations, the reconciler needs to express to the runtime that it's not an actual error, but it needs to wait and retry processing after some time.

This can be defined as:

// Waiting is the reconciliation wait state error. It contains an error, wait
// duration and a reason for the wait.
type Waiting struct {
	// RequeueAfter is the wait duration.
	RequeueAfter time.Duration
	// Err is the error that caused the wait.
	Err error
	// Reason is the reason for waiting. It can be used in status
	// conditions and events.
	Reason string
}

// Error implements error interface.
func (se *Wait) Error() string {
	return se.Err.Error()
}

This can be used in result of reconcile as:

return ResultEmpty, &Wait{
	Err: fmt.Errorf("component X not ready: %w", err)
	RequeueAfter: time.Minute,
	Reason: "ComponentNotReady",
}

Since a waiting error contributes to expressing the result, we can modify the result in BuildRuntimeResult() accordingly when a waiting error is received:

func BuildRuntimeResult(successInterval time.Duration, rr Result, err error) ctrl.Result {
	// Handle special errors that contribute to expressing the result.
	if e, ok := err.(*serror.Waiting); ok {
		return ctrl.Result{RequeueAfter: e.RequeueAfter}
	}

	// Direct conversion of result constants to runtime result.
	switch rr {
	case ResultRequeue:
		return ctrl.Result{Requeue: true}
	case ResultSuccess:
		return ctrl.Result{RequeueAfter: successInterval}
	default:
		return ctrl.Result{}
	}
}

The reconcile error can also be modified accordingly, in summarizeAndPatch():

	// Analyze the reconcile error.
	switch t := recErr.(type) {
	...
	case *serror.Waiting:
		// The reconciler needs to wait and retry. Return no error.
		return result, nil

Stalling error

An object enters into a stalled state when the controller can't process the object in its current state and retrying would not change anything. This requires human intervention to update the configuration that the controller can process further. For example, an invalid input, or conflicting inputs, etc.

This can be defined as:

// Stalling is the reconciliation stalled state error. It contains an error
// and a reason for the stalled condition.
type Stalling struct {
	// Reason is the stalled condition reason string.
	Reason string
	// Err is the error that caused stalling. This can be used as the message in
	// stalled condition.
	Err error
}

// Error implements error interface.
func (se *Stalling) Error() string {
	return se.Err.Error()
}

This can be used in result of reconcile as:

return ResultEmpty, &Stalling{
  Err: fmt.Errorf("invalid configuration: %w", err)
  Reason: "InvalidConfig",
}

This error can be handled in summarizeAndPatch() as:

	...
	// Analyze the reconcile error.
	switch t := recErr.(type) {
	case *serror.Stalling:
		if res == ResultEmpty {
			return result, nil
		}
	...
}

The error is swallowed by returning nil. Stalled result shouldn't result in a requeue.

Event error

When an error is returned to the runtime, the runtime logs the error internally and puts the object in a work queue to be processed again. Depending on the environment and cluster setup, we may want to emit an event for an error so that it's recorded in K8s event stream. We can express this with an Event type error with context specific handlers for it.

This can be defined as:

// Event is an error event. It can be used to construct an event to be
// recorded.
type Event struct {
	// Reason to be used in the emitted event.
	Reason string
	// Err is the actual error.
	Err    error
}

// Implement the error interface.
...

This can be used in result of reconcile as:

return ResultEmpty, &Event{
  Err: fmt.Errorf("x failed:%w", err)
  Reason: "Failed"
}

In summarizeAndPatch(), the contextual errors can be recorded by logging or event emitting, based on the type of the error. A helper function for this would look like:

func RecordContextualError(ctx context.Context, recorder kuberecorder.EventRecorder, obj runtime.Object, err error) {
	switch e := err.(type) {
	case *serror.Event:
		recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
	case *serror.Waiting:
		// Waiting errors are not returned to the runtime. Log it explicitly.
		ctrl.LoggerFrom(ctx).Info("reconciliation waiting", "reason", e.Err, "duration", e.RequeueAfter)
		recorder.Event(obj, corev1.EventTypeNormal, e.Reason, e.Error())
	case *serror.Stalling:
		// Stalling errors are not returned to the runtime. Log it explicitly.
		ctrl.LoggerFrom(ctx).Error(e, "reconciliation stalled")
		recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
	}
}

The helper handles recording the errors for different types of contextual errors. This can be called from summarizeAndPatch().

An event is emitted for the error and the result is returned as usual by the rest of the summarizeAndPatch().

These are a few examples of custom errors that can help express contextual information and help handle them accordingly. summarizeAndPatch() serves as a middleware between the core reconciler and the runtime.

Computing Reconcile Result

Based on the above example of contextual errors, we can create a function to consolidate the computation of result, say ComputeReconcileResult():

func ComputeReconcileResult(obj runtime.Object, successInterval time.Duration, res Result, recErr error) (ctrl.Result, error) {
	// Summarize and set the object status.
	...

	result := BuildRuntimeResult(successInterval, res, recErr)

	// Analyze the reconcile error.
	switch t := recErr.(type) {
	case *serror.Stalling:
		// Result must be empty for the object to be in stalled state.
		if res == ResultEmpty {
			return result, nil
		}
	case *serror.Waiting:
		// The reconciler needs to wait and requeue. Return no error.
		return result, nil
	default:
		...
	}
	return result, recErr
}

This is a simplified version of the actual ComputeReconcileResult() that's used in some of the controllers. Using this, the summarizeAndPatch() becomes:

func (r *FooReconciler) summarizeAndPatch(ctx context.Context, obj *Foo, patchHelper *patch.Helper, res sreconcile.Result, recErr error) (ctrl.Result, error) {
	...

	// res - is of the abstracted result constant type.
	// recErr - is the reconciliation error to be returned.
	var result ctrl.Result
	result, recErr = ComputeReconcileResult(obj, obj.GetRequeueAfter(), res, recErr)
	...

	// Patching operation.
	...

	return result, recErr
}

These abstrations for reconciler results are important because they are crucial in defining the behavior of the controller. In the following sections, these result abstractions are used extensively in various stages of reconciliation.

Sub-reconcilers

The Reconcile() method is the main function of a controller. It can easily get complex, hard to test and maintain. A well known pattern is to create another method reconcile(), which contains most of the core business logic. Although reconcile() makes the code more testable, in certain situations, it's still not easy to maintain all the business logic and also adhere to certain controller standards in one long function. For example, when a controller implements status conditions, each step of the reconciliation may have a corresponding status condition. The conditions have to be added and removed based on domain specific conditions. In case of flux source-controller, when an artifact is outdated, ArtifactOutdated condition is added to the object. But at the end of the reconciliation, if a new artifact was created successfully, the ArtifactOutdated condition has to be removed. Mixing multiple steps in reconcile() and keeping them in order could be difficult.

The business logic in reconcile() can be divided into multiple smaller functions, let's call them sub-reconcilers. These sub-reconcilers can implement each step of the reconciliation and return the result. reconcile() can iterate through these sub-reconcilers, collect the results and return them to the main reconciler. Since the sub-reconcilers are sequential steps, they can be run sequentially. This is inspired by a pattern seen in the CAPI project.

In order to simplify how reconcile() handles the sub-reconcilers, a custom type can be defined for the sub-reconcilers, for example:

type reconcilerFunc func(ctx context.Context, obj client.Object, artifact *Artifact) (Result, error)

All the sub-reconcilers can implement this function type.

The reconcile() can then iterate through all the sub-reconcilers, for example:

func (r *FooReconciler) reconcile(ctx context.Context, obj *client.Object, reconcilers []reconcilerFunc) (Result, error) {
        ...

	var artifact Artifact

	// Run the sub-reconcilers and build the result of reconciliation.
	var res sreconcile.Result
	var resErr error
	for _, rec := range reconcilers {
		recResult, err := rec(ctx, obj, &artifact)
		// Exit immediately on ResultRequeue.
		if recResult == sreconcile.ResultRequeue {
			return sreconcile.ResultRequeue, nil
		}
		// If an error is received, prioritize the returned results because an
		// error also means immediate requeue.
		if err != nil {
			resErr = err
			res = recResult
			break
		}
		// Prioritize requeue request in the result.
		res = lowestRequeuingResult(res, recResult)
	}
	return res, resErr
}

In the above example, reconcile() receives a list of sub-reconcilers, reconcilers, and it executes them, processing the results from each of the sub-reconcilers.

The result and error from the sub-reconcilers are handled accordingly, ResultRequeue (immediate requeue) is prioritized, the non-immediate requeue results are prioritized based on their values.

Following is an example of a sub-reconciler:

func (r *FooReconciler) reconcileStorage(ctx context.Context, obj client.Object, artifact *Artifact) (Result, error) {
	// Garbage collect previous advertised artifact(s) from storage
	_ = r.garbageCollect(ctx, obj)

	// Determine if the advertised artifact is still in storage
	if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
		obj.Status.Artifact = nil
		obj.Status.URL = ""
	}

        ...

	return sreconcile.ResultSuccess, nil
}

Such sub-reconcilers can be easily tested independently for correctness, creating separation of concern between the steps in the whole reconciliation.

Sub-reconciliation results

Each of the sub-reconcilers perform some core step in the whole reconciliation process of an object. These sub-reconcilers can adopt the Results abstractions discussed above. The reconcile() function respects the results returned in each of the sub-reconcilers.

A sub-reconcile function can have multiple return statements. It's better to catch any abnormalities and failures early in the function and end the function with a successful result return, for example:

func (r *fooReconciler) reconcileSource(ctx context.Context, obj client.Object, artifact *Artifact) (Result, error) {
	...
	if err := foo(); err != nil {
		return sreconcile.ResultEmpty, &serror.Event{Err: err, Reason: meta.FailedReason}
	}

	...

	return sreconcile.ResultSuccess, nil
}

Status conditions in sub-reconcilers

Status conditions are a way of reporting the status of an object in the API. Each of the sub-reconcilers can have a domain specific status condition that they add to the object being reconciled. Most of the time, the same sub-reconciler removes a status condition from the object on successful operation.

For example, sub-reconciler reconcileSource() fetches the source as per the given configuration. When there's no failure, no condition is added on the object. But when there's a failure to fetch the source, reconcileSource() can add a status condition FetchFailedCondition on the object to indicate the failure.

For example, when fetching a git repo source in reconcileSource():

// Checkout HEAD of reference in object
gitCtx, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
commit, err := checkoutStrategy.Checkout(gitCtx, dir, obj.Spec.URL, authOpts)
if err != nil {
	e := &serror.Event{
		Err:    fmt.Errorf("failed to checkout and determine revision: %w", err),
		Reason: sourcev1.GitOperationFailedReason,
	}
	conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, e.Err.Error())
	return sreconcile.ResultEmpty, e
}
r.eventLogf(ctx, obj, corev1.EventTypeNormal, sourcev1.GitOperationSucceedReason,
	"cloned repository '%s' and checked out revision '%s'", obj.Spec.URL, commit.String())
conditions.Delete(obj, sourcev1.FetchFailedCondition)

In the above example, an Event error is created with the checkout error. This is used to set a FetchFailedCondition condition on the object before returning the error. But when the checkout is successful, a successful message is logged, followed by removal of any FetchFailedCondition from the object.

Similarly, each sub-reconciler can be responsible for their status conditions.

Logging and emitting events

Logging and emitting events are ways to record the events at any point in time. Since it's up to the cluster administrators to set up log and event aggregators, it's hard for the controller to decide what to log and what to emit events for. It's much safer to log and emit event by default, unless there's a good reason to not do so. To enable this, we can provide helpers that allow logging and event emitting at the same time.

Since an event requires extra information compared to a log, the helper can have a front-end of an event recorder function. Internally, the event metadata can be used to log the message appropriately.

func (r *FooReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
	msg := fmt.Sprintf(messageFmt, args...)
	if eventType == corev1.EventTypeWarning {
		ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg)
	} else {
		ctrl.LoggerFrom(ctx).Info(msg)
	}
	r.Eventf(obj, eventType, reason, msg)
}

NOTE: This can be moved into a separate helper package.

This can be used to log and emit event by:

r.eventLogf(ctx, obj, corev1.EventTypeNormal, sourcev1.GitOperationSucceedReason,
		"cloned repository '%s' and checked out revision '%s'", obj.Spec.URL, commit.String())

Trace events

Package fluxcd/pkg/runtime/events has an event recorder that's compatible with the K8s upstream EventRecorder. It adds a few flux specific behaviors to the events. It emits an event to the K8s API server, sends a notification request to the flux notification-controller and logs the emitted event if debug logs are enabled. This package also defines a new event type EventTypeTrace which is used to record the event in K8s only and not forward the event as a notification to the notification-controller.

In the eventLogf() code in the previous section, the r.Eventf() can be this new event recorder. A trace event, that shouldn't result in a notification can be sent as:

r.Eventf(obj, events.EventTypeTrace, reason, msg)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment