Last active
September 8, 2022 13:40
-
-
Save cideM/aab27b385643193975dfe7c61766f923 to your computer and use it in GitHub Desktop.
Add error handling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/main.go b/main.go | |
index 3888a36..8794caa 100644 | |
--- a/main.go | |
+++ b/main.go | |
@@ -2,6 +2,7 @@ package main | |
import ( | |
"context" | |
+ "errors" | |
"log" | |
"runtime" | |
"strings" | |
@@ -28,11 +29,13 @@ func producer(ctx context.Context, strings []string) (<-chan string, error) { | |
return outChannel, nil | |
} | |
-func transformToLower(ctx context.Context, values <-chan string) (<-chan string, error) { | |
+func transformToLower(ctx context.Context, values <-chan string) (<-chan string, <-chan error, error) { | |
outChannel := make(chan string) | |
+ errorChannel := make(chan error) | |
go func() { | |
defer close(outChannel) | |
+ defer close(errorChannel) | |
select { | |
case <-ctx.Done(): | |
@@ -48,14 +51,16 @@ func transformToLower(ctx context.Context, values <-chan string) (<-chan string, | |
} | |
}() | |
- return outChannel, nil | |
+ return outChannel, errorChannel, nil | |
} | |
-func transformToTitle(ctx context.Context, values <-chan string) (<-chan string, error) { | |
+func transformToTitle(ctx context.Context, values <-chan string) (<-chan string, <-chan error, error) { | |
outChannel := make(chan string) | |
+ errorChannel := make(chan error) | |
go func() { | |
defer close(outChannel) | |
+ defer close(errorChannel) | |
select { | |
case <-ctx.Done(): | |
@@ -64,22 +69,31 @@ func transformToTitle(ctx context.Context, values <-chan string) (<-chan string, | |
if ok { | |
log.Println("transformToTitle input: ", s) | |
time.Sleep(time.Second * 3) | |
- outChannel <- strings.Title(s) | |
+ if s == "foo" { | |
+ errorChannel <- errors.New("error in transformToTitle") | |
+ } else { | |
+ outChannel <- strings.ToTitle(s) | |
+ } | |
} else { | |
return | |
} | |
} | |
}() | |
- return outChannel, nil | |
+ return outChannel, errorChannel, nil | |
} | |
-func sink(ctx context.Context, values <-chan string) { | |
+func sink(ctx context.Context, cancel context.CancelFunc, values <-chan string, errors <-chan error) { | |
for { | |
select { | |
case <-ctx.Done(): | |
log.Print(ctx.Err().Error()) | |
return | |
+ case err, ok := <-errors: | |
+ if ok { | |
+ cancel() | |
+ log.Print(err.Error()) | |
+ } | |
case val, ok := <-values: | |
if ok { | |
log.Println("Sink: ", val) | |
@@ -102,28 +116,32 @@ func main() { | |
} | |
stage1Channels := []<-chan string{} | |
+ errors := []<-chan error{} | |
for i := 0; i < runtime.NumCPU(); i++ { | |
- lowerCaseChannel, err := transformToLower(ctx, outputChannel) | |
+ lowerCaseChannel, lowerCaseErrors, err := transformToLower(ctx, outputChannel) | |
if err != nil { | |
log.Fatal(err) | |
} | |
stage1Channels = append(stage1Channels, lowerCaseChannel) | |
+ errors = append(errors, lowerCaseErrors) | |
} | |
stage1Merged := mergeStringChans(ctx, stage1Channels...) | |
stage2Channels := []<-chan string{} | |
for i := 0; i < runtime.NumCPU(); i++ { | |
- titleCaseChannel, err := transformToTitle(ctx, stage1Merged) | |
+ titleCaseChannel, titleCaseErrors, err := transformToTitle(ctx, stage1Merged) | |
if err != nil { | |
log.Fatal(err) | |
} | |
stage2Channels = append(stage2Channels, titleCaseChannel) | |
+ errors = append(errors, titleCaseErrors) | |
} | |
stage2Merged := mergeStringChans(ctx, stage2Channels...) | |
- sink(ctx, stage2Merged) | |
+ errorsMerged := mergeErrorChans(ctx, errors...) | |
+ sink(ctx, cancel, stage2Merged, errorsMerged) | |
} | |
func mergeStringChans(ctx context.Context, cs ...<-chan string) <-chan string { | |
@@ -153,3 +171,31 @@ func mergeStringChans(ctx context.Context, cs ...<-chan string) <-chan string { | |
return out | |
} | |
+ | |
+func mergeErrorChans(ctx context.Context, cs ...<-chan error) <-chan error { | |
+ var wg sync.WaitGroup | |
+ out := make(chan error) | |
+ | |
+ output := func(c <-chan error) { | |
+ defer wg.Done() | |
+ for n := range c { | |
+ select { | |
+ case out <- n: | |
+ case <-ctx.Done(): | |
+ return | |
+ } | |
+ } | |
+ } | |
+ | |
+ wg.Add(len(cs)) | |
+ for _, c := range cs { | |
+ go output(c) | |
+ } | |
+ | |
+ go func() { | |
+ wg.Wait() | |
+ close(out) | |
+ }() | |
+ | |
+ return out | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment