Skip to content

Instantly share code, notes, and snippets.

@dele454
Created August 6, 2022 03:42
Show Gist options
  • Save dele454/10110d24f748c88eac639a58e203dff4 to your computer and use it in GitHub Desktop.
Save dele454/10110d24f748c88eac639a58e203dff4 to your computer and use it in GitHub Desktop.
Part 1 - CSV Transformation
// ProcessRecord process records received via the chan
func (tr *HTMLTransformer) ProcessRecord(wg *sync.WaitGroup, record <-chan []string, done <-chan bool) {
var (
now = time.Now()
data []utils.SalesRecord
end bool
ctx = context.Background()
)
defer func() {
tr.reporter.SetFilename(filepath.Base(tr.reporter.GetFilename()))
tr.reporter.AddDuration(time.Since(now).Seconds())
tr.reporter.Completed()
err := tr.reporter.WriteReportToStdOut(ctx)
if err != nil {
utils.Log(utils.ColorError, err)
}
wg.Done()
}()
// process pipeline
for {
select {
case <-done:
// reading has completed.
end = true
case row := <-record:
// read from pipeline
if len(row) == 0 {
tr.reporter.RecordFailed()
tr.reporter.AddError(errs.ErrorEmptyRowFound)
utils.Log(utils.ColorError, errs.ErrorEmptyRowFound)
continue
}
// unmarshal records
var sr utils.SalesRecord
sr, err := tr.processor.Unmarshal(row, sr)
if err != nil {
tr.reporter.RecordFailed()
tr.reporter.AddError(err)
continue
}
// push row to collection
tr.reporter.RecordTransformed()
data = append(data, sr)
}
// means parser has signaled end of file
// exit loop
if end {
break
}
}
// send output to file
err := tr.WriteOutputToFile(&Output{
FileName: filepath.Base(tr.reporter.GetFilename()),
TotalHeaders: len(tr.reporter.GetHeaders()),
Headers: tr.reporter.GetHeaders(),
Data: data,
})
if err != nil {
tr.reporter.AddError(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment