Created
March 23, 2021 14:28
-
-
Save d-kuro/f87094d2dab9dee5cb24f5c85f3b847e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package leaderelection | |
import ( | |
"context" | |
"time" | |
coordinationv1 "k8s.io/api/coordination/v1" | |
"k8s.io/apimachinery/pkg/api/errors" | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
"k8s.io/apimachinery/pkg/util/wait" | |
"k8s.io/client-go/kubernetes" | |
"k8s.io/client-go/rest" | |
"k8s.io/client-go/tools/leaderelection" | |
"k8s.io/client-go/tools/leaderelection/resourcelock" | |
"sigs.k8s.io/controller-runtime/pkg/client" | |
) | |
const leaseLockNamespace = "foo-namespace" | |
type Client struct { | |
*leaderelection.LeaderElector | |
k8sClient client.Client | |
lockName string | |
identity string | |
} | |
func NewClient(config *rest.Config, k8sClient client.Client, lockName string, identity string) (*Client, error) { | |
client, err := kubernetes.NewForConfig(config) | |
if err != nil { | |
return nil, err | |
} | |
lock := &resourcelock.LeaseLock{ | |
LeaseMeta: metav1.ObjectMeta{ | |
Name: lockName, | |
Namespace: leaseLockNamespace, | |
}, | |
Client: client.CoordinationV1(), | |
LockConfig: resourcelock.ResourceLockConfig{ | |
Identity: identity, | |
}, | |
} | |
leaderElectionConf := leaderelection.LeaderElectionConfig{ | |
Lock: lock, | |
// release the lock by canceling the context | |
ReleaseOnCancel: true, | |
LeaseDuration: 60 * time.Second, | |
RenewDeadline: 15 * time.Second, | |
RetryPeriod: 5 * time.Second, | |
Callbacks: leaderelection.LeaderCallbacks{ | |
OnStartedLeading: func(_ context.Context) {}, | |
OnStoppedLeading: func() {}, | |
}, | |
} | |
elector, err := leaderelection.NewLeaderElector(leaderElectionConf) | |
if err != nil { | |
return nil, err | |
} | |
return &Client{ | |
LeaderElector: elector, | |
k8sClient: k8sClient, | |
identity: identity, | |
lockName: lockName, | |
}, nil | |
} | |
// maxBackoffInterval defines the maximum amount of time to wait between | |
// attempts to become the leader. | |
const maxBackoffInterval = time.Second * 16 | |
// Become will run a leader election and wait until it gets the leader. | |
// Cancel context to release the lock. | |
func (l *Client) Become(ctx context.Context) error { | |
go l.Run(ctx) | |
backoff := time.Second | |
for { | |
var lease coordinationv1.Lease | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
default: | |
err := l.k8sClient.Get(ctx, client.ObjectKey{Name: l.lockName, Namespace: leaseLockNamespace}, &lease) | |
if !errors.IsNotFound(err) && err != nil { | |
return err | |
} | |
} | |
if l.isLeader(lease) { | |
return nil | |
} | |
select { | |
case <-time.After(wait.Jitter(backoff, .2)): | |
if backoff < maxBackoffInterval { | |
backoff *= 2 | |
} | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
} | |
func (l *Client) isLeader(lease coordinationv1.Lease) bool { | |
if lease.Spec.HolderIdentity == nil { | |
return false | |
} | |
return l.identity == *lease.Spec.HolderIdentity | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment