Created
May 5, 2021 05:12
-
-
Save kulti/e3514db299f48b9f859b73a9a06ec1fc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package hw06pipelineexecution | |
import ( | |
"strconv" | |
"testing" | |
"time" | |
"github.com/stretchr/testify/require" | |
) | |
func TestPipeline(t *testing.T) { | |
g := func(_ string, f func(v interface{}) interface{}) Stage { | |
return func(in In) Out { | |
out := make(Bi) | |
go func() { | |
defer close(out) | |
for v := range in { | |
out <- f(v) | |
} | |
}() | |
return out | |
} | |
} | |
stages := []Stage{ | |
g("Dummy", func(v interface{}) interface{} { return v }), | |
g("Multiplier (* 2)", func(v interface{}) interface{} { return v.(int) * 2 }), | |
g("Adder (+ 100)", func(v interface{}) interface{} { return v.(int) + 100 }), | |
g("Stringifier", func(v interface{}) interface{} { return strconv.Itoa(v.(int)) }), | |
} | |
in := make(Bi) | |
data := []int{1, 2, 3, 4, 5} | |
go func() { | |
for _, v := range data { | |
in <- v | |
} | |
close(in) | |
}() | |
result := make([]string, 0, 10) | |
for s := range ExecutePipeline(in, nil, stages...) { | |
result = append(result, s.(string)) | |
} | |
require.Equal(t, []string{"102", "104", "106", "108", "110"}, result) | |
} | |
func TestPipelineConcurrency(t *testing.T) { | |
waitCh := make(chan struct{}) | |
defer close(waitCh) | |
stageFn := func(in In) Out { | |
out := make(Bi) | |
go func() { | |
defer close(out) | |
for v := range in { | |
out <- v | |
<-waitCh | |
} | |
}() | |
return out | |
} | |
lastStageFn := func(in In) Out { | |
out := make(Bi) | |
go func() { | |
defer close(out) | |
for v := range in { | |
out <- v | |
<-waitCh | |
} | |
}() | |
return out | |
} | |
in := make(Bi) | |
const testValue = "test" | |
go func() { | |
in <- testValue | |
close(in) | |
}() | |
var resValue interface{} | |
out := ExecutePipeline(in, nil, stageFn, stageFn, lastStageFn) | |
require.Eventually(t, func() bool { | |
select { | |
case resValue = <-out: | |
return true | |
default: | |
return false | |
} | |
}, time.Second, time.Millisecond) | |
require.EqualValues(t, testValue, resValue) | |
} | |
func TestPipelineDone(t *testing.T) { | |
waitCh := make(chan struct{}) | |
defer close(waitCh) | |
stageFn := func(in In) Out { | |
out := make(Bi) | |
go func() { | |
defer close(out) | |
for v := range in { | |
<-waitCh | |
out <- v | |
} | |
}() | |
return out | |
} | |
in := make(Bi) | |
const testValue = "test" | |
go func() { | |
in <- testValue | |
close(in) | |
}() | |
doneCh := make(Bi) | |
var resValue interface{} | |
out := ExecutePipeline(in, doneCh, stageFn, stageFn, stageFn) | |
close(doneCh) | |
require.Eventually(t, func() bool { | |
select { | |
case resValue = <-out: | |
return true | |
default: | |
return false | |
} | |
}, time.Second, time.Millisecond) | |
require.Nil(t, resValue) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
а тут вообще в принципе нужен
waitCh
?кажется, что его можно убрать и ничего не изменится. по сути ты убрал слипы и мы получили тест на неблокированную работу связки каналов (ждём теперь вместо времени слипа в стейдже время на оверхед по этой работе).
плюс если его убрать, сразу покроется случай с
ну в общем, те же яйца только в профиль, не?