Skip to content

Instantly share code, notes, and snippets.

@rossedman
Last active October 8, 2020 21:35
Show Gist options
  • Save rossedman/7d237209eabfda04d883e7d4ada8e215 to your computer and use it in GitHub Desktop.
Save rossedman/7d237209eabfda04d883e7d4ada8e215 to your computer and use it in GitHub Desktop.
Examples of concurrency patterns in Go
package main
import (
"log"
)
func main() {
// turn tasks into a channel
tasks := generator(
Task{Name: "one"},
Task{Name: "two"},
)
// chain stages to do some kind of processing
tasks = stage("stage3", stage("stage2", stage("stage1", tasks)))
// print the results
for t := range tasks {
log.Printf("final %s", t.Name)
}
}
// Tasks represents a unit of work
type Task struct {
Name string
}
// generator converts a group of tasks into a channel
func generator(tasks ...Task) <-chan Task {
response := make(chan Task)
go func() {
defer close(response)
for _, task := range tasks {
response <- task
}
}()
return response
}
// stage does some sort of processing on a Task
// and can be chained together in a number of ways
func stage(name string, tasks <-chan Task) <-chan Task {
t := make(chan Task)
go func() {
defer close(t)
for task := range tasks {
log.Printf("running %s in stage %s", task.Name, name)
t <- task
}
}()
return t
}
package main
import (
"fmt"
"log"
"sync"
"time"
)
func main() {
// simple timer function for measuring how
// long to process a group of tasks
start := time.Now()
defer func() {
log.Printf("processed resources in %s", time.Since(start))
}()
var tasks []Task
// generate some number of tasks
for i := 0; i < 100; i++ {
tasks = append(tasks, Task{
Name: fmt.Sprintf("task-%v", i),
Number: i,
})
}
// take all tasks and process them onto a channel
// so they can be processed concurrently
t := generator(tasks...)
// generate worker pool that will process all the
// tasks, this can be adjusted based on the amount
// of tasks to process
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go worker(&wg, fmt.Sprintf("worker-%v", i), t)
}
// wait for all workers to finish processing /
wg.Wait()
// print results for tasks
for task := range t {
log.Printf("result %s", task.Name)
}
}
// Task represents a unit of work
type Task struct {
Name string
Number int
}
// generator takes any number of tasks and transfers
// them to a channel
func generator(tasks ...Task) <-chan Task {
response := make(chan Task)
go func() {
defer close(response)
for _, task := range tasks {
response <- task
}
}()
return response
}
// worker will process tasks off of a channel and do
// some amount of work
func worker(wg *sync.WaitGroup, name string, tasks <-chan Task) {
defer wg.Done()
for task := range tasks {
log.Printf("running %s in stage %s", task.Name, name)
time.Sleep(time.Millisecond * 100)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment