Skip to content

Instantly share code, notes, and snippets.

@blasterpal
Last active April 9, 2017 16:01
Show Gist options
  • Select an option

  • Save blasterpal/81489f4307ba514156887c4eaf1c10a3 to your computer and use it in GitHub Desktop.

Select an option

Save blasterpal/81489f4307ba514156887c4eaf1c10a3 to your computer and use it in GitHub Desktop.
PoC idea for an async interface, similar to ActiveJob, but with ability add message queue features.
=begin
Problem:
HTTP is the focus of this idea. For a small team with enough on their plate already, avoiding another component or dependency into stack and ops is costly.
In this example, the cloud and PaaS services are not an option, the app lives on our hardware for business reasons. I can't use RabbitMQ as a service.
Implementation:
With service architecture, some type of message passing is required between applications (sharing a DB is an anti-pattern). A goal is implementation should be opaque.
However the synchronous versus asynchronous nature of HTTPS and message queues respectively, makes the implementation being generic a little harder.
One of the first problems when you consider using HTTP as a means of distributed messaging, what if the service is down?
Then your calls either throw exceptions or you have to write code to account for service being down, which actually you should anyway.
The wonderment is 'why can't you use HTTP but also have 'some' durability'? Even better, you can retry HTTP calls and even replay messages later. Smells similar to a queue.
For the purposes of simplicity, I need to ignore other features of queues like fanout,etc.
Record the message into a journal that can be replayed. The "messages" are domain specific to this app anyway.
So why not let it keep the failures and present in admin for this app to replay?
=end
# much of the semantics of normal flow can be abstracted so user only needs
# to worry about business specific implementations like validation and processing
class MyAsyncWorker < ActiveQueue
adapter :http
publish resource: 'http://remote.dev/api/to-consume', retries: 3, backoff: 3
consume resource: 'http://myapp.dev/callbacks/this-context'
#adapter :rabbit
#publish resource: 'rabbit://server:port/queues/publish', retries: 3, backoff: 3, timeout: 20
#consume resource: 'rabbit://server:port/queues/consume'
def publish(message, tries=0)
if tries > self.retries
write_failed_message(message) #a journal for possible reply
return false
end
validate(message)
# if error
begin
queue_send_transaction(message)
rescue
pause_message(message,tries) #this effectively puts into queue to be performed soonish
end
end
def consume(message)
validate(message)
# your code when message is recieved
end
def pause_message(message,tries)
delay = self.cool_method_that_determines_when_message_should_be_retried
# a worker re-publishes this message at delay
ActiveQueuePause.create(message,tries,delay)
end
# a table for erroed or delayed messages is cached when the external queue or resource is down
# can be unwound and replayed or examined from dashboard
def write_failed_message(message)
ActiveQueueJournal.create(message)
end
def cool_method_that_determines_when_message_should_be_retried
10.minutes
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment