Last active
October 8, 2022 23:09
-
-
Save wolfeidau/5a78ad5514b56ca53df55eb75d528bd8 to your computer and use it in GitHub Desktop.
Apache Arrow Write Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"customers": [ | |
{ | |
"customer_id": "c7281d12-9ea0-4574-b41a-3a6de5aba119", | |
"name": "Party Foods", | |
"created_date": "2022-08-23T22:18:31.445Z" | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module github.com/wolfeidau/arrow-cookbook-golang | |
go 1.18 | |
require ( | |
github.com/alecthomas/kong v0.6.1 | |
github.com/apache/arrow/go/v10 v10.0.0-20220805182631-d26489c3c842 | |
github.com/rs/zerolog v1.27.0 | |
) | |
require ( | |
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect | |
github.com/andybalholm/brotli v1.0.4 // indirect | |
github.com/apache/thrift v0.16.0 // indirect | |
github.com/goccy/go-json v0.9.10 // indirect | |
github.com/golang/protobuf v1.5.2 // indirect | |
github.com/golang/snappy v0.0.4 // indirect | |
github.com/google/flatbuffers v2.0.6+incompatible // indirect | |
github.com/klauspost/asmfmt v1.3.2 // indirect | |
github.com/klauspost/compress v1.15.9 // indirect | |
github.com/klauspost/cpuid/v2 v2.0.9 // indirect | |
github.com/mattn/go-colorable v0.1.12 // indirect | |
github.com/mattn/go-isatty v0.0.14 // indirect | |
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect | |
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect | |
github.com/pierrec/lz4/v4 v4.1.15 // indirect | |
github.com/zeebo/xxh3 v1.0.2 // indirect | |
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect | |
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect | |
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect | |
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect | |
golang.org/x/sys v0.0.0-20220804214406-8e32c043e418 // indirect | |
golang.org/x/text v0.3.7 // indirect | |
golang.org/x/tools v0.1.12 // indirect | |
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect | |
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect | |
google.golang.org/grpc v1.48.0 // indirect | |
google.golang.org/protobuf v1.28.1 // indirect | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"fmt" | |
"io/ioutil" | |
"os" | |
"time" | |
"github.com/alecthomas/kong" | |
"github.com/apache/arrow/go/v10/arrow" | |
"github.com/apache/arrow/go/v10/arrow/array" | |
"github.com/apache/arrow/go/v10/arrow/memory" | |
"github.com/apache/arrow/go/v10/parquet" | |
"github.com/apache/arrow/go/v10/parquet/compress" | |
"github.com/apache/arrow/go/v10/parquet/pqarrow" | |
"github.com/rs/zerolog" | |
"github.com/rs/zerolog/log" | |
) | |
var ( | |
version = "unknown" | |
cfg struct { | |
Version kong.VersionFlag | |
InputFile string `kong:"required,arg"` | |
OutputFile string `kong:"required,arg"` | |
} | |
) | |
type CustomerEntry struct { | |
CustomerID string `json:"customer_id,omitempty"` | |
Name string `json:"name,omitempty"` | |
CreatedDate string `json:"created_date,omitempty"` // date the customer was created in the system | |
} | |
type ImportFile struct { | |
Customers []CustomerEntry | |
} | |
func main() { | |
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.Kitchen}).With().Stack().Caller().Logger() | |
log.Logger.Level(zerolog.DebugLevel) | |
kong.Parse(&cfg, | |
kong.Vars{"version": version}, | |
) | |
data, err := ioutil.ReadFile(cfg.InputFile) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to read input json file") | |
} | |
importFile := new(ImportFile) | |
err = json.Unmarshal(data, importFile) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to unmarshal json file") | |
} | |
outFile, err := os.Create(cfg.OutputFile) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to open output file") | |
} | |
schema := arrow.NewSchema( | |
[]arrow.Field{ | |
{Name: "customer_id", Type: arrow.BinaryTypes.String}, | |
{Name: "name", Type: arrow.BinaryTypes.String}, | |
{Name: "created_date", Type: arrow.BinaryTypes.String}, | |
{Name: "imported_date", Type: arrow.BinaryTypes.String}, | |
}, | |
nil, | |
) | |
props := parquet.NewWriterProperties( | |
parquet.WithCompression(compress.Codecs.Snappy), | |
parquet.WithRootName("spark_schema"), | |
parquet.WithRootRepetition(parquet.Repetitions.Required), | |
) | |
pqWriter, err := pqarrow.NewFileWriter(schema, outFile, props, pqarrow.DefaultWriterProps()) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to create output writer") | |
} | |
defer pqWriter.Close() | |
pool := memory.NewGoAllocator() | |
b := array.NewRecordBuilder(pool, schema) | |
defer b.Release() | |
for _, customer := range importFile.Customers { | |
err = writeRecord(b, pqWriter, customer) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to output to writer") | |
} | |
} | |
} | |
func writeRecord(b *array.RecordBuilder, pqWriter *pqarrow.FileWriter, customer CustomerEntry) error { | |
b.Field(0).(*array.StringBuilder).AppendString(customer.CustomerID) | |
b.Field(1).(*array.StringBuilder).AppendString(customer.Name) | |
b.Field(2).(*array.StringBuilder).AppendString(customer.CreatedDate) | |
b.Field(3).AppendNull() | |
rec := b.NewRecord() | |
defer rec.Release() | |
err := pqWriter.WriteBuffered(rec) | |
if err != nil { | |
return fmt.Errorf("failed to write to parquet file: %w", err) | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment