Created
November 3, 2022 13:55
-
-
Save dragonsinth/038c72f1bc4dc6f9a35eeabe0e3816b8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package concurrency | |
import ( | |
"context" | |
"fmt" | |
"golang.org/x/sync/errgroup" | |
) | |
const ( | |
doThingWorkers = 4 | |
doSubThingWorkers = 10 | |
) | |
// 1) Go offers unique advantages in concurrency composition and non-leaky abstraction | |
// DoThing ... | |
// | |
// 2) The basic contract between decoupled components in a Go server is | |
// | |
// func DoThing(ctx, args...) (value, error) { | |
// } | |
// | |
// - If the caller cancels the context, the callee had better exit quickly. | |
// | |
// - When this method returns, it better have closed everything it opened. | |
// | |
// - errgroups fit well into this model because they compose | |
func DoThing(ctx context.Context, args ...any) (any, error) { | |
g, gCtx := errgroup.WithContext(ctx) | |
results := make([]any, doThingWorkers) // could also be a result-collecting channel, or a mutexed data structure like a map. | |
for i := 0; i < doThingWorkers; i++ { | |
i, ctx := i, gCtx | |
g.Go(func() error { | |
ret, err := DoSubThing(ctx, i, args...) | |
results[i] = ret | |
return err | |
}) | |
} | |
return results, g.Wait() | |
} | |
func DoSubThing(ctx context.Context, i int, args ...any) (any, error) { | |
// Can have its own group. | |
g, _ := errgroup.WithContext(ctx) | |
for i := 0; i < doSubThingWorkers; i++ { | |
// stuff | |
} | |
return nil, g.Wait() | |
} | |
// 3) Prefer callback functions over channels in APIs. | |
type doc struct{} | |
// StreamDocumentsChannel bad: forces the caller to use channels and multiple goroutines. | |
func StreamDocumentsChannel(ctx context.Context, out chan<- *doc) error { | |
defer close(out) | |
for i := 0; i < 10; i++ { | |
// Most people will forget the select | |
select { | |
case out <- &doc{}: | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
return nil | |
} | |
// docCallback can be called concurrently | |
type docCallback func(ctx context.Context, d *doc) error | |
// StreamDocumentsCallback better: the caller can still use channels if it wants to. | |
func StreamDocumentsCallback(ctx context.Context, f docCallback) error { | |
for i := 0; i < 10; i++ { | |
if err := f(ctx, &doc{}); err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
func simpleCaller(ctx context.Context) error { | |
return StreamDocumentsCallback(ctx, func(ctx context.Context, d *doc) error { | |
fmt.Println(d) | |
return nil | |
}) | |
} | |
// 4) 99% of the time: mutexes should be structure scoped, concurrency controls (go func, channel, errgroup, context) | |
// should be method scoped. | |
func complexCaller(ctx context.Context) error { | |
g, ctx := errgroup.WithContext(ctx) | |
resultChan := make(chan *doc, 64) | |
// producer | |
g.Go(func() error { | |
defer close(resultChan) | |
return StreamDocumentsCallback(ctx, func(ctx context.Context, d *doc) error { | |
select { | |
case resultChan <- &doc{}: | |
return nil | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
}) | |
}) | |
// consumer | |
g.Go(func() error { | |
for { | |
select { | |
case d, ok := <-resultChan: | |
if !ok { | |
return nil // done | |
} | |
fmt.Println(d) | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
}) | |
return g.Wait() | |
} | |
// Also possible, but document that `f` can be called concurrently. | |
func StreamDocumentsCallbackParallel(ctx context.Context, f func(ctx context.Context, d *doc) error) error { | |
g, gCtx := errgroup.WithContext(ctx) | |
for i := 0; i < 10; i++ { | |
ctx := gCtx | |
g.Go(func() error { | |
if err := f(ctx, &doc{}); err != nil { | |
return err | |
} | |
return nil | |
}) | |
} | |
return g.Wait() | |
} | |
// 5) If you see channel, errgroup, context embedded into a structure, be suspicious. | |
// There are legitimate use cases (think task queue) but you should be sure there's a good reason. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment