Last active
October 14, 2023 19:15
-
-
Save groboclown/0456ba6ba572e0500f8af63cb856aee2 to your computer and use it in GitHub Desktop.
Go Concurrent Monoid Map Reduce
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Released under the Public Domain | |
// Where applicable, consider this CC-0 | |
// Inspired by the Rope science documents from the Xi text editor: | |
// https://xi-editor.io/docs/rope_science_01.html | |
package main | |
import ( | |
"context" | |
"fmt" | |
) | |
// ================================================ | |
// A monoid is a simple mathematical concept, where it has an | |
// operator that is associative and binary: | |
// a + (b + c) = (a + b) + c | |
// and it has an identity: | |
// a + 0 = 0 + a = a | |
// Note that order must be strictly maintained, because with a monoid, this is no guarantee of communative equality: | |
// a + b != b + a (can't be guaranteed, but might be true for some values) | |
// This is then handled below by applying the monoid operation in goroutines while | |
// strictly maintaining the ordering. | |
// | |
// This can be used with iterable structures to join them together when ordering matters, | |
// such as string concatenation. | |
// MonoidOperator performs the monoid operation. In computer terms, this is the reduction operation. | |
type MonoidOperator[T any] func(a T, b T) T | |
// MonoidGenerator produces the initial monoid value. | |
// | |
// Given some input, it generates the monoid value. After being generated, it will be reduced. | |
// The generation will be called by the map-reduce function inside a goroutine, so it must be | |
// able to safely run concurrently. | |
type MonoidGenerator[S any, T any] func(arg S) T | |
// MonoidMapReduce generates the monoid values from the initial value list and produces a single result in the channel. | |
// | |
// This accepts a context to allow deadline and other cancelation operations while running. | |
func MonoidMapReduce[S any, T any](ctx context.Context, identity T, values []S, gen MonoidGenerator[S, T], op MonoidOperator[T]) <-chan T { | |
done := make(chan T) | |
// The reduction of two values is stored in a "group" which contains | |
// the result from running across a continuous range of initial values. | |
// When two neighboring groups complete, the function will join them | |
// together through the monoid operation. | |
// The completed values are stored in an array, where the array index | |
// contains the group whose start or end index is at that position. | |
// As the operation runs, some of those index values may reference the wrong | |
// group, but what matters is the current first and last index of the | |
// group referencing the right index. | |
count := len(values) | |
const firstIdx = 0 | |
lastIdx := count + 1 | |
queue := make(chan *monoidGroup[T]) | |
// The groups array is initialized with a first and last value containing the identity. | |
// This eliminates the need for end-of-list checking on every join. | |
groups := make([]*monoidGroup[T], count+2) | |
groups[firstIdx] = &monoidGroup[T]{left: firstIdx, right: firstIdx, value: identity} | |
groups[lastIdx] = &monoidGroup[T]{left: lastIdx, right: lastIdx, value: identity} | |
for i, v := range values { | |
idx := i + 1 | |
v := v | |
groups[idx] = nil | |
go func() { | |
queue <- &monoidGroup[T]{left: idx, right: idx, value: gen(v)} | |
}() | |
} | |
go func() { | |
defer func() { | |
if e := recover(); e != nil { | |
fmt.Printf("bug: %v", e) | |
} | |
close(done) | |
}() | |
for { | |
select { | |
case <-ctx.Done(): | |
fmt.Printf("aborting due to ctx done") | |
done <- identity | |
case n := <-queue: | |
groups[n.left] = n | |
groups[n.right] = n | |
preIdx := n.left - 1 | |
sufIdx := n.right + 1 | |
if pre := groups[preIdx]; pre != nil { | |
n.left = pre.left | |
n.value = op(pre.value, n.value) | |
groups[n.left] = n | |
} | |
if suf := groups[sufIdx]; suf != nil { | |
n.right = suf.right | |
n.value = op(n.value, suf.value) | |
groups[n.right] = n | |
} | |
// There is still first and last checking, so the array padding | |
// could be avoided, but that would require duplication of code | |
// to eliminate the extra if statements. | |
if n.left == firstIdx && n.right == lastIdx { | |
//fmt.Printf("completed\n") | |
done <- n.value | |
return | |
} | |
} | |
} | |
}() | |
return done | |
} | |
// MonoidMapReduceSync runs the MonoidMapReduce function and returns the result when completed. | |
func MonoidMapReduceSync[S any, T any](identity T, values []S, gen MonoidGenerator[S, T], op MonoidOperator[T]) T { | |
return <-MonoidMapReduce[S, T](context.Background(), identity, values, gen, op) | |
} | |
type monoidGroup[T any] struct { | |
left int | |
right int | |
value T | |
} | |
// ================================================ | |
// Example using this with string concatenation. | |
func genStr(s int) string { | |
//fmt.Printf("generating %d\n", s) | |
return fmt.Sprintf("=%d;", s) | |
} | |
func reduceStr(a, b string) string { | |
//fmt.Printf("joining '%s' and '%s'\n", a, b) | |
return a + b | |
} | |
func main() { | |
count := 1000 | |
inp := make([]int, count) | |
for i := 0; i < count; i++ { | |
inp[i] = i | |
} | |
resCh := MonoidMapReduce[int, string](context.Background(), "", inp, genStr, reduceStr) | |
fmt.Printf(<-resCh) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
An implementation of performing concurrent map-reduce when the ordering of the reduce must be maintained.
This implementation has several limitations:
For cases where memory constraints must be considered, the algorithm should be adjusted to free unused blocks of groups. If the indexes reference arbitrary bounds (but known in advance), a map may be used.