Skip to content

Instantly share code, notes, and snippets.

@ip75
Last active April 27, 2023 17:22
Show Gist options
  • Save ip75/c718b1cae7bdea439244d5f446539742 to your computer and use it in GitHub Desktop.
Save ip75/c718b1cae7bdea439244d5f446539742 to your computer and use it in GitHub Desktop.
import huge CSV file to postgresql. Using sized channel to make a queue with chunks. Reader pushes chunks to queue in one thread and the dumper writes records in other thread
package psql
import (
"database/sql"
"encoding/csv"
"fmt"
"io"
"os"
"regexp"
"sync"
"github.com/lib/pq"
)
const (
chunkSize = 1024
queueSize = 1024
)
type InvalidPassportRecord struct {
Serial, Number string
}
type Chunk []InvalidPassportRecord
var (
queue chan Chunk
wg sync.WaitGroup
)
// InvalidPassportsCleanAndImportFile удаление битых записей и импорт в таблицу invalid_passports справочника. (2 часа) 1,5млн записей
func (ths PgRepo) InvalidPassportsCleanAndImportFile(dictionaryFile string) error {
// grep -E '^[0-9]+,[0-9]+$' /podft/list_of_expired_passports.csv | psql -Upostgres -hlocalhost profiles -c "COPY invalid_passports (serial, number) FROM STDIN (FORCE_NOT_NULL(serial, number), DELIMITER ',', FORMAT csv);"
file, err := os.Open(dictionaryFile)
if err != nil {
return err
}
// truncate table before import new data
if err := ths.db.Exec("TRUNCATE TABLE invalid_passports;").Error; err != nil {
return err
}
// drop index to import duplicates
if err := ths.db.Exec("DROP INDEX IF EXISTS idx_passport_serial_number;").Error; err != nil {
return err
}
parser := csv.NewReader(file)
queue = make(chan Chunk, queueSize)
defer close(queue)
wg.Add(1)
go readChunk(parser, validateRecord)
db, err := ths.db.DB()
if err != nil {
return err
}
wg.Add(1)
go dumpChunk(db)
// подождём пару часиков пока всё зальётся
wg.Wait()
if err := ths.removeDuplicates(); err != nil {
return err
}
if err := ths.generateIndices(); err != nil {
return err
}
return nil
}
// читает записи из файла, наполняет чанки записями и кладёт в очередь.
func readChunk(r *csv.Reader, validator func([]string) bool) error {
var chunk Chunk = make(Chunk, 0, chunkSize)
var err error
var record []string
for {
for iRecord := chunkSize; iRecord != 0; {
record, err = r.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
if validator(record) {
chunk = append(chunk, InvalidPassportRecord{Serial: record[0], Number: record[1]})
iRecord--
}
}
fmt.Printf("chunk size to queue: %d\n", len(chunk))
queue <- chunk
chunk = chunk[:0] // clear chunk but keep allocated memory in heap
if err == io.EOF {
wg.Done()
return nil
}
}
}
// берёт чанки из очереди и далее дампит их в базу
func dumpChunk(db *sql.DB) error {
for {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Commit()
stmt, err := tx.Prepare(pq.CopyIn("invalid_passports", "serial", "number"))
if err != nil {
return err
}
defer stmt.Close()
//loop through an array of struct filled with data, or read from a file
nextChunk := <-queue
fmt.Printf("chunk size to dump: %d\n", len(nextChunk))
for _, row := range nextChunk {
stmt.Exec(row.Number, row.Serial)
if err != nil {
tx.Rollback()
return err
}
}
// TODO: ничего не понимаю. Тут зависает и всё.
_, err = stmt.Exec()
if err != nil {
return err
}
if chunkSize > len(nextChunk) {
// последний неполный чанк.
wg.Done()
return nil
}
}
}
func validateRecord(rec []string) bool {
return regexp.MustCompile("[0-9]+").MatchString(rec[0]) && regexp.MustCompile("[0-9]+").MatchString(rec[1])
}
// removeDuplicates - удаление дубликатов. (10 минут)
func (ths PgRepo) removeDuplicates() error {
trx := ths.db.Exec(`
--- delete duplicates
DELETE FROM
invalid_passports a
USING invalid_passports b
WHERE
a.id < b.id
AND a.serial = b.serial
AND a.number = b.number;
`)
if trx.Error != nil {
return trx.Error
}
return nil
}
// generateIndices - создание индекса для быстрого поиска записей. (10 минут)
func (ths PgRepo) generateIndices() error {
trx := ths.db.Exec("CREATE UNIQUE INDEX idx_passport_serial_number ON public.invalid_passports USING btree (serial, number);")
if trx.Error != nil {
return trx.Error
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment