Skip to content

Instantly share code, notes, and snippets.

@tamird
Created November 3, 2015 22:45
Show Gist options
  • Save tamird/6a2061cd7bb48dde1187 to your computer and use it in GitHub Desktop.
Save tamird/6a2061cd7bb48dde1187 to your computer and use it in GitHub Desktop.
commit e84576027590e617e639c30e8bdf4a935a86fc6b
Author: Tamir Duberstein <[email protected]>
Date: Tue Nov 3 17:44:04 2015 -0500
return a callback that unregisters
diff --git a/gossip/client.go b/gossip/client.go
index 6ba0ccd..e1945d2 100644
--- a/gossip/client.go
+++ b/gossip/client.go
@@ -187,8 +187,7 @@ func (c *client) gossip(g *Gossip, stopper *stop.Stopper) error {
updateCallback := func(key string, content []byte) {
c.sendGossip(g, nodeID, addr, lAddr, done)
}
- unregisterCB := g.RegisterCallback(".*", updateCallback)
- defer g.UnregisterCallback(unregisterCB)
+ defer g.RegisterCallback(".*", updateCallback)()
// Loop until stopper is signalled, or until either the gossip or
// RPC clients are closed. getGossip is a hanging get, returning
diff --git a/gossip/gossip.go b/gossip/gossip.go
index 1876097..d0cab36 100644
--- a/gossip/gossip.go
+++ b/gossip/gossip.go
@@ -305,24 +305,21 @@ type Callback func(key string, content []byte)
// RegisterCallback registers a callback for a key pattern to be
// invoked whenever new info for a gossip key matching pattern is
// received. The callback method is invoked with the info key which
-// matched pattern. Returns a value to use with UnregisterCallback().
-func (g *Gossip) RegisterCallback(pattern string, method Callback) interface{} {
+// matched pattern. Returns a function that unregisters the callback.
+func (g *Gossip) RegisterCallback(pattern string, method Callback) func() {
if pattern == KeySystemConfig {
log.Warning("raw gossip callback registered on %s, consider using RegisterSystemConfigCallback",
KeySystemConfig)
}
g.mu.Lock()
- defer g.mu.Unlock()
- return g.is.registerCallback(pattern, method)
-}
-
-// UnregisterCallback unregisters an update callback. Use the value
-// returned by the call to RegisterCallback to unregister.
-func (g *Gossip) UnregisterCallback(unregister interface{}) {
- g.mu.Lock()
- defer g.mu.Unlock()
- g.is.unregisterCallback(unregister)
+ unregisterCB := g.is.registerCallback(pattern, method)
+ g.mu.Unlock()
+ return func() {
+ g.mu.Lock()
+ unregisterCB()
+ g.mu.Unlock()
+ }
}
// GetSystemConfig returns the local unmarshalled version of the
diff --git a/gossip/infostore.go b/gossip/infostore.go
index b3fb36d..48af0f6 100644
--- a/gossip/infostore.go
+++ b/gossip/infostore.go
@@ -51,7 +51,7 @@ type infoStore struct {
NodeID roachpb.NodeID `json:"-"` // Owning node's ID
NodeAddr util.UnresolvedAddr `json:"-"` // Address of node owning this info store: "host:port"
highWaterStamps map[int32]int64 // High water timestamps for known gossip peers
- callbacks []*callback
+ callbacks map[*callback]struct{}
}
// monotonicUnixNano returns a monotonically increasing value for
@@ -105,6 +105,7 @@ func newInfoStore(nodeID roachpb.NodeID, nodeAddr util.UnresolvedAddr) infoStore
NodeID: nodeID,
NodeAddr: nodeAddr,
highWaterStamps: map[int32]int64{},
+ callbacks: map[*callback]struct{}{},
}
}
@@ -197,10 +198,10 @@ func (is *infoStore) getHighWaterStamps() map[int32]int64 {
// registerCallback compiles a regexp for pattern and adds it to
// the callbacks slice.
-func (is *infoStore) registerCallback(pattern string, method Callback) interface{} {
+func (is *infoStore) registerCallback(pattern string, method Callback) func() {
re := regexp.MustCompile(pattern)
cb := &callback{pattern: re, method: method}
- is.callbacks = append(is.callbacks, cb)
+ is.callbacks[cb] = struct{}{}
infosBytes := make(map[string][]byte)
if err := is.visitInfos(func(key string, i *Info) error {
if re.MatchString(key) {
@@ -220,15 +221,8 @@ func (is *infoStore) registerCallback(pattern string, method Callback) interface
method(key, bytes)
}
}()
- return cb
-}
-
-func (is *infoStore) unregisterCallback(cb interface{}) {
- for i := range is.callbacks {
- if is.callbacks[i] == cb {
- is.callbacks = append(is.callbacks[:i], is.callbacks[i+1:]...)
- return
- }
+ return func() {
+ delete(is.callbacks, cb)
}
}
@@ -237,7 +231,7 @@ func (is *infoStore) unregisterCallback(cb interface{}) {
// the corresponding callback method on a match.
func (is *infoStore) processCallbacks(key string, content []byte) {
var matches []*callback
- for _, cb := range is.callbacks {
+ for cb := range is.callbacks {
if cb.pattern.MatchString(key) {
matches = append(matches, cb)
}
diff --git a/gossip/infostore_test.go b/gossip/infostore_test.go
index 23680ec..6e31762 100644
--- a/gossip/infostore_test.go
+++ b/gossip/infostore_test.go
@@ -335,7 +335,7 @@ func TestCallbacks(t *testing.T) {
cb2 := callbackRecord{wg: wg}
cbAll := callbackRecord{wg: wg}
- cb1CB := is.registerCallback("key1", cb1.Add)
+ unregisterCB1 := is.registerCallback("key1", cb1.Add)
is.registerCallback("key2", cb2.Add)
is.registerCallback("key.*", cbAll.Add)
@@ -425,7 +425,7 @@ func TestCallbacks(t *testing.T) {
}
// Unregister a callback and verify nothing is invoked on it.
- is.unregisterCallback(cb1CB)
+ unregisterCB1()
iNew := is.newInfo([]byte("a"), time.Second)
wg.Add(2) // for the two cbAll callbacks
if err := is.addInfo("key1", iNew); err != nil {
diff --git a/gossip/server.go b/gossip/server.go
index 386be84..cec3fac 100644
--- a/gossip/server.go
+++ b/gossip/server.go
@@ -161,12 +161,12 @@ func (s *server) start(rpcServer *rpc.Server, stopper *stop.Stopper) {
// stop sets the server's closed bool to true and broadcasts to
// waiting gossip clients to wakeup and finish.
-func (s *server) stop(unregisterCB interface{}) {
+func (s *server) stop(unregisterCB func()) {
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
s.ready.Broadcast() // wake up clients
- s.is.unregisterCallback(unregisterCB)
+ unregisterCB()
}
// onClose is invoked by the rpcServer each time a connected client
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment