Skip to content

Instantly share code, notes, and snippets.

@marceloneppel
Created December 24, 2019 04:51
Show Gist options
  • Save marceloneppel/3c4f07b7af4a79e9b1dbb6817f468a1b to your computer and use it in GitHub Desktop.
Save marceloneppel/3c4f07b7af4a79e9b1dbb6817f468a1b to your computer and use it in GitHub Desktop.
Apache Beam Golang UDF
Marcelo Neppel
Henrique Neppel
package main
import (
"context"
"flag"
"reflect"
"github.com/apache/beam/sdks/go/pkg/beam"
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/go/pkg/beam/x/debug"
udf "github.com/marceloneppel/apache-beam-golang-udf"
"github.com/marceloneppel/apache-beam-golang-udf/examples/common"
)
type parse struct {
Location string
}
func (c *parse) ProcessElement(ctx context.Context, line string, emit func(string)) {
function, err := udf.GetFunction(ctx, "./parse.go", "csv", "Parse")
if err != nil {
log.Infof(ctx, err.Error())
return
}
emit(function.(func(string) string)(line))
}
func init() {
beam.RegisterType(reflect.TypeOf((*parse)(nil)).Elem())
}
func main() {
flag.Parse()
beam.Init()
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "./file.csv")
processedLines := beam.ParDo(s, &parse{
Location: *common.Location,
}, lines)
debug.Print(s, processedLines)
beamx.Run(context.Background(), p)
}
package csv
import (
"encoding/csv"
"io"
"log"
"strings"
)
func Parse(data string) string {
reader := csv.NewReader(strings.NewReader(data))
line, error := reader.Read()
if error == io.EOF {
return ""
} else if error != nil {
log.Fatal(error)
}
return "Firstname: " + line[0] + " - Lastname: " + line[1]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment