Skip to content

Instantly share code, notes, and snippets.

@jonmorehouse
Created October 16, 2018 05:15
Show Gist options
  • Save jonmorehouse/b28a37a01bfe164664809389d9b674e2 to your computer and use it in GitHub Desktop.
Save jonmorehouse/b28a37a01bfe164664809389d9b674e2 to your computer and use it in GitHub Desktop.
func (b *bigqueryGithubDataSink) executeQuery(queryStr string, cb func([]bigquery.Value, error) error) (context.CancelFunc, error) {
if !b.isInitialized() {
return nil, ErrDataSinkNotInitialized
}
transactionContext, cancelFunc := context.WithTimeout(b.clientContext, b.opts.ReadTransactionTimeout)
query := b.client.Query(queryStr)
rowIterator, err := query.Read(transactionContext)
if err != nil {
return cancelFunc, err
}
b.addCancelFunc(cancelFunc)
defer cancelFunc()
go func(it *bigquery.RowIterator) {
defer cancelFunc()
for {
var values []bigquery.Value
err := it.Next(&values)
if cbErr := cb(values, err); cbErr != nil {
return
}
if err == iterator.Done {
return
}
}
}(rowIterator)
return cancelFunc, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment