Skip to content

Instantly share code, notes, and snippets.

@amitbet
Last active June 4, 2019 08:03
Show Gist options
  • Save amitbet/9b14c4004d0f7b697f855bb9915e3d4a to your computer and use it in GitHub Desktop.
Save amitbet/9b14c4004d0f7b697f855bb9915e3d4a to your computer and use it in GitHub Desktop.
multi producer & consumer example in golang
package main
import (
"fmt"
"runtime"
"strconv"
"time"
)
func main() {
//define more threads to assure we have parallelizm in the program
runtime.GOMAXPROCS(4)
start := time.Now()
//create a master kill channel
quit := make(chan bool)
//run the multi-producer + multi-consumer operation
go prodConRun("myrun", quit, 50, 50)
//let it have some time to play
time.Sleep(3000 * time.Millisecond)
//kill everything with a message
fmt.Println("Quitting!")
quit <- true
//wait for everything to end before exiting the process
time.Sleep(60 * time.Millisecond)
elapsed := time.Now().Sub(start)
fmt.Printf("#messages %d, time spent: %v", countConsume, elapsed)
}
func prodConRun(name string, quit chan bool, numProducers, numConsumers int) {
//create a message exchange channel
c := make(chan string)
//a slice to hold all inner quit channels
var quits []chan bool
//run producers
for i := 1; i <= numProducers; i++ {
quit := make(chan bool)
quits = append(quits, quit)
go produce(i, c, quit)
}
//run consumers
for i := 1; i <= numConsumers; i++ {
quit := make(chan bool)
quits = append(quits, quit)
go consume(name+" con"+strconv.Itoa(i), c, quit)
}
//listen to quit and fan it out to the workers
if <-quit {
fmt.Println("Quitting All!")
for i, q := range quits {
fmt.Printf("Sending Quit: %d/%d\n", i+1, len(quits))
q <- true
}
}
}
func produce(i int, co chan<- string, quit chan bool) {
j := 1
for {
name := fmt.Sprintf("prod%d", i)
co <- fmt.Sprintf("hi from %d.%d", i, j)
j++
select {
//check quit chan
case res := <-quit:
if res {
fmt.Println(name + " is done!")
return
}
//instead of using default, we can also introduce a timeout to slow things down to a managable speed:
// case <-time.After(5 * time.Millisecond):
// continue
// }
default:
}
}
}
var countConsume int = 0
func consume(name string, c chan string, quit chan bool) {
for {
select {
case <-c:
countConsume++
//case msg := <-c:
//fmt.Println(name + ":" + msg)
case done := <-quit:
fmt.Println(name + " is done!")
if done {
return
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment