Skip to content

Instantly share code, notes, and snippets.

@aswinkm-tc
Created October 23, 2024 09:24
Show Gist options
  • Save aswinkm-tc/d0b87648897d5433d989adad6dad2c41 to your computer and use it in GitHub Desktop.
Save aswinkm-tc/d0b87648897d5433d989adad6dad2c41 to your computer and use it in GitHub Desktop.
This code will read a file and write the data to another file after doing some processing on the data line by line.
package main
import (
"bufio"
"context"
"fmt"
"io"
"log/slog"
"os"
"sync"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
outCh := make(chan []byte)
reader, err := os.Open("../testdata/input.txt")
if err != nil {
panic("failed to read input.txt")
}
defer reader.Close()
writer, err := os.Create("../testdata/output.txt")
if err != nil {
panic("failed to read output.txt")
}
defer writer.Close()
scanner := bufio.NewScanner(reader)
t := &Transformer{}
inCh := read(ctx, cancel, scanner)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
t.Transform(ctx, cancel, inCh, outCh)
}()
}
go func() {
wg.Wait()
close(outCh)
}()
write(cancel, writer, outCh)
fmt.Println("finished processing")
}
type Transformer struct {
}
func (t *Transformer) Apply(in []byte) ([]byte, error) {
/*if string(in) == "stu" {
return nil, fmt.Errorf("cannot process 'stu'")
} */
out := append([]byte("processed: "), in...)
return append(out, []byte("\n")...), nil
}
func (t *Transformer) Transform(ctx context.Context, cancel context.CancelFunc, in <-chan []byte, out chan []byte) {
for buf := range in {
if len(buf) == 0 {
continue
}
var err error
buf, err = t.Apply(buf)
if err != nil {
slog.Error("could not apply transformation", "error", err)
cancel()
return
}
select {
case out <- buf:
case <-ctx.Done():
return
}
}
}
func read(ctx context.Context, cancel context.CancelFunc, scanner *bufio.Scanner) <-chan []byte {
out := make(chan []byte)
go func() {
defer close(out)
for scanner.Scan() {
if ctx.Err() != nil {
slog.Error("context cancelled", "error", ctx.Err())
return
}
bytes := scanner.Bytes()
if scanner.Err() != nil {
slog.Error("could not read from scanner", "error", scanner.Err())
cancel()
return
}
select {
case out <- bytes:
case <-ctx.Done():
return
}
}
if err := scanner.Err(); err != nil {
slog.Error("could not read from scanner", "error", err)
cancel()
}
}()
return out
}
func write(cancel context.CancelFunc, writer io.Writer, in <-chan []byte) {
for buf := range in {
_, err := writer.Write(buf)
if err != nil {
slog.Error("could not write to output.txt", "error", err)
cancel()
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment