Skip to content

Instantly share code, notes, and snippets.

@deads2k
Created March 29, 2022 21:17
Show Gist options
  • Select an option

  • Save deads2k/20eed0aa871b9d8c7dbb0dd928e2b885 to your computer and use it in GitHub Desktop.

Select an option

Save deads2k/20eed0aa871b9d8c7dbb0dd928e2b885 to your computer and use it in GitHub Desktop.
diff --git a/.ci-operator.yaml b/.ci-operator.yaml
new file mode 100644
index 000000000..9271a8208
--- /dev/null
+++ b/.ci-operator.yaml
@@ -0,0 +1,4 @@
+build_root_image:
+ name: release
+ namespace: openshift
+ tag: rhel-8-release-golang-1.16-openshift-4.10
diff --git a/Dockerfile.art b/Dockerfile.art
new file mode 100644
index 000000000..a2faba94f
--- /dev/null
+++ b/Dockerfile.art
@@ -0,0 +1,23 @@
+FROM registry.ci.openshift.org/ocp/builder:rhel-8-etcd-golang-1.16 AS builder
+
+COPY $REMOTE_SOURCES $REMOTE_SOURCES_DIR
+WORKDIR $REMOTE_SOURCES_DIR/cachito-gomod-with-deps/app
+RUN ls -lR $REMOTE_SOURCES_DIR
+RUN cat $REMOTE_SOURCES_DIR/cachito-gomod-with-deps/cachito.env
+RUN source $REMOTE_SOURCES_DIR/cachito-gomod-with-deps/cachito.env && GOFLAGS='-mod=readonly' GO_BUILD_FLAGS='-v' ./build.sh
+
+RUN mkdir -p /go/src/go.etcd.io/
+RUN ln -s $REMOTE_SOURCES_DIR/cachito-gomod-with-deps/app /go/src/go.etcd.io/etcd
+
+# stage 2 (note: any changes should reflect in Dockerfile.rhel)
+FROM registry.ci.openshift.org/ocp/4.10:base
+
+ENTRYPOINT ["/usr/bin/etcd"]
+
+COPY --from=builder /go/src/go.etcd.io/etcd/bin/etcd /usr/bin/
+COPY --from=builder /go/src/go.etcd.io/etcd/bin/etcdctl /usr/bin/
+COPY --from=builder /go/src/go.etcd.io/etcd/bin/discover-etcd-initial-cluster /usr/bin/
+
+LABEL io.k8s.display-name="etcd server" \
+ io.k8s.description="etcd is a distributed key-value store which stores the persistent master state for Kubernetes and OpenShift." \
+ maintainer="Sam Batschelet <[email protected]>"
diff --git a/Dockerfile.rhel b/Dockerfile.rhel
new file mode 100644
index 000000000..eee97f052
--- /dev/null
+++ b/Dockerfile.rhel
@@ -0,0 +1,20 @@
+FROM registry.ci.openshift.org/ocp/builder:rhel-8-etcd-golang-1.16 AS builder
+
+WORKDIR /go/src/go.etcd.io/etcd
+
+COPY . .
+
+RUN GOFLAGS='-mod=readonly' GO_BUILD_FLAGS='-v' ./build.sh
+
+# stage 2 (note: any changes should reflect in Dockerfile.art)
+FROM registry.ci.openshift.org/ocp/4.9:base
+
+ENTRYPOINT ["/usr/bin/etcd"]
+
+COPY --from=builder /go/src/go.etcd.io/etcd/bin/etcd /usr/bin/
+COPY --from=builder /go/src/go.etcd.io/etcd/bin/etcdctl /usr/bin/
+COPY --from=builder /go/src/go.etcd.io/etcd/bin/discover-etcd-initial-cluster /usr/bin/
+
+LABEL io.k8s.display-name="etcd server" \
+ io.k8s.description="etcd is a distributed key-value store which stores the persistent master state for Kubernetes and OpenShift." \
+ maintainer="Sam Batschelet <[email protected]>"
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 000000000..8492ebf1d
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1,12 @@
+approvers:
+- deads2k
+- hexfusion
+- hasbro17
+reviewers:
+- deads2k
+- dusk125
+- hasbro17
+- hexfusion
+- smarterclayton
+- wking
+component: "Etcd"
diff --git a/ROADMAP.md b/ROADMAP.md
deleted file mode 100644
index d9898166c..000000000
--- a/ROADMAP.md
+++ /dev/null
@@ -1,21 +0,0 @@
-# etcd roadmap
-
-**work in progress**
-
-This document defines a high level roadmap for etcd development.
-
-The dates below should not be considered authoritative, but rather indicative of the projected timeline of the project. The [milestones defined in GitHub](https://github.com/etcd-io/etcd/milestones) represent the most up-to-date and issue-for-issue plans.
-
-etcd 3.3 is our current stable branch. The roadmap below outlines new features that will be added to etcd, and while subject to change, define what future stable will look like.
-
-### etcd 3.4 (2019)
-
-- Stabilization of 3.3 experimental features
-- Support/document downgrade
-- Snapshot restore as Go library
-- Improved client balancer with new gRPC balancer interface
-- Improve single-client put performance
-- Improve large response handling
-- Improve test coverage
-- Decrease test runtime
-- Migrate to Go module for dependency management
diff --git a/build.sh b/build.sh
index 7a4f8670e..160aa0bdc 100755
--- a/build.sh
+++ b/build.sh
@@ -1,5 +1,7 @@
#!/usr/bin/env bash
+GO_BUILD_FLAGS="${GO_BUILD_FLAGS} ${GOFLAGS:+-$GOFLAGS}"
+
source ./scripts/test_lib.sh
GIT_SHA=$(git rev-parse --short HEAD || echo "GitNotFound")
@@ -66,6 +68,17 @@ etcd_build() {
"-ldflags=${GO_LDFLAGS[*]}" \
-o="../${out}/etcdctl" . || return 2
) || return 2
+
+ run rm -f "${out}/discover-etcd-initial-cluster"
+ # shellcheck disable=SC2086
+ (
+ cd ./openshift-tools/discover-etcd-initial-cluster
+ run env GO_BUILD_FLAGS="${GO_BUILD_FLAGS}" "${GO_BUILD_ENV[@]}" go build $GO_BUILD_FLAGS \
+ -installsuffix=cgo \
+ "-ldflags=${GO_LDFLAGS[*]}" \
+ -o="../../${out}/discover-etcd-initial-cluster" . || return 2
+ ) || return 2
+
# Verify whether symbol we overriden exists
# For cross-compiling we cannot run: ${out}/etcd --version | grep -q "Git SHA: ${GIT_SHA}"
diff --git a/client/v3/cluster.go b/client/v3/cluster.go
index 92d7cdb56..3387f8bfc 100644
--- a/client/v3/cluster.go
+++ b/client/v3/cluster.go
@@ -124,7 +124,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
// it is safe to retry on list.
- resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{Linearizable: true}, c.callOpts...)
+ resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...)
if err == nil {
return (*MemberListResponse)(resp), nil
}
diff --git a/etcdctl/ctlv3/command/check.go b/etcdctl/ctlv3/command/check.go
index a2a5ca315..b7f9d750f 100644
--- a/etcdctl/ctlv3/command/check.go
+++ b/etcdctl/ctlv3/command/check.go
@@ -107,13 +107,10 @@ var checkDatascaleCfgMap = map[string]checkDatascaleCfg{
// NewCheckCommand returns the cobra command for "check".
func NewCheckCommand() *cobra.Command {
cc := &cobra.Command{
- Use: "check <subcommand>",
- Short: "commands for checking properties of the etcd cluster",
+ Use: "check <subcommand> is no longer supported in OpenShift. Performance analysis should be performed using metrics, please see the etcd dashboards",
+ Short: "command no longer supported in OpenShift",
}
- cc.AddCommand(NewCheckPerfCommand())
- cc.AddCommand(NewCheckDatascaleCommand())
-
return cc
}
diff --git a/go.mod b/go.mod
index e791486f2..c3d739d29 100644
--- a/go.mod
+++ b/go.mod
@@ -19,6 +19,7 @@ require (
github.com/bgentry/speakeasy v0.1.0
github.com/dustin/go-humanize v1.0.0
github.com/spf13/cobra v1.1.3
+ github.com/spf13/pflag v1.0.5 // indirect
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
diff --git a/openshift-tools/discover-etcd-initial-cluster/main.go b/openshift-tools/discover-etcd-initial-cluster/main.go
new file mode 100644
index 000000000..544c666d9
--- /dev/null
+++ b/openshift-tools/discover-etcd-initial-cluster/main.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+ goflag "flag"
+ "fmt"
+ "math/rand"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/spf13/pflag"
+ discover_etcd_initial_cluster "go.etcd.io/etcd/v3/openshift-tools/pkg/discover-etcd-initial-cluster"
+)
+
+// copy from `utilflag "k8s.io/component-base/cli/flag"`
+// WordSepNormalizeFunc changes all flags that contain "_" separators
+func WordSepNormalizeFunc(f *pflag.FlagSet, name string) pflag.NormalizedName {
+ if strings.Contains(name, "_") {
+ return pflag.NormalizedName(strings.Replace(name, "_", "-", -1))
+ }
+ return pflag.NormalizedName(name)
+}
+
+func main() {
+ rand.Seed(time.Now().UTC().UnixNano())
+
+ pflag.CommandLine.SetNormalizeFunc(WordSepNormalizeFunc)
+ pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
+
+ command := discover_etcd_initial_cluster.NewDiscoverEtcdInitialClusterCommand()
+ if err := command.Execute(); err != nil {
+ fmt.Fprintf(os.Stderr, "%v\n", err)
+ os.Exit(1)
+ }
+}
diff --git a/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go b/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go
new file mode 100644
index 000000000..91630e5d1
--- /dev/null
+++ b/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go
@@ -0,0 +1,323 @@
+package discover_etcd_initial_cluster
+
+import (
+ "context"
+ "fmt"
+ "net/url"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/spf13/cobra"
+ "github.com/spf13/pflag"
+
+ "google.golang.org/grpc"
+
+ "go.etcd.io/etcd/api/v3/etcdserverpb"
+ "go.etcd.io/etcd/client/pkg/v3/transport"
+ "go.etcd.io/etcd/client/v3"
+)
+
+type DiscoverEtcdInitialClusterOptions struct {
+ // TargetPeerURLHost is the host portion of the peer URL. It is used to match on. (either IP or hostname)
+ TargetPeerURLHost string
+ // TargetPeerURLScheme is the host scheme of the peer URL.
+ TargetPeerURLScheme string
+ // TargetPeerURLPort is the host port of the peer URL.
+ TargetPeerURLPort string
+ // TargetName is the name to assign to this peer if we create it.
+ TargetName string
+
+ // CABundleFile is the file to use to trust the etcd server
+ CABundleFile string
+ // ClientCertFile is the client cert to use to authenticate this binary to etcd
+ ClientCertFile string
+ // ClientKeyFile is the client key to use to authenticate this binary to etcd
+ ClientKeyFile string
+ // Endpoints is a list of all the endpoints to use to try to contact etcd
+ Endpoints []string
+
+ // DataDir is the directory created when etcd starts the first time
+ DataDir string
+}
+
+func NewDiscoverEtcdInitialCluster() *DiscoverEtcdInitialClusterOptions {
+ return &DiscoverEtcdInitialClusterOptions{
+ TargetPeerURLScheme: "https",
+ TargetPeerURLPort: "2380",
+ }
+}
+
+func NewDiscoverEtcdInitialClusterCommand() *cobra.Command {
+ o := NewDiscoverEtcdInitialCluster()
+
+ cmd := &cobra.Command{
+ Use: "discover-etcd-initial-cluster",
+ Short: "output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod",
+ Long: `output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod
+
+Please see docs for more details:
+https://github.com/openshift/cluster-etcd-operator/tree/master/docs/discover-etcd-initial-cluster.md
+`,
+ Run: func(cmd *cobra.Command, args []string) {
+ if err := o.Validate(); err != nil {
+ fmt.Fprint(os.Stderr, err)
+ os.Exit(1)
+ }
+
+ if err := o.Run(); err != nil {
+ fmt.Fprint(os.Stderr, err)
+ os.Exit(1)
+ }
+ },
+ }
+ o.BindFlags(cmd.Flags())
+
+ return cmd
+}
+
+func (o *DiscoverEtcdInitialClusterOptions) BindFlags(flags *pflag.FlagSet) {
+ flags.StringVar(&o.CABundleFile, "cacert", o.CABundleFile, "file to use to verify the identity of the etcd server")
+ flags.StringVar(&o.ClientCertFile, "cert", o.ClientCertFile, "client cert to use to authenticate this binary to etcd")
+ flags.StringVar(&o.ClientKeyFile, "key", o.ClientKeyFile, "client key to use to authenticate this binary to etcd")
+ flags.StringSliceVar(&o.Endpoints, "endpoints", o.Endpoints, "list of all the endpoints to use to try to contact etcd")
+ flags.StringVar(&o.DataDir, "data-dir", o.DataDir, "dir to stat for existence of the member directory")
+ flags.StringVar(&o.TargetPeerURLHost, "target-peer-url-host", o.TargetPeerURLHost, "host portion of the peer URL. It is used to match on. (either IP or hostname)")
+ flags.StringVar(&o.TargetName, "target-name", o.TargetName, "name to assign to this peer if we create it")
+}
+
+func (o *DiscoverEtcdInitialClusterOptions) Validate() error {
+ if len(o.CABundleFile) == 0 {
+ return fmt.Errorf("missing --cacert")
+ }
+ if len(o.ClientCertFile) == 0 {
+ return fmt.Errorf("missing --cert")
+ }
+ if len(o.ClientKeyFile) == 0 {
+ return fmt.Errorf("missing --key")
+ }
+ if len(o.Endpoints) == 0 {
+ return fmt.Errorf("missing --endpoints")
+ }
+ if len(o.DataDir) == 0 {
+ return fmt.Errorf("missing --data-dir")
+ }
+ if len(o.TargetPeerURLHost) == 0 {
+ return fmt.Errorf("missing --target-peer-url-host")
+ }
+ if len(o.TargetName) == 0 {
+ return fmt.Errorf("missing --target-name")
+ }
+ if len(o.TargetPeerURLPort) == 0 {
+ return fmt.Errorf("missing TargetPeerURLPort")
+ }
+ if len(o.TargetPeerURLScheme) == 0 {
+ return fmt.Errorf("missing TargetPeerURLScheme")
+ }
+ return nil
+}
+
+func (o *DiscoverEtcdInitialClusterOptions) Run() error {
+ var dataDirExists bool
+ // check if dataDir structure exists
+ _, err := os.Stat(filepath.Join(o.DataDir, "member/snap"))
+ if err != nil && !os.IsNotExist(err) {
+ return err
+ }
+ if err == nil {
+ fmt.Fprintf(os.Stderr, "dataDir is present on %s\n", o.TargetName)
+ dataDirExists = true
+ }
+
+ client, err := o.getClient()
+
+ // Condition: create client fail with dataDir
+ // Possible reasons for this condition.
+ // 1.) single node etcd cluster
+ // 2.) transient networking problem
+ // 3.) on and off flow
+ // Result: start etcd with empty initial config
+ if err != nil && dataDirExists {
+ fmt.Fprintf(os.Stderr, "failed to create etcd client, but the server is already initialized as member %q before, starting as etcd member: %v", o.TargetName, err.Error())
+ return nil
+ }
+ // Condition: create client fail, no dataDir
+ // Possible reasons for the condition include transient network partition.
+ // Result: return error and restart container
+ if err != nil {
+ return fmt.Errorf("failed to create etcd client: %v", err)
+ }
+ defer client.Close()
+
+ for i := 0; i < 10; i++ {
+ fmt.Fprintf(os.Stderr, "#### attempt %d\n", i)
+
+ // Check member list on each iteration for changes.
+ cluster, err := client.MemberList(context.TODO())
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "member list request failed: %v", err)
+ continue
+ }
+ logCurrentMembership(cluster.Members)
+
+ initialCluster, memberFound, err := o.getInitialCluster(cluster.Members, dataDirExists)
+ if err != nil && memberFound {
+ return err
+ }
+ // If member is not yet part of the cluster print to stderr and retry.
+ if err != nil && !memberFound {
+ fmt.Fprintf(os.Stderr, " %s\n#### sleeping...\n", err.Error())
+ time.Sleep(1 * time.Second)
+ continue
+ }
+ // Empty string value for initialCluster is valid.
+ fmt.Println(initialCluster)
+
+ return nil
+ }
+ return fmt.Errorf("timed out")
+}
+
+func (o *DiscoverEtcdInitialClusterOptions) getInitialCluster(members []*etcdserverpb.Member, dataDirExists bool) (string, bool, error) {
+ target := url.URL{
+ Scheme: o.TargetPeerURLScheme,
+ Host: fmt.Sprintf("%s:%s", o.TargetPeerURLHost, o.TargetPeerURLPort),
+ }
+
+ targetMember, memberFound := checkTargetMember(target, members)
+
+ // Condition: unstarted member found, no dataDir
+ // This member is part of the cluster but has not yet started. We know this because the name is populated at
+ // runtime which this member does not have.
+ // Result: populate initial cluster so etcd can communicate with peers during startup
+ if memberFound && targetMember.Name == "" && !dataDirExists {
+ return formatInitialCluster(o.TargetName, targetMember, members), memberFound, nil
+ }
+
+ // Condition: unstarted member found with dataDir
+ // This member is part of the cluster but has not yet started, yet has a dataDir.
+ // Result: archive old dataDir and return error which will restart container
+ if memberFound && targetMember.Name == "" && dataDirExists {
+ archivedDir, err := archiveDataDir(o.DataDir)
+ if err != nil {
+ return "", memberFound, err
+ }
+ return "", memberFound, fmt.Errorf("member %q is unstarted but previous members dataDir exists: archiving to %q", target.String(), archivedDir)
+ }
+
+ // Condition: started member found with dataDir
+ // Result: start etcd with empty initial config
+ if memberFound && dataDirExists {
+ return "", memberFound, nil
+ }
+
+ // Condition: started member found, no dataDir
+ // A member is not actually gone forever unless it is removed from cluster with MemberRemove or the dataDir is destroyed. Since
+ // this is the latter. Do not let etcd start and report the condition as an error.
+ // Result: return error and restart container
+ if memberFound && !dataDirExists {
+ return "", memberFound, fmt.Errorf("member %q dataDir has been destroyed and must be removed from the cluster", target.String())
+ }
+
+ // Condition: member not found with dataDir
+ // The member has been removed from the cluster. The dataDir will be archived once the operator
+ // scales up etcd.
+ // Result: retry member check allowing operator time to scale up etcd again on this node.
+ if !memberFound && dataDirExists {
+ return "", memberFound, fmt.Errorf("member %q not found in member list but dataDir exists, check operator logs for possible scaling problems\n", target.String())
+ }
+
+ // Condition: member not found, no dataDir
+ // The member list does not reflect the target member as it is waiting to be scaled up.
+ // Result: retry
+ if !memberFound && !dataDirExists {
+ return "", memberFound, fmt.Errorf("member %q not found in member list, check operator logs for possible scaling problems", target.String())
+ }
+
+ return "", memberFound, nil
+}
+
+func (o *DiscoverEtcdInitialClusterOptions) getClient() (*clientv3.Client, error) {
+ dialOptions := []grpc.DialOption{
+ grpc.WithBlock(), // block until the underlying connection is up
+ }
+
+ tlsInfo := transport.TLSInfo{
+ CertFile: o.ClientCertFile,
+ KeyFile: o.ClientKeyFile,
+ TrustedCAFile: o.CABundleFile,
+ }
+ tlsConfig, err := tlsInfo.ClientConfig()
+ if err != nil {
+ return nil, err
+ }
+
+ cfg := &clientv3.Config{
+ DialOptions: dialOptions,
+ Endpoints: o.Endpoints,
+ DialTimeout: 2 * time.Second, // fail fast
+ TLS: tlsConfig,
+ }
+
+ return clientv3.New(*cfg)
+}
+
+func archiveDataDir(dataDir string) (string, error) {
+ // for testing
+ if strings.HasPrefix(dataDir, "/tmp") {
+ return "/tmp-removed-archive", nil
+ }
+ sourceDir := filepath.Join(dataDir, "member")
+ targetDir := filepath.Join(sourceDir + "-removed-archive-" + time.Now().Format("2006-01-02-030405"))
+
+ fmt.Fprintf(os.Stderr, "attempting to archive %s to %s", sourceDir, targetDir)
+ if err := os.Rename(sourceDir, targetDir); err != nil {
+ return "", err
+ }
+ return targetDir, nil
+}
+
+func stringifyMember(member *etcdserverpb.Member) string {
+ return fmt.Sprintf("{name=%q, peerURLs=[%s}, clientURLs=[%s]", member.Name, strings.Join(member.PeerURLs, ","), strings.Join(member.ClientURLs, ","))
+}
+
+// checkTargetMember populates the target member if it is part of the member list and print member details into etcd log.
+func checkTargetMember(target url.URL, members []*etcdserverpb.Member) (*etcdserverpb.Member, bool) {
+ for _, member := range members {
+ for _, peerURL := range member.PeerURLs {
+ if peerURL == target.String() {
+ fmt.Fprintf(os.Stderr, " target=%s\n", stringifyMember(member))
+ return member, true
+ }
+ }
+ }
+ return nil, false
+}
+
+// logCurrentMembership prints the current etcd membership to the etcd logs.
+func logCurrentMembership(members []*etcdserverpb.Member) {
+ for _, member := range members {
+ fmt.Fprintf(os.Stderr, " member=%s\n", stringifyMember(member))
+ }
+ return
+}
+
+// formatInitialCluster populates the initial cluster comma delimited string in the format <peerName>=<peerUrl>.
+func formatInitialCluster(targetName string, target *etcdserverpb.Member, members []*etcdserverpb.Member) string {
+ var initialCluster []string
+ for _, member := range members {
+ if member.Name == "" { // this is the signal for whether or not a given peer is started
+ continue
+ }
+ for _, peerURL := range member.PeerURLs {
+ initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", member.Name, peerURL))
+ }
+ }
+ if target.Name == "" {
+ // Adding unstarted member to the end of list
+ initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", targetName, target.PeerURLs[0]))
+ }
+
+ return strings.Join(initialCluster, ",")
+}
diff --git a/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster_test.go b/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster_test.go
new file mode 100644
index 000000000..f0b562d4d
--- /dev/null
+++ b/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster_test.go
@@ -0,0 +1,104 @@
+package discover_etcd_initial_cluster
+
+import (
+ "regexp"
+ "testing"
+
+ "go.etcd.io/etcd/api/v3/etcdserverpb"
+)
+
+var (
+ emptyInitialCluster = ""
+ startedEtcdMember = &etcdserverpb.Member{Name: "etcd-0", PeerURLs: []string{"https://etcd-0:2380"}}
+ unstartedEtcdMember = &etcdserverpb.Member{Name: "", PeerURLs: []string{"https://etcd-0:2380"}}
+ notFoundEtcdMember = &etcdserverpb.Member{Name: "not-found", PeerURLs: []string{"https://not-found:2380"}}
+)
+
+func Test_ensureValidMember(t *testing.T) {
+ tests := map[string]struct {
+ member *etcdserverpb.Member
+ dataDirExists bool
+ wantMemberFound bool
+ wantInitialCluster string
+ wantErr bool
+ wantErrString string
+ }{
+ "started member found no dataDir": {
+ member: startedEtcdMember,
+ wantMemberFound: true,
+ dataDirExists: false,
+ wantInitialCluster: emptyInitialCluster,
+ wantErr: true,
+ wantErrString: "dataDir has been destroyed and must be removed from the cluster",
+ },
+ "started member found with dataDir": {
+ member: startedEtcdMember,
+ wantMemberFound: true,
+ dataDirExists: true,
+ wantInitialCluster: emptyInitialCluster,
+ wantErr: false,
+ },
+ "member not found with dataDir": {
+ member: notFoundEtcdMember,
+ wantMemberFound: false,
+ dataDirExists: true,
+ wantInitialCluster: emptyInitialCluster,
+ wantErr: true,
+ wantErrString: "check operator logs for possible scaling problems",
+ },
+ "member not found no dataDir": {
+ member: notFoundEtcdMember,
+ wantMemberFound: false,
+ dataDirExists: false,
+ wantInitialCluster: emptyInitialCluster,
+ wantErr: true,
+ wantErrString: "check operator logs for possible scaling problems",
+ },
+ "unstarted member found with dataDir": {
+ member: unstartedEtcdMember,
+ wantMemberFound: true,
+ dataDirExists: true,
+ wantInitialCluster: emptyInitialCluster,
+ wantErr: true,
+ wantErrString: "previous members dataDir exists: archiving",
+ },
+ "unstarted member found no dataDir": {
+ member: unstartedEtcdMember,
+ wantMemberFound: true,
+ dataDirExists: false,
+ wantInitialCluster: "etcd-0=https://etcd-0:2380",
+ wantErr: false,
+ },
+ }
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ o := DiscoverEtcdInitialClusterOptions{
+ TargetPeerURLHost: "etcd-0",
+ TargetPeerURLScheme: "https",
+ TargetPeerURLPort: "2380",
+ TargetName: "etcd-0",
+ DataDir: "/tmp",
+ }
+ gotInitialCluster, gotMemberFound, err := o.getInitialCluster([]*etcdserverpb.Member{test.member}, test.dataDirExists)
+ if gotInitialCluster != test.wantInitialCluster {
+ t.Fatalf("initialCluster: want: %q, got: %q", test.wantInitialCluster, gotInitialCluster)
+ }
+ if err != nil && !test.wantErr {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err == nil && test.wantErr {
+ t.Fatal("expected error got nil")
+ }
+ if gotMemberFound != test.wantMemberFound {
+ t.Fatalf("memberFound: want %v, got %v", gotMemberFound, test.wantMemberFound)
+ }
+ if test.wantErrString != "" {
+ regex := regexp.MustCompile(test.wantErrString)
+ if len(regex.FindAll([]byte(err.Error()), -1)) != 1 {
+ t.Fatalf("unexpected error wanted %q in %q", test.wantErrString, err.Error())
+ }
+ }
+ })
+ }
+
+}
diff --git a/raft/OWNERS b/raft/OWNERS
index ab781066e..35fc8f687 100644
--- a/raft/OWNERS
+++ b/raft/OWNERS
@@ -1,19 +1,5 @@
approvers:
-- heyitsanthony
-- philips
-- fanminshi
-- gyuho
-- mitake
-- jpbetz
-- xiang90
-- bdarnell
+ - hexfusion
+ - smarterclayton
reviewers:
-- heyitsanthony
-- philips
-- fanminshi
-- gyuho
-- mitake
-- jpbetz
-- xiang90
-- bdarnell
-- tschottdorf
+ - hexfusion
diff --git a/scripts/test_lib.sh b/scripts/test_lib.sh
index 9053f9ce8..be86396d2 100644
--- a/scripts/test_lib.sh
+++ b/scripts/test_lib.sh
@@ -302,7 +302,7 @@ function tool_exists {
}
# Ensure gobin is available, as it runs majority of the tools
-if ! command -v "gobin" >/dev/null; then
+if ! command -v "gobin" >/dev/null && [[ ! "$GO_BUILD_FLAGS" =~ 'mod=readonly' ]]; then
run env GO111MODULE=off go get github.com/myitcv/gobin || exit 1
fi
diff --git a/server/config/config.go b/server/config/config.go
index b6e2109c2..ad6c38902 100644
--- a/server/config/config.go
+++ b/server/config/config.go
@@ -183,6 +183,9 @@ type ServerConfig struct {
// consider running defrag during bootstrap. Needs to be set to non-zero value to take effect.
ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"`
+ // ExperimentalMaxLearners sets a limit to the number of learner members that can exist in the cluster membership.
+ ExperimentalMaxLearners int `json:"experimental-max-learners"`
+
// V2Deprecation defines a phase of v2store deprecation process.
V2Deprecation V2DeprecationEnum `json:"v2-deprecation"`
}
diff --git a/server/embed/config.go b/server/embed/config.go
index 380c0c3aa..9dcefbae2 100644
--- a/server/embed/config.go
+++ b/server/embed/config.go
@@ -35,6 +35,7 @@ import (
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
bolt "go.etcd.io/bbolt"
@@ -324,6 +325,8 @@ type Config struct {
// ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to
// consider running defrag during bootstrap. Needs to be set to non-zero value to take effect.
ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"`
+ // ExperimentalMaxLearners sets a limit to the number of learner members that can exist in the cluster membership.
+ ExperimentalMaxLearners int `json:"experimental-max-learners"`
// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
@@ -488,6 +491,7 @@ func NewConfig() *Config {
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false,
ExperimentalTxnModeWriteWithSharedBuffer: true,
+ ExperimentalMaxLearners: membership.DefaultMaxLearners,
V2Deprecation: config.V2_DEPR_DEFAULT,
}
diff --git a/server/embed/etcd.go b/server/embed/etcd.go
index 001302f99..49d45b69b 100644
--- a/server/embed/etcd.go
+++ b/server/embed/etcd.go
@@ -223,7 +223,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
- V2Deprecation: cfg.V2DeprecationEffective(),
+ ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
+ V2Deprecation: cfg.V2DeprecationEffective(),
}
if srvcfg.ExperimentalEnableDistributedTracing {
@@ -345,6 +346,7 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.String("discovery-url", sc.DiscoveryURL),
zap.String("discovery-proxy", sc.DiscoveryProxy),
zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
+ zap.Int("max-learners", sc.ExperimentalMaxLearners),
)
}
diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go
index f9c91d9f9..1092181a7 100644
--- a/server/etcdmain/config.go
+++ b/server/etcdmain/config.go
@@ -29,6 +29,7 @@ import (
"go.etcd.io/etcd/pkg/v3/flags"
cconfig "go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/embed"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.uber.org/zap"
@@ -288,6 +289,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
+ fs.IntVar(&cfg.ec.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.")
// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go
index 8ebcd7430..9a078ef67 100644
--- a/server/etcdmain/grpc_proxy.go
+++ b/server/etcdmain/grpc_proxy.go
@@ -357,7 +357,7 @@ func newTLS(ca, cert, key string, requireEmptyCN bool) *transport.TLSInfo {
if ca == "" && cert == "" && key == "" {
return nil
}
- return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key, EmptyCN: requireEmptyCN}
+ return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key}
}
func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go
index dc5b55fae..56ca5285c 100644
--- a/server/etcdmain/help.go
+++ b/server/etcdmain/help.go
@@ -241,11 +241,13 @@ Experimental feature:
--experimental-watch-progress-notify-interval '10m'
Duration of periodical watch progress notification.
--experimental-warning-apply-duration '100ms'
- Warning is generated if requests take more than this duration.
+ Warning is generated if requests take more than this duration.
--experimental-txn-mode-write-with-shared-buffer 'true'
Enable the write transaction to use a shared buffer in its readonly check operations.
--experimental-bootstrap-defrag-threshold-megabytes
Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.
+ --experimental-max-learners '1'
+ Set the max number of learner members allowed in the cluster membership.
Unsafe feature:
--force-new-cluster 'false'
diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go
index 3df9588be..63732807d 100644
--- a/server/etcdserver/api/membership/cluster.go
+++ b/server/etcdserver/api/membership/cluster.go
@@ -20,6 +20,7 @@ import (
"crypto/sha1"
"encoding/binary"
"encoding/json"
+ "errors"
"fmt"
"path"
"sort"
@@ -32,6 +33,7 @@ import (
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
@@ -41,8 +43,6 @@ import (
"go.uber.org/zap"
)
-const maxLearners = 1
-
// RaftCluster is a list of Members that belong to the same raft cluster
type RaftCluster struct {
lg *zap.Logger
@@ -61,6 +61,7 @@ type RaftCluster struct {
removed map[types.ID]bool
downgradeInfo *DowngradeInfo
+ maxLearners int
}
// ConfigChangeContext represents a context for confChange.
@@ -81,8 +82,8 @@ const (
// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
// cluster with raft learner member.
-func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
- c := NewCluster(lg)
+func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap, opts ...ClusterOption) (*RaftCluster, error) {
+ c := NewCluster(lg, opts...)
for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
@@ -97,8 +98,8 @@ func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap)
return c, nil
}
-func NewClusterFromMembers(lg *zap.Logger, id types.ID, membs []*Member) *RaftCluster {
- c := NewCluster(lg)
+func NewClusterFromMembers(lg *zap.Logger, id types.ID, membs []*Member, opts ...ClusterOption) *RaftCluster {
+ c := NewCluster(lg, opts...)
c.cid = id
for _, m := range membs {
c.members[m.ID] = m
@@ -106,15 +107,18 @@ func NewClusterFromMembers(lg *zap.Logger, id types.ID, membs []*Member) *RaftCl
return c
}
-func NewCluster(lg *zap.Logger) *RaftCluster {
+func NewCluster(lg *zap.Logger, opts ...ClusterOption) *RaftCluster {
if lg == nil {
lg = zap.NewNop()
}
+ clOpts := newClusterOpts(opts...)
+
return &RaftCluster{
lg: lg,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
downgradeInfo: &DowngradeInfo{Enabled: false},
+ maxLearners: clOpts.maxLearners,
}
}
@@ -254,12 +258,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()
- if c.be != nil {
- c.version = clusterVersionFromBackend(c.lg, c.be)
- c.members, c.removed = membersFromBackend(c.lg, c.be)
- } else {
+ if c.v2store != nil {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
+ } else {
+ c.version = clusterVersionFromBackend(c.lg, c.be)
+ c.members, c.removed = membersFromBackend(c.lg, c.be)
}
if c.be != nil {
@@ -279,6 +283,7 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
zap.String("local-member-id", c.localID.String()),
zap.String("recovered-remote-peer-id", m.ID.String()),
zap.Strings("recovered-remote-peer-urls", m.PeerURLs),
+ zap.Bool("recovered-remote-peer-is-learner", m.IsLearner),
)
}
if c.version != nil {
@@ -293,9 +298,9 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
// ensures that it is still valid.
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
// TODO: this must be switched to backend as well.
- members, removed := membersFromStore(c.lg, c.v2store)
+ membersMap, removedMap := membersFromStore(c.lg, c.v2store)
id := types.ID(cc.NodeID)
- if removed[id] {
+ if removedMap[id] {
return ErrIDRemoved
}
switch cc.Type {
@@ -306,19 +311,21 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
if confChangeContext.IsPromote { // promoting a learner member to voting member
- if members[id] == nil {
+ if membersMap[id] == nil {
return ErrIDNotFound
}
- if !members[id].IsLearner {
+ if !membersMap[id].IsLearner {
return ErrMemberNotLearner
}
} else { // adding a new member
- if members[id] != nil {
+ if membersMap[id] != nil {
return ErrIDExists
}
+ var members []*Member
urls := make(map[string]bool)
- for _, m := range members {
+ for _, m := range membersMap {
+ members = append(members, m)
for _, u := range m.PeerURLs {
urls[u] = true
}
@@ -329,29 +336,24 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
}
- if confChangeContext.Member.IsLearner { // the new member is a learner
- numLearners := 0
- for _, m := range members {
- if m.IsLearner {
- numLearners++
- }
- }
- if numLearners+1 > maxLearners {
- return ErrTooManyLearners
+ if confChangeContext.Member.RaftAttributes.IsLearner && cc.Type == raftpb.ConfChangeAddLearnerNode { // the new member is a learner
+ scaleUpLearners := true
+ if err := ValidateMaxLearnerConfig(c.maxLearners, members, scaleUpLearners); err != nil {
+ return err
}
}
}
case raftpb.ConfChangeRemoveNode:
- if members[id] == nil {
+ if membersMap[id] == nil {
return ErrIDNotFound
}
case raftpb.ConfChangeUpdateNode:
- if members[id] == nil {
+ if membersMap[id] == nil {
return ErrIDNotFound
}
urls := make(map[string]bool)
- for _, m := range members {
+ for _, m := range membersMap {
if m.ID == id {
continue
}
@@ -381,11 +383,37 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()
+
+ var v2Err, beErr error
if c.v2store != nil {
- mustSaveMemberToStore(c.lg, c.v2store, m)
+ v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m)
+ if v2Err != nil {
+ if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist {
+ c.lg.Panic(
+ "failed to save member to store",
+ zap.String("member-id", m.ID.String()),
+ zap.Error(v2Err),
+ )
+ }
+ }
}
if c.be != nil && shouldApplyV3 {
- mustSaveMemberToBackend(c.lg, c.be, m)
+ beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
+ if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) {
+ c.lg.Panic(
+ "failed to save member to backend",
+ zap.String("member-id", m.ID.String()),
+ zap.Error(beErr),
+ )
+ }
+ }
+ // Panic if both storeV2 and backend report member already exist.
+ if v2Err != nil && (beErr != nil || c.be == nil) {
+ c.lg.Panic(
+ "failed to save member to store",
+ zap.String("member-id", m.ID.String()),
+ zap.Error(v2Err),
+ )
}
c.members[m.ID] = m
@@ -396,6 +424,7 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
zap.String("local-member-id", c.localID.String()),
zap.String("added-peer-id", m.ID.String()),
zap.Strings("added-peer-peer-urls", m.PeerURLs),
+ zap.Bool("added-peer-is-learner", m.IsLearner),
)
}
@@ -404,11 +433,36 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()
+ var v2Err, beErr error
if c.v2store != nil {
- mustDeleteMemberFromStore(c.lg, c.v2store, id)
+ v2Err = unsafeDeleteMemberFromStore(c.v2store, id)
+ if v2Err != nil {
+ if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound {
+ c.lg.Panic(
+ "failed to delete member from store",
+ zap.String("member-id", id.String()),
+ zap.Error(v2Err),
+ )
+ }
+ }
}
if c.be != nil && shouldApplyV3 {
- mustDeleteMemberFromBackend(c.be, id)
+ beErr = unsafeDeleteMemberFromBackend(c.be, id)
+ if beErr != nil && !errors.Is(beErr, errMemberNotFound) {
+ c.lg.Panic(
+ "failed to delete member from backend",
+ zap.String("member-id", id.String()),
+ zap.Error(beErr),
+ )
+ }
+ }
+ // Panic if both storeV2 and backend report member not found.
+ if v2Err != nil && (beErr != nil || c.be == nil) {
+ c.lg.Panic(
+ "failed to delete member from store",
+ zap.String("member-id", id.String()),
+ zap.Error(v2Err),
+ )
}
m, ok := c.members[id]
@@ -422,6 +476,7 @@ func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
zap.String("local-member-id", c.localID.String()),
zap.String("removed-remote-peer-id", id.String()),
zap.Strings("removed-remote-peer-urls", m.PeerURLs),
+ zap.Bool("removed-remote-peer-is-learner", m.IsLearner),
)
} else {
c.lg.Warn(
@@ -443,7 +498,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
}
if c.be != nil && shouldApplyV3 {
- mustSaveMemberToBackend(c.lg, c.be, m)
+ unsafeSaveMemberToBackend(c.lg, c.be, m)
}
return
}
@@ -476,7 +531,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
- mustSaveMemberToBackend(c.lg, c.be, c.members[id])
+ unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}
c.lg.Info(
@@ -495,7 +550,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
- mustSaveMemberToBackend(c.lg, c.be, c.members[id])
+ unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}
c.lg.Info(
@@ -504,6 +559,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
zap.String("local-member-id", c.localID.String()),
zap.String("updated-remote-peer-id", id.String()),
zap.Strings("updated-remote-peer-urls", raftAttr.PeerURLs),
+ zap.Bool("updated-remote-peer-is-learner", raftAttr.IsLearner),
)
}
@@ -870,7 +926,7 @@ func (c *RaftCluster) PushMembershipToStorage() {
if c.be != nil {
TrimMembershipFromBackend(c.lg, c.be)
for _, m := range c.members {
- mustSaveMemberToBackend(c.lg, c.be, m)
+ unsafeSaveMemberToBackend(c.lg, c.be, m)
}
}
if c.v2store != nil {
@@ -880,3 +936,24 @@ func (c *RaftCluster) PushMembershipToStorage() {
}
}
}
+
+// ValidateMaxLearnerConfig verifies the existing learner members in the cluster membership and an optional N+1 learner
+// scale up are not more than maxLearners.
+func ValidateMaxLearnerConfig(maxLearners int, members []*Member, scaleUpLearners bool) error {
+ numLearners := 0
+ for _, m := range members {
+ if m.IsLearner {
+ numLearners++
+ }
+ }
+ // Validate config can accommodate scale up.
+ if scaleUpLearners {
+ numLearners++
+ }
+
+ if numLearners > maxLearners {
+ return ErrTooManyLearners
+ }
+
+ return nil
+}
diff --git a/server/etcdserver/api/membership/cluster_opts.go b/server/etcdserver/api/membership/cluster_opts.go
new file mode 100644
index 000000000..204fbf04d
--- /dev/null
+++ b/server/etcdserver/api/membership/cluster_opts.go
@@ -0,0 +1,43 @@
+// Copyright 2021 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package membership
+
+const DefaultMaxLearners = 1
+
+type ClusterOptions struct {
+ maxLearners int
+}
+
+// ClusterOption are options which can be applied to the raft cluster.
+type ClusterOption func(*ClusterOptions)
+
+func newClusterOpts(opts ...ClusterOption) *ClusterOptions {
+ clOpts := &ClusterOptions{}
+ clOpts.applyOpts(opts)
+ return clOpts
+}
+
+func (co *ClusterOptions) applyOpts(opts []ClusterOption) {
+ for _, opt := range opts {
+ opt(co)
+ }
+}
+
+// WithMaxLearners sets the maximum number of learners that can exist in the cluster membership.
+func WithMaxLearners(max int) ClusterOption {
+ return func(co *ClusterOptions) {
+ co.maxLearners = max
+ }
+}
diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go
index 23d81fec1..e152e06a1 100644
--- a/server/etcdserver/api/membership/cluster_test.go
+++ b/server/etcdserver/api/membership/cluster_test.go
@@ -20,8 +20,12 @@ import (
"path"
"reflect"
"testing"
+ "time"
"github.com/coreos/go-semver/semver"
+ "github.com/stretchr/testify/assert"
+ betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
+ "go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/client/pkg/v3/testutil"
@@ -29,8 +33,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mock/mockstore"
-
- "go.uber.org/zap"
)
func TestClusterMember(t *testing.T) {
@@ -279,10 +281,14 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {
}
func TestClusterValidateConfigurationChange(t *testing.T) {
- cl := NewCluster(zaptest.NewLogger(t))
+ cl := NewCluster(zaptest.NewLogger(t), WithMaxLearners(1))
cl.SetStore(v2store.New())
for i := 1; i <= 4; i++ {
- attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}}
+ var isLearner bool
+ if i == 1 {
+ isLearner = true
+ }
+ attr := RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", i)}, IsLearner: isLearner}
cl.AddMember(&Member{ID: types.ID(i), RaftAttributes: attr}, true)
}
cl.RemoveMember(4, true)
@@ -327,6 +333,17 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
t.Fatal(err)
}
+ attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 7)}, IsLearner: true}
+ ctx7, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(7), RaftAttributes: attr}})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}, IsLearner: true}
+ ctx8, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(1), RaftAttributes: attr}, IsPromote: true})
+ if err != nil {
+ t.Fatal(err)
+ }
tests := []struct {
cc raftpb.ConfChange
werr error
@@ -424,6 +441,22 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
},
ErrIDNotFound,
},
+ {
+ raftpb.ConfChange{
+ Type: raftpb.ConfChangeAddLearnerNode,
+ NodeID: 7,
+ Context: ctx7,
+ },
+ ErrTooManyLearners,
+ },
+ {
+ raftpb.ConfChange{
+ Type: raftpb.ConfChangeAddNode,
+ NodeID: 1,
+ Context: ctx8,
+ },
+ nil,
+ },
}
for i, tt := range tests {
err := cl.ValidateConfigurationChange(tt.cc)
@@ -1019,3 +1052,193 @@ func TestIsVersionChangable(t *testing.T) {
})
}
}
+
+func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) {
+ now := time.Now()
+ alice := NewMember("", nil, "alice", &now)
+
+ tcs := []struct {
+ name string
+
+ storeV2Nil bool
+ backendNil bool
+ storeV2Members []*Member
+ backendMembers []*Member
+
+ expectPanics bool
+ expectMembers map[types.ID]*Member
+ }{
+ {
+ name: "Adding new member should succeed",
+ },
+ {
+ name: "Adding member should succeed if it was only in storeV2",
+ storeV2Members: []*Member{alice},
+ },
+ {
+ name: "Adding member should succeed if it was only in backend",
+ backendMembers: []*Member{alice},
+ },
+ {
+ name: "Adding member should fail if it exists in both",
+ storeV2Members: []*Member{alice},
+ backendMembers: []*Member{alice},
+ expectPanics: true,
+ },
+ {
+ name: "Adding member should fail if it exists in storeV2 and backend is nil",
+ storeV2Members: []*Member{alice},
+ backendNil: true,
+ expectPanics: true,
+ },
+ {
+ name: "Adding member should succeed if it exists in backend and storageV2 is nil",
+ storeV2Nil: true,
+ backendMembers: []*Member{alice},
+ },
+ {
+ name: "Adding new member should succeed if backend is nil",
+ storeV2Members: []*Member{},
+ backendNil: true,
+ },
+ {
+ name: "Adding new member should fail if storageV2 is nil",
+ storeV2Nil: true,
+ backendMembers: []*Member{},
+ },
+ }
+ for _, tc := range tcs {
+ t.Run(tc.name, func(t *testing.T) {
+ lg := zaptest.NewLogger(t)
+ be, _ := betesting.NewDefaultTmpBackend(t)
+ defer be.Close()
+ mustCreateBackendBuckets(be)
+ st := v2store.New()
+ for _, m := range tc.backendMembers {
+ unsafeSaveMemberToBackend(lg, be, m)
+ }
+ be.ForceCommit()
+ for _, m := range tc.storeV2Members {
+ mustSaveMemberToStore(lg, st, m)
+ }
+ cluster := NewCluster(lg)
+ if !tc.backendNil {
+ cluster.SetBackend(be)
+ }
+ if !tc.storeV2Nil {
+ cluster.SetStore(st)
+ }
+ if tc.expectPanics {
+ assert.Panics(t, func() {
+ cluster.AddMember(alice, ApplyBoth)
+ })
+ } else {
+ cluster.AddMember(alice, ApplyBoth)
+ }
+ if !tc.storeV2Nil {
+ storeV2Members, _ := membersFromStore(lg, st)
+ assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, storeV2Members)
+ }
+ if !tc.backendNil {
+ be.ForceCommit()
+ beMembers, _ := mustReadMembersFromBackend(lg, be)
+ assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, beMembers)
+ }
+ })
+ }
+}
+
+func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) {
+ now := time.Now()
+ alice := NewMember("", nil, "alice", &now)
+
+ tcs := []struct {
+ name string
+
+ storeV2Nil bool
+ backendNil bool
+ storeV2Members []*Member
+ backendMembers []*Member
+
+ expectMembers []*Member
+ expectPanics bool
+ }{
+ {
+ name: "Removing new member should fail",
+ expectPanics: true,
+ },
+ {
+ name: "Removing member should succeed if it was only in storeV2",
+ storeV2Members: []*Member{alice},
+ },
+ {
+ name: "Removing member should succeed if it was only in backend",
+ backendMembers: []*Member{alice},
+ },
+ {
+ name: "Removing member should succeed if it exists in both",
+ storeV2Members: []*Member{alice},
+ backendMembers: []*Member{alice},
+ },
+ {
+ name: "Removing new member should fail if backend is nil",
+ storeV2Members: []*Member{},
+ backendNil: true,
+ expectPanics: true,
+ },
+ {
+ name: "Removing new member should succeed if storageV2 is nil",
+ storeV2Nil: true,
+ backendMembers: []*Member{},
+ },
+ {
+ name: "Removing member should succeed if it exists in v2storage and backend is nil",
+ storeV2Members: []*Member{alice},
+ backendNil: true,
+ },
+ {
+ name: "Removing member should succeed if it exists in backend and storageV2 is nil",
+ storeV2Nil: true,
+ backendMembers: []*Member{alice},
+ },
+ }
+ for _, tc := range tcs {
+ t.Run(tc.name, func(t *testing.T) {
+ lg := zaptest.NewLogger(t)
+ be, _ := betesting.NewDefaultTmpBackend(t)
+ defer be.Close()
+ mustCreateBackendBuckets(be)
+ st := v2store.New()
+ for _, m := range tc.backendMembers {
+ unsafeSaveMemberToBackend(lg, be, m)
+ }
+ be.ForceCommit()
+ for _, m := range tc.storeV2Members {
+ mustSaveMemberToStore(lg, st, m)
+ }
+ cluster := NewCluster(lg)
+ if !tc.backendNil {
+ cluster.SetBackend(be)
+ }
+ if !tc.storeV2Nil {
+ cluster.SetStore(st)
+ }
+ if tc.expectPanics {
+ assert.Panics(t, func() {
+ cluster.RemoveMember(alice.ID, ApplyBoth)
+ })
+ } else {
+ cluster.RemoveMember(alice.ID, ApplyBoth)
+ }
+ if !tc.storeV2Nil {
+ storeV2Members, _ := membersFromStore(lg, st)
+ assert.Equal(t, map[types.ID]*Member{}, storeV2Members)
+ }
+ if !tc.backendNil {
+ be.ForceCommit()
+ beMembers, _ := mustReadMembersFromBackend(lg, be)
+ assert.Equal(t, map[types.ID]*Member{}, beMembers)
+ }
+ })
+ }
+}
diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go
index 0bab3e42e..a0cdf370a 100644
--- a/server/etcdserver/api/membership/store.go
+++ b/server/etcdserver/api/membership/store.go
@@ -15,6 +15,7 @@
package membership
import (
+ "bytes"
"encoding/json"
"fmt"
"path"
@@ -39,9 +40,11 @@ const (
var (
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
+ errMemberAlreadyExist = fmt.Errorf("member already exists")
+ errMemberNotFound = fmt.Errorf("member not found")
)
-func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
+func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) error {
mkey := backendMemberKey(m.ID)
mvalue, err := json.Marshal(m)
if err != nil {
@@ -51,7 +54,11 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
+ if unsafeMemberExists(tx, mkey) {
+ return errMemberAlreadyExist
+ }
tx.UnsafePut(buckets.Members, mkey, mvalue)
+ return nil
}
// TrimClusterFromBackend removes all information about cluster (versions)
@@ -64,14 +71,29 @@ func TrimClusterFromBackend(be backend.Backend) error {
return nil
}
-func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
+func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
mkey := backendMemberKey(id)
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
- tx.UnsafeDelete(buckets.Members, mkey)
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
+ if !unsafeMemberExists(tx, mkey) {
+ return errMemberNotFound
+ }
+ tx.UnsafeDelete(buckets.Members, mkey)
+ return nil
+}
+
+func unsafeMemberExists(tx backend.ReadTx, mkey []byte) bool {
+ var found bool
+ tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
+ if bytes.Equal(k, mkey) {
+ found = true
+ }
+ return nil
+ })
+ return found
}
func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
@@ -182,35 +204,34 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
}
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
- b, err := json.Marshal(m.RaftAttributes)
+ err := unsafeSaveMemberToStore(lg, s, m)
if err != nil {
- lg.Panic("failed to marshal raftAttributes", zap.Error(err))
- }
- p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
- if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to save member to store",
- zap.String("path", p),
+ zap.String("member-id", m.ID.String()),
zap.Error(err),
)
}
}
-func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
+func unsafeSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) error {
+ b, err := json.Marshal(m.RaftAttributes)
+ if err != nil {
+ lg.Panic("failed to marshal raftAttributes", zap.Error(err))
+ }
+ p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
+ _, err = s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
+ return err
+}
+
+func unsafeDeleteMemberFromStore(s v2store.Store, id types.ID) error {
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
- lg.Panic(
- "failed to delete member from store",
- zap.String("path", MemberStoreKey(id)),
- zap.Error(err),
- )
+ return err
}
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
- lg.Panic(
- "failed to create removedMember",
- zap.String("path", RemovedMemberStoreKey(id)),
- zap.Error(err),
- )
+ return err
}
+ return nil
}
func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go
index 8b9600d39..3c013e815 100644
--- a/server/etcdserver/raft.go
+++ b/server/etcdserver/raft.go
@@ -486,7 +486,7 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID,
zap.String("local-member-id", id.String()),
zap.Uint64("commit-index", st.Commit),
)
- cl := membership.NewCluster(cfg.Logger)
+ cl := membership.NewCluster(cfg.Logger, membership.WithMaxLearners(cfg.ExperimentalMaxLearners))
cl.SetID(id, cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
@@ -560,7 +560,7 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot)
zap.Uint64("commit-index", st.Commit),
)
- cl := membership.NewCluster(cfg.Logger)
+ cl := membership.NewCluster(cfg.Logger, membership.WithMaxLearners(cfg.ExperimentalMaxLearners))
cl.SetID(id, cid)
s := raft.NewMemoryStorage()
if snapshot != nil {
diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go
index 56e288cc5..e52b43a91 100644
--- a/server/etcdserver/server.go
+++ b/server/etcdserver/server.go
@@ -410,7 +410,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if err = cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
- cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
+ cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap, membership.WithMaxLearners(cfg.ExperimentalMaxLearners))
if err != nil {
return nil, err
}
@@ -424,7 +424,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
return nil, fmt.Errorf("incompatible with current running cluster")
}
-
+ scaleUpLearners := false
+ if err := membership.ValidateMaxLearnerConfig(cfg.ExperimentalMaxLearners, existingCluster.Members(), scaleUpLearners); err != nil {
+ return nil, err
+ }
remotes = existingCluster.Members()
cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st)
@@ -436,7 +439,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if err = cfg.VerifyBootstrap(); err != nil {
return nil, err
}
- cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
+ cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap, membership.WithMaxLearners(cfg.ExperimentalMaxLearners))
if err != nil {
return nil, err
}
@@ -458,7 +461,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if config.CheckDuplicateURL(urlsmap) {
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
}
- if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
+ if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap, membership.WithMaxLearners(cfg.ExperimentalMaxLearners)); err != nil {
return nil, err
}
}
@@ -535,6 +538,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
cl.SetStore(st)
cl.SetBackend(be)
cl.Recover(api.UpdateCapability)
+
+ scaleUpLearners := false
+ if err := membership.ValidateMaxLearnerConfig(cfg.ExperimentalMaxLearners, cl.Members(), scaleUpLearners); err != nil {
+ return nil, err
+ }
+
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go
index b7207c171..529f83b0f 100644
--- a/server/mvcc/backend/backend.go
+++ b/server/mvcc/backend/backend.go
@@ -432,6 +432,8 @@ func (b *backend) Defrag() error {
func (b *backend) defrag() error {
now := time.Now()
+ isDefragActive.Set(1)
+ defer isDefragActive.Set(0)
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
diff --git a/server/mvcc/backend/metrics.go b/server/mvcc/backend/metrics.go
index d9641af7a..9d58c0063 100644
--- a/server/mvcc/backend/metrics.go
+++ b/server/mvcc/backend/metrics.go
@@ -83,6 +83,13 @@ var (
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
})
+
+ isDefragActive = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "disk",
+ Name: "defrag_inflight",
+ Help: "Whether or not defrag is active on the member. 1 means active, 0 means not.",
+ })
)
func init() {
@@ -92,4 +99,5 @@ func init() {
prometheus.MustRegister(writeSec)
prometheus.MustRegister(defragSec)
prometheus.MustRegister(snapshotTransferSec)
+ prometheus.MustRegister(isDefragActive)
}
diff --git a/tests/integration/clientv3/cluster_test.go b/tests/integration/clientv3/cluster_test.go
index eff15cf7d..d624a56d7 100644
--- a/tests/integration/clientv3/cluster_test.go
+++ b/tests/integration/clientv3/cluster_test.go
@@ -16,6 +16,7 @@ package clientv3test
import (
"context"
+ "fmt"
"math/rand"
"reflect"
"strings"
@@ -376,18 +377,28 @@ func TestMemberPromoteMemberNotExist(t *testing.T) {
}
}
-// TestMaxLearnerInCluster verifies that the maximum number of learners allowed in a cluster is 1
+// TestMaxLearnerInCluster verifies that the maximum number of learners allowed in a cluster
func TestMaxLearnerInCluster(t *testing.T) {
integration.BeforeTest(t)
- // 1. start with a cluster with 3 voting member and 0 learner member
- clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+ // 1. start with a cluster with 3 voting member and max learner 2
+ clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, ExperimentalMaxLearners: 2})
defer clus.Terminate(t)
- // 2. adding a learner member should succeed
- resp1, err := clus.Client(0).MemberAddAsLearner(context.Background(), []string{"http://127.0.0.1:1234"})
+ // 2. adding 2 learner members should succeed
+ for i := 0; i < 2; i++ {
+ _, err := clus.Client(0).MemberAddAsLearner(context.Background(), []string{fmt.Sprintf("http://127.0.0.1:123%d", i)})
+ if err != nil {
+ t.Fatalf("failed to add learner member %v", err)
+ }
+ }
+
+ // ensure client endpoint is voting member
+ leaderIdx := clus.WaitLeader(t)
+ capi := clus.Client(leaderIdx)
+ resp1, err := capi.MemberList(context.Background())
if err != nil {
- t.Fatalf("failed to add learner member %v", err)
+ t.Fatalf("failed to get member list")
}
numberOfLearners := 0
for _, m := range resp1.Members {
@@ -395,12 +406,12 @@ func TestMaxLearnerInCluster(t *testing.T) {
numberOfLearners++
}
}
- if numberOfLearners != 1 {
- t.Fatalf("Added 1 learner node to cluster, got %d", numberOfLearners)
+ if numberOfLearners != 2 {
+ t.Fatalf("added 2 learner node to cluster, got %d", numberOfLearners)
}
- // 3. cluster has 3 voting member and 1 learner, adding another learner should fail
- _, err = clus.Client(0).MemberAddAsLearner(context.Background(), []string{"http://127.0.0.1:2345"})
+ // 3. cluster has 3 voting member and 2 learner, adding another learner should fail
+ _, err = clus.Client(0).MemberAddAsLearner(context.Background(), []string{"http://127.0.0.1:2342"})
if err == nil {
t.Fatalf("expect member add to fail, got no error")
}
@@ -410,7 +421,7 @@ func TestMaxLearnerInCluster(t *testing.T) {
}
// 4. cluster has 3 voting member and 1 learner, adding a voting member should succeed
- _, err = clus.Client(0).MemberAdd(context.Background(), []string{"http://127.0.0.1:3456"})
+ _, err = clus.Client(0).MemberAdd(context.Background(), []string{"http://127.0.0.1:3453"})
if err != nil {
t.Errorf("failed to add member %v", err)
}
diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go
index b7e36817b..26d39696e 100644
--- a/tests/integration/cluster.go
+++ b/tests/integration/cluster.go
@@ -44,6 +44,7 @@ import (
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
+ "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2http"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
@@ -158,6 +159,7 @@ type ClusterConfig struct {
LeaseCheckpointInterval time.Duration
WatchProgressNotifyInterval time.Duration
+ ExperimentalMaxLearners int
}
type cluster struct {
@@ -316,6 +318,7 @@ func (c *cluster) mustNewMember(t testutil.TB) *member {
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
+ ExperimentalMaxLearners: c.cfg.ExperimentalMaxLearners,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
@@ -608,6 +611,7 @@ type memberConfig struct {
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
WatchProgressNotifyInterval time.Duration
+ ExperimentalMaxLearners int
}
// mustNewMember return an inited member with the given name. If peerTLS is
@@ -705,7 +709,10 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.InitialCorruptCheck = true
m.WarningApplyDuration = embed.DefaultWarningApplyDuration
-
+ m.ExperimentalMaxLearners = membership.DefaultMaxLearners
+ if mcfg.ExperimentalMaxLearners != 0 {
+ m.ExperimentalMaxLearners = mcfg.ExperimentalMaxLearners
+ }
m.V2Deprecation = config.V2_DEPR_DEFAULT
m.Logger = memberLogger(t, mcfg.name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment