Skip to content

Instantly share code, notes, and snippets.

@jeffdeville
Created February 14, 2016 11:13
Show Gist options
  • Save jeffdeville/dba897b48f4d7a8b6e8f to your computer and use it in GitHub Desktop.
Save jeffdeville/dba897b48f4d7a8b6e8f to your computer and use it in GitHub Desktop.
Sender / Receiver
require 'angelo'
require 'bunny'
Celluloid.logger = ::Logger.new(STDOUT)
module Receiver
class Server < Angelo::Base
get '/status' do
# TODO return number of incomplete tasks
# Look at self.class.server
'ohai Receiver::Server here'
end
post '/messages' do
async :publish_message, params[:data]
halt 202
end
task :publish_message do |message|
Celluloid.logger.debug "Publishing message: #{message} to NLG"
Receiver::Server.exchange.publish(
JSON.generate(message),
routing_key: 'NLG'
)
end
def self.exchange
return @_exchange if @_exchange
connection = Bunny.new({
host: '127.0.0.1',
port: 5672,
ssl: false,
vhost: '/',
user: 'guest',
pass: 'guest',
heartbeat: :server, # will use RabbitMQ setting
frame_max: 131072,
auth_mechanism: 'PLAIN'
})
connection.start
channel = connection.create_channel
@_exchange = channel.fanout("Event")
end
end
end
require 'bunny'
require 'multi_json'
require 'time'
require 'http'
module Sender
class Server
FQDN_BY_INTEGRATION_SERVER_NAME = {
'unknown' => "http://localhost:#{ENV['UNHANDLED_PORT']}",
'forecastio' => "http://localhost:#{ENV['FORECASTIO_PORT']}",
'smalltalk' => "http://localhost:#{ENV['SMALLTALK_SERVER_PORT']}",
'demo' => "http://localhost:#{ENV['DEMO_SERVER_PORT']}",
'lifx' => "http://localhost:#{ENV['LIFX_SERVER_PORT']}",
'xfinitytv' => "http://localhost:#{ENV['XFINITYTV_SERVER_PORT']}"
}
DEFAULT_ENDPOINT = "#{FQDN_BY_INTEGRATION_SERVER_NAME['unknown']}/unknown"
def self.run!
new.run
end
def run
queue.subscribe do |delivery_info, metadata, payload|
logger.debug({ message_received_from_queue: {
delivery_info: delivery_info,
metadata: metadata,
payload: payload
} })
routing_key = delivery_info[:routing_key]
captures = routing_key.match(/chime\.(\w+)\.(\w+)/i).captures
bot, action = captures[0..1]
if bot
logger.debug("Routing key #{routing_key} tells #{bot} to #{action}")
send_request(bot, action, MultiJson.decode(payload))
else
logger.debug("No matching bot found for routing key #{routing_key}")
end
end
loop do
Thread.pass
end
ensure
connection.close if connection
end
private
def send_request(bot, action, payload)
endpoint = endpoint_for(bot, action)
logger.debug(endpoint: endpoint, payload: payload)
response = HTTP.post(endpoint, json: payload)
return unless response.status == 404
logger.debug("Endpoint: #{endpoint} returned status: #{response.status}. Routing to 'unknown' server for handling.")
HTTP.post(DEFAULT_ENDPOINT)
end
def endpoint_for(bot, action)
return DEFAULT_ENDPOINT unless FQDN_BY_INTEGRATION_SERVER_NAME.key?(bot)
fqdn = FQDN_BY_INTEGRATION_SERVER_NAME[bot]
"#{fqdn}/#{action}"
end
def channel
@_channel ||= connection.create_channel
end
def queue
return @_queue if @_queue
exchange = channel.topic("messages-with-intents", auto_delete: true)
@_queue = channel.queue("Chime", auto_delete: true)
# Bind Queue with the name of "Chime" to the topic exchange named "Chime".
# Only messages with a routing key matching "Chime.#" will be routed to the "Chime" queue.
@_queue.bind(exchange, routing_key: "Chime.#")
@_queue
end
def connection
return @_connection if @_connection
@_connection = Bunny.new({
host: 'localhost',
port: ENV['RABBIT_PORT'],
ssl: false,
vhost: '/',
user: 'guest',
pass: 'guest',
heartbeat: :server, # will use RabbitMQ setting
frame_max: 131072,
auth_mechanism: 'PLAIN'
})
begin
last_exception = nil
Timeout::timeout(20) do
begin
@_connection.start
rescue StandardError => e
logger.error "RabbitMQ connection failed. Retrying."
last_exception = e
sleep 0.5
retry
end
end
rescue Timeout::Error => timeout
raise(last_exception || timeout)
end
@_connection
end
def logger
@_logger ||= Logger.new(STDOUT)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment