Created
December 29, 2024 09:32
-
-
Save willnode/75a96840ff33ada3f2aa3db3d28cde07 to your computer and use it in GitHub Desktop.
Streaming CSV inside a ZIP File with Go Channel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"archive/zip" | |
"bytes" | |
"context" | |
"errors" | |
"fmt" | |
"io" | |
"path/filepath" | |
"sync" | |
"cloud.google.com/go/storage" | |
"github.com/gocarina/gocsv" | |
"github.com/google/uuid" | |
"gorm.io/gorm" | |
) | |
type GcpStorage struct { | |
client *storage.Client | |
} | |
func (s *GcpStorage) Read(bucket string, object string) (io.ReadCloser, error) { | |
rc, err := s.client.Bucket(bucket).Object(object).NewReader(context.Background()) | |
if err != nil { | |
return nil, err | |
} | |
return io.ReadCloser(rc), nil | |
} | |
func (s *GcpStorage) ReadFileCsvInZipStream(reader io.ReadCloser) (*zip.File, error) { | |
defer reader.Close() | |
// this reads the whole zip file! (~100 MB allocation) | |
fileBytes, err := io.ReadAll(reader) | |
if err != nil { | |
return nil, err | |
} | |
// these readers don't need Close() since the data lives on memory | |
byteReader := bytes.NewReader(fileBytes) | |
zipReader, err := zip.NewReader(byteReader, int64(len(fileBytes))) | |
if err != nil { | |
return nil, err | |
} | |
for _, f := range zipReader.File { | |
if filepath.Ext(f.Name) == ".csv" { | |
f.Open() | |
return f, nil | |
} | |
} | |
return nil, errors.New("CSV File not found") | |
} | |
type SimulationData struct { | |
ID uuid.UUID `gorm:"type:uuid;column:id;primaryKey" json:"id"` | |
Name string `gorm:"type:text;column:name" json:"name" csv:"Name"` | |
Metadata string `gorm:"type:text;column:metadata" json:"metadata" csv:"Metadata"` | |
Properties string `gorm:"type:text;column:properties" json:"properties" csv:"Properties"` | |
NthIndex int `gorm:"type:int;column:nth_index" json:"nth_index"` | |
} | |
func ReadCsvStreamed(reader io.ReadCloser, payloadChan chan []SimulationData) error { | |
defer reader.Close() | |
var csvRowChan = make(chan SimulationData) | |
// this function runs on separate thread | |
go func() { | |
// we close it here since we send the channel data from here | |
defer close(payloadChan) | |
rows := make([]SimulationData, 0) | |
index := 0 | |
for row := range csvRowChan { | |
input := row | |
input.ID = uuid.New() | |
input.NthIndex = index | |
index += 1 | |
rows = append(rows, input) | |
// when current batch is over 100 | |
if len(rows) >= 100 { | |
payloadChan <- rows | |
rows = make([]SimulationData, 0) | |
} | |
} | |
// send remaining data if any | |
if len(rows) > 0 { | |
payloadChan <- rows | |
} | |
}() | |
// csvRowChan is closed inside this function | |
if err := gocsv.UnmarshalToChan(reader, csvRowChan); err != nil { | |
return err | |
} | |
return nil | |
} | |
func Handler(storage *GcpStorage, db *gorm.DB, wg *sync.WaitGroup, filename string) (err error) { | |
zipReader, err := storage.Read("mybucket", filename) | |
if err != nil { | |
return | |
} | |
// zipReader is closed here | |
zipHandle, err := storage.ReadFileCsvInZipStream(zipReader) | |
if err != nil { | |
return | |
} | |
fileReader, err := zipHandle.Open() | |
if err != nil { | |
return | |
} | |
pchan := make(chan []SimulationData) | |
wg.Add(1) | |
go func() { | |
for p := range pchan { | |
if err := db.Create(p).Error; err != nil { | |
fmt.Printf("%+v", err) | |
} | |
} | |
wg.Done() | |
}() | |
// pChan and fileReader is closed here | |
err = ReadCsvStreamed(fileReader, pchan) | |
return | |
} | |
func main() { | |
storage := GcpStorage{} | |
db := gorm.DB{} | |
wg := sync.WaitGroup{} | |
// init storage and db from envar | |
// .... | |
// normally you place the Handler inside HTTP function | |
// but for brevity we call this directly | |
if err := Handler(&storage, &db, &wg, "myfile.csv.zip"); err != nil { | |
panic(err) | |
} | |
fmt.Println("All CSV data is parsed") | |
wg.Wait() | |
fmt.Println("All CSV data is saved") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment