Created
February 14, 2016 11:13
-
-
Save jeffdeville/dba897b48f4d7a8b6e8f to your computer and use it in GitHub Desktop.
Sender / Receiver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'angelo' | |
require 'bunny' | |
Celluloid.logger = ::Logger.new(STDOUT) | |
module Receiver | |
class Server < Angelo::Base | |
get '/status' do | |
# TODO return number of incomplete tasks | |
# Look at self.class.server | |
'ohai Receiver::Server here' | |
end | |
post '/messages' do | |
async :publish_message, params[:data] | |
halt 202 | |
end | |
task :publish_message do |message| | |
Celluloid.logger.debug "Publishing message: #{message} to NLG" | |
Receiver::Server.exchange.publish( | |
JSON.generate(message), | |
routing_key: 'NLG' | |
) | |
end | |
def self.exchange | |
return @_exchange if @_exchange | |
connection = Bunny.new({ | |
host: '127.0.0.1', | |
port: 5672, | |
ssl: false, | |
vhost: '/', | |
user: 'guest', | |
pass: 'guest', | |
heartbeat: :server, # will use RabbitMQ setting | |
frame_max: 131072, | |
auth_mechanism: 'PLAIN' | |
}) | |
connection.start | |
channel = connection.create_channel | |
@_exchange = channel.fanout("Event") | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bunny' | |
require 'multi_json' | |
require 'time' | |
require 'http' | |
module Sender | |
class Server | |
FQDN_BY_INTEGRATION_SERVER_NAME = { | |
'unknown' => "http://localhost:#{ENV['UNHANDLED_PORT']}", | |
'forecastio' => "http://localhost:#{ENV['FORECASTIO_PORT']}", | |
'smalltalk' => "http://localhost:#{ENV['SMALLTALK_SERVER_PORT']}", | |
'demo' => "http://localhost:#{ENV['DEMO_SERVER_PORT']}", | |
'lifx' => "http://localhost:#{ENV['LIFX_SERVER_PORT']}", | |
'xfinitytv' => "http://localhost:#{ENV['XFINITYTV_SERVER_PORT']}" | |
} | |
DEFAULT_ENDPOINT = "#{FQDN_BY_INTEGRATION_SERVER_NAME['unknown']}/unknown" | |
def self.run! | |
new.run | |
end | |
def run | |
queue.subscribe do |delivery_info, metadata, payload| | |
logger.debug({ message_received_from_queue: { | |
delivery_info: delivery_info, | |
metadata: metadata, | |
payload: payload | |
} }) | |
routing_key = delivery_info[:routing_key] | |
captures = routing_key.match(/chime\.(\w+)\.(\w+)/i).captures | |
bot, action = captures[0..1] | |
if bot | |
logger.debug("Routing key #{routing_key} tells #{bot} to #{action}") | |
send_request(bot, action, MultiJson.decode(payload)) | |
else | |
logger.debug("No matching bot found for routing key #{routing_key}") | |
end | |
end | |
loop do | |
Thread.pass | |
end | |
ensure | |
connection.close if connection | |
end | |
private | |
def send_request(bot, action, payload) | |
endpoint = endpoint_for(bot, action) | |
logger.debug(endpoint: endpoint, payload: payload) | |
response = HTTP.post(endpoint, json: payload) | |
return unless response.status == 404 | |
logger.debug("Endpoint: #{endpoint} returned status: #{response.status}. Routing to 'unknown' server for handling.") | |
HTTP.post(DEFAULT_ENDPOINT) | |
end | |
def endpoint_for(bot, action) | |
return DEFAULT_ENDPOINT unless FQDN_BY_INTEGRATION_SERVER_NAME.key?(bot) | |
fqdn = FQDN_BY_INTEGRATION_SERVER_NAME[bot] | |
"#{fqdn}/#{action}" | |
end | |
def channel | |
@_channel ||= connection.create_channel | |
end | |
def queue | |
return @_queue if @_queue | |
exchange = channel.topic("messages-with-intents", auto_delete: true) | |
@_queue = channel.queue("Chime", auto_delete: true) | |
# Bind Queue with the name of "Chime" to the topic exchange named "Chime". | |
# Only messages with a routing key matching "Chime.#" will be routed to the "Chime" queue. | |
@_queue.bind(exchange, routing_key: "Chime.#") | |
@_queue | |
end | |
def connection | |
return @_connection if @_connection | |
@_connection = Bunny.new({ | |
host: 'localhost', | |
port: ENV['RABBIT_PORT'], | |
ssl: false, | |
vhost: '/', | |
user: 'guest', | |
pass: 'guest', | |
heartbeat: :server, # will use RabbitMQ setting | |
frame_max: 131072, | |
auth_mechanism: 'PLAIN' | |
}) | |
begin | |
last_exception = nil | |
Timeout::timeout(20) do | |
begin | |
@_connection.start | |
rescue StandardError => e | |
logger.error "RabbitMQ connection failed. Retrying." | |
last_exception = e | |
sleep 0.5 | |
retry | |
end | |
end | |
rescue Timeout::Error => timeout | |
raise(last_exception || timeout) | |
end | |
@_connection | |
end | |
def logger | |
@_logger ||= Logger.new(STDOUT) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment