Created
September 18, 2019 14:05
-
-
Save kylebrandt/0cfc1d7ad7436c7b9a7adb8fb1d036b3 to your computer and use it in GitHub Desktop.
Old DF (dataframe) to Arrow Diff
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/pkg/dataframe/csv.go b/pkg/dataframe/csv.go | |
new file mode 100644 | |
index 0000000000..0838a7c413 | |
--- /dev/null | |
+++ b/pkg/dataframe/csv.go | |
@@ -0,0 +1,48 @@ | |
+package dataframe | |
+ | |
+import ( | |
+ "encoding/csv" | |
+ "io" | |
+) | |
+ | |
+// FromCSV is a simple CSV loader, primarily for testing | |
+func FromCSV(reader io.Reader, hasHeader bool, schema Schema) (*DataFrame, error) { | |
+ df := new(DataFrame) | |
+ csvReader := csv.NewReader(reader) | |
+ i := 0 | |
+ for { | |
+ record, err := csvReader.Read() | |
+ if err == io.EOF { | |
+ break | |
+ } | |
+ if err != nil { | |
+ return nil, err | |
+ } | |
+ | |
+ if i == 0 && hasHeader { | |
+ for fieldIdx, header := range record { | |
+ if fieldIdx >= len(schema) { | |
+ break | |
+ } | |
+ schema[fieldIdx].SetName(header) | |
+ df.Schema = append(df.Schema, schema[fieldIdx]) | |
+ } | |
+ i++ | |
+ continue | |
+ } | |
+ | |
+ row := []Field{} | |
+ for fieldIdx, fieldValue := range record { | |
+ if fieldIdx >= len(schema) { | |
+ break | |
+ } | |
+ v, err := schema[fieldIdx].Extract(fieldValue) | |
+ if err != nil { | |
+ return nil, err | |
+ } | |
+ row = append(row, v) | |
+ } | |
+ df.Records = append(df.Records, row) | |
+ } | |
+ return df, nil | |
+} | |
diff --git a/pkg/dataframe/dataframe.go b/pkg/dataframe/dataframe.go | |
new file mode 100644 | |
index 0000000000..6402195d57 | |
--- /dev/null | |
+++ b/pkg/dataframe/dataframe.go | |
@@ -0,0 +1,102 @@ | |
+// Package dataframe provides the DataFrame type. | |
+// The DataFrame type is used to hold data returned from Grafana Datasources. | |
+// This type is meant to tightly integrated with the DataFrame type in Grafana's Frontend. | |
+package dataframe | |
+ | |
+import ( | |
+ "fmt" | |
+ | |
+ "github.com/apache/arrow/go/arrow" | |
+ "github.com/apache/arrow/go/arrow/array" | |
+ "github.com/apache/arrow/go/arrow/memory" | |
+) | |
+ | |
+// DataFrame holds Table data. | |
+type DataFrame struct { | |
+ Schema Schema | |
+ Type FrameType | |
+ Records []Fields | |
+} | |
+ | |
+// FrameType indicates the type of data the Dataframe holds | |
+type FrameType int | |
+ | |
+const ( | |
+ // NumericFrame indicates the Dataframe holds numeric values. | |
+ NumericFrame FrameType = iota | |
+ | |
+ // TimeSeriesFrame indicates the Dataframe holds timeseries data. | |
+ TimeSeriesFrame | |
+ | |
+ // HistogramFrame indicates the Dataframe holds histograms data. | |
+ HistogramFrame | |
+ | |
+ // OtherFrame indicates the DataFrame holds mixed or another data type. | |
+ OtherFrame | |
+) | |
+ | |
+func (ft FrameType) String() string { | |
+ switch ft { | |
+ case NumericFrame: | |
+ return "Number" | |
+ case TimeSeriesFrame: | |
+ return "TimeSeries" | |
+ case HistogramFrame: | |
+ return "Histogram" | |
+ default: | |
+ return "Other" | |
+ } | |
+} | |
+ | |
+func (ft FrameType) MarshalJSON() ([]byte, error) { | |
+ return []byte(fmt.Sprintf(`"%v"`, ft.String())), nil | |
+} | |
+ | |
+// DataFrames is a collection of DataFrames uniquely identified by key. | |
+type DataFrames []DataFrame | |
+ | |
+// Fields is a slice fo Field. | |
+type Fields []Field | |
+ | |
+// Field represents a unique field within a dataframe identified by its column and record position. | |
+type Field interface{} | |
+ | |
+// ToArrow is an experiment to create an arrow Table from the dataframe | |
+func (d *DataFrame) ToArrow() *array.TableReader { | |
+ arrowFields := make([]arrow.Field, len(d.Schema)) | |
+ for i, cs := range d.Schema { | |
+ arrowFields[i] = arrow.Field{Name: cs.GetName(), Type: cs.ArrowType()} | |
+ } | |
+ schema := arrow.NewSchema(arrowFields, nil) | |
+ | |
+ pool := memory.NewGoAllocator() | |
+ | |
+ rb := array.NewRecordBuilder(pool, schema) | |
+ defer rb.Release() | |
+ | |
+ records := make([]array.Record, len(d.Records)) | |
+ for rowIdx, row := range d.Records { | |
+ for fieldIdx, field := range row { | |
+ switch arrowFields[fieldIdx].Type.(type) { | |
+ case *arrow.StringType: | |
+ rb.Field(fieldIdx).(*array.StringBuilder).Append(*(field.(*string))) | |
+ //rb.Field(fieldIdx).(*array.StringBuilder).AppendValues([]string{*(field.(*string))}, []bool{}) | |
+ case *arrow.Float64Type: | |
+ rb.Field(fieldIdx).(*array.Float64Builder).Append(*(field.(*float64))) | |
+ //rb.Field(fieldIdx).(*array.Float64Builder).AppendValues([]float64{*(field.(*float64))}, []bool{}) | |
+ default: | |
+ fmt.Println("unmatched") | |
+ } | |
+ } | |
+ rec := rb.NewRecord() | |
+ defer rec.Release() | |
+ records[rowIdx] = rec | |
+ } | |
+ table := array.NewTableFromRecords(schema, records) | |
+ defer table.Release() | |
+ tableReader := array.NewTableReader(table, 3) | |
+ //tableReader.Retain() | |
+ | |
+ return tableReader | |
+ | |
+} | |
diff --git a/pkg/dataframe/dataframe_test.go b/pkg/dataframe/dataframe_test.go | |
new file mode 100644 | |
index 0000000000..fcc459b799 | |
--- /dev/null | |
+++ b/pkg/dataframe/dataframe_test.go | |
@@ -0,0 +1,103 @@ | |
+package dataframe | |
+ | |
+import ( | |
+ "bufio" | |
+ "encoding/json" | |
+ "fmt" | |
+ "os" | |
+ "testing" | |
+ "time" | |
+ | |
+ "github.com/apache/arrow/go/arrow/ipc" | |
+) | |
+ | |
+// Not really tests ... just my repl/notebook for now | |
+ | |
+func TestLoadingDataFrameFromCSV(t *testing.T) { | |
+ data, err := os.Open("./testdata/simpleTimeSeries.csv") | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ schema := Schema{ | |
+ NewTimeColumn(time.RFC3339), | |
+ NewNumberColumn(), | |
+ NewStringColumn(), | |
+ } | |
+ df, err := FromCSV( | |
+ bufio.NewReader(data), | |
+ true, | |
+ schema) | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ df.Type = TimeSeriesFrame | |
+ v, err := json.MarshalIndent(df, "", " ") | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ _ = v | |
+ fmt.Println(string(v)) | |
+} | |
+ | |
+func TestLoadingNumberDataFrameFromCSVAndWritingToArrow(t *testing.T) { | |
+ data, err := os.Open("./testdata/stringNumber.csv") | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ schema := Schema{ | |
+ NewStringColumn(), | |
+ NewNumberColumn(), | |
+ } | |
+ df, err := FromCSV( | |
+ bufio.NewReader(data), | |
+ true, | |
+ schema) | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ df.Type = TimeSeriesFrame | |
+ v, err := json.MarshalIndent(df, "", " ") | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ _ = v | |
+ tableReader := df.ToArrow() | |
+ | |
+ outFile, err := os.OpenFile("/home/kbrandt/tmp/arrowstuff", os.O_APPEND|os.O_WRONLY|os.O_CREATE, os.FileMode(0644)) | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ | |
+ writer, err := ipc.NewFileWriter(outFile, ipc.WithSchema(tableReader.Schema())) | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ | |
+ for tableReader.Next() { | |
+ rec := tableReader.Record() | |
+ err := writer.Write(rec) | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ } | |
+ err = writer.Close() | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ err = outFile.Close() | |
+ if err != nil { | |
+ t.Error(err) | |
+ return | |
+ } | |
+ | |
+} | |
diff --git a/pkg/dataframe/schema.go b/pkg/dataframe/schema.go | |
new file mode 100644 | |
index 0000000000..bf5b42e17b | |
--- /dev/null | |
+++ b/pkg/dataframe/schema.go | |
@@ -0,0 +1,143 @@ | |
+package dataframe | |
+ | |
+import ( | |
+ "fmt" | |
+ "strconv" | |
+ "time" | |
+ | |
+ "github.com/apache/arrow/go/arrow" | |
+) | |
+ | |
+// Schema is a slice of ColumnSchema. | |
+type Schema []ColumnSchema | |
+ | |
+type ColumnSchema interface { | |
+ GetName() string | |
+ SetName(name string) | |
+ ColumnType() ColumnType | |
+ Extract(v string) (interface{}, error) | |
+ ArrowType() arrow.DataType | |
+} | |
+ | |
+type BaseSchema struct { | |
+ Name string | |
+ Type ColumnType | |
+ ArrowDataType arrow.DataType | |
+} | |
+ | |
+func (b *BaseSchema) GetName() string { | |
+ return b.Name | |
+} | |
+ | |
+func (b *BaseSchema) SetName(name string) { | |
+ b.Name = name | |
+} | |
+ | |
+func (b *BaseSchema) ColumnType() ColumnType { | |
+ return b.Type | |
+} | |
+ | |
+func (b *BaseSchema) ArrowType() arrow.DataType { | |
+ return b.ArrowDataType | |
+} | |
+ | |
+type TimeColumnSchema struct { | |
+ BaseSchema | |
+ Format string | |
+} | |
+ | |
+func NewTimeColumn(format string) *TimeColumnSchema { | |
+ t := new(TimeColumnSchema) | |
+ t.Type = DateTime | |
+ t.Format = format | |
+ t.ArrowDataType = arrow.PrimitiveTypes.Date64 | |
+ return t | |
+} | |
+ | |
+func (tcs *TimeColumnSchema) ColumnType() ColumnType { | |
+ return DateTime | |
+} | |
+ | |
+func (tcs *TimeColumnSchema) Extract(v string) (interface{}, error) { | |
+ if v == "" { | |
+ return nil, nil | |
+ } | |
+ t, err := time.Parse(tcs.Format, v) | |
+ if err != nil { | |
+ return nil, err | |
+ } | |
+ return &t, nil | |
+} | |
+ | |
+type NumberColumnSchema struct{ BaseSchema } | |
+ | |
+func NewNumberColumn() *NumberColumnSchema { | |
+ n := new(NumberColumnSchema) | |
+ n.Type = Number | |
+ n.ArrowDataType = &arrow.Float64Type{} | |
+ | |
+ return n | |
+} | |
+ | |
+func (ncs *NumberColumnSchema) Extract(v string) (interface{}, error) { | |
+ if v == "" { | |
+ return nil, nil | |
+ } | |
+ f, err := strconv.ParseFloat(v, 64) | |
+ if err != nil { | |
+ return nil, err | |
+ } | |
+ return &f, nil | |
+ | |
+} | |
+ | |
+type StringColumnSchema struct{ BaseSchema } | |
+ | |
+func NewStringColumn() *StringColumnSchema { | |
+ s := new(StringColumnSchema) | |
+ s.Type = String | |
+ s.ArrowDataType = &arrow.StringType{} | |
+ return s | |
+} | |
+ | |
+func (scs *StringColumnSchema) Extract(v string) (interface{}, error) { | |
+ return &v, nil | |
+ | |
+} | |
+ | |
+// ColumnType is the type of Data that a DataFrame column holds. | |
+type ColumnType int | |
+ | |
+const ( | |
+ // DateTime is the ColumnType holds a value that is a representation of absolute time. | |
+ DateTime ColumnType = iota | |
+ | |
+ // Number is the ColumnType that indicates the column will have integers and floats. | |
+ Number | |
+ | |
+ // String is the ColumnType that indicate the column will have string values. | |
+ String | |
+ | |
+ // Bool is the ColumnType that indicates the column will have a boolean values. | |
+ Bool | |
+ | |
+ // Other is the ColumnType that indicates the column has an unknown type or mix of value types. | |
+ Other | |
+) | |
+ | |
+func (c ColumnType) String() string { | |
+ switch c { | |
+ case DateTime: | |
+ return "DateTime" | |
+ case Number: | |
+ return "Number" | |
+ case String: | |
+ return "String" | |
+ default: | |
+ return "Other" | |
+ } | |
+} | |
+ | |
+func (c ColumnType) MarshalJSON() ([]byte, error) { | |
+ return []byte(fmt.Sprintf(`"%v"`, c.String())), nil | |
+} | |
diff --git a/pkg/dataframe/testdata/simpleTimeSeries.csv b/pkg/dataframe/testdata/simpleTimeSeries.csv | |
new file mode 100644 | |
index 0000000000..7f54305f26 | |
--- /dev/null | |
+++ b/pkg/dataframe/testdata/simpleTimeSeries.csv | |
@@ -0,0 +1,9 @@ | |
+time,value,color | |
+2006-01-02T15:04:05Z,3,blue | |
+2006-01-02T15:04:06Z,5, | |
+2006-01-02T15:04:07Z,6,yellow | |
+2006-01-02T15:04:08Z,1,blue | |
+2006-01-02T15:04:09Z,,blue | |
+,1, | |
+2006-01-02T15:04:11Z,2,blue | |
+,, | |
\ No newline at end of file | |
diff --git a/pkg/dataframe/testdata/stringNumber.csv b/pkg/dataframe/testdata/stringNumber.csv | |
new file mode 100644 | |
index 0000000000..971968a914 | |
--- /dev/null | |
+++ b/pkg/dataframe/testdata/stringNumber.csv | |
@@ -0,0 +1,4 @@ | |
+thing,value | |
+foo,1 | |
+bar,3 | |
+why,7 | |
\ No newline at end of file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment