Last active
June 4, 2019 08:03
-
-
Save amitbet/9b14c4004d0f7b697f855bb9915e3d4a to your computer and use it in GitHub Desktop.
multi producer & consumer example in golang
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"runtime" | |
"strconv" | |
"time" | |
) | |
func main() { | |
//define more threads to assure we have parallelizm in the program | |
runtime.GOMAXPROCS(4) | |
start := time.Now() | |
//create a master kill channel | |
quit := make(chan bool) | |
//run the multi-producer + multi-consumer operation | |
go prodConRun("myrun", quit, 50, 50) | |
//let it have some time to play | |
time.Sleep(3000 * time.Millisecond) | |
//kill everything with a message | |
fmt.Println("Quitting!") | |
quit <- true | |
//wait for everything to end before exiting the process | |
time.Sleep(60 * time.Millisecond) | |
elapsed := time.Now().Sub(start) | |
fmt.Printf("#messages %d, time spent: %v", countConsume, elapsed) | |
} | |
func prodConRun(name string, quit chan bool, numProducers, numConsumers int) { | |
//create a message exchange channel | |
c := make(chan string) | |
//a slice to hold all inner quit channels | |
var quits []chan bool | |
//run producers | |
for i := 1; i <= numProducers; i++ { | |
quit := make(chan bool) | |
quits = append(quits, quit) | |
go produce(i, c, quit) | |
} | |
//run consumers | |
for i := 1; i <= numConsumers; i++ { | |
quit := make(chan bool) | |
quits = append(quits, quit) | |
go consume(name+" con"+strconv.Itoa(i), c, quit) | |
} | |
//listen to quit and fan it out to the workers | |
if <-quit { | |
fmt.Println("Quitting All!") | |
for i, q := range quits { | |
fmt.Printf("Sending Quit: %d/%d\n", i+1, len(quits)) | |
q <- true | |
} | |
} | |
} | |
func produce(i int, co chan<- string, quit chan bool) { | |
j := 1 | |
for { | |
name := fmt.Sprintf("prod%d", i) | |
co <- fmt.Sprintf("hi from %d.%d", i, j) | |
j++ | |
select { | |
//check quit chan | |
case res := <-quit: | |
if res { | |
fmt.Println(name + " is done!") | |
return | |
} | |
//instead of using default, we can also introduce a timeout to slow things down to a managable speed: | |
// case <-time.After(5 * time.Millisecond): | |
// continue | |
// } | |
default: | |
} | |
} | |
} | |
var countConsume int = 0 | |
func consume(name string, c chan string, quit chan bool) { | |
for { | |
select { | |
case <-c: | |
countConsume++ | |
//case msg := <-c: | |
//fmt.Println(name + ":" + msg) | |
case done := <-quit: | |
fmt.Println(name + " is done!") | |
if done { | |
return | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment