Last active
April 9, 2017 16:01
-
-
Save blasterpal/81489f4307ba514156887c4eaf1c10a3 to your computer and use it in GitHub Desktop.
PoC idea for an async interface, similar to ActiveJob, but with ability add message queue features.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| =begin | |
| Problem: | |
| HTTP is the focus of this idea. For a small team with enough on their plate already, avoiding another component or dependency into stack and ops is costly. | |
| In this example, the cloud and PaaS services are not an option, the app lives on our hardware for business reasons. I can't use RabbitMQ as a service. | |
| Implementation: | |
| With service architecture, some type of message passing is required between applications (sharing a DB is an anti-pattern). A goal is implementation should be opaque. | |
| However the synchronous versus asynchronous nature of HTTPS and message queues respectively, makes the implementation being generic a little harder. | |
| One of the first problems when you consider using HTTP as a means of distributed messaging, what if the service is down? | |
| Then your calls either throw exceptions or you have to write code to account for service being down, which actually you should anyway. | |
| The wonderment is 'why can't you use HTTP but also have 'some' durability'? Even better, you can retry HTTP calls and even replay messages later. Smells similar to a queue. | |
| For the purposes of simplicity, I need to ignore other features of queues like fanout,etc. | |
| Record the message into a journal that can be replayed. The "messages" are domain specific to this app anyway. | |
| So why not let it keep the failures and present in admin for this app to replay? | |
| =end | |
| # much of the semantics of normal flow can be abstracted so user only needs | |
| # to worry about business specific implementations like validation and processing | |
| class MyAsyncWorker < ActiveQueue | |
| adapter :http | |
| publish resource: 'http://remote.dev/api/to-consume', retries: 3, backoff: 3 | |
| consume resource: 'http://myapp.dev/callbacks/this-context' | |
| #adapter :rabbit | |
| #publish resource: 'rabbit://server:port/queues/publish', retries: 3, backoff: 3, timeout: 20 | |
| #consume resource: 'rabbit://server:port/queues/consume' | |
| def publish(message, tries=0) | |
| if tries > self.retries | |
| write_failed_message(message) #a journal for possible reply | |
| return false | |
| end | |
| validate(message) | |
| # if error | |
| begin | |
| queue_send_transaction(message) | |
| rescue | |
| pause_message(message,tries) #this effectively puts into queue to be performed soonish | |
| end | |
| end | |
| def consume(message) | |
| validate(message) | |
| # your code when message is recieved | |
| end | |
| def pause_message(message,tries) | |
| delay = self.cool_method_that_determines_when_message_should_be_retried | |
| # a worker re-publishes this message at delay | |
| ActiveQueuePause.create(message,tries,delay) | |
| end | |
| # a table for erroed or delayed messages is cached when the external queue or resource is down | |
| # can be unwound and replayed or examined from dashboard | |
| def write_failed_message(message) | |
| ActiveQueueJournal.create(message) | |
| end | |
| def cool_method_that_determines_when_message_should_be_retried | |
| 10.minutes | |
| end | |
| end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment