Created
June 21, 2016 01:42
-
-
Save taterbase/c349d8facf40564e7d54f53ec56c6960 to your computer and use it in GitHub Desktop.
Recursively watch child Zookeeper graph grow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"path" | |
"sync" | |
"time" | |
"github.com/samuel/go-zookeeper/zk" | |
) | |
var ( | |
zk_addrs = []string{"127.0.0.1"} | |
zk_conn_timeout = time.Second | |
) | |
type ZKScanner struct { | |
conn *zk.Conn | |
total uint | |
Event <-chan string | |
} | |
func NewZKScanner(conn *zk.Conn) *ZKScanner { | |
return &ZKScanner{conn: conn} | |
} | |
func (s *ZKScanner) watch(ch <-chan zk.Event) { | |
event := <-ch | |
if event.Err != nil { | |
log.Fatal("Child Watch Event Error:", event.Err) | |
} | |
if event.Type == zk.EventNodeDeleted { | |
log.Println("Child Node Removed:", event.Path) | |
return | |
} | |
log.Println("Child changed:", event.Path) | |
err := s.partialScan(event.Path) | |
if err != nil { | |
log.Fatal("Error on partial scan:", err) | |
} | |
} | |
func (s *ZKScanner) scan(prefix, node string) error { | |
node = path.Join(prefix, node) | |
children, _, ch, err := s.conn.ChildrenW(node) | |
if err != nil { | |
return err | |
} | |
go s.watch(ch) | |
for _, child := range children { | |
err := s.scan(node, child) | |
if err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
func (s *ZKScanner) partialScan(node string) error { | |
children, parent_stat, ch, err := s.conn.ChildrenW(node) | |
if err != nil { | |
return err | |
} | |
go s.watch(ch) | |
for _, child := range children { | |
_, stat, err := s.conn.Get(path.Join(node, child)) | |
if err != nil { | |
return err | |
} | |
if stat.Czxid == parent_stat.Pzxid { | |
s.scan(node, child) | |
} | |
} | |
return nil | |
} | |
func main() { | |
c, _, err := zk.Connect(zk_addrs, zk_conn_timeout) | |
if err != nil { | |
log.Fatal("Unable to connect:", err) | |
} | |
scanner := NewZKScanner(c) | |
err = scanner.scan("/", "") | |
if err != nil { | |
log.Fatal("Unable to scan nodes:", err) | |
} | |
wg := &sync.WaitGroup{} | |
wg.Add(1) | |
wg.Wait() | |
log.Println("finished") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment