Created
November 3, 2015 22:45
-
-
Save tamird/6a2061cd7bb48dde1187 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit e84576027590e617e639c30e8bdf4a935a86fc6b | |
Author: Tamir Duberstein <[email protected]> | |
Date: Tue Nov 3 17:44:04 2015 -0500 | |
return a callback that unregisters | |
diff --git a/gossip/client.go b/gossip/client.go | |
index 6ba0ccd..e1945d2 100644 | |
--- a/gossip/client.go | |
+++ b/gossip/client.go | |
@@ -187,8 +187,7 @@ func (c *client) gossip(g *Gossip, stopper *stop.Stopper) error { | |
updateCallback := func(key string, content []byte) { | |
c.sendGossip(g, nodeID, addr, lAddr, done) | |
} | |
- unregisterCB := g.RegisterCallback(".*", updateCallback) | |
- defer g.UnregisterCallback(unregisterCB) | |
+ defer g.RegisterCallback(".*", updateCallback)() | |
// Loop until stopper is signalled, or until either the gossip or | |
// RPC clients are closed. getGossip is a hanging get, returning | |
diff --git a/gossip/gossip.go b/gossip/gossip.go | |
index 1876097..d0cab36 100644 | |
--- a/gossip/gossip.go | |
+++ b/gossip/gossip.go | |
@@ -305,24 +305,21 @@ type Callback func(key string, content []byte) | |
// RegisterCallback registers a callback for a key pattern to be | |
// invoked whenever new info for a gossip key matching pattern is | |
// received. The callback method is invoked with the info key which | |
-// matched pattern. Returns a value to use with UnregisterCallback(). | |
-func (g *Gossip) RegisterCallback(pattern string, method Callback) interface{} { | |
+// matched pattern. Returns a function that unregisters the callback. | |
+func (g *Gossip) RegisterCallback(pattern string, method Callback) func() { | |
if pattern == KeySystemConfig { | |
log.Warning("raw gossip callback registered on %s, consider using RegisterSystemConfigCallback", | |
KeySystemConfig) | |
} | |
g.mu.Lock() | |
- defer g.mu.Unlock() | |
- return g.is.registerCallback(pattern, method) | |
-} | |
- | |
-// UnregisterCallback unregisters an update callback. Use the value | |
-// returned by the call to RegisterCallback to unregister. | |
-func (g *Gossip) UnregisterCallback(unregister interface{}) { | |
- g.mu.Lock() | |
- defer g.mu.Unlock() | |
- g.is.unregisterCallback(unregister) | |
+ unregisterCB := g.is.registerCallback(pattern, method) | |
+ g.mu.Unlock() | |
+ return func() { | |
+ g.mu.Lock() | |
+ unregisterCB() | |
+ g.mu.Unlock() | |
+ } | |
} | |
// GetSystemConfig returns the local unmarshalled version of the | |
diff --git a/gossip/infostore.go b/gossip/infostore.go | |
index b3fb36d..48af0f6 100644 | |
--- a/gossip/infostore.go | |
+++ b/gossip/infostore.go | |
@@ -51,7 +51,7 @@ type infoStore struct { | |
NodeID roachpb.NodeID `json:"-"` // Owning node's ID | |
NodeAddr util.UnresolvedAddr `json:"-"` // Address of node owning this info store: "host:port" | |
highWaterStamps map[int32]int64 // High water timestamps for known gossip peers | |
- callbacks []*callback | |
+ callbacks map[*callback]struct{} | |
} | |
// monotonicUnixNano returns a monotonically increasing value for | |
@@ -105,6 +105,7 @@ func newInfoStore(nodeID roachpb.NodeID, nodeAddr util.UnresolvedAddr) infoStore | |
NodeID: nodeID, | |
NodeAddr: nodeAddr, | |
highWaterStamps: map[int32]int64{}, | |
+ callbacks: map[*callback]struct{}{}, | |
} | |
} | |
@@ -197,10 +198,10 @@ func (is *infoStore) getHighWaterStamps() map[int32]int64 { | |
// registerCallback compiles a regexp for pattern and adds it to | |
// the callbacks slice. | |
-func (is *infoStore) registerCallback(pattern string, method Callback) interface{} { | |
+func (is *infoStore) registerCallback(pattern string, method Callback) func() { | |
re := regexp.MustCompile(pattern) | |
cb := &callback{pattern: re, method: method} | |
- is.callbacks = append(is.callbacks, cb) | |
+ is.callbacks[cb] = struct{}{} | |
infosBytes := make(map[string][]byte) | |
if err := is.visitInfos(func(key string, i *Info) error { | |
if re.MatchString(key) { | |
@@ -220,15 +221,8 @@ func (is *infoStore) registerCallback(pattern string, method Callback) interface | |
method(key, bytes) | |
} | |
}() | |
- return cb | |
-} | |
- | |
-func (is *infoStore) unregisterCallback(cb interface{}) { | |
- for i := range is.callbacks { | |
- if is.callbacks[i] == cb { | |
- is.callbacks = append(is.callbacks[:i], is.callbacks[i+1:]...) | |
- return | |
- } | |
+ return func() { | |
+ delete(is.callbacks, cb) | |
} | |
} | |
@@ -237,7 +231,7 @@ func (is *infoStore) unregisterCallback(cb interface{}) { | |
// the corresponding callback method on a match. | |
func (is *infoStore) processCallbacks(key string, content []byte) { | |
var matches []*callback | |
- for _, cb := range is.callbacks { | |
+ for cb := range is.callbacks { | |
if cb.pattern.MatchString(key) { | |
matches = append(matches, cb) | |
} | |
diff --git a/gossip/infostore_test.go b/gossip/infostore_test.go | |
index 23680ec..6e31762 100644 | |
--- a/gossip/infostore_test.go | |
+++ b/gossip/infostore_test.go | |
@@ -335,7 +335,7 @@ func TestCallbacks(t *testing.T) { | |
cb2 := callbackRecord{wg: wg} | |
cbAll := callbackRecord{wg: wg} | |
- cb1CB := is.registerCallback("key1", cb1.Add) | |
+ unregisterCB1 := is.registerCallback("key1", cb1.Add) | |
is.registerCallback("key2", cb2.Add) | |
is.registerCallback("key.*", cbAll.Add) | |
@@ -425,7 +425,7 @@ func TestCallbacks(t *testing.T) { | |
} | |
// Unregister a callback and verify nothing is invoked on it. | |
- is.unregisterCallback(cb1CB) | |
+ unregisterCB1() | |
iNew := is.newInfo([]byte("a"), time.Second) | |
wg.Add(2) // for the two cbAll callbacks | |
if err := is.addInfo("key1", iNew); err != nil { | |
diff --git a/gossip/server.go b/gossip/server.go | |
index 386be84..cec3fac 100644 | |
--- a/gossip/server.go | |
+++ b/gossip/server.go | |
@@ -161,12 +161,12 @@ func (s *server) start(rpcServer *rpc.Server, stopper *stop.Stopper) { | |
// stop sets the server's closed bool to true and broadcasts to | |
// waiting gossip clients to wakeup and finish. | |
-func (s *server) stop(unregisterCB interface{}) { | |
+func (s *server) stop(unregisterCB func()) { | |
s.mu.Lock() | |
defer s.mu.Unlock() | |
s.closed = true | |
s.ready.Broadcast() // wake up clients | |
- s.is.unregisterCallback(unregisterCB) | |
+ unregisterCB() | |
} | |
// onClose is invoked by the rpcServer each time a connected client |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment