Last active
December 12, 2019 02:27
-
-
Save rodaine/d627e4b67285eb5aaa72f3df2b344ad2 to your computer and use it in GitHub Desktop.
Code snippets for my blog post "The X-Files: Avoiding Concurrency Boilerplate with golang.org/x/sync"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BenchmarkMutexCache/10-8 10000000 180 ns/op 0 B/op 0 allocs/op | |
BenchmarkMutexCache/100-8 10000000 187 ns/op 0 B/op 0 allocs/op | |
BenchmarkMutexCache/1000-8 10000000 214 ns/op 0 B/op 0 allocs/op | |
BenchmarkMutexCache/10000-8 10000000 231 ns/op 0 B/op 0 allocs/op | |
BenchmarkMutexCache/100000-8 5000000 254 ns/op 2 B/op 0 allocs/op | |
BenchmarkMutexCache/1000000-8 1000000 1159 ns/op 102 B/op 1 allocs/op | |
BenchmarkMutexCache/10000000-8 1000000 1481 ns/op 184 B/op 2 allocs/op | |
BenchmarkMutexCache/100000000-8 1000000 1655 ns/op 187 B/op 3 allocs/op | |
BenchmarkSyncMapCache/10-8 5000000 221 ns/op 0 B/op 0 allocs/op | |
BenchmarkSyncMapCache/100-8 10000000 235 ns/op 0 B/op 0 allocs/op | |
BenchmarkSyncMapCache/1000-8 10000000 235 ns/op 0 B/op 0 allocs/op | |
BenchmarkSyncMapCache/10000-8 10000000 246 ns/op 0 B/op 0 allocs/op | |
BenchmarkSyncMapCache/100000-8 5000000 264 ns/op 5 B/op 0 allocs/op | |
BenchmarkSyncMapCache/1000000-8 1000000 1378 ns/op 146 B/op 3 allocs/op | |
BenchmarkSyncMapCache/10000000-8 1000000 1939 ns/op 237 B/op 5 allocs/op | |
BenchmarkSyncMapCache/100000000-8 1000000 2090 ns/op 241 B/op 6 allocs/op |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Debounce wraps e, preventing duplicate NamedActions from running | |
// concurrently, even from separate calls to Execute. | |
func Debounce(e Executor) Executor { | |
return debouncer{ | |
ex: e, | |
sf: new(singleflight.Group), | |
} | |
} | |
type debouncer struct { | |
ex Executor | |
sf *singleflight.Group | |
} | |
// Execute attaches a singleflight.Group to any NamedActions, effectively debouncing | |
// identical Actions if ran concurrently. | |
func (d debouncer) Execute(ctx context.Context, actions []Action) error { | |
wrapped := make([]Action, len(actions)) | |
for i, a := range actions { | |
if na, ok := a.(NamedAction); ok { | |
// compose the NamedAction with the singleflight.Group | |
wrapped[i] = debouncedAction{ | |
NamedAction: na, | |
sf: d.sf, | |
} | |
} else { | |
// otherwise, pass it through untouched | |
wrapped[i] = actions[i] | |
} | |
} | |
// delegate wrapped Actions to decorated Executor | |
return d.ex.Execute(ctx, wrapped) | |
} | |
type debouncedAction struct { | |
NamedAction | |
sf *singleflight.Group | |
} | |
func (da debouncedAction) Execute(ctx context.Context) error { | |
// map the composed Action's Execute function with the expected signature | |
// for singleflight.Group.Do. | |
fn := func() (interface{}, error) { | |
return nil, da.NamedAction.Execute(ctx) | |
} | |
_, err, _ := da.sf.Do(da.ID(), fn) | |
return err | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// An Action performs a single arbitrary task. | |
type Action interface { | |
// Execute performs the work of an Action. This method should make a best | |
// effort to be cancelled if the provided ctx is cancelled. | |
Execute(ctx context.Context) error | |
} | |
// An Executor performs a set of Actions. It is up to the implementing type | |
// to define the concurrency and open/closed failure behavior of the actions. | |
type Executor interface { | |
// Execute performs all provided actions by calling their Execute method. | |
// This method should make a best-effort to cancel outstanding actions if the | |
// provided ctx is cancelled. | |
Execute(ctx context.Context, actions []Action) error | |
} | |
// ActionFunc permits using a standalone function as an Action. | |
type ActionFunc func(context.Context) error | |
// Execute satisfies the Action interface, delegating the call to the | |
// underlying function. | |
func (fn ActionFunc) Execute(ctx context.Context) error { return fn(ctx) } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type flow struct { | |
maxActions int64 | |
actions *semaphore.Weighted | |
calls *semaphore.Weighted | |
ex Executor | |
} | |
// ControlFlow decorates an Executor, limiting it to a maximum concurrent | |
// number of calls and actions. | |
func ControlFlow(e Executor, maxCalls, maxActions int64) Executor { | |
return &flow{ | |
maxActions: maxActions, | |
calls: semaphore.NewWeighted(maxCalls), | |
actions: semaphore.NewWeighted(maxActions), | |
ex: e, | |
} | |
} | |
// Execute attempts to acquire the semaphores for the concurrent calls and | |
// actions before delegating to the decorated Executor. If Execute is called | |
// with more actions than maxActions, an error is returned. | |
func (f *flow) Execute(ctx context.Context, actions []Action) error { | |
qty := int64(len(actions)) | |
if qty > f.maxActions { | |
return fmt.Errorf("maximum %d actions allowed", f.maxActions) | |
} | |
// limit concurrent calls to Executor.Execute | |
if err := f.calls.Acquire(ctx, 1); err != nil { | |
return err | |
} | |
defer f.calls.Release(1) | |
// limit total in-flight Actions, independent of Execute calls | |
if err := f.actions.Acquire(ctx, qty); err != nil { | |
return err | |
} | |
defer f.actions.Release(qty) | |
// delegate Actions to decorated Executor | |
return f.ex.Execute(ctx, actions) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type metrics struct { | |
ex Executor | |
stats statCache | |
} | |
// Execute emits latency, success, and error metrics for every action delegated to the | |
// decorated Executor. For NamedActions, additional name-scoped stats are also emitted. | |
func (m *metrics) Execute(ctx context.Context, actions []Action) error { | |
wrapped := make([]Action, len(actions)) | |
global := m.stats.get("all_actions") | |
for i, a := range actions { | |
if na, ok := a.(NamedAction); ok { | |
// composed the NamedAction with global and name-scoped stats | |
wrapped[i] = namedStatAction{ | |
NamedAction: na, | |
global: global, | |
stats: m.stats.get(na.ID()), | |
} | |
} else { | |
// otherwise, just compose with global stats | |
wrapped[i] = statAction{ | |
Action: a, | |
global: global, | |
} | |
} | |
} | |
// delegate wrapped Actions to decorated Executor | |
return m.ex.Execute(ctx, wrapped) | |
} | |
type namedStatAction struct { | |
NamedAction | |
global *statSet | |
stats *statSet | |
} | |
func (a namedStatAction) Execute(ctx context.Context) error { | |
return captureMetrics(ctx, a.NamedAction, a.global, a.stats) | |
} | |
type statAction struct { | |
Action | |
global *statSet | |
} | |
func (a statAction) Execute(ctx context.Context) error { | |
return captureMetrics(ctx, a.Action, a.global, nil) | |
} | |
func captureMetrics(ctx context.Context, a Action, global, stats *statSet) error { | |
// execute the action, timing its latency | |
start := time.Now() | |
err := a.Execute(ctx) | |
lat := time.Now().Sub(start) | |
// create our counter values for error/success | |
var errored, succeeded int | |
if err != nil { | |
errored = 1 | |
} else { | |
succeeded = 1 | |
} | |
// emit the global stats | |
global.Latency(lat) | |
global.Success(succeeded) | |
global.Error(errored) | |
// if there are name-scoped stats, emit those, too | |
if stats != nil { | |
stats.Latency(lat) | |
stats.Success(succeeded) | |
stats.Error(errored) | |
} | |
return err | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A NamedAction describes an Action that also has a unique identifier. This | |
// interface is used by the Debounce Executor to prevent duplicate actions from | |
// running concurrently. | |
type NamedAction interface { | |
Action | |
// ID returns the name for this Action. Identical actions | |
// should return the same ID value. | |
ID() string | |
} | |
type namedAction struct { | |
ActionFunc | |
name string | |
} | |
func (a namedAction) ID() string { return a.name } | |
// Named creates a NamedAction from fn, with n as its name. This function is | |
// just a helper to simplify creating NamedActions. | |
func Named(n string, fn ActionFunc) NamedAction { | |
return namedAction{ | |
ActionFunc: fn, | |
name: n, | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Parallel is a concurrent implementation of Executor | |
type Parallel struct{} | |
// Execute performs all provided actions in concurrently, failing closed on the | |
// first error or if ctx is cancelled. | |
func (p Parallel) Execute(ctx context.Context, actions []Action) error { | |
grp, ctx := errgroup.WithContext(ctx) | |
for _, a := range actions { | |
grp.Go(p.execFn(ctx, a)) | |
} | |
return grp.Wait() | |
} | |
// execFn binds the Context and Action to the proper function signature for the | |
// errgroup.Group. | |
func (p Parallel) execFn(ctx context.Context, a Action) func() error { | |
return func() error { return a.Execute(ctx) } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type pool struct { | |
done <-chan struct{} | |
in chan poolAction | |
} | |
// Pool creates an Executor backed by a concurrent worker pool. Up to n Actions | |
// can be in-flight simultaneously; if n is less than or equal to zero, | |
// runtime.NumCPU is used. The done channel should be closed to release | |
// resources held by the Executor. | |
func Pool(n int, done <-chan struct{}) Executor { | |
if n <= 0 { | |
n = runtime.NumCPU() | |
} | |
p := pool{done: done, in: make(chan poolAction, n)} | |
for i := 0; i < n; i++ { | |
go p.work(p.in, p.done) | |
} | |
return p | |
} | |
// Execute enqueues all Actions on the worker pool, failing closed on the | |
// first error or if ctx is cancelled. This method blocks until all enqueued | |
// Actions have returned. In the event of an error, not all Actions may be | |
// executed. | |
func (p pool) Execute(ctx context.Context, actions []Action) error { | |
qty := len(actions) | |
if qty == 0 { | |
return nil | |
} | |
ctx, cancel := context.WithCancel(ctx) | |
defer cancel() | |
res := make(chan error, qty) | |
var err error | |
var queued uint64 | |
enqueue: | |
for _, action := range actions { | |
pa := poolAction{ctx: ctx, act: action, res: res} | |
select { | |
case <-p.done: // pool is closed | |
cancel() | |
return errors.New("pool is closed") | |
case <-ctx.Done(): // ctx is closed by caller | |
err = ctx.Err() | |
break enqueue | |
case p.in <- pa: // enqueue action | |
queued++ | |
} | |
} | |
for ; queued > 0; queued-- { | |
if r := <-res; r != nil { | |
if err == nil { | |
err = r | |
cancel() | |
} | |
} | |
} | |
return err | |
} | |
func (p pool) work(in <-chan poolAction, done <-chan struct{}) { | |
for { | |
select { | |
case <-done: | |
return | |
case a := <-in: | |
a.res <- a.act.Execute(a.ctx) | |
} | |
} | |
} | |
type poolAction struct { | |
ctx context.Context | |
act Action | |
res chan<- error | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Sequential implements Executor, performing each Action in series | |
type Sequential struct{} | |
// Execute performs each action in order, exiting on the first error or if the | |
// context is cancelled/deadlined. | |
func (Sequential) Execute(ctx context.Context, actions []Action) error { | |
for _, a := range actions { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
default: | |
if err := a.Execute(ctx); err != nil { | |
return err | |
} | |
} | |
} | |
return nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// StatSource creates metrics with the given name. The returned metrics should be | |
// concurrency-safe. | |
type StatSource interface { | |
Timer(name string) Timer | |
Counter(name string) Counter | |
} | |
// Timer emits the duration of a particular event. The duration value is | |
// typically used to measure latencies and create histograms thereof. | |
type Timer func(duration time.Duration) | |
// Counter emits any number of events happening at a given time. For example, | |
// Counters are often used to measure RPS. | |
type Counter func(delta int) | |
// A StatSet is the cached value. | |
type statSet struct { | |
// Latency measures how long an Action takes | |
Latency Timer | |
// Success is incremented when an Action does not return an error | |
Success Counter | |
// Error is incremented when an Action results in an error | |
Error Counter | |
} | |
// Cache describes a read-through cache to obtain | |
type statCache interface { | |
// get returns a shared statSet for the given name, either from the cache or | |
// a provided StatSource. | |
get(name string) *statSet | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// mutexCache implements statCache, backed by a map and sync.RWMutex | |
type mutexCache struct { | |
src StatSource | |
mtx sync.RWMutex | |
lookup map[string]*statSet | |
} | |
func (mc *mutexCache) get(name string) *statSet { | |
// take a read lock to see if the set already exists | |
mc.mtx.RLock() | |
set, ok := mc.lookup[name] | |
mc.mtx.RUnlock() | |
if ok { // the set exists, return it | |
return set | |
} | |
// need to take a write lock to update the map | |
mc.mtx.Lock() | |
// While waiting for the write lock, another goroutine may have created the | |
// set. Here, we check again after obtaining the lock before making a new one | |
if set, ok = mc.lookup[name]; !ok { | |
set = newStatSet(mc.src, name) | |
mc.lookup[name] = set | |
} | |
mc.mtx.Unlock() | |
return set | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// syncMapCache implements statCache, backed by a sync.Map | |
type syncMapCache struct { | |
src StatSource | |
lookup sync.Map | |
} | |
func (smc *syncMapCache) get(name string) *statSet { | |
val, _ := smc.lookup.Load(name) | |
if set, ok := val.(*statSet); ok { | |
return set | |
} | |
// create a new statSet, but don't store it if one was added since the last | |
// load. This is not ideal since we can't atomically create the set and | |
// write it. | |
set, _ := smc.lookup.LoadOrStore(name, newStatSet(smc.src, name)) | |
return set.(*statSet) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment