Last active
April 27, 2023 17:22
-
-
Save ip75/c718b1cae7bdea439244d5f446539742 to your computer and use it in GitHub Desktop.
import huge CSV file to postgresql. Using sized channel to make a queue with chunks. Reader pushes chunks to queue in one thread and the dumper writes records in other thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package psql | |
import ( | |
"database/sql" | |
"encoding/csv" | |
"fmt" | |
"io" | |
"os" | |
"regexp" | |
"sync" | |
"github.com/lib/pq" | |
) | |
const ( | |
chunkSize = 1024 | |
queueSize = 1024 | |
) | |
type InvalidPassportRecord struct { | |
Serial, Number string | |
} | |
type Chunk []InvalidPassportRecord | |
var ( | |
queue chan Chunk | |
wg sync.WaitGroup | |
) | |
// InvalidPassportsCleanAndImportFile удаление битых записей и импорт в таблицу invalid_passports справочника. (2 часа) 1,5млн записей | |
func (ths PgRepo) InvalidPassportsCleanAndImportFile(dictionaryFile string) error { | |
// grep -E '^[0-9]+,[0-9]+$' /podft/list_of_expired_passports.csv | psql -Upostgres -hlocalhost profiles -c "COPY invalid_passports (serial, number) FROM STDIN (FORCE_NOT_NULL(serial, number), DELIMITER ',', FORMAT csv);" | |
file, err := os.Open(dictionaryFile) | |
if err != nil { | |
return err | |
} | |
// truncate table before import new data | |
if err := ths.db.Exec("TRUNCATE TABLE invalid_passports;").Error; err != nil { | |
return err | |
} | |
// drop index to import duplicates | |
if err := ths.db.Exec("DROP INDEX IF EXISTS idx_passport_serial_number;").Error; err != nil { | |
return err | |
} | |
parser := csv.NewReader(file) | |
queue = make(chan Chunk, queueSize) | |
defer close(queue) | |
wg.Add(1) | |
go readChunk(parser, validateRecord) | |
db, err := ths.db.DB() | |
if err != nil { | |
return err | |
} | |
wg.Add(1) | |
go dumpChunk(db) | |
// подождём пару часиков пока всё зальётся | |
wg.Wait() | |
if err := ths.removeDuplicates(); err != nil { | |
return err | |
} | |
if err := ths.generateIndices(); err != nil { | |
return err | |
} | |
return nil | |
} | |
// читает записи из файла, наполняет чанки записями и кладёт в очередь. | |
func readChunk(r *csv.Reader, validator func([]string) bool) error { | |
var chunk Chunk = make(Chunk, 0, chunkSize) | |
var err error | |
var record []string | |
for { | |
for iRecord := chunkSize; iRecord != 0; { | |
record, err = r.Read() | |
if err != nil { | |
if err == io.EOF { | |
break | |
} | |
return err | |
} | |
if validator(record) { | |
chunk = append(chunk, InvalidPassportRecord{Serial: record[0], Number: record[1]}) | |
iRecord-- | |
} | |
} | |
fmt.Printf("chunk size to queue: %d\n", len(chunk)) | |
queue <- chunk | |
chunk = chunk[:0] // clear chunk but keep allocated memory in heap | |
if err == io.EOF { | |
wg.Done() | |
return nil | |
} | |
} | |
} | |
// берёт чанки из очереди и далее дампит их в базу | |
func dumpChunk(db *sql.DB) error { | |
for { | |
tx, err := db.Begin() | |
if err != nil { | |
return err | |
} | |
defer tx.Commit() | |
stmt, err := tx.Prepare(pq.CopyIn("invalid_passports", "serial", "number")) | |
if err != nil { | |
return err | |
} | |
defer stmt.Close() | |
//loop through an array of struct filled with data, or read from a file | |
nextChunk := <-queue | |
fmt.Printf("chunk size to dump: %d\n", len(nextChunk)) | |
for _, row := range nextChunk { | |
stmt.Exec(row.Number, row.Serial) | |
if err != nil { | |
tx.Rollback() | |
return err | |
} | |
} | |
// TODO: ничего не понимаю. Тут зависает и всё. | |
_, err = stmt.Exec() | |
if err != nil { | |
return err | |
} | |
if chunkSize > len(nextChunk) { | |
// последний неполный чанк. | |
wg.Done() | |
return nil | |
} | |
} | |
} | |
func validateRecord(rec []string) bool { | |
return regexp.MustCompile("[0-9]+").MatchString(rec[0]) && regexp.MustCompile("[0-9]+").MatchString(rec[1]) | |
} | |
// removeDuplicates - удаление дубликатов. (10 минут) | |
func (ths PgRepo) removeDuplicates() error { | |
trx := ths.db.Exec(` | |
--- delete duplicates | |
DELETE FROM | |
invalid_passports a | |
USING invalid_passports b | |
WHERE | |
a.id < b.id | |
AND a.serial = b.serial | |
AND a.number = b.number; | |
`) | |
if trx.Error != nil { | |
return trx.Error | |
} | |
return nil | |
} | |
// generateIndices - создание индекса для быстрого поиска записей. (10 минут) | |
func (ths PgRepo) generateIndices() error { | |
trx := ths.db.Exec("CREATE UNIQUE INDEX idx_passport_serial_number ON public.invalid_passports USING btree (serial, number);") | |
if trx.Error != nil { | |
return trx.Error | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment