Created
July 5, 2023 21:40
-
-
Save wolfeidau/158d8390dae1a3b79693c756d5fc8cc9 to your computer and use it in GitHub Desktop.
Copy columns to new file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"os" | |
"github.com/alecthomas/kong" | |
"github.com/apache/arrow/go/v13/parquet" | |
"github.com/apache/arrow/go/v13/parquet/compress" | |
"github.com/apache/arrow/go/v13/parquet/file" | |
"github.com/rs/zerolog" | |
"github.com/rs/zerolog/log" | |
) | |
var ( | |
version string | |
flags struct { | |
Version kong.VersionFlag | |
Source string | |
Destination string | |
NoMemoryMap bool | |
} | |
) | |
func main() { | |
kong.Parse(&flags, | |
kong.Vars{"version": version}, // bind a var for version | |
kong.Name("parquet-normaliser"), | |
) | |
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr}).With().Caller().Stack().Logger().Level(zerolog.InfoLevel) | |
inputRdr, err := file.OpenParquetFile(flags.Source, !flags.NoMemoryMap) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to open parquet file") | |
} | |
defer inputRdr.Close() | |
destFile, err := os.Create(flags.Destination) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to write parquet file") | |
} | |
wrtp := parquet.NewWriterProperties( | |
parquet.WithCompression(compress.Codecs.Snappy), | |
parquet.WithStats(true), | |
) | |
schema := inputRdr.MetaData().Schema | |
destWrt := file.NewParquetWriter(destFile, schema.Root(), file.WithWriterProps(wrtp)) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to open parquet file") | |
} | |
err = copyColumns(inputRdr, destWrt) | |
if err != nil { | |
log.Fatal().Err(err).Msg("failed to copy columns") | |
} | |
destWrt.Close() | |
destFile.Close() | |
log.Info().Int64("num_rows", destWrt.FileMetadata.NumRows).Msg("file done") | |
} | |
func copyColumns(inputRdr *file.Reader, destWrt *file.Writer) error { | |
log.Info().Int("count", inputRdr.NumRowGroups()).Msg("row groups") | |
// for each row group | |
for r := 0; r < inputRdr.NumRowGroups(); r++ { | |
rgReader := inputRdr.RowGroup(r) | |
rgWriter := destWrt.AppendBufferedRowGroup() | |
log.Info().Int64("count", rgReader.NumRows()).Msg("row groups rows") | |
log.Info().Int("count", rgReader.NumColumns()).Msg("columns") | |
for r := 0; r < rgReader.NumColumns(); r++ { | |
col, err := rgReader.Column(r) | |
if err != nil { | |
return fmt.Errorf("failed get next column: %w", err) | |
} | |
log.Info().Str("name", col.Descriptor().Name()).Int("idx", r).Msg("copying column") | |
cwr, err := rgWriter.Column(r) | |
if err != nil { | |
return fmt.Errorf("failed to get next destination column: %w", err) | |
} | |
err = copyColumn(col, cwr) | |
if err != nil { | |
return fmt.Errorf("failed to copy column: %w", err) | |
} | |
} | |
numRows, err := rgWriter.NumRows() | |
if err != nil { | |
return fmt.Errorf("failed to read row group row number: %w", err) | |
} | |
log.Info().Int64("total_bytes_written", rgWriter.TotalBytesWritten()).Int("rows", numRows).Msg("rgWriter done") | |
err = rgWriter.Close() | |
if err != nil { | |
return fmt.Errorf("failed to close row group: %w", err) | |
} | |
log.Info().Msg("row group done") | |
} | |
return nil | |
} | |
func copyColumn(col file.ColumnChunkReader, cw file.ColumnChunkWriter) error { | |
log.Debug().Str("source", col.Type().String()).Str("dest", cw.Type().String()).Msg("column types") | |
if cw.Type() != col.Type() { | |
return fmt.Errorf("column type mismatch source: %s destination: %s", col.Type(), cw.Type()) | |
} | |
var err error | |
switch w := cw.(type) { | |
case *file.ByteArrayColumnChunkWriter: | |
err = copyArrayColumn[parquet.ByteArray](w, col.(*file.ByteArrayColumnChunkReader)) | |
case *file.Int64ColumnChunkWriter: | |
err = copyArrayColumn[int64](w, col.(*file.Int64ColumnChunkReader)) | |
case *file.Int32ColumnChunkWriter: | |
err = copyArrayColumn[int32](w, col.(*file.Int32ColumnChunkReader)) | |
case *file.Float32ColumnChunkWriter: | |
err = copyArrayColumn[float32](w, col.(*file.Float32ColumnChunkReader)) | |
case *file.Float64ColumnChunkWriter: | |
err = copyArrayColumn[float64](w, col.(*file.Float64ColumnChunkReader)) | |
case *file.Int96ColumnChunkWriter: | |
err = copyArrayColumn[parquet.Int96](w, col.(*file.Int96ColumnChunkReader)) | |
default: | |
log.Error().Str("source", col.Type().String()).Str("destination", cw.Type().String()).Msg("unknown column types") | |
} | |
if err != nil { | |
return fmt.Errorf("failed to write column: %w", err) | |
} | |
log.Info().Int("rows_written", cw.RowsWritten()).Msg("cw done") | |
err = cw.Close() | |
if err != nil { | |
return fmt.Errorf("failed to close column: %w", err) | |
} | |
log.Info().Msg("column done") | |
return nil | |
} | |
type value interface { | |
parquet.ByteArray | parquet.Int96 | int64 | int32 | float32 | float64 | |
} | |
type reader[V value] interface { | |
HasNext() bool | |
ReadBatch(batchSize int64, values []V, defLvls []int16, repLvls []int16) (total int64, valuesRead int, err error) | |
} | |
type writer[V value] interface { | |
WriteBatch(values []V, defLevels []int16, repLevels []int16) (valueOffset int64, err error) | |
} | |
const ( | |
batchSize = 1024 | |
) | |
func copyArrayColumn[V value](w writer[V], r reader[V]) error { | |
values := make([]V, batchSize) | |
defLvls := make([]int16, batchSize) | |
repLvls := make([]int16, batchSize) | |
for r.HasNext() { | |
t, vr, err := r.ReadBatch(batchSize, values, defLvls, repLvls) | |
if err != nil { | |
return fmt.Errorf("failed to read batch: %w", err) | |
} | |
log.Debug().Int64("t", t).Int("vr", vr).Msg("read batch") | |
vo, err := w.WriteBatch(values[:vr], defLvls[:vr], repLvls[:vr]) | |
if err != nil { | |
return fmt.Errorf("failed to write batch: %w", err) | |
} | |
log.Debug().Int64("vo", vo).Msg("wrote values") | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment