Last active
August 29, 2015 14:05
-
-
Save EyalAr/117125b0e72a69584dee to your computer and use it in GitHub Desktop.
Go + ZeroMQ ROUTER socket concurency problem demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is a demonstration for a ROUTER socket which is used concurrently | |
// by two different threads. One to receive messages and one to send. | |
// Running this program will cause a panic because ZeroMQ sockets are not | |
// threads-safe. | |
package main | |
import ( | |
"fmt" | |
zmq "github.com/pebbe/zmq4" | |
"math/rand" | |
) | |
type Work struct { | |
id []string | |
payload int | |
} | |
type Product struct { | |
id []string | |
result int | |
} | |
func main() { | |
addr := "inproc://demo" | |
context, err := zmq.NewContext() | |
if err != nil { | |
panic(err) | |
} | |
// create server frontend socket: | |
frontend, err := context.NewSocket(zmq.ROUTER) | |
if err != nil { | |
panic(err) | |
} | |
defer frontend.Close() | |
if err := frontend.Bind(addr); err != nil { | |
panic(err) | |
} | |
// queues: | |
incoming := make(chan Work, 100) | |
outgoing := make(chan Product, 100) | |
// pool of 5 worker goroutines | |
for i := 0; i < 5; i++ { | |
go func(id int) { | |
for { | |
work := <-incoming | |
fmt.Printf("worker %d: working...\n", id) | |
outgoing <- Product{ | |
id: work.id, | |
result: rand.Int(), | |
} | |
} | |
}(i) | |
} | |
// receiver: | |
go func() { | |
fmt.Printf("server: pending...\n") | |
for { | |
if msg, err := frontend.RecvMessage(0); err == nil { | |
incoming <- Work{ | |
id: msg[:2], | |
payload: rand.Int(), | |
} | |
fmt.Printf("server: queued new work\n") | |
} else { | |
panic(err) | |
} | |
} | |
}() | |
// replier: | |
go func() { | |
for { | |
prod := <-outgoing | |
if _, err := frontend.SendMessage(prod.id, prod.result); err != nil { | |
panic(err) | |
} | |
fmt.Printf("server: product sent\n") | |
} | |
}() | |
// simulate 10 clients: | |
for i := 0; i < 10; i++ { | |
go func(id int) { | |
sock, err := context.NewSocket(zmq.REQ) | |
if err != nil { | |
panic(err) | |
} | |
if err := sock.Connect(addr); err != nil { | |
panic(err) | |
} | |
fmt.Printf("client %d: connected\n", id) | |
for { | |
if _, err := sock.SendMessage("ping"); err != nil { | |
panic(err) | |
} | |
fmt.Printf("client %d: sent work\n", id) | |
if _, err := sock.RecvMessage(0); err != nil { | |
panic(err) | |
} | |
fmt.Printf("client %d: received product\n", id) | |
} | |
}(i) | |
} | |
// let the goroutines run indefinitely | |
<-make(chan bool) | |
return | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is a demonstration for a ROUTER socket which is used concurrently | |
// by two different threads. One to receive messages and one to send. | |
// Since ZeroMQ sockets are not threads-safe, a mutex lock is needed in | |
// order to make sure the socket is not accessed concurrently by different | |
// threads. | |
// The socket is polled, with a short timeout, to check if there are new | |
// messages pending. If not, the lock is released and the socket can be | |
// used to send messages. | |
package main | |
import ( | |
"fmt" | |
zmq "github.com/pebbe/zmq4" | |
"math/rand" | |
"sync" | |
"time" | |
) | |
type Work struct { | |
id []string | |
payload int | |
} | |
type Product struct { | |
id []string | |
result int | |
} | |
func main() { | |
addr := "inproc://demo" | |
context, err := zmq.NewContext() | |
if err != nil { | |
panic(err) | |
} | |
// create server frontend socket: | |
frontend, err := context.NewSocket(zmq.ROUTER) | |
if err != nil { | |
panic(err) | |
} | |
defer frontend.Close() | |
if err := frontend.Bind(addr); err != nil { | |
panic(err) | |
} | |
// socket access mutex | |
felock := sync.Mutex{} | |
// queues: | |
incoming := make(chan Work, 100) | |
outgoing := make(chan Product, 100) | |
// pool of 5 worker goroutines | |
for i := 0; i < 5; i++ { | |
go func(id int) { | |
for { | |
work := <-incoming | |
fmt.Printf("worker %d: working...\n", id) | |
outgoing <- Product{ | |
id: work.id, | |
result: rand.Int(), | |
} | |
} | |
}(i) | |
} | |
// receiver: | |
go func() { | |
poller := zmq.NewPoller() | |
poller.Add(frontend, zmq.POLLIN) | |
fmt.Printf("server: pending...\n") | |
for { | |
// wait up to 5 milliseconds | |
felock.Lock() | |
polled, err := poller.Poll(5 * time.Millisecond) | |
if err != nil { | |
panic(err) | |
} | |
if len(polled) > 0 { | |
if msg, err := polled[0].Socket.RecvMessage(zmq.DONTWAIT); err == nil { | |
incoming <- Work{ | |
id: msg[:2], | |
payload: rand.Int(), | |
} | |
fmt.Printf("server: queued new work\n") | |
} else { | |
panic(err) | |
} | |
} | |
felock.Unlock() | |
// give a chance to other threads to obtain the lock: | |
time.Sleep(5 * time.Millisecond) | |
} | |
}() | |
// replier: | |
go func() { | |
for { | |
prod := <-outgoing | |
felock.Lock() | |
if _, err := frontend.SendMessage(prod.id, prod.result); err != nil { | |
panic(err) | |
} | |
felock.Unlock() | |
fmt.Printf("server: product sent\n") | |
// give a chance to other threads to obtain the lock: | |
time.Sleep(5 * time.Millisecond) | |
} | |
}() | |
// simulate 10 clients: | |
for i := 0; i < 10; i++ { | |
go func(id int) { | |
sock, err := context.NewSocket(zmq.REQ) | |
if err != nil { | |
panic(err) | |
} | |
defer sock.Close() | |
if err := sock.Connect(addr); err != nil { | |
panic(err) | |
} | |
fmt.Printf("client %d: connected\n", id) | |
for { | |
if _, err := sock.SendMessage("ping"); err != nil { | |
panic(err) | |
} | |
fmt.Printf("client %d: sent work\n", id) | |
if _, err := sock.RecvMessage(0); err != nil { | |
panic(err) | |
} | |
fmt.Printf("client %d: received product\n", id) | |
} | |
}(i) | |
} | |
// let the goroutines run indefinitely | |
<-make(chan bool) | |
return | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment