Skip to content

Instantly share code, notes, and snippets.

@murphybytes
Last active November 6, 2016 02:16
Show Gist options
  • Save murphybytes/66972afb03fc4b969fa35cdbb10143c6 to your computer and use it in GitHub Desktop.
Save murphybytes/66972afb03fc4b969fa35cdbb10143c6 to your computer and use it in GitHub Desktop.
Synchronized access to a data using a closure over a channel
package main
import (
"errors"
"fmt"
"sync"
)
type counters struct {
sum int
modcount int
}
type countersResponse struct {
counters
err error
}
var errNoKey = errors.New("key does not exist")
type data map[string]*counters
type operation func(m data)
func get(key string, c chan<- operation) (ctr counters, e error) {
r := make(chan countersResponse)
defer close(r)
// function is passed to data update goroutine where it
// is called, we use return channel to block until we get response
c <- func(m data) {
if ctr, exists := m[key]; exists {
r <- countersResponse{counters{ctr.sum, ctr.modcount}, nil}
} else {
r <- countersResponse{err: errNoKey}
}
}
// block and wait for response
resp := <-r
return counters{resp.sum, resp.modcount}, resp.err
}
func updateSum(key string, n int, c chan<- operation) (ctr counters, e error) {
r := make(chan countersResponse)
defer close(r)
c <- func(m data) {
if ctr, exists := m[key]; exists {
ctr.sum += n
ctr.modcount++
r <- countersResponse{counters{ctr.sum, ctr.modcount}, nil}
} else {
r <- countersResponse{err: errNoKey}
}
}
resp := <-r
return counters{resp.sum, resp.modcount}, resp.err
}
// creates a counter returns true if key already exists
func createCounter(key string, c chan<- operation) bool {
r := make(chan bool)
defer close(r)
c <- func(m data) {
var exists bool
if _, exists = m[key]; !exists {
m[key] = &counters{}
}
r <- exists
}
return <-r
}
func main() {
var wg sync.WaitGroup
c := make(chan operation)
// single goroutine recieves functions that operate on data
go func(c <-chan operation) {
dataStore := make(data)
for operateOn := range c {
operateOn(dataStore)
}
}(c)
// testing
wg.Add(2)
go func(wg *sync.WaitGroup, c chan<- operation) {
defer wg.Done()
created := createCounter("k1", c)
fmt.Printf("k1 existed = %t\n", created)
created = createCounter("k1", c)
fmt.Printf("k1 existed = %t\n", created)
created = createCounter("k2", c)
fmt.Printf("k2 existed = %t\n", created)
if ctr, err := updateSum("k1", 10, c); err != nil {
fmt.Println(err.Error())
} else {
fmt.Printf("for k1 sum %d mod %d\n", ctr.sum, ctr.modcount)
}
}(&wg, c)
go func(wg *sync.WaitGroup) {
defer wg.Done()
if ctr, err := updateSum("k1", 10, c); err != nil {
fmt.Println(err.Error())
} else {
fmt.Printf("for k1 sum %d mod %d\n", ctr.sum, ctr.modcount)
}
}(&wg)
ctr, err := get("k1", c)
fmt.Printf("Get k1 before wait %+v %+v\n", ctr, err)
wg.Wait()
ctr, err = get("k1", c)
fmt.Printf("Get k1 after wait %+v %+v\n", ctr, err)
close(c)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment