Created
September 1, 2016 16:28
-
-
Save mark-rushakoff/2e1c160a15d0c7500548d9aae1773c6d to your computer and use it in GitHub Desktop.
Result parsing for InfluxDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package resultparser | |
import ( | |
"encoding/json" | |
"fmt" | |
) | |
// Field is implemented by TimeField, IntField, FloatField, and StringField | |
type Field interface { | |
columnIndex([]string) (int, error) | |
set(interface{}) | |
} | |
var ( | |
_ Field = &TimeField{} | |
_ Field = &IntField{} | |
_ Field = &FloatField{} | |
_ Field = &StringField{} | |
) | |
// TimeField represents the "time" field in a row in an InfluxDB response. | |
type TimeField struct { | |
Nanoseconds int64 | |
} | |
func (f *TimeField) columnIndex(columns []string) (int, error) { | |
return findStringIndex(columns, "time") | |
} | |
func (f *TimeField) set(v interface{}) { | |
// Assuming decoder.UseNumber was called. | |
var err error | |
f.Nanoseconds, err = v.(json.Number).Int64() | |
if err != nil { | |
panic(err) | |
} | |
} | |
// IntField represents an integer field with the given Name in a row in an InfluxDB response. | |
type IntField struct { | |
Name string | |
Value int64 | |
} | |
func (f *IntField) columnIndex(columns []string) (int, error) { | |
return findStringIndex(columns, f.Name) | |
} | |
func (f *IntField) set(v interface{}) { | |
// Assuming decoder.UseNumber was called. | |
var err error | |
f.Value, err = v.(json.Number).Int64() | |
if err != nil { | |
panic(err) | |
} | |
} | |
// FloatField represents a float field with the given Name in a row in an InfluxDB response. | |
type FloatField struct { | |
Name string | |
Value float64 | |
} | |
func (f *FloatField) columnIndex(columns []string) (int, error) { | |
return findStringIndex(columns, f.Name) | |
} | |
func (f *FloatField) set(v interface{}) { | |
// Assuming decoder.UseNumber was called. | |
var err error | |
f.Value, err = v.(json.Number).Float64() | |
if err != nil { | |
panic(err) | |
} | |
} | |
// StringField represents a string field with the given Name in a row in an InfluxDB response. | |
type StringField struct { | |
Name string | |
Value string | |
} | |
func (f *StringField) columnIndex(columns []string) (int, error) { | |
return findStringIndex(columns, f.Name) | |
} | |
func (f *StringField) set(v interface{}) { | |
f.Value = v.(string) | |
} | |
func findStringIndex(columns []string, name string) (int, error) { | |
for i, col := range columns { | |
if col == name { | |
return i, nil | |
} | |
} | |
return -1, fmt.Errorf("Field not found: %s", name) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package resultparser contains utilities to simplify handling of InfluxDB query results. | |
package resultparser | |
import ( | |
"bytes" | |
"encoding/json" | |
"fmt" | |
) | |
// ResultParser manages mapping named fields to actual values from InfluxDB query results. | |
type ResultParser struct { | |
// Extracted from the response json. | |
values [][]interface{} | |
// Which row we're currently pointing at. | |
rowIndex int | |
// Map column index to underlying Field. | |
columnsToFields map[int]Field | |
} | |
// NewResultParser returns a new ResultParser based on the given raw response and the given fields. | |
func NewResultParser(rawResp []byte, fields []Field) (*ResultParser, error) { | |
resp := new(resp) | |
dec := json.NewDecoder(bytes.NewReader(rawResp)) | |
dec.UseNumber() | |
if err := dec.Decode(resp); err != nil { | |
return nil, err | |
} | |
if len(resp.Results) != 1 { | |
return nil, fmt.Errorf("Expected exactly one result in response, got %d", len(resp.Results)) | |
} | |
if len(resp.Results[0].Series) != 1 { | |
return nil, fmt.Errorf("Expected exactly one series in result, got %d", len(resp.Results[0].Series)) | |
} | |
columns := resp.Results[0].Series[0].Columns | |
columnsToFields := make(map[int]Field, len(fields)) | |
for _, f := range fields { | |
idx, err := f.columnIndex(columns) | |
if err != nil { | |
return nil, err | |
} | |
columnsToFields[idx] = f | |
} | |
return &ResultParser{ | |
values: resp.Results[0].Series[0].Values, | |
rowIndex: -1, // so that the first call to Next() sets to 0 | |
columnsToFields: columnsToFields, | |
}, nil | |
} | |
// Next returns true if there is another row of values to parse. | |
func (p *ResultParser) Next() bool { | |
p.rowIndex++ | |
return p.rowIndex < len(p.values) | |
} | |
// ParseFields assigns the correct values into the original passed-in Fields based on the Values in the current row. | |
// ParseFields currently will panic if a type conversion fails. | |
func (p *ResultParser) ParseFields() { | |
for colIdx, field := range p.columnsToFields { | |
// This can panic if the type conversion fails. | |
// If we start running into that in experiments, let's revisit returning an error from set(). | |
field.set(p.values[p.rowIndex][colIdx]) | |
} | |
} | |
// resp is a container for the raw InfluxDB responses. | |
type resp struct { | |
Results []struct { | |
Series []struct { | |
Columns []string `json:"columns"` | |
Values [][]interface{} `json:"values"` | |
} `json:"series"` | |
} `json:"results"` | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package resultparser_test | |
import ( | |
"testing" | |
"github.com/influxdata/laboratory/resultparser" | |
) | |
func TestResultParser_Fields(t *testing.T) { | |
rawResp := []byte(`{ | |
"results": [ | |
{ | |
"series": [ | |
{ | |
"name": "timetonpoints", | |
"columns": [ | |
"time", | |
"expid", | |
"latNs", | |
"linesperbatch", | |
"numpoints", | |
"numwriters", | |
"reqBytes", | |
"someFloat", | |
"status" | |
], | |
"values": [ | |
[ | |
1459985225813556804, | |
"b3blkgssusuw_1qn5vnc", | |
14917972, | |
"100", | |
"10000", | |
"1", | |
5690, | |
1.5, | |
204 | |
], | |
[ | |
1459985225815570190, | |
"b3c81v86pkqu_1nakltx", | |
1993293, | |
"100", | |
"10000", | |
"1", | |
5800, | |
2.75, | |
204 | |
] | |
] | |
} | |
] | |
} | |
] | |
}`) | |
timeF := &resultparser.TimeField{} | |
reqBytesF := &resultparser.IntField{Name: "reqBytes"} | |
expIDF := &resultparser.StringField{Name: "expid"} | |
floatF := &resultparser.FloatField{Name: "someFloat"} | |
p, err := resultparser.NewResultParser(rawResp, []resultparser.Field{ | |
timeF, | |
reqBytesF, | |
expIDF, | |
floatF, | |
}) | |
if err != nil { | |
t.Fatalf("unexpected error: %s", err.Error()) | |
} | |
rowCount := 0 | |
for p.Next() { | |
rowCount++ | |
p.ParseFields() | |
var ( | |
expTime int64 | |
expReqBytes int64 | |
expID string | |
expFloat float64 | |
) | |
if rowCount == 1 { | |
expTime = int64(1459985225813556804) | |
expReqBytes = int64(5690) | |
expID = "b3blkgssusuw_1qn5vnc" | |
expFloat = float64(1.5) | |
} else { | |
expTime = int64(1459985225815570190) | |
expReqBytes = int64(5800) | |
expID = "b3c81v86pkqu_1nakltx" | |
expFloat = float64(2.75) | |
} | |
if timeF.Nanoseconds != expTime { | |
t.Fatalf("time field: exp %d, got %d", expTime, timeF.Nanoseconds) | |
} | |
if reqBytesF.Value != expReqBytes { | |
t.Fatalf("req bytes field: exp %d, got %d", expReqBytes, reqBytesF.Value) | |
} | |
if expIDF.Value != expID { | |
t.Fatalf("id field: exp %d, got %d", expID, expIDF.Value) | |
} | |
if floatF.Value != expFloat { | |
t.Fatalf("float field: exp %d, got %d", expFloat, floatF.Value) | |
} | |
} | |
if rowCount != 2 { | |
t.Fatalf("exp 2 rows, got: %d", rowCount) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment