Last active
October 8, 2020 21:35
-
-
Save rossedman/7d237209eabfda04d883e7d4ada8e215 to your computer and use it in GitHub Desktop.
Examples of concurrency patterns in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
) | |
func main() { | |
// turn tasks into a channel | |
tasks := generator( | |
Task{Name: "one"}, | |
Task{Name: "two"}, | |
) | |
// chain stages to do some kind of processing | |
tasks = stage("stage3", stage("stage2", stage("stage1", tasks))) | |
// print the results | |
for t := range tasks { | |
log.Printf("final %s", t.Name) | |
} | |
} | |
// Tasks represents a unit of work | |
type Task struct { | |
Name string | |
} | |
// generator converts a group of tasks into a channel | |
func generator(tasks ...Task) <-chan Task { | |
response := make(chan Task) | |
go func() { | |
defer close(response) | |
for _, task := range tasks { | |
response <- task | |
} | |
}() | |
return response | |
} | |
// stage does some sort of processing on a Task | |
// and can be chained together in a number of ways | |
func stage(name string, tasks <-chan Task) <-chan Task { | |
t := make(chan Task) | |
go func() { | |
defer close(t) | |
for task := range tasks { | |
log.Printf("running %s in stage %s", task.Name, name) | |
t <- task | |
} | |
}() | |
return t | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
) | |
func main() { | |
// simple timer function for measuring how | |
// long to process a group of tasks | |
start := time.Now() | |
defer func() { | |
log.Printf("processed resources in %s", time.Since(start)) | |
}() | |
var tasks []Task | |
// generate some number of tasks | |
for i := 0; i < 100; i++ { | |
tasks = append(tasks, Task{ | |
Name: fmt.Sprintf("task-%v", i), | |
Number: i, | |
}) | |
} | |
// take all tasks and process them onto a channel | |
// so they can be processed concurrently | |
t := generator(tasks...) | |
// generate worker pool that will process all the | |
// tasks, this can be adjusted based on the amount | |
// of tasks to process | |
var wg sync.WaitGroup | |
for i := 0; i < 5; i++ { | |
wg.Add(1) | |
go worker(&wg, fmt.Sprintf("worker-%v", i), t) | |
} | |
// wait for all workers to finish processing / | |
wg.Wait() | |
// print results for tasks | |
for task := range t { | |
log.Printf("result %s", task.Name) | |
} | |
} | |
// Task represents a unit of work | |
type Task struct { | |
Name string | |
Number int | |
} | |
// generator takes any number of tasks and transfers | |
// them to a channel | |
func generator(tasks ...Task) <-chan Task { | |
response := make(chan Task) | |
go func() { | |
defer close(response) | |
for _, task := range tasks { | |
response <- task | |
} | |
}() | |
return response | |
} | |
// worker will process tasks off of a channel and do | |
// some amount of work | |
func worker(wg *sync.WaitGroup, name string, tasks <-chan Task) { | |
defer wg.Done() | |
for task := range tasks { | |
log.Printf("running %s in stage %s", task.Name, name) | |
time.Sleep(time.Millisecond * 100) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment