Skip to content

Instantly share code, notes, and snippets.

@wolfeidau
Created July 5, 2023 21:40
Show Gist options
  • Save wolfeidau/158d8390dae1a3b79693c756d5fc8cc9 to your computer and use it in GitHub Desktop.
Save wolfeidau/158d8390dae1a3b79693c756d5fc8cc9 to your computer and use it in GitHub Desktop.
Copy columns to new file.
package main
import (
"fmt"
"os"
"github.com/alecthomas/kong"
"github.com/apache/arrow/go/v13/parquet"
"github.com/apache/arrow/go/v13/parquet/compress"
"github.com/apache/arrow/go/v13/parquet/file"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
var (
version string
flags struct {
Version kong.VersionFlag
Source string
Destination string
NoMemoryMap bool
}
)
func main() {
kong.Parse(&flags,
kong.Vars{"version": version}, // bind a var for version
kong.Name("parquet-normaliser"),
)
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}).With().Caller().Stack().Logger().Level(zerolog.InfoLevel)
inputRdr, err := file.OpenParquetFile(flags.Source, !flags.NoMemoryMap)
if err != nil {
log.Fatal().Err(err).Msg("failed to open parquet file")
}
defer inputRdr.Close()
destFile, err := os.Create(flags.Destination)
if err != nil {
log.Fatal().Err(err).Msg("failed to write parquet file")
}
wrtp := parquet.NewWriterProperties(
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithStats(true),
)
schema := inputRdr.MetaData().Schema
destWrt := file.NewParquetWriter(destFile, schema.Root(), file.WithWriterProps(wrtp))
if err != nil {
log.Fatal().Err(err).Msg("failed to open parquet file")
}
err = copyColumns(inputRdr, destWrt)
if err != nil {
log.Fatal().Err(err).Msg("failed to copy columns")
}
destWrt.Close()
destFile.Close()
log.Info().Int64("num_rows", destWrt.FileMetadata.NumRows).Msg("file done")
}
func copyColumns(inputRdr *file.Reader, destWrt *file.Writer) error {
log.Info().Int("count", inputRdr.NumRowGroups()).Msg("row groups")
// for each row group
for r := 0; r < inputRdr.NumRowGroups(); r++ {
rgReader := inputRdr.RowGroup(r)
rgWriter := destWrt.AppendBufferedRowGroup()
log.Info().Int64("count", rgReader.NumRows()).Msg("row groups rows")
log.Info().Int("count", rgReader.NumColumns()).Msg("columns")
for r := 0; r < rgReader.NumColumns(); r++ {
col, err := rgReader.Column(r)
if err != nil {
return fmt.Errorf("failed get next column: %w", err)
}
log.Info().Str("name", col.Descriptor().Name()).Int("idx", r).Msg("copying column")
cwr, err := rgWriter.Column(r)
if err != nil {
return fmt.Errorf("failed to get next destination column: %w", err)
}
err = copyColumn(col, cwr)
if err != nil {
return fmt.Errorf("failed to copy column: %w", err)
}
}
numRows, err := rgWriter.NumRows()
if err != nil {
return fmt.Errorf("failed to read row group row number: %w", err)
}
log.Info().Int64("total_bytes_written", rgWriter.TotalBytesWritten()).Int("rows", numRows).Msg("rgWriter done")
err = rgWriter.Close()
if err != nil {
return fmt.Errorf("failed to close row group: %w", err)
}
log.Info().Msg("row group done")
}
return nil
}
func copyColumn(col file.ColumnChunkReader, cw file.ColumnChunkWriter) error {
log.Debug().Str("source", col.Type().String()).Str("dest", cw.Type().String()).Msg("column types")
if cw.Type() != col.Type() {
return fmt.Errorf("column type mismatch source: %s destination: %s", col.Type(), cw.Type())
}
var err error
switch w := cw.(type) {
case *file.ByteArrayColumnChunkWriter:
err = copyArrayColumn[parquet.ByteArray](w, col.(*file.ByteArrayColumnChunkReader))
case *file.Int64ColumnChunkWriter:
err = copyArrayColumn[int64](w, col.(*file.Int64ColumnChunkReader))
case *file.Int32ColumnChunkWriter:
err = copyArrayColumn[int32](w, col.(*file.Int32ColumnChunkReader))
case *file.Float32ColumnChunkWriter:
err = copyArrayColumn[float32](w, col.(*file.Float32ColumnChunkReader))
case *file.Float64ColumnChunkWriter:
err = copyArrayColumn[float64](w, col.(*file.Float64ColumnChunkReader))
case *file.Int96ColumnChunkWriter:
err = copyArrayColumn[parquet.Int96](w, col.(*file.Int96ColumnChunkReader))
default:
log.Error().Str("source", col.Type().String()).Str("destination", cw.Type().String()).Msg("unknown column types")
}
if err != nil {
return fmt.Errorf("failed to write column: %w", err)
}
log.Info().Int("rows_written", cw.RowsWritten()).Msg("cw done")
err = cw.Close()
if err != nil {
return fmt.Errorf("failed to close column: %w", err)
}
log.Info().Msg("column done")
return nil
}
type value interface {
parquet.ByteArray | parquet.Int96 | int64 | int32 | float32 | float64
}
type reader[V value] interface {
HasNext() bool
ReadBatch(batchSize int64, values []V, defLvls []int16, repLvls []int16) (total int64, valuesRead int, err error)
}
type writer[V value] interface {
WriteBatch(values []V, defLevels []int16, repLevels []int16) (valueOffset int64, err error)
}
const (
batchSize = 1024
)
func copyArrayColumn[V value](w writer[V], r reader[V]) error {
values := make([]V, batchSize)
defLvls := make([]int16, batchSize)
repLvls := make([]int16, batchSize)
for r.HasNext() {
t, vr, err := r.ReadBatch(batchSize, values, defLvls, repLvls)
if err != nil {
return fmt.Errorf("failed to read batch: %w", err)
}
log.Debug().Int64("t", t).Int("vr", vr).Msg("read batch")
vo, err := w.WriteBatch(values[:vr], defLvls[:vr], repLvls[:vr])
if err != nil {
return fmt.Errorf("failed to write batch: %w", err)
}
log.Debug().Int64("vo", vo).Msg("wrote values")
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment