Skip to content

Instantly share code, notes, and snippets.

@alexyslozada
Created October 23, 2022 23:06
Show Gist options
  • Save alexyslozada/43cfd4d20974d5da9f4a5ab6d0b2cf5d to your computer and use it in GitHub Desktop.
Save alexyslozada/43cfd4d20974d5da9f4a5ab6d0b2cf5d to your computer and use it in GitHub Desktop.
Example benchmark for bulk insert with pgx
package main
import (
"bytes"
"context"
"encoding/csv"
"fmt"
"io"
"log"
"os"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
dataCSV, err := os.ReadFile("cienaa.csv")
if err != nil {
log.Fatalln("couldn't open or read the file cienaa.csv", err)
}
reader := csv.NewReader(bytes.NewReader(dataCSV))
reader.Comma = ','
reader.Comment = '#'
var dataRead [][]any
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatalln("couldn't read the record", err)
}
var recordAny []any
for _, v := range record {
recordAny = append(recordAny, v)
}
dataRead = append(dataRead, recordAny)
}
dataRead = dataRead[1:]
// fmt.Println(dataRead)
pool, err := createPool()
if err != nil {
log.Fatalln("couldn't connect to database", err)
}
defer pool.Close()
start := time.Now()
err = insertOneByOne(dataRead, pool)
if err != nil {
log.Fatalln("couldn't insert one by one", err)
}
ends := time.Since(start)
fmt.Println("insert one by one is", ends.String())
time.Sleep(time.Second * 30)
start = time.Now()
err = insertBatch(dataRead, pool)
if err != nil {
log.Fatalln("couldn't run batch", err)
}
ends = time.Since(start)
fmt.Println("insert via batch is", ends.String())
time.Sleep(time.Second * 30)
start = time.Now()
err = insertBulk(dataRead, pool)
if err != nil {
log.Fatalln("couldn't bulk", err)
}
ends = time.Since(start)
fmt.Println("bulk is", ends.String())
fmt.Println("fin")
}
const InsertClause = "INSERT INTO migrations VALUES ($1, $2, $3, $4)"
func insertOneByOne(dataRead [][]any, pool *pgxpool.Pool) error {
for _, v := range dataRead {
_, err := pool.Exec(context.Background(), InsertClause, v[0], v[1], v[2], v[3])
if err != nil {
log.Println(v[0], v[1], v[2], v[6])
return err
}
}
return nil
}
func insertBatch(dataRead [][]any, pool *pgxpool.Pool) error {
batch := pgx.Batch{}
for _, v := range dataRead {
batch.Queue(InsertClause, v[0], v[1], v[2], v[3]).Exec(func(ct pgconn.CommandTag) error {
return nil
})
}
resultBatch := pool.SendBatch(context.Background(), &batch)
defer func() {
err := resultBatch.Close()
if err != nil {
log.Fatalln("error closing result batch", err)
}
}()
ct, err := resultBatch.Exec()
if err != nil {
return err
}
fmt.Println(ct.RowsAffected())
return nil
}
func insertBulk(dataRead [][]any, pool *pgxpool.Pool) error {
copyCount, err := pool.CopyFrom(
context.Background(),
pgx.Identifier{"migrations"},
[]string{"year_month", "month_of_release", "passenger_type", "country_of_residence"},
pgx.CopyFromSlice(len(dataRead), func(i int) ([]any, error) {
return []any{dataRead[i][0], dataRead[i][1], dataRead[i][2], dataRead[i][3]}, nil
}),
)
if err != nil {
return err
}
fmt.Println(copyCount)
return nil
}
func createPool() (*pgxpool.Pool, error) {
dns := "user=alexys password=alexys host=localhost port=5432 dbname=alexys sslmode=disable pool_max_conns=10 pool_min_conns=3"
config, err := pgxpool.ParseConfig(dns)
if err != nil {
return nil, err
}
config.ConnConfig.RuntimeParams["application_name"] = "alexys-pruebas"
return pgxpool.NewWithConfig(context.Background(), config)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment