Created
March 9, 2023 09:30
-
-
Save menghan/fee89adabfedef8b97130c5fc45262b6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Write a golang struct called "watchUpdator". The struct creates a etcd connection, looping watch a given key, with timeout. if the key changes or timeouts, call corresponded callback | |
Here's an example of a Go struct called "watchUpdator" that creates an etcd connection, loops to watch a given key, and calls a corresponding callback function if the key changes or times out: | |
*/ | |
package main | |
import ( | |
"context" | |
"fmt" | |
"go.etcd.io/etcd/clientv3" | |
"go.etcd.io/etcd/mvcc/mvccpb" | |
"time" | |
) | |
type watchUpdator struct { | |
client *clientv3.Client | |
key string | |
timeout time.Duration | |
onUpdate func(string) | |
onTimeout func() | |
stopSignal chan bool | |
} | |
func (wu *watchUpdator) watchLoop() { | |
watchCh := wu.client.Watch(context.Background(), wu.key, clientv3.WithPrefix(), clientv3.WithPrevKV()) | |
timer := time.NewTimer(wu.timeout) | |
for { | |
select { | |
case <-wu.stopSignal: | |
return | |
case resp := <-watchCh: | |
if resp.Err() != nil { | |
fmt.Printf("error watching key %s: %v\n", wu.key, resp.Err()) | |
return | |
} | |
for _, ev := range resp.Events { | |
switch ev.Type { | |
case mvccpb.PUT: | |
wu.onUpdate(string(ev.Kv.Value)) | |
case mvccpb.DELETE: | |
wu.onUpdate("") | |
default: | |
fmt.Printf("unexpected etcd event type: %v\n", ev.Type) | |
} | |
} | |
// Reset the timer when we receive an update | |
if !timer.Stop() { | |
<-timer.C | |
} | |
timer.Reset(wu.timeout) | |
case <-timer.C: | |
wu.onTimeout() | |
return | |
} | |
} | |
} | |
func NewWatchUpdator(endpoints []string, key string, timeout time.Duration, onUpdate func(string), onTimeout func()) (*watchUpdator, error) { | |
client, err := clientv3.New(clientv3.Config{ | |
Endpoints: endpoints, | |
DialTimeout: 5 * time.Second, | |
}) | |
if err != nil { | |
return nil, err | |
} | |
wu := &watchUpdator{ | |
client: client, | |
key: key, | |
timeout: timeout, | |
onUpdate: onUpdate, | |
onTimeout: onTimeout, | |
stopSignal: make(chan bool), | |
} | |
go wu.watchLoop() | |
return wu, nil | |
} | |
func main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment