Skip to content

Instantly share code, notes, and snippets.

@v
Created February 25, 2021 23:47
Show Gist options
  • Save v/a2b25604a44792ea754814a6deb55a77 to your computer and use it in GitHub Desktop.
Save v/a2b25604a44792ea754814a6deb55a77 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"log"
"reflect"
"strconv"
"strings"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
)
func processLine(
line string,
emitQuery func(date string, queries int),
emitSUN func(date string, sunID int),
) {
parts := strings.Split(line, ",")
sunID, err := strconv.Atoi(strings.TrimSpace(parts[2]))
if err != nil {
sunID = -1
}
emitQuery(parts[0], 1)
emitSUN(parts[0], sunID)
}
type countDistinctAccum struct {
Values map[int64]bool
}
// countDistinctFn is a combineFn that returns the count of distinct numbers in a PCollection
type countDistinctFn struct{}
func (f *countDistinctFn) CreateAccumulator() countDistinctAccum {
return countDistinctAccum{
Values: make(map[int64]bool),
}
}
func (f *countDistinctFn) AddInput(a countDistinctAccum, val beam.T) countDistinctAccum {
v := reflect.ValueOf(val.(interface{})).Convert(reflectx.Int64).Interface().(int64)
a.Values[v] = true
return a
}
func (f *countDistinctFn) MergeAccumulators(a, b countDistinctAccum) countDistinctAccum {
c := countDistinctAccum{}
for aa := range a.Values {
c.Values[aa] = true
}
for bb := range b.Values {
c.Values[bb] = true
}
return c
}
func (f *countDistinctFn) ExtractOutput(a countDistinctAccum) int {
return len(a.Values)
}
func processFn(
date string,
counts func(*int) bool,
suns func(*int) bool,
) string {
var count int
var sun int
// grab first (and only) count, if any
counts(&count)
// grab first (and only) sun, if any
suns(&sun)
result := fmt.Sprintf("date: %s, counts: %d, suns: %d", date, count, sun)
return result
}
func main() {
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()
// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "words.txt")
dateQueries, dateSunPairs := beam.ParDo2(s, processLine, lines)
fmt.Println(dateQueries)
counted := stats.SumPerKey(s, dateQueries)
dateToSuns := beam.CombinePerKey(s, &countDistinctFn{}, dateSunPairs)
fmt.Println(dateToSuns)
beam.ParDo0(s, func(date string, suns int) {
fmt.Printf("%s: %v\n", date, suns)
}, dateToSuns)
joinedBack := beam.ParDo(s, processFn, beam.CoGroupByKey(s, counted, dateToSuns))
textio.Write(s, "wordcounts.txt", joinedBack)
// Run the pipeline on the direct runner.
if err := beamx.Run(context.Background(), p); err != nil {
log.Fatalf("wtff; %+v", err)
}
fmt.Println("DONE!")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment