Skip to content

Instantly share code, notes, and snippets.

@willnode
Created December 29, 2024 09:32
Show Gist options
  • Save willnode/75a96840ff33ada3f2aa3db3d28cde07 to your computer and use it in GitHub Desktop.
Save willnode/75a96840ff33ada3f2aa3db3d28cde07 to your computer and use it in GitHub Desktop.
Streaming CSV inside a ZIP File with Go Channel
package main
import (
"archive/zip"
"bytes"
"context"
"errors"
"fmt"
"io"
"path/filepath"
"sync"
"cloud.google.com/go/storage"
"github.com/gocarina/gocsv"
"github.com/google/uuid"
"gorm.io/gorm"
)
type GcpStorage struct {
client *storage.Client
}
func (s *GcpStorage) Read(bucket string, object string) (io.ReadCloser, error) {
rc, err := s.client.Bucket(bucket).Object(object).NewReader(context.Background())
if err != nil {
return nil, err
}
return io.ReadCloser(rc), nil
}
func (s *GcpStorage) ReadFileCsvInZipStream(reader io.ReadCloser) (*zip.File, error) {
defer reader.Close()
// this reads the whole zip file! (~100 MB allocation)
fileBytes, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
// these readers don't need Close() since the data lives on memory
byteReader := bytes.NewReader(fileBytes)
zipReader, err := zip.NewReader(byteReader, int64(len(fileBytes)))
if err != nil {
return nil, err
}
for _, f := range zipReader.File {
if filepath.Ext(f.Name) == ".csv" {
f.Open()
return f, nil
}
}
return nil, errors.New("CSV File not found")
}
type SimulationData struct {
ID uuid.UUID `gorm:"type:uuid;column:id;primaryKey" json:"id"`
Name string `gorm:"type:text;column:name" json:"name" csv:"Name"`
Metadata string `gorm:"type:text;column:metadata" json:"metadata" csv:"Metadata"`
Properties string `gorm:"type:text;column:properties" json:"properties" csv:"Properties"`
NthIndex int `gorm:"type:int;column:nth_index" json:"nth_index"`
}
func ReadCsvStreamed(reader io.ReadCloser, payloadChan chan []SimulationData) error {
defer reader.Close()
var csvRowChan = make(chan SimulationData)
// this function runs on separate thread
go func() {
// we close it here since we send the channel data from here
defer close(payloadChan)
rows := make([]SimulationData, 0)
index := 0
for row := range csvRowChan {
input := row
input.ID = uuid.New()
input.NthIndex = index
index += 1
rows = append(rows, input)
// when current batch is over 100
if len(rows) >= 100 {
payloadChan <- rows
rows = make([]SimulationData, 0)
}
}
// send remaining data if any
if len(rows) > 0 {
payloadChan <- rows
}
}()
// csvRowChan is closed inside this function
if err := gocsv.UnmarshalToChan(reader, csvRowChan); err != nil {
return err
}
return nil
}
func Handler(storage *GcpStorage, db *gorm.DB, wg *sync.WaitGroup, filename string) (err error) {
zipReader, err := storage.Read("mybucket", filename)
if err != nil {
return
}
// zipReader is closed here
zipHandle, err := storage.ReadFileCsvInZipStream(zipReader)
if err != nil {
return
}
fileReader, err := zipHandle.Open()
if err != nil {
return
}
pchan := make(chan []SimulationData)
wg.Add(1)
go func() {
for p := range pchan {
if err := db.Create(p).Error; err != nil {
fmt.Printf("%+v", err)
}
}
wg.Done()
}()
// pChan and fileReader is closed here
err = ReadCsvStreamed(fileReader, pchan)
return
}
func main() {
storage := GcpStorage{}
db := gorm.DB{}
wg := sync.WaitGroup{}
// init storage and db from envar
// ....
// normally you place the Handler inside HTTP function
// but for brevity we call this directly
if err := Handler(&storage, &db, &wg, "myfile.csv.zip"); err != nil {
panic(err)
}
fmt.Println("All CSV data is parsed")
wg.Wait()
fmt.Println("All CSV data is saved")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment