-
-
Save mcheviron/427f7dda652254687968e077a80156ec to your computer and use it in GitHub Desktop.
Golang array map, concurrency problem
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
"sync" | |
"time" | |
) | |
func Map[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 { | |
arrT2 := make([]T2, len(arr)) | |
for i, t := range arr { | |
t2 := f(t, i) | |
arrT2[i] = t2 | |
} | |
return arrT2 | |
} | |
func MapConcurrent[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 { | |
var wg sync.WaitGroup | |
wg.Add(len(arr)) | |
arrT2 := make([]T2, len(arr)) | |
for i, t := range arr { | |
go func() { | |
t2 := f(t, i) | |
arrT2[i] = t2 | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
return arrT2 | |
} | |
func MapConcurrentWorkerPool[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 { | |
arrT2 := make([]T2, len(arr)) | |
numworkers := runtime.NumCPU() | |
var wg sync.WaitGroup | |
wg.Add(numworkers) | |
worker := func(startIndex, endIndex int) { | |
for i := startIndex; i < endIndex; i++ { | |
t2 := f(arr[i], i) | |
arrT2[i] = t2 | |
} | |
wg.Done() | |
} | |
chunkSize := len(arr) / numworkers | |
for i := 0; i < numworkers; i++ { | |
startIndex := i * chunkSize | |
endIndex := (i + 1) * chunkSize | |
if i == numworkers-1 { | |
endIndex = len(arr) | |
} | |
go worker(startIndex, endIndex) | |
} | |
wg.Wait() | |
return arrT2 | |
} | |
type Number struct { | |
a int | |
} | |
func main() { | |
const n = 10000000 | |
arr := make([]Number, 0, n) | |
for i := range n { | |
arr = append(arr, Number{i}) | |
} | |
returnInt := func(t Number, index int) int { | |
return t.a | |
} | |
func() { | |
defer timer("normal map")() | |
Map(arr, returnInt) | |
}() | |
func() { | |
defer timer("infinite goroutines map")() | |
MapConcurrent(arr, returnInt) | |
}() | |
func() { | |
defer timer("worker pool map")() | |
MapConcurrentWorkerPool(arr, returnInt) | |
}() | |
} | |
func timer(name string) func() { | |
start := time.Now() | |
return func() { | |
fmt.Printf("%s took %v\n", name, time.Since(start)) | |
} | |
} | |
func BenchmarkMap(b *testing.B) { | |
n := 10000000 | |
input := make([]Number, 0, n) | |
for i := 0; i < n; i++ { | |
input = append(input, Number{i}) | |
} | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
_ = Map(input, func(item Number, index int) Number { | |
return Number{item.a * 2} | |
}) | |
} | |
} | |
func BenchmarkMapConcurrentWorkerPool(b *testing.B) { | |
n := 10000000 | |
input := make([]Number, 0, n) | |
for i := 0; i < n; i++ { | |
input = append(input, Number{i}) | |
} | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
_ = MapConcurrentWorkerPool(input, func(item Number, index int) Number { | |
return Number{item.a * 2} | |
}) | |
} | |
} | |
func BenchmarkMapConcurrent(b *testing.B) { | |
n := 10000000 | |
input := make([]Number, 0, n) | |
for i := 0; i < n; i++ { | |
input = append(input, Number{i}) | |
} | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
_ = MapConcurrent(input, func(item Number, index int) Number { | |
return Number{a: item.a * 2} | |
}) | |
} | |
} | |
// Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^(BenchmarkMap|BenchmarkMapConcurrentWorkerPool|BenchmarkMapConcurrent)$ go-perf | |
// goos: darwin | |
// goarch: arm64 | |
// pkg: go-perf | |
// BenchmarkMap-8 86 13761745 ns/op 80003094 B/op 1 allocs/op | |
// BenchmarkMapConcurrentWorkerPool-8 1 2435593875 ns/op 80013424 B/op 32 allocs/op | |
// BenchmarkMapConcurrent-8 1 1676149917 ns/op 880347800 B/op 10000741 allocs/op | |
// PASS | |
// ok go-perf7.178s | |
// Total: 87.45s | |
// ROUTINE ======================== go-perf.BenchmarkMapConcurrentWorkerPool in /Users/mostafaelataby-cheviron/code/go-perf/main_test.go | |
// 0 630ms (flat, cum) 0.72% of Total | |
// . . 21:func BenchmarkMapConcurrentWorkerPool(b *testing.B) { | |
// . . 22: n := 10000000 | |
// . . 23: input := make([]Number, 0, n) | |
// . . 24: for i := 0; i < n; i++ { | |
// . . 25: input = append(input, Number{i}) | |
// . . 26: } | |
// . . 27: | |
// . . 28: b.ResetTimer() | |
// . . 29: for i := 0; i < b.N; i++ { | |
// . 630ms 30: _ = MapConcurrentWorkerPool(input, func(item Number, index int) Number { | |
// . . 31: return Number{item.a * 2} | |
// . . 32: }) | |
// . . 33: } | |
// . . 34:} | |
// . . 35: | |
// ROUTINE ======================== go-perf.MapConcurrentWorkerPool[go.shape.struct { go-perf.a int },go.shape.struct { go-perf.a int }] in /Users/mostafaelataby-cheviron/code/go-perf/main.go | |
// 0 630ms (flat, cum) 0.72% of Total | |
// . . 40:func MapConcurrentWorkerPool[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 { | |
// . . 41: arrT2 := make([]T2, len(arr)) | |
// . . 42: | |
// . . 43: numworkers := runtime.NumCPU() | |
// . . 44: | |
// . . 45: type indexT1 struct { | |
// . . 46: index int | |
// . . 47: t1 T1 | |
// . . 48: } | |
// . . 49: | |
// . . 50: type indexT2 struct { | |
// . . 51: index int | |
// . . 52: t2 T2 | |
// . . 53: } | |
// . . 54: | |
// . . 55: inputs := make(chan indexT1, numworkers) | |
// . . 56: results := make(chan indexT2, numworkers) | |
// . . 57: | |
// . . 58: var wg sync.WaitGroup | |
// . . 59: wg.Add(numworkers) | |
// . . 60: | |
// . . 61: worker := func() { | |
// . . 62: for t1 := range inputs { | |
// . . 63: t2 := f(t1.t1, t1.index) | |
// . . 64: results <- indexT2{t1.index, t2} | |
// . . 65: } | |
// . . 66: | |
// . . 67: wg.Done() | |
// . . 68: } | |
// . . 69: | |
// . . 70: for range numworkers { | |
// . . 71: go worker() | |
// . . 72: } | |
// . . 73: | |
// . . 74: go func() { | |
// . . 75: wg.Wait() | |
// . . 76: close(results) | |
// . . 77: }() | |
// . . 78: | |
// . . 79: go func() { | |
// . . 80: for i, t := range arr { | |
// . . 81: inputs <- indexT1{i, t} | |
// . . 82: } | |
// . . 83: close(inputs) | |
// . . 84: }() | |
// . . 85: | |
// . 630ms 86: for t2 := range results { | |
// . . 87: arrT2[t2.index] = t2.t2 | |
// . . 88: } | |
// . . 89: | |
// . . 90: return arrT2 | |
// . . 91:} | |
// ROUTINE ======================== go-perf.MapConcurrentWorkerPool[go.shape.struct { go-perf.a int },go.shape.struct { go-perf.a int }].func1 in /Users/mostafaelataby-cheviron/code/go-perf/main.go | |
// 0 17.52s (flat, cum) 20.03% of Total | |
// . . 61: worker := func() { | |
// . 10.15s 62: for t1 := range inputs { | |
// . . 63: t2 := f(t1.t1, t1.index) | |
// . 7.37s 64: results <- indexT2{t1.index, t2} | |
// . . 65: } | |
// . . 66: | |
// . . 67: wg.Done() | |
// . . 68: } | |
// . . 69: | |
// ROUTINE ======================== go-perf.MapConcurrentWorkerPool[go.shape.struct { go-perf.a int },go.shape.struct { go-perf.a int }].func3 in /Users/mostafaelataby-cheviron/code/go-perf/main.go | |
// 0 1.04s (flat, cum) 1.19% of Total | |
// . . 79: go func() { | |
// . . 80: for i, t := range arr { | |
// . 1.04s 81: inputs <- indexT1{i, t} | |
// . . 82: } | |
// . . 83: close(inputs) | |
// . . 84: }() | |
// . . 85: | |
// . . 86: for t2 := range results { | |
// ROUTINE ======================== go-perf.MapConcurrent[go.shape.struct { go-perf.a int },go.shape.struct { go-perf.a int }] in /Users/mostafaelataby-cheviron/code/go-perf/main.go | |
// 0 200ms (flat, cum) 0.23% of Total | |
// . . 21:func MapConcurrent[T1, T2 any](arr []T1, f func(item T1, index int) T2) []T2 { | |
// . . 22: var wg sync.WaitGroup | |
// . . 23: wg.Add(len(arr)) | |
// . . 24: | |
// . . 25: arrT2 := make([]T2, len(arr)) | |
// . . 26: | |
// . . 27: for i, t := range arr { | |
// . 200ms 28: go func() { | |
// . . 29: t2 := f(t, i) | |
// . . 30: arrT2[i] = t2 | |
// . . 31: | |
// . . 32: wg.Done() | |
// . . 33: }() | |
// Updating the worker pool to not use channels but shards of the slice | |
// Running tool: /usr/local/go/bin/go test -benchmem -run=^$ -bench ^(BenchmarkMap|BenchmarkMapConcurrentWorkerPool|BenchmarkMapConcurrent)$ go-perf | |
// goos: darwin | |
// goarch: arm64 | |
// pkg: go-perf | |
// BenchmarkMap-8 68 16442178 ns/op 80003086 B/op 1 allocs/op | |
// BenchmarkMapConcurrentWorkerPool-8 213 5634531 ns/op 80003768 B/op 11 allocs/op | |
// BenchmarkMapConcurrent-8 1 1601082708 ns/op 881047088 B/op 10002197 allocs/op | |
// PASS | |
// ok go-perf 5.310s |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment