Skip to content

Instantly share code, notes, and snippets.

@cj-dimaggio
Last active March 14, 2017 17:57
Show Gist options
  • Save cj-dimaggio/11a01fd95e0a8cf2adf5a187e8739300 to your computer and use it in GitHub Desktop.
Save cj-dimaggio/11a01fd95e0a8cf2adf5a187e8739300 to your computer and use it in GitHub Desktop.
package main
import (
"net"
"os"
"time"
"github.com/Ssawa/accord/dataStructures"
zmq "github.com/pebbe/zmq4"
"fmt"
"github.com/fatih/color"
)
// A simple channel so that any of our goroutines can close our application
// with an error code
var quit = make(chan int)
// Holds a FIFO list of database operations to be shared with another broker
var queue dataStructures.Queue
// Holds a FILO list of database operations to be used for rectifying conflicts
var stack dataStructures.Stack
// An in-process 0MQ channel for sending operations to be performed on the database
var databaseExecutorURL = "inproc://databaseExecutor"
// CheckError checks if an error exists and if it does, logs it and exits
func CheckError(err error) {
if err != nil {
color.Red("Error: %s", err)
quit <- 1
}
}
// CheckWarning checks if an error exists and if it does, logs it
func CheckWarning(err error) {
if err != nil {
color.Yellow("Warning: %s", err)
}
}
// databaseExecutor opens a REP socket onto "databaseExecutorURL"" and listens for
// operations. Upon receiving a message, the operation will be performed on the
// database and when successfully completed will respond with "ok"
func databaseExecutor() {
color.Cyan("databaseExecutor: Creating socket to receive operations")
receiver, _ := zmq.NewSocket(zmq.REP)
defer receiver.Close()
receiver.Bind(databaseExecutorURL)
color.Cyan("databaseExecutor: Listening for operations")
for {
// We wait for somebody to send us an operation to be done. We don't care where this operation
// comes from, it can be from a CMS operation or from a remote queue
msg, _ := receiver.Recv(0)
// Here we'll just simulate a database operation by logging to the screen and waiting an
// arbitrary amount of time
color.Green("Executing: %s", msg)
time.Sleep(100 * time.Millisecond)
// We send an "ok" in response signifying the operation was successfully performed.
// Obviously, in a real application, we'd need to establish a protocol where we can also
// signify errors in our response
receiver.Send("ok", 0)
}
}
// operationReceiver binds a UDP socket to the address passed in and listens for packets.
//
// We also connect to databaseExecutorURL with a REQ 0MQ socket. Upon receiving an operation from our
// UDP socket we relay this operation to the databaseExecutor and, if successfully performed, add the
// operation to our queue and stack
// Listens over TCP for database operations from the Flask application
func operationReceiver(addr string) {
// Will be used to relay requests to be performed onto our database
color.Cyan("operationReceiver: Creating socket to databaseExecutor")
database, _ := zmq.NewSocket(zmq.REQ)
defer database.Close()
database.Connect(databaseExecutorURL)
// Will be used to accept new operations from an external applications. This is meant to be used by applications
// without requiring a lot of knowledge of our inner workings, which is why we don't use 0MQ. Obviously UDP is
// terrible for something like because we can so easily lose operations, we only use it here because it gives us very
// simple message framing (for a real application it would perhaps be an HTTP endpoint)
color.Cyan("operationReceiver: Binding to UDP %s", addr)
listenUDP, err := net.ListenPacket("udp", addr)
defer listenUDP.Close()
CheckError(err)
buf := make([]byte, 1024)
color.Cyan("operationReceiver: Listening for UDP packets")
for {
n, _, err := listenUDP.ReadFrom(buf)
CheckWarning(err)
// We received an operation from our UDP socket
val := string(buf[:n])
color.Cyan("operationReceiver: Received UDP packet: %s", val)
color.Cyan("operationReceiver: sending '%s' to databaseExecutor", val)
// We send it over to the databaseExecutor to be handled by them
database.Send(val, 0)
msg, _ := database.Recv(0)
if msg == "ok" {
// If we got an 'ok' response we can go ahead and add it to our queue and stack
color.Cyan("operationReceiver: received 'ok' from databaseExecutor, putting '%s' into stack and queue", val)
queue.Put(val)
stack.Put(val)
}
}
}
// dequeueListener binds a 0MQ REP socket to the passed in address. It is part of the "Poll" synchronization
// method, along with dequeuePoller, ad is responsible for listening for external requests from other brokers
// to relay information from our queue.
//
// If we receive a "send" request we send the next operation in our Queue and wait for an "ok" request to verify
// that it was successfully processed and safe to delete from our local queue
func dequeueListener(addr string) {
color.Cyan("dequeueListener: Creating socket to receive dequeue requests")
receiver, _ := zmq.NewSocket(zmq.REP)
defer receiver.Close()
color.Cyan("dequeueListener: Binding to %s", addr)
receiver.Bind(addr)
color.Cyan("dequeueListener: Listening for requests")
// This is to handle the theoretical case where we have multiple clients connected to our socket.
// We want to ensure that if we send out an operation to two clients we get two "ok"s before deleting it
refCount := 0
for {
// Wait until we get a request
msg, _ := receiver.Recv(0)
color.Cyan("dequeueListener: Received request: '%s'", msg)
var reply string
// Having this "send" and "ok" process happening across two requests gives us a fair bit of reliability to
// ensure we're not responsible for losing information (it also gets around 0MQ's annoying behavior of not
// actually telling you when a client disconnects during a REP response). However we should be careful of
// any race conditions this might introduce. The main worry would probably be, "If a new operation is queued
// in between a 'send' and an 'ok' will we accidentally delete the wrong element?" In this case the answer
// luckily appears to be "no". Because the queue is FIFO we'll only ever be queuing to one end and dequeueing
// from the other. The other main thing to be worried about is, if we really do want this to work with multiple
// clients (we don't need to) we'll need to figure out a proper session system and operation sequencing to prevent
// annoying edgecases (see more below)
if msg == "send" {
color.Cyan("dequeueListener: Received request to send from queue")
// The remote want's the next element from our queue, so let's peek at what it is and send it
reply = queue.Peek()
// My janky ass implementation of a FIFO queue uses an empty string to signify that there's nothing left
// (which is stupid for so many reasons, but Go doesn't allow Nil as a substitute for a string so there it is)
// We only want to refCount actual data
if reply != "" {
refCount++
}
} else if msg == "ok" {
// We got a confirmation that an operation was successfully processed remotely
color.Cyan("dequeueListener: Received confirmation that information was received and processed. Decreasing refcount")
// We don't want somebody to just send us a bunch of "ok"s without actually having received anything or else they can deplete our entire queue
if refCount > 0 {
// Decrement our refCount by one
refCount--
if refCount == 0 {
// There's no more references to this operation, we can safely remove it
color.Cyan("dequeueListener: No more references. Deleting element")
queue.RemoveLast()
reply = "deleted"
} else {
// There's still somebody out there that has knowledge of this operation but hasn't sent in their
// "ok", so we don't want to delete it yet. Instead we just return the number of references left.
//
// Now this leads to an interesting/infuriating problem that might want us to reconsider this entire
// idea of handling multiple clients here (not multiple brokers completely, mind, we can extract the logic
// out to a router goroutine which is the only client to this socket). If two clients are connected and one dies
// everything will grind to a halt and the still working remote might end up receiving the same operation over and over
// again and (without proper sequencing to check) performing it multiple times.
reply = fmt.Sprintf("%d refs left", refCount)
}
}
}
// Send our reply, whatever it is
_, err := receiver.Send(reply, 0)
CheckWarning(err)
}
}
// dequeuePoller works on the other side of dequeueListener, it sends requests out for new operations from a
// remote process and then performs the operation on our own database (the intention of this prototype isn't
// the complicated database synchronization stuff, it's focus is on the IPC protocols so we just go ahead and
// perform the operation without thinking here)
func dequeuePoller(addr string) {
color.Cyan("dequeuePoller: Creating socket to retrieve from external queue")
request, _ := zmq.NewSocket(zmq.REQ)
defer request.Close()
color.Cyan("dequeuePoller: Creating socket to databaseExecutor")
database, _ := zmq.NewSocket(zmq.REQ)
defer database.Close()
database.Connect(databaseExecutorURL)
// Connect to our remote broker using a 0MQ REQ socket
color.Cyan("dequeuePoller: connecting to to %s", addr)
request.Connect(addr)
color.Cyan("dequeuePoller: Polling from remote")
for {
// Send out a request
color.Cyan("dequeuePoller: Requesting an operation")
request.Send("send", 0)
op, _ := request.Recv(0)
color.Cyan("dequeuePoller: Received '%s'", op)
if op != "" {
// We got a real operation
color.Cyan("dequeuePoller: Sending operation to be performed on our database")
// Send it out to the database
database.Send(op, 0)
msg, _ := database.Recv(0)
if msg == "ok" {
// If the database performed the operation successfully then we can send another request
// to the remote telling them that we handled the operation and they can go ahead and delete it
// from their memory
color.Cyan("dequeuePoller: Sending confirmation that op was handled '%s", op)
request.Send("ok", 0)
// We don't really care what the response is but we could theoretically go into another state
// if the queued operation wasn't deleted or something
request.Recv(0)
}
// Obviously here we should handle a case where we *don't* get an "ok" from the databaseExecutor,
// but this is a young protocol and that just never happens yet
} else {
// As mentioned above, a blank string indicates the remote queue is empty, so we wait a bit
// before sending another request so we don't floor the network for no reason
color.Cyan("dequeuePoller: Remote had nothing to give us. Let's wait a second")
time.Sleep(time.Second)
}
}
}
// dequeueReceiver is the main listener of the "Push" synchronization method. It waits and listens for a remote
// broker to send it operations from it's queue. Notice how much of it's logic is almost identical to
// dequeuePoller (almost like a lazy programmer should have just extracted the logic into functions)
func dequeueReceiver(addr string) {
color.Cyan("dequeueReceiver: Creating socket to receive external operations")
receiver, _ := zmq.NewSocket(zmq.REP)
defer receiver.Close()
// This listener will be responsible for replicating remote operations on our local database so we need
// a connection to the databaseExecutor
color.Cyan("dequeueReceiver: Creating socket to databaseExecutor")
database, _ := zmq.NewSocket(zmq.REQ)
defer database.Close()
database.Connect(databaseExecutorURL)
// We're a listener so we bind
color.Cyan("dequeueReceiver: Binding to %s", addr)
receiver.Bind(addr)
for {
op, _ := receiver.Recv(0)
color.Cyan("dequeueReceiver: Received '%s'", op)
color.Cyan("dequeueReceiver: Sending operation to be performed on our database")
// We send the operation we received to the databaseExecutor
database.Send(op, 0)
msg, _ := database.Recv(0)
if msg == "ok" {
// If it went up to the database okay we send an "ok" to indicate we handled it successfully.
// This works similar to dequeueListener, where the remote won't actually delete the message from
// it's queue until it's gotten explicit confirmation that it has been handled, hopefully reducing the
// chance of data loss from network instability
color.Cyan("dequeueReceiver: Sending confirmation that op was handled '%s", op)
receiver.Send("ok", 0)
}
}
}
// dequeuePusher works along with dequeueReceiver and is what actually sends the requests to the remote broker.
// It is in many ways a logical mashup of dequeueListener, much as dequeueReceiver is the logical inverse of dequeuePoller.
func dequeuePusher(addr string) {
color.Cyan("dequeuePusher: Creating socket to receive dequeue requests")
pusher, _ := zmq.NewSocket(zmq.REQ)
defer pusher.Close()
color.Cyan("dequeuePusher: Connecting to %s", addr)
pusher.Connect(addr)
color.Cyan("dequeuePusher: Listening for requests")
for {
// Peek at the next operation in our queue
op := queue.Peek()
if op != "" {
// If it's a real operation send it over the wire
pusher.Send(op, 0)
msg, _ := pusher.Recv(0)
if msg == "ok" {
// If the remote said it's handled it, so we're safe to delete the operation and move on
color.Cyan("dequeuePusher: op was handled. Dequeueing it")
queue.RemoveLast()
}
} else {
// Evokes memories of dequeuePoller, huh? In this case we're essentially "polling" from our own queue
// and don't want to burn CPU cycles unnecessarily. However this polling thing is a bit silly. We could
// create another inproc pipe from operationReceiver to be notified when there is a new operation but
// that would require more work than copying and pasting from dequeuePoller and dequeueListener
color.Cyan("dequeuePusher: We have nothing to give. Let's wait a second")
time.Sleep(time.Second)
}
}
}
func main() {
// Start the goroutine that will be responsible for database operations
go databaseExecutor()
// Start the goroutine that will be listening for new operations from an external application and pass in the
// address to bind to
go operationReceiver(os.Args[1])
// These represent two ways of synchronizing with other instances of this application.
if os.Args[2] == "pull" {
// The poll method involves connecting to a remote and requesting information from it
// to synchronize it's queue with your own database. This requires a server having public
// IP address with a stable connection to the internet in order to synchronize data.
go dequeueListener(os.Args[3])
go dequeuePoller(os.Args[4])
} else if os.Args[2] == "push" {
// The push method involves opening a socket to listen for requests that can be pushed to you.
// This allows server's that are not publically accessible to still have their data synchronized
// remotely.
go dequeueReceiver(os.Args[3])
go dequeuePusher(os.Args[4])
}
// Wait until somebody writes to the quit channel and exit with that integer
os.Exit(<-quit)
}
@cj-dimaggio
Copy link
Author

./accord 127.0.0.1:5050 push tcp://127.0.0.1:5051 tcp://127.0.0.1:6061

dequeuePusher: Creating socket to receive dequeue requests
operationReceiver: Creating socket to databaseExecutor
dequeueReceiver: Creating socket to receive external operations
databaseExecutor: Creating socket to receive operations
dequeuePusher: Connecting to tcp://127.0.0.1:6061
operationReceiver: Binding to UDP 127.0.0.1:5050
dequeueReceiver: Creating socket to databaseExecutor
dequeuePusher: Listening for requests
dequeuePusher: We have nothing to give. Let's wait a second
databaseExecutor: Listening for operations
operationReceiver: Listening for UDP packets
dequeueReceiver: Binding to tcp://127.0.0.1:5051
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
operationReceiver: Received UDP packet: hello1
operationReceiver: sending 'hello1' to databaseExecutor
Executing: hello1
operationReceiver: received 'ok' from databaseExecutor, putting 'hello1' into stack and queue
operationReceiver: Received UDP packet: hello2
operationReceiver: sending 'hello2' to databaseExecutor
Executing: hello2
operationReceiver: received 'ok' from databaseExecutor, putting 'hello2' into stack and queue
operationReceiver: Received UDP packet: hello3
operationReceiver: sending 'hello3' to databaseExecutor
Executing: hello3
operationReceiver: received 'ok' from databaseExecutor, putting 'hello3' into stack and queue
operationReceiver: Received UDP packet: hello4
operationReceiver: sending 'hello4' to databaseExecutor
Executing: hello4
operationReceiver: received 'ok' from databaseExecutor, putting 'hello4' into stack and queue
operationReceiver: Received UDP packet: hello5
operationReceiver: sending 'hello5' to databaseExecutor
Executing: hello5
operationReceiver: received 'ok' from databaseExecutor, putting 'hello5' into stack and queue
operationReceiver: Received UDP packet: hello6
operationReceiver: sending 'hello6' to databaseExecutor
Executing: hello6
operationReceiver: received 'ok' from databaseExecutor, putting 'hello6' into stack and queue
dequeuePusher: op was handled. Dequeueing it
dequeuePusher: op was handled. Dequeueing it
dequeuePusher: op was handled. Dequeueing it
dequeuePusher: op was handled. Dequeueing it
dequeuePusher: op was handled. Dequeueing it
dequeuePusher: op was handled. Dequeueing it
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second
dequeuePusher: We have nothing to give. Let's wait a second

(meanwhile) ./accord 127.0.0.1:6060 push tcp://127.0.0.1:6061 tcp://127.0.0.1:5051

dequeuePusher: Creating socket to receive dequeue requests
operationReceiver: Creating socket to databaseExecutor
dequeueReceiver: Creating socket to receive external operations
databaseExecutor: Creating socket to receive operations
dequeuePusher: Connecting to tcp://127.0.0.1:5051
dequeueReceiver: Creating socket to databaseExecutor
dequeuePusher: Listening for requests
dequeuePusher: We have nothing to give. Let's wait a second
operationReceiver: Binding to UDP 127.0.0.1:6060
databaseExecutor: Listening for operations
dequeueReceiver: Binding to tcp://127.0.0.1:6061
operationReceiver: Listening for UDP packets
dequeueReceiver: Received 'hello1'
dequeueReceiver: Sending operation to be performed on our database
Executing: hello1
dequeueReceiver: Sending confirmation that op was handled 'hello1
dequeueReceiver: Received 'hello2'
dequeueReceiver: Sending operation to be performed on our database
Executing: hello2
dequeueReceiver: Sending confirmation that op was handled 'hello2
dequeueReceiver: Received 'hello3'
dequeueReceiver: Sending operation to be performed on our database
Executing: hello3
dequeueReceiver: Sending confirmation that op was handled 'hello3
dequeueReceiver: Received 'hello4'
dequeueReceiver: Sending operation to be performed on our database
Executing: hello4
dequeueReceiver: Sending confirmation that op was handled 'hello4
dequeueReceiver: Received 'hello5'
dequeueReceiver: Sending operation to be performed on our database
Executing: hello5
dequeueReceiver: Sending confirmation that op was handled 'hello5
dequeueReceiver: Received 'hello6'
dequeueReceiver: Sending operation to be performed on our database
Executing: hello6
dequeueReceiver: Sending confirmation that op was handled 'hello6
dequeuePusher: We have nothing to give. Let's wait a second

@cj-dimaggio
Copy link
Author

./accord 127.0.0.1:5050 pull tcp://127.0.0.1:5051 tcp://127.0.0.1:6061

dequeuePoller: Creating socket to retrieve from external queue
databaseExecutor: Creating socket to receive operations
dequeueListener: Creating socket to receive dequeue requests
operationReceiver: Creating socket to databaseExecutor
dequeuePoller: Creating socket to databaseExecutor
dequeueListener: Binding to tcp://127.0.0.1:5051
databaseExecutor: Listening for operations
operationReceiver: Binding to UDP 127.0.0.1:5050
dequeuePoller: connecting to to tcp://127.0.0.1:6061
dequeuePoller: Polling from remote
dequeuePoller: Requesting an operation
operationReceiver: Listening for UDP packets
dequeueListener: Listening for requests
operationReceiver: Received UDP packet: hello1
operationReceiver: sending 'hello1' to databaseExecutor
Executing: hello1
operationReceiver: received 'ok' from databaseExecutor, putting 'hello1' into stack and queue
operationReceiver: Received UDP packet: hello2
operationReceiver: sending 'hello2' to databaseExecutor
Executing: hello2
operationReceiver: received 'ok' from databaseExecutor, putting 'hello2' into stack and queue
operationReceiver: Received UDP packet: hello3
operationReceiver: sending 'hello3' to databaseExecutor
Executing: hello3
operationReceiver: received 'ok' from databaseExecutor, putting 'hello3' into stack and queue
operationReceiver: Received UDP packet: hello4
operationReceiver: sending 'hello4' to databaseExecutor
Executing: hello4
operationReceiver: received 'ok' from databaseExecutor, putting 'hello4' into stack and queue
operationReceiver: Received UDP packet: hello5
operationReceiver: sending 'hello5' to databaseExecutor
Executing: hello5
operationReceiver: received 'ok' from databaseExecutor, putting 'hello5' into stack and queue
operationReceiver: Received UDP packet: hello6
operationReceiver: sending 'hello6' to databaseExecutor
Executing: hello6
operationReceiver: received 'ok' from databaseExecutor, putting 'hello6' into stack and queue
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeuePoller: Received ''
dequeuePoller: Remote had nothing to give us. Let's wait a second
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeuePoller: Requesting an operation
dequeuePoller: Received ''
dequeuePoller: Remote had nothing to give us. Let's wait a second
dequeuePoller: Requesting an operation

(meanwhile) ./accord 127.0.0.1:6060 pull tcp://127.0.0.1:6061 tcp://127.0.0.1:5051

dequeuePoller: Creating socket to retrieve from external queue
operationReceiver: Creating socket to databaseExecutor
dequeueListener: Creating socket to receive dequeue requests
databaseExecutor: Creating socket to receive operations
dequeuePoller: Creating socket to databaseExecutor
dequeueListener: Binding to tcp://127.0.0.1:6061
operationReceiver: Binding to UDP 127.0.0.1:6060
databaseExecutor: Listening for operations
dequeuePoller: connecting to to tcp://127.0.0.1:5051
operationReceiver: Listening for UDP packets
dequeuePoller: Polling from remote
dequeuePoller: Requesting an operation
dequeueListener: Listening for requests
dequeuePoller: Received 'hello1'
dequeuePoller: Sending operation to be performed on our database
Executing: hello1
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeuePoller: Sending confirmation that op was handled 'hello1
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello2'
dequeuePoller: Sending operation to be performed on our database
Executing: hello2
dequeuePoller: Sending confirmation that op was handled 'hello2
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello3'
dequeuePoller: Sending operation to be performed on our database
Executing: hello3
dequeuePoller: Sending confirmation that op was handled 'hello3
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello4'
dequeuePoller: Sending operation to be performed on our database
Executing: hello4
dequeuePoller: Sending confirmation that op was handled 'hello4
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello5'
dequeuePoller: Sending operation to be performed on our database
Executing: hello5
dequeuePoller: Sending confirmation that op was handled 'hello5
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello6'
dequeuePoller: Sending operation to be performed on our database
Executing: hello6
dequeuePoller: Sending confirmation that op was handled 'hello6
dequeuePoller: Requesting an operation
dequeuePoller: Received ''
dequeuePoller: Remote had nothing to give us. Let's wait a second
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment