Skip to content

Instantly share code, notes, and snippets.

@ivan-loh
Last active August 8, 2019 06:44
Show Gist options
  • Save ivan-loh/7304138bb633de572adeddbca7f0d20e to your computer and use it in GitHub Desktop.
Save ivan-loh/7304138bb633de572adeddbca7f0d20e to your computer and use it in GitHub Desktop.
Sample Bigquery Streaming
package main
import (
"cloud.google.com/go/bigquery"
"context"
"google.golang.org/api/googleapi"
"log"
"time"
)
var (
ProjectID string = "some-project-id"
DatasetID string = "dataset"
TableID string = "rankings"
)
//
// Record implements ValueSaver Interface.
// -----------------------------------------
type Record struct {
Name string
Rank int
}
func (r *Record) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"name": r.Name,
"rank": r.Rank,
}, "", nil
}
//
// Main Function
// ---------------
func main() {
//
// 1. Create Connection To BigQuery
// ----------------------------------
bqClient, bqConnErr := bigquery.NewClient(context.Background(), ProjectID)
if bqConnErr != nil {
log.Fatal("error connecting to bq, ", bqConnErr)
}
dataset := bqClient.Dataset(DatasetID)
//
// 2. Create Table For Storing Data
// ----------------------------------
{
tableSchema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType},
{Name: "rank", Type: bigquery.IntegerFieldType},
}
tableMeta := &bigquery.TableMetadata{Schema: tableSchema}
tableReference := dataset.Table(TableID)
tblCreateErr := tableReference.Create(context.Background(), tableMeta)
if tblCreateErr != nil {
apiError := tblCreateErr.(*googleapi.Error)
if apiError.Code != 409 {
log.Fatal("Error creating table, ", tblCreateErr)
}
}
}
//
// 3. Try Streaming Insert API
// -----------------------------
records := []*Record{
{Name: "Simone " + time.Now().String(), Rank: 2},
{Name: "Darth " + time.Now().String(), Rank: 3},
}
log.Println("Inserting, ", records)
uploader := dataset.Table(TableID).Uploader()
putError := uploader.Put(context.Background(), records)
if putError != nil {
putMultiError := putError.(bigquery.PutMultiError)
for _, rowInsertionError := range putMultiError {
insertID := rowInsertionError.InsertID
rowIndex := rowInsertionError.RowIndex
errors := rowInsertionError.Errors
for _, error := range errors {
log.Println(insertID, rowIndex, error)
}
}
log.Fatal(putMultiError)
}
log.Println("done.")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment