Skip to content

Instantly share code, notes, and snippets.

@groboclown
Last active October 14, 2023 19:15
Show Gist options
  • Save groboclown/0456ba6ba572e0500f8af63cb856aee2 to your computer and use it in GitHub Desktop.
Save groboclown/0456ba6ba572e0500f8af63cb856aee2 to your computer and use it in GitHub Desktop.
Go Concurrent Monoid Map Reduce
// Released under the Public Domain
// Where applicable, consider this CC-0
// Inspired by the Rope science documents from the Xi text editor:
// https://xi-editor.io/docs/rope_science_01.html
package main
import (
"context"
"fmt"
)
// ================================================
// A monoid is a simple mathematical concept, where it has an
// operator that is associative and binary:
// a + (b + c) = (a + b) + c
// and it has an identity:
// a + 0 = 0 + a = a
// Note that order must be strictly maintained, because with a monoid, this is no guarantee of communative equality:
// a + b != b + a (can't be guaranteed, but might be true for some values)
// This is then handled below by applying the monoid operation in goroutines while
// strictly maintaining the ordering.
//
// This can be used with iterable structures to join them together when ordering matters,
// such as string concatenation.
// MonoidOperator performs the monoid operation. In computer terms, this is the reduction operation.
type MonoidOperator[T any] func(a T, b T) T
// MonoidGenerator produces the initial monoid value.
//
// Given some input, it generates the monoid value. After being generated, it will be reduced.
// The generation will be called by the map-reduce function inside a goroutine, so it must be
// able to safely run concurrently.
type MonoidGenerator[S any, T any] func(arg S) T
// MonoidMapReduce generates the monoid values from the initial value list and produces a single result in the channel.
//
// This accepts a context to allow deadline and other cancelation operations while running.
func MonoidMapReduce[S any, T any](ctx context.Context, identity T, values []S, gen MonoidGenerator[S, T], op MonoidOperator[T]) <-chan T {
done := make(chan T)
// The reduction of two values is stored in a "group" which contains
// the result from running across a continuous range of initial values.
// When two neighboring groups complete, the function will join them
// together through the monoid operation.
// The completed values are stored in an array, where the array index
// contains the group whose start or end index is at that position.
// As the operation runs, some of those index values may reference the wrong
// group, but what matters is the current first and last index of the
// group referencing the right index.
count := len(values)
const firstIdx = 0
lastIdx := count + 1
queue := make(chan *monoidGroup[T])
// The groups array is initialized with a first and last value containing the identity.
// This eliminates the need for end-of-list checking on every join.
groups := make([]*monoidGroup[T], count+2)
groups[firstIdx] = &monoidGroup[T]{left: firstIdx, right: firstIdx, value: identity}
groups[lastIdx] = &monoidGroup[T]{left: lastIdx, right: lastIdx, value: identity}
for i, v := range values {
idx := i + 1
v := v
groups[idx] = nil
go func() {
queue <- &monoidGroup[T]{left: idx, right: idx, value: gen(v)}
}()
}
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("bug: %v", e)
}
close(done)
}()
for {
select {
case <-ctx.Done():
fmt.Printf("aborting due to ctx done")
done <- identity
case n := <-queue:
groups[n.left] = n
groups[n.right] = n
preIdx := n.left - 1
sufIdx := n.right + 1
if pre := groups[preIdx]; pre != nil {
n.left = pre.left
n.value = op(pre.value, n.value)
groups[n.left] = n
}
if suf := groups[sufIdx]; suf != nil {
n.right = suf.right
n.value = op(n.value, suf.value)
groups[n.right] = n
}
// There is still first and last checking, so the array padding
// could be avoided, but that would require duplication of code
// to eliminate the extra if statements.
if n.left == firstIdx && n.right == lastIdx {
//fmt.Printf("completed\n")
done <- n.value
return
}
}
}
}()
return done
}
// MonoidMapReduceSync runs the MonoidMapReduce function and returns the result when completed.
func MonoidMapReduceSync[S any, T any](identity T, values []S, gen MonoidGenerator[S, T], op MonoidOperator[T]) T {
return <-MonoidMapReduce[S, T](context.Background(), identity, values, gen, op)
}
type monoidGroup[T any] struct {
left int
right int
value T
}
// ================================================
// Example using this with string concatenation.
func genStr(s int) string {
//fmt.Printf("generating %d\n", s)
return fmt.Sprintf("=%d;", s)
}
func reduceStr(a, b string) string {
//fmt.Printf("joining '%s' and '%s'\n", a, b)
return a + b
}
func main() {
count := 1000
inp := make([]int, count)
for i := 0; i < count; i++ {
inp[i] = i
}
resCh := MonoidMapReduce[int, string](context.Background(), "", inp, genStr, reduceStr)
fmt.Printf(<-resCh)
}
@groboclown
Copy link
Author

An implementation of performing concurrent map-reduce when the ordering of the reduce must be maintained.

This implementation has several limitations:

  • The number of input values must be known in advance.
  • The number of input values is of a "manageable" size - an array of the total input size will be created, and due to not freeing space within the internal array structure, unused structures in the array won't be freed until the operation completes.

For cases where memory constraints must be considered, the algorithm should be adjusted to free unused blocks of groups. If the indexes reference arbitrary bounds (but known in advance), a map may be used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment