Skip to content

Instantly share code, notes, and snippets.

@yanmhlv
Last active January 4, 2024 17:45
Show Gist options
  • Save yanmhlv/890d2e60f5cebd6d65601394a993ab1f to your computer and use it in GitHub Desktop.
Save yanmhlv/890d2e60f5cebd6d65601394a993ab1f to your computer and use it in GitHub Desktop.
Concurrency patterns
import "sync"
type Cond struct {
lock sync.RWMutex
changed chan struct{}
}
func (c *Cond) Listen() <-chan struct{} {
c.lock.RLock()
defer c.lock.RUnlock()
return c.changed
}
func (c *Cond) Broadcast() {
c.lock.Lock()
close(c.changed)
c.changed = make(chan struct{})
c.lock.Unlock()
}
package main
import (
"fmt"
"reflect"
)
func merge[T any](ch ...<-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
cases := make([]reflect.SelectCase, len(ch))
for i, c := range ch {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c)}
}
for len(cases) > 0 {
chosen, value, ok := reflect.Select(cases)
if !ok {
cases = append(cases[:chosen], cases[chosen+1:]...)
continue
}
out <- value.Interface().(T)
}
}()
return out
}
func fanIn[T any](inputs ...<-chan T) <-chan T {
output := make(chan T)
var wg sync.WaitGroup
// Increment the wait group for each input channel
wg.Add(len(inputs))
// Start a goroutine for each input channel
for _, input := range inputs {
go func(ch <-chan T) {
defer wg.Done()
for num := range ch {
output <- num
}
}(input)
}
// Start a goroutine to wait for all inputs to be closed and then close the output channel
go func() {
wg.Wait()
close(output)
}()
return output
}
func fanOut[T any](input <-chan T, numWorkers int) []<-chan T {
outputs := make([]<-chan T, numWorkers)
for i := 0; i < numWorkers; i++ {
output := make(chan T)
outputs[i] = output
go func() {
defer close(output)
for num := range input {
// Send the processed result to the output channel
output <- num
}
}()
}
return outputs
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment