Skip to content

Instantly share code, notes, and snippets.

@meetme2meat
Last active May 20, 2016 08:25
Show Gist options
  • Save meetme2meat/fd192a50a27c58ec60f9e0e56f5be592 to your computer and use it in GitHub Desktop.
Save meetme2meat/fd192a50a27c58ec60f9e0e56f5be592 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require 'celluloid/zmq'
require 'logger'
require 'airbrake-ruby'
#require 'logstash-logger'
Airbrake.configure do |c|
c.project_id = 123018
c.project_key = '552470f432b338efb3c4aab31412f93a'
c.environment = 'production'
c.ignore_environments = ['development']
end
Airbrake.add_filter do |notice|
if notice[:errors].any? { |error| error[:type] == 'SignalException' }
notice.ignore!
end
end
puts Celluloid::ZMQ::VERSION
#$logger = LogStashLogger.new(type: :udp, host: 'elk.tollfree.exchange', port: 5300)
$logger = Logger.new(STDOUT)
$last_rate = 0
Celluloid::ZMQ.init
def _time_
Time.now.strftime('%d-%m-%Y %T %z')
end
class MiddleWare
include Celluloid::ZMQ
attr_reader :socket_qone,:socket_qtwo,:socket_qone_delegator,:socket_qtwo_delegator
def initialize
init!
qs_one_init!
qs_two_init!
end
## Correct
def init!
reset_time!
reset_rate!
end
## correct
def reset_rate!
$one = $two = 0
end
## correct
## setting the current time
def reset_time!
$start_time = current_time
end
## correct
def qs_one_init!
puts "[#{_time_}] Initializing QS1 socket"
@socket_qone = Socket::Pull.new
@socket_qone.bind("ipc:///tmp/qs1-cmsdb-messages")
@socket_qone_delegator = Socket::Push.new
@socket_qone_delegator.bind('ipc:///tmp/qs1-cmsdb-puller-messages')
puts "[#{_time_}] QS1 done."
end
## correct
def qs_two_init!
puts "[#{_time_}] Initalizating QS2 socket"
@socket_qtwo = Socket::Pull.new
@socket_qtwo.bind("ipc:///tmp/qs2-cmsdb-messages")
@socket_qtwo_delegator = Socket::Push.new
@socket_qtwo_delegator.bind('ipc:///tmp/qs2-cmsdb-puller-messages')
puts "[#{_time_}] QS2 Done."
end
## correct
def monitor
#async.monitor_qs_one and async.display
async.monitor_qs_one
async.monitor_qs_two
async.display
end
def monitor_qs_one
loop do
data = socket_qone.read_multipart
$one += 1
evaluate(data,socket_qone_delegator)
data = nil
end
end
def monitor_qs_two
loop do
data = socket_qtwo.read_multipart
$two += 1
evaluate(data,socket_qtwo_delegator)
data = nil
end
end
def evaluate(data,socket)
begin
## sending is a kind of block so ideally it has to go in async
socket.send(data)
rescue Exception => exception
$logger.error "#{_time_}--- Exception is - #{exception.message} \n #{exception.backtrace}"
Airbrake.notify(exception)
end
end
def display
loop do
begin
sleep 60
puts "[#{_time_}] [Rate is (#{find_rate}[QS1(#{$one}), QS2(#{$two})] - #{$last_rate} / #{total_time})] Current Rate of message flow is #{current_rate}"
reset_time!
last_rate!
rescue Exception => exception
$logger.error "[#{_time_}] Exception is #{exception.message} -> Backtrace \n #{exception.backtrace}"
Airbrake.notify(exception)
end
end
end
def last_rate!
$last_rate = find_rate
end
def current_rate
## total time ideally should be 60 second
calculate_rate / total_time
end
def find_rate
$one + $two
end
def calculate_rate
[find_rate - $last_rate,0].max
end
def total_time
(current_time - $start_time)
end
def current_time
Time.now.to_i
end
end
middleware = MiddleWare.new
middleware.monitor
sleep
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment