Skip to content

Instantly share code, notes, and snippets.

@mayooot
Last active September 29, 2024 06:17
Show Gist options
  • Save mayooot/12a889c6625c558fd724eae753008fa5 to your computer and use it in GitHub Desktop.
Save mayooot/12a889c6625c558fd724eae753008fa5 to your computer and use it in GitHub Desktop.
Annotated the Update function in Kubernetes Store.go to explain how version conflicts are handled.
const (
OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
resourceCountPollPeriodJitter = 1.2
)
// Update performs an atomic update and set of the object. Returns the result of the update
// or an error. If the registry allows create-on-update, the create flow will be executed.
// A bool is returned along with the object and any errors, to indicate object creation.
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, false, err
}
var (
creatingObj runtime.Object
creating = false
)
qualifiedResource := e.qualifiedResourceFromContext(ctx)
storagePreconditions := &storage.Preconditions{}
if preconditions := objInfo.Preconditions(); preconditions != nil {
storagePreconditions.UID = preconditions.UID
storagePreconditions.ResourceVersion = preconditions.ResourceVersion
}
out := e.NewFunc()
// deleteObj is only used in case a deletion is carried out
var deleteObj runtime.Object
err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
// 下面的逻辑都在一个函数变量 tryUpdate 里,作为变量传递给 GuaranteedUpdate 函数
// GuaranteedUpdate 会根据 key 在 etcd 中查找到最新版本的 obj(existing 变量),然后作为函数变量传递给 tryUpdate 函数,也就是当前函数
// 获取当前资源存储在 etcd 中的 mod_revision,直接调用函数返回的,没有从 etcd 中再次获取
existingResourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(existing)
if err != nil {
return nil, nil, err
}
if existingResourceVersion == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {
return nil, nil, apierrors.NewNotFound(qualifiedResource, name)
}
}
// Given the existing object, get the new object
// objInfo 是用户传递的更新对象
// UpdatedObject 函数需要传递一个 context 和一个 oldObj,其中 oldObj 是当前资源存储在 etcd 中的最新版本
// 返回的是一个新的 obj,和一个 error
// 其实可以理解为是一个 merge 函数,将 oldObj 和 objInfo 中的 obj 合并为一个新的 obj
obj, err := objInfo.UpdatedObject(ctx, existing)
if err != nil {
return nil, nil, err
}
// If AllowUnconditionalUpdate() is true and the object specified by
// the user does not have a resource version, then we populate it with
// the latest version. Else, we check that the version specified by
// the user matches the version of latest storage object.
// obj 是 UpdatedObject 函数返回的新的资源对象
// 它的 ResourceVersion 会使用我们传递 objInfo 中的版本号
newResourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
if err != nil {
return nil, nil, err
}
doUnconditionalUpdate := newResourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()
if existingResourceVersion == 0 {
// Init metadata as early as possible.
if objectMeta, err := meta.Accessor(obj); err != nil {
return nil, nil, err
} else {
rest.FillObjectMetaSystemFields(objectMeta)
}
var finishCreate FinishFunc = finishNothing
if e.BeginCreate != nil {
fn, err := e.BeginCreate(ctx, obj, newCreateOptionsFromUpdateOptions(options))
if err != nil {
return nil, nil, err
}
finishCreate = fn
defer func() {
finishCreate(ctx, false)
}()
}
creating = true
creatingObj = obj
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, nil, err
}
// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if createValidation != nil {
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, nil, err
}
}
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, nil, err
}
// The operation has succeeded. Call the finish function if there is one,
// and then make sure the defer doesn't call it again.
fn := finishCreate
finishCreate = finishNothing
fn(ctx, true)
return obj, &ttl, nil
}
creating = false
creatingObj = nil
if doUnconditionalUpdate {
// Update the object's resource version to match the latest
// storage object's resource version.
err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)
if err != nil {
return nil, nil, err
}
} else {
// Check if the object's resource version matches the latest
// resource version.
if newResourceVersion == 0 {
// TODO: The Invalid error should have a field for Resource.
// After that field is added, we should fill the Resource and
// leave the Kind field empty. See the discussion in #18526.
qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}
fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), newResourceVersion, "must be specified for an update")}
return nil, nil, apierrors.NewInvalid(qualifiedKind, name, fieldErrList)
}
// 如果资源版本不匹配,则返回冲突错误
if newResourceVersion != existingResourceVersion {
return nil, nil, apierrors.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))
}
}
var finishUpdate FinishFunc = finishNothing
if e.BeginUpdate != nil {
fn, err := e.BeginUpdate(ctx, obj, existing, options)
if err != nil {
return nil, nil, err
}
finishUpdate = fn
defer func() {
finishUpdate(ctx, false)
}()
}
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, nil, err
}
// Ignore changes that only affect managed fields timestamps.
// FieldManager can't know about changes like normalized fields, defaulted
// fields and other mutations.
obj, err = fieldmanager.IgnoreManagedFieldsTimestampsTransformer(ctx, obj, existing)
if err != nil {
return nil, nil, err
}
// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if updateValidation != nil {
if err := updateValidation(ctx, obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {
return nil, nil, err
}
}
// Check the default delete-during-update conditions, and store-specific conditions if provided
if ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&
(e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {
deleteObj = obj
return nil, nil, errEmptiedFinalizers
}
ttl, err := e.calculateTTL(obj, res.TTL, true)
if err != nil {
return nil, nil, err
}
// The operation has succeeded. Call the finish function if there is one,
// and then make sure the defer doesn't call it again.
fn := finishUpdate
finishUpdate = finishNothing
fn(ctx, true)
if int64(ttl) != res.TTL {
return obj, &ttl, nil
}
return obj, nil, nil
}, dryrun.IsDryRun(options.DryRun), nil)
if err != nil {
// delete the object
if err == errEmptiedFinalizers {
return e.deleteWithoutFinalizers(ctx, name, key, deleteObj, storagePreconditions, newDeleteOptionsFromUpdateOptions(options))
}
if creating {
err = storeerr.InterpretCreateError(err, qualifiedResource, name)
err = rest.CheckGeneratedNameError(ctx, e.CreateStrategy, err, creatingObj)
} else {
err = storeerr.InterpretUpdateError(err, qualifiedResource, name)
}
return nil, false, err
}
if creating {
if e.AfterCreate != nil {
e.AfterCreate(out, newCreateOptionsFromUpdateOptions(options))
}
} else {
if e.AfterUpdate != nil {
e.AfterUpdate(out, options)
}
}
if e.Decorator != nil {
e.Decorator(out)
}
return out, creating, nil
}
@mayooot
Copy link
Author

mayooot commented Sep 29, 2024

SourceCode: registry/store.go

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment