Skip to content

Instantly share code, notes, and snippets.

@didof
Last active November 12, 2022 13:03
Show Gist options
  • Select an option

  • Save didof/e3b31f35a749dc1cbfb2876bd2a4f0a7 to your computer and use it in GitHub Desktop.

Select an option

Save didof/e3b31f35a749dc1cbfb2876bd2a4f0a7 to your computer and use it in GitHub Desktop.
go patterns
package main
import (
"context"
"fmt"
"sync"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
inputs := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
defer cancel()
inputCh := generator(ctx, inputs)
chs := fanOut(ctx, 10, inputCh)
resultsCh := fanIn(ctx, chs...)
for res := range resultsCh {
fmt.Println("res", res)
}
}
func generator(ctx context.Context, inputs []int) chan int {
ch := make(chan int)
go func() {
defer close(ch)
for _, i := range inputs {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
func add(ctx context.Context, inputCh <-chan int) chan int {
ch := make(chan int)
go func() {
defer close(ch)
for data := range inputCh {
select {
case <-ctx.Done():
return
case ch <- data + 1:
}
}
}()
return ch
}
func fanOut(ctx context.Context, n int, inputCh <-chan int) []chan int {
chs := make([]chan int, n)
for i := 0; i < n; i++ {
chs[i] = add(ctx, inputCh)
}
return chs
}
func fanIn(ctx context.Context, chs ...chan int) chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(chs))
for _, ch := range chs {
go func(ch <-chan int) {
defer wg.Done()
loop:
for {
select {
case <-ctx.Done():
return
case value, ok := <-ch:
if !ok {
break loop
}
out <- value
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
woodCh := generateWood(ctx)
refineries := fanOut(ctx, runtime.NumCPU(), woodCh, refineWood)
for w := range fanIn(ctx, refineries...) {
fmt.Println(w)
}
}
func refineWood(ctx context.Context, in <-chan wood, id int) chan wood {
out := make(chan wood)
go func() {
defer close(out)
loop:
for {
select {
case <-ctx.Done():
break loop
case w, ok := <-in:
if !ok {
break loop
}
w.refined = true
w.processedBy = id
out <- w
}
}
}()
return out
}
func generateWood(ctx context.Context) chan wood {
out := make(chan wood)
go func() {
defer close(out)
loop:
for {
select {
case <-ctx.Done():
fmt.Println("shutting down wood production")
break loop
case <-time.After(time.Millisecond * 300):
out <- wood{refined: false}
}
}
}()
return out
}
func fanOut(ctx context.Context, n int, in <-chan wood, fn func(ctx context.Context, in <-chan wood, id int) chan wood) []chan wood {
chs := make([]chan wood, n)
for i := 0; i < n; i++ {
chs[i] = fn(ctx, in, i)
}
return chs
}
func fanIn(ctx context.Context, ins ...chan wood) chan wood {
out := make(chan wood)
var wg sync.WaitGroup
wg.Add(len(ins))
for _, in := range ins {
go func(ch <-chan wood) {
defer wg.Done()
loop:
for {
select {
case <-ctx.Done():
break loop
case w, ok := <-ch:
if !ok {
break loop
}
out <- w
}
}
}(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
type wood struct {
refined bool
processedBy int
}
func (w wood) String() string {
if w.refined {
return fmt.Sprintf("wood refined by %d", w.processedBy)
} else {
return "raw wood"
}
}
// https://github.com/karanpratapsingh/learn-go#worker-pool
package main
import (
"fmt"
"runtime"
"sync"
)
const totalJobs = 10000
func main() {
totalWorkers := runtime.NumCPU()
jobs := make(chan int, totalJobs)
out := make(chan int, totalWorkers)
for w := 1; w <= totalWorkers; w++ {
go worker(w, jobs, out)
}
for j := 1; j <= totalJobs; j++ {
jobs <- j
}
close(jobs)
for o := 1; o <= totalJobs; o++ {
<-out
}
close(out)
}
func worker(id int, jobs <-chan int, out chan<- int) {
var wg sync.WaitGroup
for j := range jobs {
wg.Add(1)
go func(job int) {
defer wg.Done()
fmt.Printf("[Worker #%d] started job #%d\n", id, job)
res := job * 2
out <- res
fmt.Printf("[Worker #%d] finished job #%d\n", id, job)
}(j)
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment