Last active
May 20, 2016 08:25
-
-
Save meetme2meat/fd192a50a27c58ec60f9e0e56f5be592 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'celluloid/zmq' | |
require 'logger' | |
require 'airbrake-ruby' | |
#require 'logstash-logger' | |
Airbrake.configure do |c| | |
c.project_id = 123018 | |
c.project_key = '552470f432b338efb3c4aab31412f93a' | |
c.environment = 'production' | |
c.ignore_environments = ['development'] | |
end | |
Airbrake.add_filter do |notice| | |
if notice[:errors].any? { |error| error[:type] == 'SignalException' } | |
notice.ignore! | |
end | |
end | |
puts Celluloid::ZMQ::VERSION | |
#$logger = LogStashLogger.new(type: :udp, host: 'elk.tollfree.exchange', port: 5300) | |
$logger = Logger.new(STDOUT) | |
$last_rate = 0 | |
Celluloid::ZMQ.init | |
def _time_ | |
Time.now.strftime('%d-%m-%Y %T %z') | |
end | |
class MiddleWare | |
include Celluloid::ZMQ | |
attr_reader :socket_qone,:socket_qtwo,:socket_qone_delegator,:socket_qtwo_delegator | |
def initialize | |
init! | |
qs_one_init! | |
qs_two_init! | |
end | |
## Correct | |
def init! | |
reset_time! | |
reset_rate! | |
end | |
## correct | |
def reset_rate! | |
$one = $two = 0 | |
end | |
## correct | |
## setting the current time | |
def reset_time! | |
$start_time = current_time | |
end | |
## correct | |
def qs_one_init! | |
puts "[#{_time_}] Initializing QS1 socket" | |
@socket_qone = Socket::Pull.new | |
@socket_qone.bind("ipc:///tmp/qs1-cmsdb-messages") | |
@socket_qone_delegator = Socket::Push.new | |
@socket_qone_delegator.bind('ipc:///tmp/qs1-cmsdb-puller-messages') | |
puts "[#{_time_}] QS1 done." | |
end | |
## correct | |
def qs_two_init! | |
puts "[#{_time_}] Initalizating QS2 socket" | |
@socket_qtwo = Socket::Pull.new | |
@socket_qtwo.bind("ipc:///tmp/qs2-cmsdb-messages") | |
@socket_qtwo_delegator = Socket::Push.new | |
@socket_qtwo_delegator.bind('ipc:///tmp/qs2-cmsdb-puller-messages') | |
puts "[#{_time_}] QS2 Done." | |
end | |
## correct | |
def monitor | |
#async.monitor_qs_one and async.display | |
async.monitor_qs_one | |
async.monitor_qs_two | |
async.display | |
end | |
def monitor_qs_one | |
loop do | |
data = socket_qone.read_multipart | |
$one += 1 | |
evaluate(data,socket_qone_delegator) | |
data = nil | |
end | |
end | |
def monitor_qs_two | |
loop do | |
data = socket_qtwo.read_multipart | |
$two += 1 | |
evaluate(data,socket_qtwo_delegator) | |
data = nil | |
end | |
end | |
def evaluate(data,socket) | |
begin | |
## sending is a kind of block so ideally it has to go in async | |
socket.send(data) | |
rescue Exception => exception | |
$logger.error "#{_time_}--- Exception is - #{exception.message} \n #{exception.backtrace}" | |
Airbrake.notify(exception) | |
end | |
end | |
def display | |
loop do | |
begin | |
sleep 60 | |
puts "[#{_time_}] [Rate is (#{find_rate}[QS1(#{$one}), QS2(#{$two})] - #{$last_rate} / #{total_time})] Current Rate of message flow is #{current_rate}" | |
reset_time! | |
last_rate! | |
rescue Exception => exception | |
$logger.error "[#{_time_}] Exception is #{exception.message} -> Backtrace \n #{exception.backtrace}" | |
Airbrake.notify(exception) | |
end | |
end | |
end | |
def last_rate! | |
$last_rate = find_rate | |
end | |
def current_rate | |
## total time ideally should be 60 second | |
calculate_rate / total_time | |
end | |
def find_rate | |
$one + $two | |
end | |
def calculate_rate | |
[find_rate - $last_rate,0].max | |
end | |
def total_time | |
(current_time - $start_time) | |
end | |
def current_time | |
Time.now.to_i | |
end | |
end | |
middleware = MiddleWare.new | |
middleware.monitor | |
sleep |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment