Last active
March 14, 2017 17:57
-
-
Save cj-dimaggio/11a01fd95e0a8cf2adf5a187e8739300 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"net" | |
"os" | |
"time" | |
"github.com/Ssawa/accord/dataStructures" | |
zmq "github.com/pebbe/zmq4" | |
"fmt" | |
"github.com/fatih/color" | |
) | |
// A simple channel so that any of our goroutines can close our application | |
// with an error code | |
var quit = make(chan int) | |
// Holds a FIFO list of database operations to be shared with another broker | |
var queue dataStructures.Queue | |
// Holds a FILO list of database operations to be used for rectifying conflicts | |
var stack dataStructures.Stack | |
// An in-process 0MQ channel for sending operations to be performed on the database | |
var databaseExecutorURL = "inproc://databaseExecutor" | |
// CheckError checks if an error exists and if it does, logs it and exits | |
func CheckError(err error) { | |
if err != nil { | |
color.Red("Error: %s", err) | |
quit <- 1 | |
} | |
} | |
// CheckWarning checks if an error exists and if it does, logs it | |
func CheckWarning(err error) { | |
if err != nil { | |
color.Yellow("Warning: %s", err) | |
} | |
} | |
// databaseExecutor opens a REP socket onto "databaseExecutorURL"" and listens for | |
// operations. Upon receiving a message, the operation will be performed on the | |
// database and when successfully completed will respond with "ok" | |
func databaseExecutor() { | |
color.Cyan("databaseExecutor: Creating socket to receive operations") | |
receiver, _ := zmq.NewSocket(zmq.REP) | |
defer receiver.Close() | |
receiver.Bind(databaseExecutorURL) | |
color.Cyan("databaseExecutor: Listening for operations") | |
for { | |
// We wait for somebody to send us an operation to be done. We don't care where this operation | |
// comes from, it can be from a CMS operation or from a remote queue | |
msg, _ := receiver.Recv(0) | |
// Here we'll just simulate a database operation by logging to the screen and waiting an | |
// arbitrary amount of time | |
color.Green("Executing: %s", msg) | |
time.Sleep(100 * time.Millisecond) | |
// We send an "ok" in response signifying the operation was successfully performed. | |
// Obviously, in a real application, we'd need to establish a protocol where we can also | |
// signify errors in our response | |
receiver.Send("ok", 0) | |
} | |
} | |
// operationReceiver binds a UDP socket to the address passed in and listens for packets. | |
// | |
// We also connect to databaseExecutorURL with a REQ 0MQ socket. Upon receiving an operation from our | |
// UDP socket we relay this operation to the databaseExecutor and, if successfully performed, add the | |
// operation to our queue and stack | |
// Listens over TCP for database operations from the Flask application | |
func operationReceiver(addr string) { | |
// Will be used to relay requests to be performed onto our database | |
color.Cyan("operationReceiver: Creating socket to databaseExecutor") | |
database, _ := zmq.NewSocket(zmq.REQ) | |
defer database.Close() | |
database.Connect(databaseExecutorURL) | |
// Will be used to accept new operations from an external applications. This is meant to be used by applications | |
// without requiring a lot of knowledge of our inner workings, which is why we don't use 0MQ. Obviously UDP is | |
// terrible for something like because we can so easily lose operations, we only use it here because it gives us very | |
// simple message framing (for a real application it would perhaps be an HTTP endpoint) | |
color.Cyan("operationReceiver: Binding to UDP %s", addr) | |
listenUDP, err := net.ListenPacket("udp", addr) | |
defer listenUDP.Close() | |
CheckError(err) | |
buf := make([]byte, 1024) | |
color.Cyan("operationReceiver: Listening for UDP packets") | |
for { | |
n, _, err := listenUDP.ReadFrom(buf) | |
CheckWarning(err) | |
// We received an operation from our UDP socket | |
val := string(buf[:n]) | |
color.Cyan("operationReceiver: Received UDP packet: %s", val) | |
color.Cyan("operationReceiver: sending '%s' to databaseExecutor", val) | |
// We send it over to the databaseExecutor to be handled by them | |
database.Send(val, 0) | |
msg, _ := database.Recv(0) | |
if msg == "ok" { | |
// If we got an 'ok' response we can go ahead and add it to our queue and stack | |
color.Cyan("operationReceiver: received 'ok' from databaseExecutor, putting '%s' into stack and queue", val) | |
queue.Put(val) | |
stack.Put(val) | |
} | |
} | |
} | |
// dequeueListener binds a 0MQ REP socket to the passed in address. It is part of the "Poll" synchronization | |
// method, along with dequeuePoller, ad is responsible for listening for external requests from other brokers | |
// to relay information from our queue. | |
// | |
// If we receive a "send" request we send the next operation in our Queue and wait for an "ok" request to verify | |
// that it was successfully processed and safe to delete from our local queue | |
func dequeueListener(addr string) { | |
color.Cyan("dequeueListener: Creating socket to receive dequeue requests") | |
receiver, _ := zmq.NewSocket(zmq.REP) | |
defer receiver.Close() | |
color.Cyan("dequeueListener: Binding to %s", addr) | |
receiver.Bind(addr) | |
color.Cyan("dequeueListener: Listening for requests") | |
// This is to handle the theoretical case where we have multiple clients connected to our socket. | |
// We want to ensure that if we send out an operation to two clients we get two "ok"s before deleting it | |
refCount := 0 | |
for { | |
// Wait until we get a request | |
msg, _ := receiver.Recv(0) | |
color.Cyan("dequeueListener: Received request: '%s'", msg) | |
var reply string | |
// Having this "send" and "ok" process happening across two requests gives us a fair bit of reliability to | |
// ensure we're not responsible for losing information (it also gets around 0MQ's annoying behavior of not | |
// actually telling you when a client disconnects during a REP response). However we should be careful of | |
// any race conditions this might introduce. The main worry would probably be, "If a new operation is queued | |
// in between a 'send' and an 'ok' will we accidentally delete the wrong element?" In this case the answer | |
// luckily appears to be "no". Because the queue is FIFO we'll only ever be queuing to one end and dequeueing | |
// from the other. The other main thing to be worried about is, if we really do want this to work with multiple | |
// clients (we don't need to) we'll need to figure out a proper session system and operation sequencing to prevent | |
// annoying edgecases (see more below) | |
if msg == "send" { | |
color.Cyan("dequeueListener: Received request to send from queue") | |
// The remote want's the next element from our queue, so let's peek at what it is and send it | |
reply = queue.Peek() | |
// My janky ass implementation of a FIFO queue uses an empty string to signify that there's nothing left | |
// (which is stupid for so many reasons, but Go doesn't allow Nil as a substitute for a string so there it is) | |
// We only want to refCount actual data | |
if reply != "" { | |
refCount++ | |
} | |
} else if msg == "ok" { | |
// We got a confirmation that an operation was successfully processed remotely | |
color.Cyan("dequeueListener: Received confirmation that information was received and processed. Decreasing refcount") | |
// We don't want somebody to just send us a bunch of "ok"s without actually having received anything or else they can deplete our entire queue | |
if refCount > 0 { | |
// Decrement our refCount by one | |
refCount-- | |
if refCount == 0 { | |
// There's no more references to this operation, we can safely remove it | |
color.Cyan("dequeueListener: No more references. Deleting element") | |
queue.RemoveLast() | |
reply = "deleted" | |
} else { | |
// There's still somebody out there that has knowledge of this operation but hasn't sent in their | |
// "ok", so we don't want to delete it yet. Instead we just return the number of references left. | |
// | |
// Now this leads to an interesting/infuriating problem that might want us to reconsider this entire | |
// idea of handling multiple clients here (not multiple brokers completely, mind, we can extract the logic | |
// out to a router goroutine which is the only client to this socket). If two clients are connected and one dies | |
// everything will grind to a halt and the still working remote might end up receiving the same operation over and over | |
// again and (without proper sequencing to check) performing it multiple times. | |
reply = fmt.Sprintf("%d refs left", refCount) | |
} | |
} | |
} | |
// Send our reply, whatever it is | |
_, err := receiver.Send(reply, 0) | |
CheckWarning(err) | |
} | |
} | |
// dequeuePoller works on the other side of dequeueListener, it sends requests out for new operations from a | |
// remote process and then performs the operation on our own database (the intention of this prototype isn't | |
// the complicated database synchronization stuff, it's focus is on the IPC protocols so we just go ahead and | |
// perform the operation without thinking here) | |
func dequeuePoller(addr string) { | |
color.Cyan("dequeuePoller: Creating socket to retrieve from external queue") | |
request, _ := zmq.NewSocket(zmq.REQ) | |
defer request.Close() | |
color.Cyan("dequeuePoller: Creating socket to databaseExecutor") | |
database, _ := zmq.NewSocket(zmq.REQ) | |
defer database.Close() | |
database.Connect(databaseExecutorURL) | |
// Connect to our remote broker using a 0MQ REQ socket | |
color.Cyan("dequeuePoller: connecting to to %s", addr) | |
request.Connect(addr) | |
color.Cyan("dequeuePoller: Polling from remote") | |
for { | |
// Send out a request | |
color.Cyan("dequeuePoller: Requesting an operation") | |
request.Send("send", 0) | |
op, _ := request.Recv(0) | |
color.Cyan("dequeuePoller: Received '%s'", op) | |
if op != "" { | |
// We got a real operation | |
color.Cyan("dequeuePoller: Sending operation to be performed on our database") | |
// Send it out to the database | |
database.Send(op, 0) | |
msg, _ := database.Recv(0) | |
if msg == "ok" { | |
// If the database performed the operation successfully then we can send another request | |
// to the remote telling them that we handled the operation and they can go ahead and delete it | |
// from their memory | |
color.Cyan("dequeuePoller: Sending confirmation that op was handled '%s", op) | |
request.Send("ok", 0) | |
// We don't really care what the response is but we could theoretically go into another state | |
// if the queued operation wasn't deleted or something | |
request.Recv(0) | |
} | |
// Obviously here we should handle a case where we *don't* get an "ok" from the databaseExecutor, | |
// but this is a young protocol and that just never happens yet | |
} else { | |
// As mentioned above, a blank string indicates the remote queue is empty, so we wait a bit | |
// before sending another request so we don't floor the network for no reason | |
color.Cyan("dequeuePoller: Remote had nothing to give us. Let's wait a second") | |
time.Sleep(time.Second) | |
} | |
} | |
} | |
// dequeueReceiver is the main listener of the "Push" synchronization method. It waits and listens for a remote | |
// broker to send it operations from it's queue. Notice how much of it's logic is almost identical to | |
// dequeuePoller (almost like a lazy programmer should have just extracted the logic into functions) | |
func dequeueReceiver(addr string) { | |
color.Cyan("dequeueReceiver: Creating socket to receive external operations") | |
receiver, _ := zmq.NewSocket(zmq.REP) | |
defer receiver.Close() | |
// This listener will be responsible for replicating remote operations on our local database so we need | |
// a connection to the databaseExecutor | |
color.Cyan("dequeueReceiver: Creating socket to databaseExecutor") | |
database, _ := zmq.NewSocket(zmq.REQ) | |
defer database.Close() | |
database.Connect(databaseExecutorURL) | |
// We're a listener so we bind | |
color.Cyan("dequeueReceiver: Binding to %s", addr) | |
receiver.Bind(addr) | |
for { | |
op, _ := receiver.Recv(0) | |
color.Cyan("dequeueReceiver: Received '%s'", op) | |
color.Cyan("dequeueReceiver: Sending operation to be performed on our database") | |
// We send the operation we received to the databaseExecutor | |
database.Send(op, 0) | |
msg, _ := database.Recv(0) | |
if msg == "ok" { | |
// If it went up to the database okay we send an "ok" to indicate we handled it successfully. | |
// This works similar to dequeueListener, where the remote won't actually delete the message from | |
// it's queue until it's gotten explicit confirmation that it has been handled, hopefully reducing the | |
// chance of data loss from network instability | |
color.Cyan("dequeueReceiver: Sending confirmation that op was handled '%s", op) | |
receiver.Send("ok", 0) | |
} | |
} | |
} | |
// dequeuePusher works along with dequeueReceiver and is what actually sends the requests to the remote broker. | |
// It is in many ways a logical mashup of dequeueListener, much as dequeueReceiver is the logical inverse of dequeuePoller. | |
func dequeuePusher(addr string) { | |
color.Cyan("dequeuePusher: Creating socket to receive dequeue requests") | |
pusher, _ := zmq.NewSocket(zmq.REQ) | |
defer pusher.Close() | |
color.Cyan("dequeuePusher: Connecting to %s", addr) | |
pusher.Connect(addr) | |
color.Cyan("dequeuePusher: Listening for requests") | |
for { | |
// Peek at the next operation in our queue | |
op := queue.Peek() | |
if op != "" { | |
// If it's a real operation send it over the wire | |
pusher.Send(op, 0) | |
msg, _ := pusher.Recv(0) | |
if msg == "ok" { | |
// If the remote said it's handled it, so we're safe to delete the operation and move on | |
color.Cyan("dequeuePusher: op was handled. Dequeueing it") | |
queue.RemoveLast() | |
} | |
} else { | |
// Evokes memories of dequeuePoller, huh? In this case we're essentially "polling" from our own queue | |
// and don't want to burn CPU cycles unnecessarily. However this polling thing is a bit silly. We could | |
// create another inproc pipe from operationReceiver to be notified when there is a new operation but | |
// that would require more work than copying and pasting from dequeuePoller and dequeueListener | |
color.Cyan("dequeuePusher: We have nothing to give. Let's wait a second") | |
time.Sleep(time.Second) | |
} | |
} | |
} | |
func main() { | |
// Start the goroutine that will be responsible for database operations | |
go databaseExecutor() | |
// Start the goroutine that will be listening for new operations from an external application and pass in the | |
// address to bind to | |
go operationReceiver(os.Args[1]) | |
// These represent two ways of synchronizing with other instances of this application. | |
if os.Args[2] == "pull" { | |
// The poll method involves connecting to a remote and requesting information from it | |
// to synchronize it's queue with your own database. This requires a server having public | |
// IP address with a stable connection to the internet in order to synchronize data. | |
go dequeueListener(os.Args[3]) | |
go dequeuePoller(os.Args[4]) | |
} else if os.Args[2] == "push" { | |
// The push method involves opening a socket to listen for requests that can be pushed to you. | |
// This allows server's that are not publically accessible to still have their data synchronized | |
// remotely. | |
go dequeueReceiver(os.Args[3]) | |
go dequeuePusher(os.Args[4]) | |
} | |
// Wait until somebody writes to the quit channel and exit with that integer | |
os.Exit(<-quit) | |
} |
./accord 127.0.0.1:5050 pull tcp://127.0.0.1:5051 tcp://127.0.0.1:6061
dequeuePoller: Creating socket to retrieve from external queue
databaseExecutor: Creating socket to receive operations
dequeueListener: Creating socket to receive dequeue requests
operationReceiver: Creating socket to databaseExecutor
dequeuePoller: Creating socket to databaseExecutor
dequeueListener: Binding to tcp://127.0.0.1:5051
databaseExecutor: Listening for operations
operationReceiver: Binding to UDP 127.0.0.1:5050
dequeuePoller: connecting to to tcp://127.0.0.1:6061
dequeuePoller: Polling from remote
dequeuePoller: Requesting an operation
operationReceiver: Listening for UDP packets
dequeueListener: Listening for requests
operationReceiver: Received UDP packet: hello1
operationReceiver: sending 'hello1' to databaseExecutor
Executing: hello1
operationReceiver: received 'ok' from databaseExecutor, putting 'hello1' into stack and queue
operationReceiver: Received UDP packet: hello2
operationReceiver: sending 'hello2' to databaseExecutor
Executing: hello2
operationReceiver: received 'ok' from databaseExecutor, putting 'hello2' into stack and queue
operationReceiver: Received UDP packet: hello3
operationReceiver: sending 'hello3' to databaseExecutor
Executing: hello3
operationReceiver: received 'ok' from databaseExecutor, putting 'hello3' into stack and queue
operationReceiver: Received UDP packet: hello4
operationReceiver: sending 'hello4' to databaseExecutor
Executing: hello4
operationReceiver: received 'ok' from databaseExecutor, putting 'hello4' into stack and queue
operationReceiver: Received UDP packet: hello5
operationReceiver: sending 'hello5' to databaseExecutor
Executing: hello5
operationReceiver: received 'ok' from databaseExecutor, putting 'hello5' into stack and queue
operationReceiver: Received UDP packet: hello6
operationReceiver: sending 'hello6' to databaseExecutor
Executing: hello6
operationReceiver: received 'ok' from databaseExecutor, putting 'hello6' into stack and queue
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeuePoller: Received ''
dequeuePoller: Remote had nothing to give us. Let's wait a second
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeueListener: Received request: 'ok'
dequeueListener: Received confirmation that information was received and processed. Decreasing refcount
dequeueListener: No more references. Deleting element
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeuePoller: Requesting an operation
dequeuePoller: Received ''
dequeuePoller: Remote had nothing to give us. Let's wait a second
dequeuePoller: Requesting an operation
(meanwhile) ./accord 127.0.0.1:6060 pull tcp://127.0.0.1:6061 tcp://127.0.0.1:5051
dequeuePoller: Creating socket to retrieve from external queue
operationReceiver: Creating socket to databaseExecutor
dequeueListener: Creating socket to receive dequeue requests
databaseExecutor: Creating socket to receive operations
dequeuePoller: Creating socket to databaseExecutor
dequeueListener: Binding to tcp://127.0.0.1:6061
operationReceiver: Binding to UDP 127.0.0.1:6060
databaseExecutor: Listening for operations
dequeuePoller: connecting to to tcp://127.0.0.1:5051
operationReceiver: Listening for UDP packets
dequeuePoller: Polling from remote
dequeuePoller: Requesting an operation
dequeueListener: Listening for requests
dequeuePoller: Received 'hello1'
dequeuePoller: Sending operation to be performed on our database
Executing: hello1
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
dequeuePoller: Sending confirmation that op was handled 'hello1
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello2'
dequeuePoller: Sending operation to be performed on our database
Executing: hello2
dequeuePoller: Sending confirmation that op was handled 'hello2
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello3'
dequeuePoller: Sending operation to be performed on our database
Executing: hello3
dequeuePoller: Sending confirmation that op was handled 'hello3
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello4'
dequeuePoller: Sending operation to be performed on our database
Executing: hello4
dequeuePoller: Sending confirmation that op was handled 'hello4
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello5'
dequeuePoller: Sending operation to be performed on our database
Executing: hello5
dequeuePoller: Sending confirmation that op was handled 'hello5
dequeuePoller: Requesting an operation
dequeuePoller: Received 'hello6'
dequeuePoller: Sending operation to be performed on our database
Executing: hello6
dequeuePoller: Sending confirmation that op was handled 'hello6
dequeuePoller: Requesting an operation
dequeuePoller: Received ''
dequeuePoller: Remote had nothing to give us. Let's wait a second
dequeueListener: Received request: 'send'
dequeueListener: Received request to send from queue
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
./accord 127.0.0.1:5050 push tcp://127.0.0.1:5051 tcp://127.0.0.1:6061
(meanwhile) ./accord 127.0.0.1:6060 push tcp://127.0.0.1:6061 tcp://127.0.0.1:5051