-
-
Save gtrevg/3a13e2366fb5ee07f98593c23958f22c to your computer and use it in GitHub Desktop.
pipelines in go (http://blog.golang.org/pipelines)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
test.nt | |
test.ldj |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// +build OMIT | |
package main | |
import ( | |
"crypto/md5" | |
"errors" | |
"fmt" | |
"io/ioutil" | |
"os" | |
"path/filepath" | |
"sort" | |
"sync" | |
) | |
// walkFiles starts a goroutine to walk the directory tree at root and send the | |
// path of each regular file on the string channel. It sends the result of the | |
// walk on the error channel. If done is closed, walkFiles abandons its work. | |
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { | |
paths := make(chan string) | |
errc := make(chan error, 1) | |
go func() { // HL | |
// Close the paths channel after Walk returns. | |
defer close(paths) // HL | |
// No select needed for this send, since errc is buffered. | |
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL | |
if err != nil { | |
return err | |
} | |
if !info.Mode().IsRegular() { | |
return nil | |
} | |
select { | |
case paths <- path: // HL | |
case <-done: // HL | |
return errors.New("walk canceled") | |
} | |
return nil | |
}) | |
}() | |
return paths, errc | |
} | |
// A result is the product of reading and summing a file using MD5. | |
type result struct { | |
path string | |
sum [md5.Size]byte | |
err error | |
} | |
// digester reads path names from paths and sends digests of the corresponding | |
// files on c until either paths or done is closed. | |
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { | |
for path := range paths { // HLpaths | |
data, err := ioutil.ReadFile(path) | |
select { | |
case c <- result{path, md5.Sum(data), err}: | |
case <-done: | |
return | |
} | |
} | |
} | |
// MD5All reads all the files in the file tree rooted at root and returns a map | |
// from file path to the MD5 sum of the file's contents. If the directory walk | |
// fails or any read operation fails, MD5All returns an error. In that case, | |
// MD5All does not wait for inflight read operations to complete. | |
func MD5All(root string) (map[string][md5.Size]byte, error) { | |
// MD5All closes the done channel when it returns; it may do so before | |
// receiving all the values from c and errc. | |
done := make(chan struct{}) | |
defer close(done) | |
paths, errc := walkFiles(done, root) | |
// Start a fixed number of goroutines to read and digest files. | |
c := make(chan result) // HLc | |
var wg sync.WaitGroup | |
const numDigesters = 20 | |
wg.Add(numDigesters) | |
for i := 0; i < numDigesters; i++ { | |
go func() { | |
digester(done, paths, c) // HLc | |
wg.Done() | |
}() | |
} | |
go func() { | |
wg.Wait() | |
close(c) // HLc | |
}() | |
// End of pipeline. OMIT | |
m := make(map[string][md5.Size]byte) | |
for r := range c { | |
if r.err != nil { | |
return nil, r.err | |
} | |
m[r.path] = r.sum | |
} | |
// Check whether the Walk failed. | |
if err := <-errc; err != nil { // HLerrc | |
return nil, err | |
} | |
return m, nil | |
} | |
func main() { | |
// Calculate the MD5 sum of all files under the specified directory, | |
// then print the results sorted by path name. | |
dir := "." | |
if len(os.Args) >= 2 { | |
dir = os.Args[1] | |
} | |
m, err := MD5All(dir) | |
if err != nil { | |
fmt.Println(err) | |
return | |
} | |
var paths []string | |
for path := range m { | |
paths = append(paths, path) | |
} | |
sort.Strings(paths) | |
for _, path := range paths { | |
fmt.Printf("%x %s\n", m[path], path) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
) | |
func main() { | |
ch1 := make(chan int) | |
go pump(ch1) | |
fmt.Println(<-ch1) | |
} | |
// example of a generator | |
func pump(ch chan int) { | |
for i := 0; ; i++ { | |
ch <- i | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
) | |
type Reply struct{} | |
type Request struct { | |
a, b int | |
replyc chan int | |
} | |
type binOp func(a, b int) int | |
func run(op binOp, req *Request) { | |
req.replyc <- op(req.a, req.b) | |
} | |
func server(op binOp, service chan *Request) { | |
for { | |
req := <-service | |
go run(op, req) | |
} | |
} | |
func startServer(op binOp) chan *Request { | |
reqChan := make(chan *Request) | |
go server(op, reqChan) | |
return reqChan | |
} | |
func main() { | |
adder := startServer(func(a, b int) int { return a + b }) | |
const N = 100 | |
var reqs [N]Request | |
for i := 0; i < N; i++ { | |
req := &reqs[i] | |
req.a = i | |
req.b = i + N | |
req.replyc = make(chan int) | |
adder <- req | |
} | |
for i := N - 1; i >= 0; i-- { | |
if <-reqs[i].replyc != N+2*i { | |
fmt.Println("fail at", i) | |
} else { | |
fmt.Println("request", i, "is ok") | |
} | |
} | |
fmt.Println("done") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// In main() | |
// About to sleep in main() | |
// Beginning longWait() | |
// Beginning shortWait() | |
// Exiting shortWait() | |
// Exiting longWait() | |
// End main() | |
import ( | |
"fmt" | |
"time" | |
) | |
func main() { | |
fmt.Println("In main()") | |
go longWait() | |
go shortWait() | |
fmt.Println("About to sleep in main()") | |
time.Sleep(10 * 1e9) | |
fmt.Println("End main()") | |
} | |
func longWait() { | |
fmt.Println("Beginning longWait()") | |
time.Sleep(5 * 1e9) | |
fmt.Println("Exiting longWait()") | |
} | |
func shortWait() { | |
fmt.Println("Beginning shortWait()") | |
time.Sleep(2 * 1e9) | |
fmt.Println("Exiting shortWait()") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
func main() { | |
ch := make(chan string) | |
// remove one or both "go" keywords and get a | |
// fatal error: all goroutines are asleep - deadlock! | |
go sendData(ch) | |
go getData(ch) | |
time.Sleep(1e9) // time.Sleep(1e5) | |
fmt.Println() | |
} | |
func sendData(ch chan string) { | |
ch <- "Washington" | |
ch <- "Tripoli" | |
ch <- "London" | |
ch <- "Beijing" | |
ch <- "Tokio" | |
} | |
func getData(ch chan string) { | |
var input string | |
for { | |
input = <-ch | |
fmt.Printf("%s ", input) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
"time" | |
) | |
func main() { | |
runtime.GOMAXPROCS(4) | |
ch1 := make(chan int) | |
ch2 := make(chan int) | |
go pump1(ch1) | |
go pump2(ch2) | |
go suck(ch1, ch2) | |
time.Sleep(1e9) | |
} | |
func pump1(ch chan int) { | |
for i := 0; ; i++ { | |
ch <- i * 2 | |
} | |
} | |
func pump2(ch chan int) { | |
for i := 0; ; i++ { | |
ch <- i + 5 | |
} | |
} | |
func suck(ch1, ch2 chan int) { | |
for { | |
select { | |
case v := <-ch1: | |
fmt.Println("on line 1", v) | |
case v := <-ch2: | |
fmt.Println("on line 2", v) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// test various marc access patterns | |
package main | |
import ( | |
"crypto/sha512" | |
"errors" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"runtime" | |
"strconv" | |
"sync" | |
// "time" | |
) | |
// recordLength returns the length of the marc record as stored in the leader | |
func recordLength(reader io.Reader) (length int64, err error) { | |
var l int | |
data := make([]byte, 24) | |
n, err := reader.Read(data) | |
if err != nil { | |
return 0, err | |
} else { | |
if n != 24 { | |
errs := fmt.Sprintf("MARC21: invalid leader: expected 24 bytes, read %d", n) | |
err = errors.New(errs) | |
} else { | |
l, err = strconv.Atoi(string(data[0:5])) | |
if err != nil { | |
errs := fmt.Sprintf("MARC21: invalid record length: %s", err) | |
err = errors.New(errs) | |
} | |
} | |
} | |
return int64(l), err | |
} | |
func Worker(queue chan *[]byte, out chan *string, wg *sync.WaitGroup) { | |
defer wg.Done() | |
for br := range queue { | |
result := fmt.Sprintf("%x", sha512.Sum512(*br)) | |
out <- &result | |
} | |
} | |
func Collector(in chan *string, done chan bool) { | |
for sr := range in { | |
fmt.Println(*sr) | |
} | |
done <- true | |
} | |
// func parseRecord(buffer *[]byte) string { | |
// // fmt.Printf("%x\n", sha512.Sum512(*buffer)) | |
// // return sha512.Sum512(*buffer) | |
// return fmt.Sprintf("%x", sha512.Sum512(*buffer)) | |
// } | |
// RecordCount count the number of records in marc file | |
func RecordCount(filename string) int64 { | |
// runtime.GOMAXPROCS(runtime.NumCPU()) | |
runtime.GOMAXPROCS(1) | |
handle, err := os.Open(filename) | |
if err != nil { | |
log.Fatalf("%s\n", err) | |
} | |
defer func() { | |
if err := handle.Close(); err != nil { | |
log.Fatalf("%s\n", err) | |
} | |
}() | |
var i, cumulative int64 | |
// queue := make(chan *[]byte) | |
// results := make(chan *string) | |
// done := make(chan bool) | |
// go Collector(results, done) | |
// var wg sync.WaitGroup | |
// for i := 0; i < runtime.NumCPU(); i++ { | |
// wg.Add(1) | |
// go Worker(queue, results, &wg) | |
// } | |
for { | |
length, err := recordLength(handle) | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
log.Fatalf("%s\n", err) | |
} | |
i += 1 | |
cumulative += length | |
handle.Seek(cumulative, 0) | |
// buf := make([]byte, length-24) | |
// _, err = handle.Read(buf) | |
// if err != nil { | |
// log.Fatalln(err) | |
// } | |
// queue <- &buf | |
} | |
// close(queue) | |
// wg.Wait() | |
// close(results) | |
// select { | |
// case <-time.After(1e9): | |
// break | |
// case <-done: | |
// break | |
// } | |
return i | |
} | |
func main() { | |
fmt.Println(RecordCount(os.Args[1])) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"crypto/md5" | |
"fmt" | |
"io/ioutil" | |
"os" | |
"path/filepath" | |
"sort" | |
) | |
func MD5All(root string) (map[string][md5.Size]byte, error) { | |
m := make(map[string][md5.Size]byte) | |
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { | |
if err != nil { | |
return err | |
} | |
if !info.Mode().IsRegular() { | |
return nil | |
} | |
data, err := ioutil.ReadFile(path) | |
if err != nil { | |
return err | |
} | |
m[path] = md5.Sum(data) | |
return nil | |
}) | |
if err != nil { | |
return nil, err | |
} | |
return m, nil | |
} | |
func main() { | |
dir := "." | |
if len(os.Args) >= 2 { | |
dir = os.Args[1] | |
} | |
m, err := MD5All(dir) | |
if err != nil { | |
fmt.Println(err) | |
return | |
} | |
var paths []string | |
for path := range m { | |
paths = append(paths, path) | |
} | |
sort.Strings(paths) | |
for _, path := range paths { | |
fmt.Printf("%x %s\n", m[path], path) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// The MD5All implementation in parallel.go starts a new goroutine for each file. | |
// In a directory with many large files, this may allocate more memory than is available on the machine. | |
import ( | |
"crypto/md5" | |
"errors" | |
"fmt" | |
"io/ioutil" | |
"os" | |
"path/filepath" | |
"sort" | |
"sync" | |
) | |
type result struct { | |
path string | |
sum [md5.Size]byte | |
err error | |
} | |
func MD5All(root string) (map[string][md5.Size]byte, error) { | |
// MD5All closes the done channel when it returns; it may do so before | |
// receiving all the values from c and errc | |
done := make(chan struct{}) | |
defer close(done) | |
c, errc := sumFiles(done, root) | |
m := make(map[string][md5.Size]byte) | |
for r := range c { | |
if r.err != nil { | |
return nil, r.err | |
} | |
m[r.path] = r.sum | |
} | |
if err := <-errc; err != nil { | |
return nil, err | |
} | |
return m, nil | |
} | |
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { | |
// For each regular file, start a goroutine that sums the file and sends | |
// the result on c. Send the result of the walk on errc. | |
c := make(chan result) | |
errc := make(chan error, 1) | |
go func() { | |
var wg sync.WaitGroup | |
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { | |
if err != nil { | |
return err | |
} | |
if !info.Mode().IsRegular() { | |
return nil | |
} | |
wg.Add(1) | |
go func() { | |
data, err := ioutil.ReadFile(path) | |
select { | |
case c <- result{path, md5.Sum(data), err}: | |
case <-done: | |
} | |
wg.Done() | |
}() | |
// Abort the walk if done is closed | |
select { | |
case <-done: | |
return errors.New("walk cancelled") | |
default: | |
return nil | |
} | |
}) | |
// Walk has returned, so all calls to wg.Add are done. Start a | |
// goroutine to close c once all the sends are done | |
go func() { | |
wg.Wait() | |
close(c) | |
}() | |
errc <- err | |
}() | |
return c, errc | |
} | |
func main() { | |
dir := "." | |
if len(os.Args) >= 2 { | |
dir = os.Args[1] | |
} | |
m, err := MD5All(dir) | |
if err != nil { | |
fmt.Println(err) | |
return | |
} | |
var paths []string | |
for path := range m { | |
paths = append(paths, path) | |
} | |
sort.Strings(paths) | |
for _, path := range paths { | |
fmt.Printf("%x %s\n", m[path], path) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
"time" | |
) | |
func DoPart(sem chan int) { | |
fmt.Println("Computing...") | |
time.Sleep(1e9) | |
sem <- 1 | |
} | |
func DoAll() { | |
NCPU := runtime.NumCPU() | |
sem := make(chan int, NCPU) | |
for i := 0; i < NCPU; i++ { | |
go DoPart(sem) | |
} | |
for i := 0; i < NCPU; i++ { | |
<-sem | |
} | |
// All done. | |
} | |
func main() { | |
DoAll() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
func processChannel(in <-chan int, out chan<- string) { | |
for inValue := range in { | |
result := fmt.Sprintf("Hello %d", inValue) | |
out <- result | |
} | |
} | |
func main() { | |
receiveChan := make(chan string) | |
sendChan := make(chan int) | |
go func() { | |
for value := range receiveChan { | |
fmt.Println(value) | |
} | |
}() | |
go processChannel(sendChan, receiveChan) | |
for i := 0; i < 10; i++ { | |
sendChan <- i | |
} | |
close(sendChan) | |
time.Sleep(1e7) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
// "runtime" | |
) | |
// turn int arguments into channel | |
func gen(done <-chan struct{}, nums ...int) <-chan int { | |
out := make(chan int) | |
go func() { | |
defer close(out) | |
for _, n := range nums { | |
select { | |
case out <- n: | |
case <-done: | |
return | |
} | |
} | |
}() | |
return out | |
} | |
// take a channel of ints and square them | |
func sq(done <-chan struct{}, in <-chan int) <-chan int { | |
out := make(chan int) | |
go func() { | |
defer close(out) | |
for n := range in { | |
select { | |
case out <- n: | |
case <-done: | |
return | |
} | |
} | |
}() | |
return out | |
} | |
func infiniteCounter() <-chan int { | |
out := make(chan int) | |
current := 0 | |
go func() { | |
for { | |
out <- current | |
current += 1 | |
} | |
}() | |
return out | |
} | |
// func main() { | |
// runtime.GOMAXPROCS(runtime.NumCPU()) | |
// var wg sync.WaitGroup | |
// counterC := infiniteCounter() | |
// for i := 0; i < 10000; i++ { | |
// wg.Add(1) | |
// go func(id int, counter <-chan int) { | |
// defer wg.Done() | |
// for i := 0; i < 10000; i++ { | |
// value := <-counter | |
// fmt.Printf("Worker %d: %d\n", id, value) | |
// } | |
// }(i, counterC) | |
// } | |
// wg.Wait() | |
// } | |
// func main() { | |
// c := gen(2, 3) | |
// out := sq(c) | |
// fmt.Println(<-out) | |
// fmt.Println(<-out) | |
// } | |
// func main() { | |
// // Set up the pipeline and consume the output. | |
// for n := range sq(sq(gen(2, 3))) { | |
// fmt.Println(n) // 16 then 81 | |
// } | |
// } | |
// func merge(cs ...<-chan int) <-chan int { | |
// var wg sync.WaitGroup | |
// out := make(chan int) | |
// // Start an output goroutine for each input channel in cs. | |
// output := func(c <-chan int) { | |
// for n := range c { | |
// out <- n | |
// } | |
// wg.Done() | |
// } | |
// wg.Add(len(cs)) | |
// for _, c := range cs { | |
// go output(c) | |
// } | |
// go func() { | |
// wg.Wait() | |
// close(out) | |
// }() | |
// return out | |
// } | |
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { | |
var wg sync.WaitGroup | |
out := make(chan int) | |
output := func(c <-chan int) { | |
defer wg.Done() | |
for n := range c { | |
select { | |
case out <- n: | |
case <-done: | |
return | |
} | |
} | |
wg.Done() | |
} | |
wg.Add(len(cs)) | |
for _, c := range cs { | |
go output(c) | |
} | |
go func() { | |
wg.Wait() | |
close(out) | |
}() | |
return out | |
} | |
func main() { | |
done := make(chan struct{}) | |
defer close(done) | |
in := gen(done, 5, 3) | |
// Distribute the sq work across two goroutines that both read from in. | |
c1 := sq(done, in) | |
c2 := sq(done, in) | |
// The merge function converts a list of channels to a single channel | |
// by starting a goroutine for each inbound channel that copies | |
// the values to the sole outbound channel. | |
// for n := range merge(c1, c2) { | |
// fmt.Println(n) | |
// } | |
// This is a resource leak: goroutines consume memory and runtime resources, | |
// and heap references in goroutine stacks keep data from being garbage | |
// collected. Goroutines are not garbage collected; they must exit on their own. | |
// out := merge(c1, c2) | |
// fmt.Println(<-out) // 4 or 9 | |
// panic(nil) | |
out := merge(done, c1, c2) | |
fmt.Println(<-out) | |
panic(nil) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
type Empty interface{} | |
type semaphore chan Empty | |
// acquire n resources | |
func (s semaphore) P(n int) { | |
e := new(Empty) | |
for i := 0; i < n; i++ { | |
s <- e | |
} | |
} | |
// release n resources | |
func (s semaphore) V(n int) { | |
for i := 0; i < n; i++ { | |
<-s | |
} | |
} | |
func (s semaphore) Lock() { | |
s.P(1) | |
} | |
func (s semaphore) Unlock() { | |
s.V(1) | |
} | |
func (s semaphore) Wait(n int) { | |
s.P(n) | |
} | |
func (s semaphore) Signal() { | |
s.V(1) | |
} | |
func main() { | |
N := 4 | |
_ = make(semaphore, N) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
) | |
func generate(ch chan int) { | |
for i := 2; ; i++ { | |
ch <- i | |
} | |
} | |
func filter(in, out chan int, prime int) { | |
for { | |
i := <-in | |
if i%prime != 0 { | |
out <- i | |
} | |
} | |
} | |
func main() { | |
runtime.GOMAXPROCS(runtime.NumCPU() * 2) | |
ch := make(chan int) | |
go generate(ch) | |
for { | |
prime := <-ch | |
fmt.Print(prime, " ") | |
ch1 := make(chan int) | |
go filter(ch, ch1, prime) | |
ch = ch1 | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
) | |
func generate() chan int { | |
ch := make(chan int) | |
go func() { | |
for i := 2; ; i++ { | |
ch <- i | |
} | |
}() | |
return ch | |
} | |
func filter(in chan int, prime int) chan int { | |
out := make(chan int) | |
go func() { | |
for { | |
if i := <-in; i%prime != 0 { | |
out <- i | |
} | |
} | |
}() | |
return out | |
} | |
func sieve() chan int { | |
runtime.GOMAXPROCS(runtime.NumCPU() * 2) | |
out := make(chan int) | |
go func() { | |
ch := generate() | |
for { | |
prime := <-ch | |
ch = filter(ch, prime) | |
out <- prime | |
} | |
}() | |
return out | |
} | |
func main() { | |
primes := sieve() | |
for { | |
fmt.Println(<-primes) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
"strings" | |
"time" | |
) | |
type Task struct { | |
Name string | |
} | |
func process(task *Task) { | |
task.Name = strings.ToLower(task.Name) | |
} | |
func Worker(in, out chan *Task) { | |
for { | |
t := <-in | |
process(t) | |
out <- t | |
} | |
} | |
func sendWork(out chan *Task) { | |
for i := 0; i < 1000000; i++ { | |
out <- &Task{Name: fmt.Sprintf("HELLO %d!", i)} | |
} | |
} | |
func consumeWork(in chan *Task) { | |
for task := range in { | |
fmt.Printf("RX: %+v\n", *task) | |
} | |
} | |
func main() { | |
runtime.GOMAXPROCS(4) | |
pending, done := make(chan *Task), make(chan *Task) | |
N := 16 | |
go sendWork(pending) // put tasks with work on the channel | |
for i := 0; i < N; i++ { | |
go Worker(pending, done) | |
} | |
go consumeWork(done) | |
time.Sleep(1e9) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
func main() { | |
tick := time.Tick(1e8) | |
boom := time.After(5e8) | |
for { | |
select { | |
case <-tick: | |
fmt.Println("tick") | |
case <-boom: | |
fmt.Println("BOOM") | |
return | |
default: | |
fmt.Println(".") | |
time.Sleep(5e7) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// no error handling | |
// 60s to convert 10M lines (1.3G) | |
// 10min for GND | |
// 60min for DBP | |
// 5h+ for Freebase | |
import ( | |
"bufio" | |
"encoding/json" | |
"fmt" | |
"log" | |
"os" | |
"runtime" | |
) | |
type Triple struct { | |
Subject string | |
Predicate string | |
Object string | |
} | |
type Work struct { | |
Text string | |
Options map[string]interface{} | |
ReplyChan chan *Triple | |
} | |
func CollectTriples(triples chan *Triple) { | |
for { | |
triple := <-triples | |
if triple == nil { | |
break | |
} | |
encoded, err := json.Marshal(triple) | |
if err != nil { | |
log.Fatalln(err) | |
} | |
fmt.Printf("%s\n", encoded) | |
} | |
} | |
// the leaner, the better, since there is only one collector | |
func CollectStrings(lines chan *[]byte) { | |
for { | |
line := <-lines | |
if line == nil { | |
break | |
} | |
fmt.Printf("%s\n", string(*line)) | |
} | |
} | |
// should do all the heavy lifting | |
func Worker(workChan chan *Work, resultChan chan *[]byte) { | |
for { | |
work := <-workChan | |
triple := Triple{Subject: "triple", Predicate: "length", Object: fmt.Sprintf("%d", len(work.Text))} | |
encoded, err := json.Marshal(triple) | |
if err != nil { | |
log.Fatalln(err) | |
} | |
// resultChan <- &triple | |
resultChan <- &encoded | |
} | |
} | |
func main() { | |
runtime.GOMAXPROCS(runtime.NumCPU()) | |
filename := "test.nt" | |
// fmt.Println(os.Args[0]) | |
file, err := os.Open(filename) | |
defer file.Close() | |
if err != nil { | |
log.Fatalln(err) | |
} | |
options := make(map[string]interface{}) | |
queueChan := make(chan *Work) | |
// resultChan := make(chan *Triple) | |
resultChan := make(chan *[]byte) | |
for i := 0; i < runtime.NumCPU(); i++ { | |
go Worker(queueChan, resultChan) | |
} | |
go CollectStrings(resultChan) | |
reader := bufio.NewReader(file) | |
for { | |
b, _, err := reader.ReadLine() | |
if err != nil || b == nil { | |
break | |
} | |
line := string(b) | |
work := Work{Text: line, Options: options} | |
queueChan <- &work | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"crypto/sha1" | |
"encoding/json" | |
"fmt" | |
"os" | |
"runtime" | |
"sync" | |
"time" | |
) | |
type Work struct { | |
Id string | |
Hash string | |
} | |
func Worker(id int, queue chan *Work, results chan *string, wg *sync.WaitGroup) { | |
defer wg.Done() | |
for work := range queue { | |
work.Hash = fmt.Sprintf("%x", sha1.Sum([]byte(work.Id))) | |
b, err := json.Marshal(work) | |
if err != nil { | |
fmt.Fprintln(os.Stderr, err) | |
continue | |
} | |
line := string(b) | |
results <- &line | |
} | |
} | |
func Collector(results chan *string, done chan bool) { | |
for line := range results { | |
fmt.Println(*line) | |
} | |
done <- true | |
} | |
func main() { | |
numWorkers := 4 // runtime.NumCPU() | |
runtime.GOMAXPROCS(numWorkers) | |
// runtime.GOMAXPROCS(1) | |
queue := make(chan *Work) | |
results := make(chan *string) | |
done := make(chan bool) | |
go Collector(results, done) | |
// worker waitgroup | |
wg := new(sync.WaitGroup) | |
for i := 0; i < numWorkers; i++ { | |
wg.Add(1) | |
go Worker(i, queue, results, wg) | |
} | |
// send tasks to the queue | |
for i := 0; i < 2000000; i++ { | |
work := Work{Id: fmt.Sprintf("%d", i)} | |
queue <- &work | |
} | |
// wait for the workers to finish | |
close(queue) | |
wg.Wait() | |
// wait for the collector to finish | |
close(results) | |
select { | |
case <-time.After(1e9): | |
break | |
case <-done: | |
break | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment