Created
October 23, 2024 09:24
-
-
Save aswinkm-tc/d0b87648897d5433d989adad6dad2c41 to your computer and use it in GitHub Desktop.
This code will read a file and write the data to another file after doing some processing on the data line by line.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"context" | |
"fmt" | |
"io" | |
"log/slog" | |
"os" | |
"sync" | |
) | |
func main() { | |
ctx, cancel := context.WithCancel(context.Background()) | |
outCh := make(chan []byte) | |
reader, err := os.Open("../testdata/input.txt") | |
if err != nil { | |
panic("failed to read input.txt") | |
} | |
defer reader.Close() | |
writer, err := os.Create("../testdata/output.txt") | |
if err != nil { | |
panic("failed to read output.txt") | |
} | |
defer writer.Close() | |
scanner := bufio.NewScanner(reader) | |
t := &Transformer{} | |
inCh := read(ctx, cancel, scanner) | |
var wg sync.WaitGroup | |
for i := 0; i < 10; i++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
t.Transform(ctx, cancel, inCh, outCh) | |
}() | |
} | |
go func() { | |
wg.Wait() | |
close(outCh) | |
}() | |
write(cancel, writer, outCh) | |
fmt.Println("finished processing") | |
} | |
type Transformer struct { | |
} | |
func (t *Transformer) Apply(in []byte) ([]byte, error) { | |
/*if string(in) == "stu" { | |
return nil, fmt.Errorf("cannot process 'stu'") | |
} */ | |
out := append([]byte("processed: "), in...) | |
return append(out, []byte("\n")...), nil | |
} | |
func (t *Transformer) Transform(ctx context.Context, cancel context.CancelFunc, in <-chan []byte, out chan []byte) { | |
for buf := range in { | |
if len(buf) == 0 { | |
continue | |
} | |
var err error | |
buf, err = t.Apply(buf) | |
if err != nil { | |
slog.Error("could not apply transformation", "error", err) | |
cancel() | |
return | |
} | |
select { | |
case out <- buf: | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
func read(ctx context.Context, cancel context.CancelFunc, scanner *bufio.Scanner) <-chan []byte { | |
out := make(chan []byte) | |
go func() { | |
defer close(out) | |
for scanner.Scan() { | |
if ctx.Err() != nil { | |
slog.Error("context cancelled", "error", ctx.Err()) | |
return | |
} | |
bytes := scanner.Bytes() | |
if scanner.Err() != nil { | |
slog.Error("could not read from scanner", "error", scanner.Err()) | |
cancel() | |
return | |
} | |
select { | |
case out <- bytes: | |
case <-ctx.Done(): | |
return | |
} | |
} | |
if err := scanner.Err(); err != nil { | |
slog.Error("could not read from scanner", "error", err) | |
cancel() | |
} | |
}() | |
return out | |
} | |
func write(cancel context.CancelFunc, writer io.Writer, in <-chan []byte) { | |
for buf := range in { | |
_, err := writer.Write(buf) | |
if err != nil { | |
slog.Error("could not write to output.txt", "error", err) | |
cancel() | |
return | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment