Skip to content

Instantly share code, notes, and snippets.

@huangsam
Last active January 28, 2019 17:45
Show Gist options
  • Save huangsam/776b6650ebd554811c1f9f2905e1d6f9 to your computer and use it in GitHub Desktop.
Save huangsam/776b6650ebd554811c1f9f2905e1d6f9 to your computer and use it in GitHub Desktop.
Go channels in action
package main
import "fmt"
func main() {
jobs := make(chan int)
done := make(chan bool)
// Wait for job and send done signal
go func() {
j := <-jobs
fmt.Println("received job", j)
done <- true
}()
// Send job
jobs <- 1
// Receive done signal
if <-done == true {
fmt.Println("done")
} else {
fmt.Println("not done")
}
}
package main
import "log"
func integers() chan int {
yield := make(chan int)
count := 0
// Does not block on anything
go func() {
for i := 0; i < 100; i += 10 {
yield <- i
yield <- count
count++
}
close(yield)
}()
return yield
}
func main() {
resume := integers()
// Block on resume until close(yield)
for v := range resume {
log.Println(v)
}
}
package main
import "fmt"
func main() {
// Buffer 4 without concurrent receive
queue := make(chan string, 4)
queue <- "one"
queue <- "two"
queue <- "three"
queue <- "four"
close(queue)
// Simple retrieval from queue
fmt.Println(<-queue)
fmt.Println(<-queue)
// Retrieves last couple of messages
for i := range queue {
fmt.Println(i)
}
}
package main
import (
"fmt"
"sync"
"time"
)
func worker(wg *sync.WaitGroup, numbers chan int, i int) {
defer wg.Done()
time.Sleep(50000000)
numbers <- i
fmt.Printf("goroutine %v done\n", i)
}
func monitor(wg *sync.WaitGroup, numbers chan int) {
wg.Wait()
fmt.Println("all goroutines done")
close(numbers)
}
func main() {
wg := &sync.WaitGroup{}
numbers := make(chan int)
// Add 10 new workers to WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(wg, numbers, i)
}
// Wait on all workers
go monitor(wg, numbers)
// Block until close(numbers)
for result := range numbers {
fmt.Println(result)
}
}
package main
import (
"fmt"
"time"
)
// Payload consists of identifiers
type Payload struct {
msgID int
jobID int
}
func (p Payload) String() string {
return fmt.Sprintf("{msg: %v, job: %v}", p.msgID, p.jobID)
}
func submit(queue chan int, i int) {
fmt.Printf("goroutine %v start\n", i)
jobID := 0
// Sleep between each job submit
for {
queue <- jobID
jobID += 25
time.Sleep(1 * time.Millisecond)
}
}
func receive(queue chan int) {
count := 1
done := time.After(5 * time.Millisecond)
for {
// Poll for channel messages until work is done
select {
case message := <-queue:
p := &Payload{msgID: count, jobID: message}
fmt.Println(*p)
count++
case <-done:
return
}
}
}
func main() {
queue := make(chan int)
// Create 3 new workers
for i := 0; i < 3; i++ {
go submit(queue, i)
}
receive(queue)
}
package main
import "fmt"
func main() {
done := make(chan bool)
nums := make(chan int)
// Send nums to range receiver
go func() {
for i := 0; i < 9; i++ {
nums <- i
}
done <- true
}()
// Avoid deadlock between channels
go func() {
<-done
close(nums)
}()
// Initiate num retrieval
for v := range nums {
fmt.Println(v)
}
}
package main
import (
"fmt"
"sync"
)
func main() {
in := gen(1000000)
// Run fan-out
cs := []<-chan int{
sq(in), sq(in), sq(in),
sq(in), sq(in), sq(in),
sq(in), sq(in), sq(in),
sq(in), sq(in), sq(in),
}
// Run fan-in
for v := range merge(cs...) {
fmt.Println(v)
}
}
func gen(n int) <-chan int {
out := make(chan int)
// Feed input to the pipeline
go func() {
for i := 0; i < n; i++ {
out <- i
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
// Apply square transformation
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(cs))
// Combine N channels into 1 channel
for _, c := range cs {
// Bind channel to goroutine
go func(ch <-chan int) {
for n := range ch {
out <- n
}
wg.Done()
}(c)
}
// Sync on completion of channels
go func() {
wg.Wait()
close(out)
}()
return out
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment