Created
February 25, 2021 23:47
-
-
Save v/a2b25604a44792ea754814a6deb55a77 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"reflect" | |
"strconv" | |
"strings" | |
"github.com/apache/beam/sdks/go/pkg/beam" | |
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx" | |
"github.com/apache/beam/sdks/go/pkg/beam/io/textio" | |
"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" | |
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx" | |
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs" | |
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local" | |
) | |
func processLine( | |
line string, | |
emitQuery func(date string, queries int), | |
emitSUN func(date string, sunID int), | |
) { | |
parts := strings.Split(line, ",") | |
sunID, err := strconv.Atoi(strings.TrimSpace(parts[2])) | |
if err != nil { | |
sunID = -1 | |
} | |
emitQuery(parts[0], 1) | |
emitSUN(parts[0], sunID) | |
} | |
type countDistinctAccum struct { | |
Values map[int64]bool | |
} | |
// countDistinctFn is a combineFn that returns the count of distinct numbers in a PCollection | |
type countDistinctFn struct{} | |
func (f *countDistinctFn) CreateAccumulator() countDistinctAccum { | |
return countDistinctAccum{ | |
Values: make(map[int64]bool), | |
} | |
} | |
func (f *countDistinctFn) AddInput(a countDistinctAccum, val beam.T) countDistinctAccum { | |
v := reflect.ValueOf(val.(interface{})).Convert(reflectx.Int64).Interface().(int64) | |
a.Values[v] = true | |
return a | |
} | |
func (f *countDistinctFn) MergeAccumulators(a, b countDistinctAccum) countDistinctAccum { | |
c := countDistinctAccum{} | |
for aa := range a.Values { | |
c.Values[aa] = true | |
} | |
for bb := range b.Values { | |
c.Values[bb] = true | |
} | |
return c | |
} | |
func (f *countDistinctFn) ExtractOutput(a countDistinctAccum) int { | |
return len(a.Values) | |
} | |
func processFn( | |
date string, | |
counts func(*int) bool, | |
suns func(*int) bool, | |
) string { | |
var count int | |
var sun int | |
// grab first (and only) count, if any | |
counts(&count) | |
// grab first (and only) sun, if any | |
suns(&sun) | |
result := fmt.Sprintf("date: %s, counts: %d, suns: %d", date, count, sun) | |
return result | |
} | |
func main() { | |
// beam.Init() is an initialization hook that must be called on startup. | |
beam.Init() | |
// Create the Pipeline object and root scope. | |
p := beam.NewPipeline() | |
s := p.Root() | |
lines := textio.Read(s, "words.txt") | |
dateQueries, dateSunPairs := beam.ParDo2(s, processLine, lines) | |
fmt.Println(dateQueries) | |
counted := stats.SumPerKey(s, dateQueries) | |
dateToSuns := beam.CombinePerKey(s, &countDistinctFn{}, dateSunPairs) | |
fmt.Println(dateToSuns) | |
beam.ParDo0(s, func(date string, suns int) { | |
fmt.Printf("%s: %v\n", date, suns) | |
}, dateToSuns) | |
joinedBack := beam.ParDo(s, processFn, beam.CoGroupByKey(s, counted, dateToSuns)) | |
textio.Write(s, "wordcounts.txt", joinedBack) | |
// Run the pipeline on the direct runner. | |
if err := beamx.Run(context.Background(), p); err != nil { | |
log.Fatalf("wtff; %+v", err) | |
} | |
fmt.Println("DONE!") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment