-
-
Save fabianvf/7cb795603a7559a12a3c81eeee3ba4bb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/pkg/ansible/controller/controller.go b/pkg/ansible/controller/controller.go | |
index 02accab2..28bdeb92 100644 | |
--- a/pkg/ansible/controller/controller.go | |
+++ b/pkg/ansible/controller/controller.go | |
@@ -28,6 +28,7 @@ import ( | |
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | |
"k8s.io/apimachinery/pkg/runtime" | |
"k8s.io/apimachinery/pkg/runtime/schema" | |
+ "sigs.k8s.io/controller-runtime/pkg/client" | |
"sigs.k8s.io/controller-runtime/pkg/controller" | |
crthandler "sigs.k8s.io/controller-runtime/pkg/handler" | |
"sigs.k8s.io/controller-runtime/pkg/manager" | |
@@ -57,9 +58,20 @@ func Add(mgr manager.Manager, options Options) *controller.Controller { | |
options.EventHandlers = []events.EventHandler{} | |
} | |
eventHandlers := append(options.EventHandlers, events.NewLoggingEventHandler(options.LoggingLevel)) | |
+ apiReader, err := client.New(mgr.GetConfig(), client.Options{}) | |
+ if err != nil { | |
+ log.Error(err, "Unable to get new api client") | |
+ } | |
aor := &AnsibleOperatorReconciler{ | |
- Client: mgr.GetClient(), | |
+ // The default client will use the DelegatingReader for reads | |
+ // this forces it to use the cache for unstructured types. | |
+ Client: client.DelegatingClient{ | |
+ Reader: mgr.GetCache(), | |
+ Writer: mgr.GetClient(), | |
+ StatusClient: mgr.GetClient(), | |
+ }, | |
+ APIReader: apiReader, | |
GVK: options.GVK, | |
Runner: options.Runner, | |
EventHandlers: eventHandlers, | |
@@ -68,7 +80,7 @@ func Add(mgr manager.Manager, options Options) *controller.Controller { | |
} | |
scheme := mgr.GetScheme() | |
- _, err := scheme.New(options.GVK) | |
+ _, err = scheme.New(options.GVK) | |
if runtime.IsNotRegisteredError(err) { | |
// Register the GVK with the schema | |
scheme.AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{}) | |
diff --git a/pkg/ansible/controller/reconcile.go b/pkg/ansible/controller/reconcile.go | |
index 1a26424a..81688d81 100644 | |
--- a/pkg/ansible/controller/reconcile.go | |
+++ b/pkg/ansible/controller/reconcile.go | |
@@ -55,6 +55,7 @@ type AnsibleOperatorReconciler struct { | |
GVK schema.GroupVersionKind | |
Runner runner.Runner | |
Client client.Client | |
+ APIReader client.Reader | |
EventHandlers []events.EventHandler | |
ReconcilePeriod time.Duration | |
ManageStatus bool | |
@@ -220,19 +221,16 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc | |
} | |
if r.ManageStatus { | |
err = r.markDone(u, request.NamespacedName, statusEvent, failureMessages) | |
- if err != nil { | |
- logger.Error(err, "Failed to mark status done") | |
+ if exit, err := determineReturn(err); exit { | |
+ return reconcileResult, err | |
} | |
+ | |
} | |
return reconcileResult, err | |
} | |
func (r *AnsibleOperatorReconciler) markRunning(u *unstructured.Unstructured, namespacedName types.NamespacedName) error { | |
// Get the latest resource to prevent updating a stale status | |
- err := r.Client.Get(context.TODO(), namespacedName, u) | |
- if err != nil { | |
- return err | |
- } | |
statusInterface := u.Object["status"] | |
statusMap, _ := statusInterface.(map[string]interface{}) | |
crStatus := ansiblestatus.CreateFromMap(statusMap) | |
@@ -256,7 +254,7 @@ func (r *AnsibleOperatorReconciler) markRunning(u *unstructured.Unstructured, na | |
) | |
ansiblestatus.SetCondition(&crStatus, *c) | |
u.Object["status"] = crStatus.GetJSONMap() | |
- err = r.Client.Status().Update(context.TODO(), u) | |
+ err := r.Client.Status().Update(context.TODO(), u) | |
if err != nil { | |
return err | |
} | |
@@ -306,16 +304,6 @@ func (r *AnsibleOperatorReconciler) markError(u *unstructured.Unstructured, name | |
} | |
func (r *AnsibleOperatorReconciler) markDone(u *unstructured.Unstructured, namespacedName types.NamespacedName, statusEvent eventapi.StatusJobEvent, failureMessages eventapi.FailureMessages) error { | |
- logger := logf.Log.WithName("markDone") | |
- // Get the latest resource to prevent updating a stale status | |
- err := r.Client.Get(context.TODO(), namespacedName, u) | |
- if apierrors.IsNotFound(err) { | |
- logger.Info("Resource not found, assuming it was deleted", err) | |
- return nil | |
- } | |
- if err != nil { | |
- return err | |
- } | |
statusInterface := u.Object["status"] | |
statusMap, _ := statusInterface.(map[string]interface{}) | |
crStatus := ansiblestatus.CreateFromMap(statusMap) | |
@@ -363,3 +351,21 @@ func contains(l []string, s string) bool { | |
} | |
return false | |
} | |
+ | |
+// determineReturn - if the object was updated outside of our controller | |
+// this means that the current reconcilation is over and we should use the | |
+// latest version. To do this, we just exit without error because the | |
+// latest version should be queued for update. | |
+func determineReturn(err error) (bool, error) { | |
+ exit := false | |
+ if err == nil { | |
+ return exit, err | |
+ } | |
+ exit = true | |
+ | |
+ if apierrors.IsConflict(err) { | |
+ log.V(1).Info("Conflict found during an update; re-running reconcilation") | |
+ return exit, nil | |
+ } | |
+ return exit, err | |
+} | |
diff --git a/pkg/ansible/controller/reconcile_test.go b/pkg/ansible/controller/reconcile_test.go | |
index 2218b579..e0130241 100644 | |
--- a/pkg/ansible/controller/reconcile_test.go | |
+++ b/pkg/ansible/controller/reconcile_test.go | |
@@ -487,6 +487,7 @@ func TestReconcile(t *testing.T) { | |
GVK: tc.GVK, | |
Runner: tc.Runner, | |
Client: tc.Client, | |
+ APIReader: tc.Client, | |
EventHandlers: tc.EventHandlers, | |
ReconcilePeriod: tc.ReconcilePeriod, | |
ManageStatus: tc.ManageStatus, | |
diff --git a/pkg/ansible/proxy/proxy_test.go b/pkg/ansible/proxy/proxy_test.go | |
index 65eab30d..3991cadb 100644 | |
--- a/pkg/ansible/proxy/proxy_test.go | |
+++ b/pkg/ansible/proxy/proxy_test.go | |
@@ -12,6 +12,8 @@ | |
// See the License for the specific language governing permissions and | |
// limitations under the License. | |
+// +build !openshiftci | |
+ | |
package proxy | |
import ( | |
diff --git a/pkg/ansible/run.go b/pkg/ansible/run.go | |
index 7b9bd53d..ecc4f719 100644 | |
--- a/pkg/ansible/run.go | |
+++ b/pkg/ansible/run.go | |
@@ -60,6 +60,8 @@ func printVersion() { | |
func Run(flags *aoflags.AnsibleOperatorFlags) error { | |
printVersion() | |
+ printVersion() | |
+ | |
namespace, found := os.LookupEnv(k8sutil.WatchNamespaceEnvVar) | |
log = log.WithValues("Namespace", namespace) | |
if found { | |
diff --git a/pkg/ansible/runner/internal/inputdir/inputdir.go b/pkg/ansible/runner/internal/inputdir/inputdir.go | |
index cca03b50..3a58c892 100644 | |
--- a/pkg/ansible/runner/internal/inputdir/inputdir.go | |
+++ b/pkg/ansible/runner/internal/inputdir/inputdir.go | |
@@ -22,9 +22,10 @@ import ( | |
"path/filepath" | |
"strings" | |
- "github.com/operator-framework/operator-sdk/internal/util/fileutil" | |
"github.com/spf13/afero" | |
+ "github.com/operator-framework/operator-sdk/internal/util/fileutil" | |
+ | |
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" | |
) | |
diff --git a/pkg/helm/controller/controller.go b/pkg/helm/controller/controller.go | |
index 3d57b76c..093904a0 100644 | |
--- a/pkg/helm/controller/controller.go | |
+++ b/pkg/helm/controller/controller.go | |
@@ -29,6 +29,7 @@ import ( | |
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | |
"k8s.io/apimachinery/pkg/runtime/schema" | |
rpb "k8s.io/helm/pkg/proto/hapi/release" | |
+ "sigs.k8s.io/controller-runtime/pkg/client" | |
"sigs.k8s.io/controller-runtime/pkg/controller" | |
"sigs.k8s.io/controller-runtime/pkg/event" | |
crthandler "sigs.k8s.io/controller-runtime/pkg/handler" | |
@@ -56,7 +57,13 @@ type WatchOptions struct { | |
// Add creates a new helm operator controller and adds it to the manager | |
func Add(mgr manager.Manager, options WatchOptions) error { | |
r := &HelmOperatorReconciler{ | |
- Client: mgr.GetClient(), | |
+ // The default client will use the DelegatingReader for reads | |
+ // this forces it to use the cache for unstructured types. | |
+ Client: client.DelegatingClient{ | |
+ Reader: mgr.GetCache(), | |
+ Writer: mgr.GetClient(), | |
+ StatusClient: mgr.GetClient(), | |
+ }, | |
GVK: options.GVK, | |
ManagerFactory: options.ManagerFactory, | |
ReconcilePeriod: options.ReconcilePeriod, |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment