Skip to content

Instantly share code, notes, and snippets.

@compleatang
Created April 13, 2014 12:56
Show Gist options
  • Select an option

  • Save compleatang/10583041 to your computer and use it in GitHub Desktop.

Select an option

Save compleatang/10583041 to your computer and use it in GitHub Desktop.
ZMQ Lazy Pirate Pattern in Go
// Lazy Pirate client
// Use zmq_poll to do a safe request-reply
// To run, start lpserver and then randomly kill/restart it
//
// Author: iano <[email protected]>
// Based on C example
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq"
"strconv"
)
const (
REQUEST_TIMEOUT = 2500 * 1000
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"
)
func main() {
context, _ := zmq.NewContext()
defer context.Close()
fmt.Println("I: Connecting to server…")
client, _ := context.NewSocket(zmq.REQ)
client.Connect(SERVER_ENDPOINT)
for sequence, retriesLeft := 1, REQUEST_RETRIES; retriesLeft > 0; sequence++ {
fmt.Printf("I: Sending (%d)\n", sequence)
client.Send([]byte(strconv.Itoa(sequence)), 0)
for expectReply := true; expectReply; {
// Poll socket for a reply, with timeout
items := zmq.PollItems{
zmq.PollItem{Socket: client, Events: zmq.POLLIN},
}
if _, err := zmq.Poll(items, REQUEST_TIMEOUT); err != nil {
panic(err) // Interrupted
}
// Here we process a server reply and exit our loop if the
// reply is valid. If we didn't a reply we close the client
// socket and resend the request. We try a number of times
// before finally abandoning:
if item := items[0]; item.REvents&zmq.POLLIN != 0 {
// We got a reply from the server, must match sequence
reply, err := item.Socket.Recv(0)
if err != nil {
panic(err) // Interrupted
}
if replyInt, err := strconv.Atoi(string(reply)); replyInt == sequence && err == nil {
fmt.Printf("I: Server replied OK (%s)\n", reply)
retriesLeft = REQUEST_RETRIES
expectReply = false
} else {
fmt.Printf("E: Malformed reply from server: %s", reply)
}
} else if retriesLeft--; retriesLeft == 0 {
fmt.Println("E: Server seems to be offline, abandoning")
client.SetLinger(0)
client.Close()
break
} else {
fmt.Println("W: No response from server, retrying…")
// Old socket is confused; close it and open a new one
client.SetLinger(0)
client.Close()
client, _ = context.NewSocket(zmq.REQ)
client.Connect(SERVER_ENDPOINT)
fmt.Printf("I: Resending (%d)\n", sequence)
// Send request again, on new socket
client.Send([]byte(strconv.Itoa(sequence)), 0)
}
}
}
}
// Lazy Pirate server
// Binds REQ socket to tcp://*:5555
// Like hwserver except:
// - echoes request as-is
// - randomly runs slowly, or exits to simulate a crash.
//
// Author: iano <[email protected]>
// Based on C example
package main
import (
"fmt"
zmq "github.com/alecthomas/gozmq"
"math/rand"
"time"
)
const (
SERVER_ENDPOINT = "tcp://*:5555"
)
func main() {
src := rand.NewSource(time.Now().UnixNano())
random := rand.New(src)
context, _ := zmq.NewContext()
defer context.Close()
server, _ := context.NewSocket(zmq.REP)
defer server.Close()
server.Bind(SERVER_ENDPOINT)
for cycles := 1; ; cycles++ {
request, _ := server.Recv(0)
// Simulate various problems, after a few cycles
if cycles > 3 {
switch r := random.Intn(3); r {
case 0:
fmt.Println("I: Simulating a crash")
return
case 1:
fmt.Println("I: simulating CPU overload")
time.Sleep(2 * time.Second)
}
}
fmt.Printf("I: normal request (%s)\n", request)
time.Sleep(1 * time.Second) // Do some heavy work
server.Send(request, 0)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment