You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
funcgen(nums...int) <-chanint {
out:=make(chanint)
gofunc() {
for_, n:=rangenums {
out<-n
}
close(out)
}()
returnout
}
funcsq(in<-chanint) <-chanint {
out:=make(chanint)
gofunc() {
forn:=rangein {
out<-n*n
}
close(out)
}()
returnout
}
funcmain() {
// Set up the pipeline and consume the output.forn:=rangesq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
Fan-in/Fan-out
funcmerge(cs...<-chanint) <-chanint {
varwg sync.WaitGroupout:=make(chanint)
// Start an output goroutine for each input channel in cs. output// copies values from c to out until c is closed, then calls wg.Done.output:=func(c<-chanint) {
forn:=rangec {
out<-n
}
wg.Done()
}
wg.Add(len(cs))
for_, c:=rangecs {
gooutput(c)
}
// Start a goroutine to close out once all the output goroutines are// done. This must start after the wg.Add call.gofunc() {
wg.Wait()
close(out)
}()
returnout
}
funcmain() {
in:=gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.c1:=sq(in)
c2:=sq(in)
// Consume the merged output from c1 and c2.forn:=rangemerge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
Stopping Short / Explicit Cancellation
funcsq(done<-chanstruct{}, in<-chanint) <-chanint {
out:=make(chanint)
gofunc() {
deferclose(out)
forn:=rangein {
select {
caseout<-n*n:
case<-done:
return
}
}
}()
returnout
}
funcmerge(done<-chanstruct{}, cs...<-chanint) <-chanint {
varwg sync.WaitGroupout:=make(chanint)
// Start an output goroutine for each input channel in cs. output// copies values from c to out until c or done is closed, then calls// wg.Done.output:=func(c<-chanint) {
deferwg.Done()
forn:=rangec {
select {
caseout<-n:
case<-done:
return
}
}
}
wg.Add(len(cs))
for_, c:=rangecs {
gooutput(c)
}
// Start a goroutine to close out once all the output goroutines are// done. This must start after the wg.Add call.gofunc() {
wg.Wait()
close(out)
}()
returnout
}
funcmain() {
// Set up a done channel that's shared by the whole pipeline,// and close that channel when this pipeline exits, as a signal// for all the goroutines we started to exit.done:=make(chanstruct{})
deferclose(done)
in:=gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.c1:=sq(done, in)
c2:=sq(done, in)
// Consume the first value from output.out:=merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9// done will be closed by the deferred call.
}
Bounded Parallelism
// +build OMITpackage main
import (
"crypto/md5""errors""fmt""io/ioutil""os""path/filepath""sort""sync"
)
// walkFiles starts a goroutine to walk the directory tree at root and send the// path of each regular file on the string channel. It sends the result of the// walk on the error channel. If done is closed, walkFiles abandons its work.funcwalkFiles(done<-chanstruct{}, rootstring) (<-chanstring, <-chanerror) {
paths:=make(chanstring)
errc:=make(chanerror, 1)
gofunc() { // HL// Close the paths channel after Walk returns.deferclose(paths) // HL// No select needed for this send, since errc is buffered.errc<-filepath.Walk(root, func(pathstring, info os.FileInfo, errerror) error { // HLiferr!=nil {
returnerr
}
if!info.Mode().IsRegular() {
returnnil
}
select {
casepaths<-path: // HLcase<-done: // HLreturnerrors.New("walk canceled")
}
returnnil
})
}()
returnpaths, errc
}
// A result is the product of reading and summing a file using MD5.typeresultstruct {
pathstringsum [md5.Size]byteerrerror
}
// digester reads path names from paths and sends digests of the corresponding// files on c until either paths or done is closed.funcdigester(done<-chanstruct{}, paths<-chanstring, cchan<-result) {
forpath:=rangepaths { // HLpathsdata, err:=ioutil.ReadFile(path)
select {
casec<-result{path, md5.Sum(data), err}:
case<-done:
return
}
}
}
// MD5All reads all the files in the file tree rooted at root and returns a map// from file path to the MD5 sum of the file's contents. If the directory walk// fails or any read operation fails, MD5All returns an error. In that case,// MD5All does not wait for inflight read operations to complete.funcMD5All(rootstring) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before// receiving all the values from c and errc.done:=make(chanstruct{})
deferclose(done)
paths, errc:=walkFiles(done, root)
// Start a fixed number of goroutines to read and digest files.c:=make(chanresult) // HLcvarwg sync.WaitGroupconstnumDigesters=20wg.Add(numDigesters)
fori:=0; i<numDigesters; i++ {
gofunc() {
digester(done, paths, c) // HLcwg.Done()
}()
}
gofunc() {
wg.Wait()
close(c) // HLc
}()
// End of pipeline. OMITm:=make(map[string][md5.Size]byte)
forr:=rangec {
ifr.err!=nil {
returnnil, r.err
}
m[r.path] =r.sum
}
// Check whether the Walk failed.iferr:=<-errc; err!=nil { // HLerrcreturnnil, err
}
returnm, nil
}
funcmain() {
// Calculate the MD5 sum of all files under the specified directory,// then print the results sorted by path name.m, err:=MD5All(os.Args[1])
iferr!=nil {
fmt.Println(err)
return
}
varpaths []stringforpath:=rangem {
paths=append(paths, path)
}
sort.Strings(paths)
for_, path:=rangepaths {
fmt.Printf("%x %s\n", m[path], path)
}
}