This document describes some controller patterns built on top of the controller-runtime/kubebuilder tooling to help simplify how the controller code is written and organized. It does that by creating some abstractions and patterns to reduce the cognitive load while reading and writing controller code. Some of these are inspired by the ideas from the cluster API (CAPI) controllers, modified to fit the needs of flux controllers because of the differences in the domain of operations. Familiarity with controller code structure created by the kubebuilder scaffold is assumed.
NOTE: Some of the recommendations and best practices are highlighted in bold and italic.
In controller-runtime controllers, the main control loop, Reconcile()
, is an
event handler. A controller subscribes to events for objects it manages and the
control loop processes the objects, whenever it receives an event, to reconcile
the object to the desired state. The status of the reconciliation is reported
by updating the status API of the object. At the end of a reconciliation, the
control loop can either return a successful result (desired state = actual
state), ask for a requeue in order to wait for certain condition, or, in case
of any failure, return an error and attempt a retry to resolve the failure.
An example of a controller's Reconcile()
method generated by the kubebuilder
scaffold looks like:
func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
// TODO(user): your logic here
return ctrl.Result{}, nil
}
Request
contains the details about the event that the reconciler
handles. It contains the Namespace/Name of the event source, which can be used
to fetch the source object.
Result
contains the result of a reconcile invocation. It contains information
about the reconciliation, whether it succeeded, or needs to be retried. The
Result
is composed of Result.Requeue
and Result.RequeueAfter
.
Result.Requeue
is a boolean, used to request an immediate requeue.
Result.RequeueAfter
is a duration, used to request a requeue after a certain
duration. Only one of Requeue
or RequeueAfter
should be used.
RequeueAfter
takes precedence over Requeue
when both are set.
The Reconcile()
method also returns an error, which is used to signal
reconciliation failure, which results in a retry. It is similar to setting
Result.Requeue
with value true
. When there's an error, the Result
value
is ignored, so there's no need to set Result
when error is not nil,
refer controller-runtime/pkg/internal/controller/controller.go.
While reconciling an object, the reconciler performs actions to bring the state of the world closer to the declared state in the object's spec. The state of the world may or may not be reconciled with the declared state within the lifetime of a reconciliation. It may take a few reconciliations before the desired state is achieved. Since the reconciler is responsible for the object it operates on, it is also responsible for reporting the status of the object in the object status API. The object status tells an observer about the status of an object, whether it's ready, succeeded in what it's supposed to do, not ready, failed, stalled, etc. This can be done by patching the object's status. For patching, the change in the object compared with the object in the K8s API server needs to be calculated. For flux controllers, there are some helpers for this.
In the Reconcile()
, a patch helper can be initialized with the initial
version of the object, before processing it:
...
import (
...
"github.com/fluxcd/pkg/runtime/patch"
...
)
func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Fetch the latest version of the object.
...
// Initialize the patch helper with the current version of the object.
patchHelper, err := patch.NewHelper(obj, r.Client)
if err != nil {
return ctrl.Result{}, err
}
// Process the object and update the status of the object.
...
// Patch the object in the K8s API server using the patch helper.
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
...
}
...
}
After processing the object, the updated object status can be saved in the K8s API server.
When using this pattern, it's better to patch the object status once, with all
the aggregated updates at the end of a reconciliation. Go's defer
statement
can be very useful for this purpose. The final Patch()
can be called in a
deferred block.
The status of an object can be populated with various information. Depending on the controller, there may be certain expectations from the status. For example, kstatus defines certain standards for the object status. Conforming to such standards makes it easier to integrate with other systems. A client compatible with kstatus can read any object status that conforms to kstatus.
For flux controllers, there are some status condition helpers that can be used to construct kstatus compatible object status. These are based on similar helpers in the CAPI project, modified to work with flux objects. Refer: https://pkg.go.dev/github.com/fluxcd/pkg/[email protected]/conditions .
In Reconcile()
, along with patching status at the very end, we may need to
perform certain operations like summarizing the result, logging, emitting
events, etc. In order to not clutter the body of Reconcile()
, these
operations can be grouped into a separate function, say summarizeAndPatch()
,
which also makes these operations testable independently of the main
reconciliation code.
func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
...
defer func() {
result, retErr = r.summarizeAndPatch(ctx, obj, patchHelper, recResult, retErr)
...
}()
...
}
func (r *FooReconciler) summarizeAndPatch(ctx context.Context, obj *Foo, patchHelper *patch.Helper, res sreconcile.Result, recErr error) (ctrl.Result, error) {
// Log, emit event, summarize the result, etc.
...
// Finally, patch the resource.
if err := patchHelper.Patch(ctx, obj); err != nil {
...
}
...
}
Note that the Reconcile()
in the above code uses named return values to set
the result and error in the deferred block. The summarizeAndPatch()
method
takes a few arguments and return the final reconciler results. These are
related to the result and error of the reconciliation. More details about them
in the following sections.
The next patterns involves the results of reconcilers and how they are used with the pattern described above.
In a Reconcile function, the result can be returned from anywhere in the function, based on certain conditions. These results have different meanings which are crucial for proper functioning of the controller. It's important to return appropriate result under different conditions.
Usually the Reconciler result can be used to compose the following types of results:
- Success
- Failure
- Immediate requeue
- Wait and requeue
They may have different meanings for different controllers. In case of flux
controllers, a Successful reconciliation returns a result with
Result.RequeueAfter
set to some interval. Following describes in detail what
all the results mean for flux controllers. These can also be applied to other
controllers in general, but the examples are used from flux controllers.
All the flux objects have a Spec.Interval
property which is used to set
the period at which flux checks for any updates, in case of no object update
event from K8s API server. A successful Result
contains the RequeueAfter
value set to Spec.Interval
value.
A successful result returned from Reconcile()
looks like:
return ctrl.Result{RequeueAfter: obj.GetInterval().Duration}, nil
A failure result returned from Reconcile()
due to an error looks like:
return ctrl.Result{}, err
For cases when the controller needs to perform an immediate requeue, the
Result.Requeue
value is set to true
:
return ctrl.Result{Requeue: true}, nil
For cases when there's no failure, no error, but the controller needs to wait
for certain conditions to be satisfied, the controller can ask for a requeue
after a certain period by setting Result.RequeueAfter
to some duration.
return ctrl.Result{RequeueAfter: 10*time.Second}, nil
This is different from success result because of the value of
Result.RequeueAfter
, which may or may not match with Spec.Interval
.
The reconciler result, as described above, has too many small details in them that could make it harder to write correctly and read easily. In order to reduce the cognitive load when thinking about results, these results can be simplified by adding a layer of abstraction to represent the different results more explicitly and clearly.
A new internal Result
type can be introduced to represent the same types of
results with constants:
const (
// ResultEmpty indicates a reconcile result which does not requeue.
ResultEmpty Result = iota
// ResultRequeue indicates a reconcile result which should immediately
// requeue.
ResultRequeue
// ResultSuccess indicates a reconcile result which should be
// requeued on the interval as defined on the reconciled object.
ResultSuccess
)
Using these constants in flux controllers, the return from Reconcile()
can be
written as:
- Successful result:
return ResultSuccess, nil
- Failure result:
return ResultEmpty, err
- Immediate requeue result:
return ResultRequeue, nil
NOTE: Although it would be considered idiomatic go to return nil
instead
of ResultEmpty
when there's no result, the underlying type of these constants
is int
, which can't be nil. There could be alternate implementations of
this using interfaces and more custom types that may make it feel like more
idiomatic go.
When using these constants, the abstracted results can be passed to a helper
function to build and return the result that controller-runtime understands.
This can be done in a function, say BuildRuntimeResult()
:
func BuildRuntimeResult(successInterval time.Duration, rr Result, err error) ctrl.Result {
switch rr {
case ResultRequeue:
return ctrl.Result{Requeue: true}
case ResultSuccess:
return ctrl.Result{RequeueAfter: successInterval}
default:
return ctrl.Result{}
}
}
For the fourth type of result, wait and requeue, since the RequeueAfter
period could be any arbitrary duration, it's hard to put the duration
information in result when we replace the result with constants. The
accompanying error can be used to express this information. The error being
dynamic, non-constant, can be used to store such information.
Before continuing, the above described BuildRuntimeResult()
can be used in
summarizeAndPatch()
as:
func (r *FooReconciler) summarizeAndPatch(ctx context.Context, obj *Foo, patchHelper *patch.Helper, res sreconcile.Result, recErr error) (ctrl.Result, error) {
...
// res - is of the abstracted result constant type.
// recErr - is the reconciliation error to be returned.
result := BuildRuntimeResult(obj.GetRequeueAfter(), res, recErr)
...
// Rest of the summarizeAndPatch.
return result, recErr
}
The result of a Reconcile()
consists of a Result
and an error. The error
has a special meaning to the runtime. If the error is not nil, the runtime logs
the error message and puts the event back into the work queue to be processed
again. Since we have defined an abstration layer between the returned values
that the runtime expects and the reconcile results, we can also define more
contextual errors and handle them accordingly. This introduces the ability to
be more expressive in the returned result with context specific meanings.
For example, to express that a controller needs to wait before it can further
process an object but without treating this as an actual runtime error, a
Waiting
error can be returned with information about the reason for the wait,
wait duration, etc. In summarizeAndPatch()
we can analyze the error and
return an appropriate runtime result with the RequeueAfter
value.
Similarly, expressing that an object has entered a stalled state becomes much
easier with the abstraction. A context specific error, say Stalling
error,
can be returned with information about the reason for the stalled state. The
summarizeAndPatch()
can handle this error accordingly. Since a stalled state
means that the controller can't process the object with the current
configuration any further, requeuing isn't needed, but the users have to be
notified about the state. So, summarizeAndPatch()
can log and emit event
about the stalled state, notifying the user, and swallow the error to prevent
requeuing by the runtime.
summarizeAndPatch()
can be used like a middleware to add custom handling of
the results returned from the reconciler, without cluttering the main reconcile
code with custom result transformations to fit the expectations of the runtime.
Any common or repeated patterns around the returned results can be added in
this layer.
As described above, abstracting the result of a reconciliation introduces the ability to express more in context specific ways. Following are examples of some context specific errors with details about how to implement and use them.
When a reconciler needs to wait for a condition to be true before further processing an object, it needs to wait and requeue. This wait could be different from the object reconcile interval. For example, an object can be configured to reconcile every 30 minutes, but when the reconciliation is in progress, it may depend on some other condition to be true, it needs to wait for a minute and retry. In such situations, the reconciler needs to express to the runtime that it's not an actual error, but it needs to wait and retry processing after some time.
This can be defined as:
// Waiting is the reconciliation wait state error. It contains an error, wait
// duration and a reason for the wait.
type Waiting struct {
// RequeueAfter is the wait duration.
RequeueAfter time.Duration
// Err is the error that caused the wait.
Err error
// Reason is the reason for waiting. It can be used in status
// conditions and events.
Reason string
}
// Error implements error interface.
func (se *Wait) Error() string {
return se.Err.Error()
}
This can be used in result of reconcile as:
return ResultEmpty, &Wait{
Err: fmt.Errorf("component X not ready: %w", err)
RequeueAfter: time.Minute,
Reason: "ComponentNotReady",
}
Since a waiting error contributes to expressing the result, we can modify the
result in BuildRuntimeResult()
accordingly when a waiting error is received:
func BuildRuntimeResult(successInterval time.Duration, rr Result, err error) ctrl.Result {
// Handle special errors that contribute to expressing the result.
if e, ok := err.(*serror.Waiting); ok {
return ctrl.Result{RequeueAfter: e.RequeueAfter}
}
// Direct conversion of result constants to runtime result.
switch rr {
case ResultRequeue:
return ctrl.Result{Requeue: true}
case ResultSuccess:
return ctrl.Result{RequeueAfter: successInterval}
default:
return ctrl.Result{}
}
}
The reconcile error can also be modified accordingly, in summarizeAndPatch()
:
// Analyze the reconcile error.
switch t := recErr.(type) {
...
case *serror.Waiting:
// The reconciler needs to wait and retry. Return no error.
return result, nil
An object enters into a stalled state when the controller can't process the object in its current state and retrying would not change anything. This requires human intervention to update the configuration that the controller can process further. For example, an invalid input, or conflicting inputs, etc.
This can be defined as:
// Stalling is the reconciliation stalled state error. It contains an error
// and a reason for the stalled condition.
type Stalling struct {
// Reason is the stalled condition reason string.
Reason string
// Err is the error that caused stalling. This can be used as the message in
// stalled condition.
Err error
}
// Error implements error interface.
func (se *Stalling) Error() string {
return se.Err.Error()
}
This can be used in result of reconcile as:
return ResultEmpty, &Stalling{
Err: fmt.Errorf("invalid configuration: %w", err)
Reason: "InvalidConfig",
}
This error can be handled in summarizeAndPatch()
as:
...
// Analyze the reconcile error.
switch t := recErr.(type) {
case *serror.Stalling:
if res == ResultEmpty {
return result, nil
}
...
}
The error is swallowed by returning nil. Stalled result shouldn't result in a requeue.
When an error is returned to the runtime, the runtime logs the error internally and puts the object in a work queue to be processed again. Depending on the environment and cluster setup, we may want to emit an event for an error so that it's recorded in K8s event stream. We can express this with an Event type error with context specific handlers for it.
This can be defined as:
// Event is an error event. It can be used to construct an event to be
// recorded.
type Event struct {
// Reason to be used in the emitted event.
Reason string
// Err is the actual error.
Err error
}
// Implement the error interface.
...
This can be used in result of reconcile as:
return ResultEmpty, &Event{
Err: fmt.Errorf("x failed:%w", err)
Reason: "Failed"
}
In summarizeAndPatch()
, the contextual errors can be recorded by logging or
event emitting, based on the type of the error. A helper function for this
would look like:
func RecordContextualError(ctx context.Context, recorder kuberecorder.EventRecorder, obj runtime.Object, err error) {
switch e := err.(type) {
case *serror.Event:
recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
case *serror.Waiting:
// Waiting errors are not returned to the runtime. Log it explicitly.
ctrl.LoggerFrom(ctx).Info("reconciliation waiting", "reason", e.Err, "duration", e.RequeueAfter)
recorder.Event(obj, corev1.EventTypeNormal, e.Reason, e.Error())
case *serror.Stalling:
// Stalling errors are not returned to the runtime. Log it explicitly.
ctrl.LoggerFrom(ctx).Error(e, "reconciliation stalled")
recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
}
}
The helper handles recording the errors for different types of contextual
errors. This can be called from summarizeAndPatch()
.
An event is emitted for the error and the result is returned as usual by the
rest of the summarizeAndPatch()
.
These are a few examples of custom errors that can help express contextual
information and help handle them accordingly. summarizeAndPatch()
serves as
a middleware between the core reconciler and the runtime.
Based on the above example of contextual errors, we can create a function to
consolidate the computation of result, say ComputeReconcileResult()
:
func ComputeReconcileResult(obj runtime.Object, successInterval time.Duration, res Result, recErr error) (ctrl.Result, error) {
// Summarize and set the object status.
...
result := BuildRuntimeResult(successInterval, res, recErr)
// Analyze the reconcile error.
switch t := recErr.(type) {
case *serror.Stalling:
// Result must be empty for the object to be in stalled state.
if res == ResultEmpty {
return result, nil
}
case *serror.Waiting:
// The reconciler needs to wait and requeue. Return no error.
return result, nil
default:
...
}
return result, recErr
}
This is a simplified version of the actual ComputeReconcileResult()
that's
used in some of the controllers. Using this, the summarizeAndPatch()
becomes:
func (r *FooReconciler) summarizeAndPatch(ctx context.Context, obj *Foo, patchHelper *patch.Helper, res sreconcile.Result, recErr error) (ctrl.Result, error) {
...
// res - is of the abstracted result constant type.
// recErr - is the reconciliation error to be returned.
var result ctrl.Result
result, recErr = ComputeReconcileResult(obj, obj.GetRequeueAfter(), res, recErr)
...
// Patching operation.
...
return result, recErr
}
These abstrations for reconciler results are important because they are crucial in defining the behavior of the controller. In the following sections, these result abstractions are used extensively in various stages of reconciliation.
The Reconcile()
method is the main function of a controller. It can easily
get complex, hard to test and maintain. A well known pattern is to create
another method reconcile()
, which contains most of the core business logic.
Although reconcile()
makes the code more testable, in certain situations,
it's still not easy to maintain all the business logic and also adhere to
certain controller standards in one long function. For example, when a
controller implements status conditions, each step of the reconciliation may
have a corresponding status condition. The conditions have to be added and
removed based on domain specific conditions. In case of flux source-controller,
when an artifact is outdated, ArtifactOutdated
condition is added to the
object. But at the end of the reconciliation, if a new artifact was created
successfully, the ArtifactOutdated
condition has to be removed. Mixing
multiple steps in reconcile()
and keeping them in order could be difficult.
The business logic in reconcile()
can be divided into multiple smaller
functions, let's call them sub-reconcilers. These sub-reconcilers can implement
each step of the reconciliation and return the result. reconcile()
can
iterate through these sub-reconcilers, collect the results and return them to
the main reconciler. Since the sub-reconcilers are sequential steps, they can
be run sequentially. This is inspired by a pattern seen in the CAPI project.
In order to simplify how reconcile()
handles the sub-reconcilers, a custom
type can be defined for the sub-reconcilers, for example:
type reconcilerFunc func(ctx context.Context, obj client.Object, artifact *Artifact) (Result, error)
All the sub-reconcilers can implement this function type.
The reconcile()
can then iterate through all the sub-reconcilers, for
example:
func (r *FooReconciler) reconcile(ctx context.Context, obj *client.Object, reconcilers []reconcilerFunc) (Result, error) {
...
var artifact Artifact
// Run the sub-reconcilers and build the result of reconciliation.
var res sreconcile.Result
var resErr error
for _, rec := range reconcilers {
recResult, err := rec(ctx, obj, &artifact)
// Exit immediately on ResultRequeue.
if recResult == sreconcile.ResultRequeue {
return sreconcile.ResultRequeue, nil
}
// If an error is received, prioritize the returned results because an
// error also means immediate requeue.
if err != nil {
resErr = err
res = recResult
break
}
// Prioritize requeue request in the result.
res = lowestRequeuingResult(res, recResult)
}
return res, resErr
}
In the above example, reconcile()
receives a list of sub-reconcilers,
reconcilers
, and it executes them, processing the results from each of the
sub-reconcilers.
The result and error from the sub-reconcilers are handled accordingly,
ResultRequeue
(immediate requeue) is prioritized, the non-immediate requeue
results are prioritized based on their values.
Following is an example of a sub-reconciler:
func (r *FooReconciler) reconcileStorage(ctx context.Context, obj client.Object, artifact *Artifact) (Result, error) {
// Garbage collect previous advertised artifact(s) from storage
_ = r.garbageCollect(ctx, obj)
// Determine if the advertised artifact is still in storage
if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
obj.Status.Artifact = nil
obj.Status.URL = ""
}
...
return sreconcile.ResultSuccess, nil
}
Such sub-reconcilers can be easily tested independently for correctness, creating separation of concern between the steps in the whole reconciliation.
Each of the sub-reconcilers perform some core step in the whole reconciliation
process of an object. These sub-reconcilers can adopt the Results abstractions
discussed above. The reconcile()
function respects the results returned in
each of the sub-reconcilers.
A sub-reconcile function can have multiple return statements. It's better to catch any abnormalities and failures early in the function and end the function with a successful result return, for example:
func (r *fooReconciler) reconcileSource(ctx context.Context, obj client.Object, artifact *Artifact) (Result, error) {
...
if err := foo(); err != nil {
return sreconcile.ResultEmpty, &serror.Event{Err: err, Reason: meta.FailedReason}
}
...
return sreconcile.ResultSuccess, nil
}
Status conditions are a way of reporting the status of an object in the API. Each of the sub-reconcilers can have a domain specific status condition that they add to the object being reconciled. Most of the time, the same sub-reconciler removes a status condition from the object on successful operation.
For example, sub-reconciler reconcileSource()
fetches the source as per the
given configuration. When there's no failure, no condition is added on the
object. But when there's a failure to fetch the source, reconcileSource()
can
add a status condition FetchFailedCondition
on the object to indicate the
failure.
For example, when fetching a git repo source in reconcileSource()
:
// Checkout HEAD of reference in object
gitCtx, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
commit, err := checkoutStrategy.Checkout(gitCtx, dir, obj.Spec.URL, authOpts)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to checkout and determine revision: %w", err),
Reason: sourcev1.GitOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
r.eventLogf(ctx, obj, corev1.EventTypeNormal, sourcev1.GitOperationSucceedReason,
"cloned repository '%s' and checked out revision '%s'", obj.Spec.URL, commit.String())
conditions.Delete(obj, sourcev1.FetchFailedCondition)
In the above example, an Event
error is created with the checkout error. This
is used to set a FetchFailedCondition
condition on the object before
returning the error. But when the checkout is successful, a successful message
is logged, followed by removal of any FetchFailedCondition
from the object.
Similarly, each sub-reconciler can be responsible for their status conditions.
Logging and emitting events are ways to record the events at any point in time. Since it's up to the cluster administrators to set up log and event aggregators, it's hard for the controller to decide what to log and what to emit events for. It's much safer to log and emit event by default, unless there's a good reason to not do so. To enable this, we can provide helpers that allow logging and event emitting at the same time.
Since an event requires extra information compared to a log, the helper can have a front-end of an event recorder function. Internally, the event metadata can be used to log the message appropriately.
func (r *FooReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
msg := fmt.Sprintf(messageFmt, args...)
if eventType == corev1.EventTypeWarning {
ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg)
} else {
ctrl.LoggerFrom(ctx).Info(msg)
}
r.Eventf(obj, eventType, reason, msg)
}
NOTE: This can be moved into a separate helper package.
This can be used to log and emit event by:
r.eventLogf(ctx, obj, corev1.EventTypeNormal, sourcev1.GitOperationSucceedReason,
"cloned repository '%s' and checked out revision '%s'", obj.Spec.URL, commit.String())
Package fluxcd/pkg/runtime/events
has an event recorder that's compatible with the K8s upstream EventRecorder. It
adds a few flux specific behaviors to the events. It emits an event to the K8s
API server, sends a notification request to the flux notification-controller
and logs the emitted event if debug logs are enabled. This package also defines
a new event type EventTypeTrace
which is used to record the event in K8s only
and not forward the event as a notification to the notification-controller.
In the eventLogf()
code in the previous section, the r.Eventf()
can be this
new event recorder. A trace event, that shouldn't result in a notification can
be sent as:
r.Eventf(obj, events.EventTypeTrace, reason, msg)