Skip to content

Instantly share code, notes, and snippets.

@EyalAr
Last active August 29, 2015 14:05
Show Gist options
  • Save EyalAr/117125b0e72a69584dee to your computer and use it in GitHub Desktop.
Save EyalAr/117125b0e72a69584dee to your computer and use it in GitHub Desktop.
Go + ZeroMQ ROUTER socket concurency problem demo
// This is a demonstration for a ROUTER socket which is used concurrently
// by two different threads. One to receive messages and one to send.
// Running this program will cause a panic because ZeroMQ sockets are not
// threads-safe.
package main
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"math/rand"
)
type Work struct {
id []string
payload int
}
type Product struct {
id []string
result int
}
func main() {
addr := "inproc://demo"
context, err := zmq.NewContext()
if err != nil {
panic(err)
}
// create server frontend socket:
frontend, err := context.NewSocket(zmq.ROUTER)
if err != nil {
panic(err)
}
defer frontend.Close()
if err := frontend.Bind(addr); err != nil {
panic(err)
}
// queues:
incoming := make(chan Work, 100)
outgoing := make(chan Product, 100)
// pool of 5 worker goroutines
for i := 0; i < 5; i++ {
go func(id int) {
for {
work := <-incoming
fmt.Printf("worker %d: working...\n", id)
outgoing <- Product{
id: work.id,
result: rand.Int(),
}
}
}(i)
}
// receiver:
go func() {
fmt.Printf("server: pending...\n")
for {
if msg, err := frontend.RecvMessage(0); err == nil {
incoming <- Work{
id: msg[:2],
payload: rand.Int(),
}
fmt.Printf("server: queued new work\n")
} else {
panic(err)
}
}
}()
// replier:
go func() {
for {
prod := <-outgoing
if _, err := frontend.SendMessage(prod.id, prod.result); err != nil {
panic(err)
}
fmt.Printf("server: product sent\n")
}
}()
// simulate 10 clients:
for i := 0; i < 10; i++ {
go func(id int) {
sock, err := context.NewSocket(zmq.REQ)
if err != nil {
panic(err)
}
if err := sock.Connect(addr); err != nil {
panic(err)
}
fmt.Printf("client %d: connected\n", id)
for {
if _, err := sock.SendMessage("ping"); err != nil {
panic(err)
}
fmt.Printf("client %d: sent work\n", id)
if _, err := sock.RecvMessage(0); err != nil {
panic(err)
}
fmt.Printf("client %d: received product\n", id)
}
}(i)
}
// let the goroutines run indefinitely
<-make(chan bool)
return
}
// This is a demonstration for a ROUTER socket which is used concurrently
// by two different threads. One to receive messages and one to send.
// Since ZeroMQ sockets are not threads-safe, a mutex lock is needed in
// order to make sure the socket is not accessed concurrently by different
// threads.
// The socket is polled, with a short timeout, to check if there are new
// messages pending. If not, the lock is released and the socket can be
// used to send messages.
package main
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"math/rand"
"sync"
"time"
)
type Work struct {
id []string
payload int
}
type Product struct {
id []string
result int
}
func main() {
addr := "inproc://demo"
context, err := zmq.NewContext()
if err != nil {
panic(err)
}
// create server frontend socket:
frontend, err := context.NewSocket(zmq.ROUTER)
if err != nil {
panic(err)
}
defer frontend.Close()
if err := frontend.Bind(addr); err != nil {
panic(err)
}
// socket access mutex
felock := sync.Mutex{}
// queues:
incoming := make(chan Work, 100)
outgoing := make(chan Product, 100)
// pool of 5 worker goroutines
for i := 0; i < 5; i++ {
go func(id int) {
for {
work := <-incoming
fmt.Printf("worker %d: working...\n", id)
outgoing <- Product{
id: work.id,
result: rand.Int(),
}
}
}(i)
}
// receiver:
go func() {
poller := zmq.NewPoller()
poller.Add(frontend, zmq.POLLIN)
fmt.Printf("server: pending...\n")
for {
// wait up to 5 milliseconds
felock.Lock()
polled, err := poller.Poll(5 * time.Millisecond)
if err != nil {
panic(err)
}
if len(polled) > 0 {
if msg, err := polled[0].Socket.RecvMessage(zmq.DONTWAIT); err == nil {
incoming <- Work{
id: msg[:2],
payload: rand.Int(),
}
fmt.Printf("server: queued new work\n")
} else {
panic(err)
}
}
felock.Unlock()
// give a chance to other threads to obtain the lock:
time.Sleep(5 * time.Millisecond)
}
}()
// replier:
go func() {
for {
prod := <-outgoing
felock.Lock()
if _, err := frontend.SendMessage(prod.id, prod.result); err != nil {
panic(err)
}
felock.Unlock()
fmt.Printf("server: product sent\n")
// give a chance to other threads to obtain the lock:
time.Sleep(5 * time.Millisecond)
}
}()
// simulate 10 clients:
for i := 0; i < 10; i++ {
go func(id int) {
sock, err := context.NewSocket(zmq.REQ)
if err != nil {
panic(err)
}
defer sock.Close()
if err := sock.Connect(addr); err != nil {
panic(err)
}
fmt.Printf("client %d: connected\n", id)
for {
if _, err := sock.SendMessage("ping"); err != nil {
panic(err)
}
fmt.Printf("client %d: sent work\n", id)
if _, err := sock.RecvMessage(0); err != nil {
panic(err)
}
fmt.Printf("client %d: received product\n", id)
}
}(i)
}
// let the goroutines run indefinitely
<-make(chan bool)
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment