Last active
August 29, 2015 14:04
-
-
Save davidhq/7420f31f2e6653857ccf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ENV['RAILS_ENV'] = ENV["RAILS_ENV"] || ENV["RACK_ENV"] || "development" | |
require ::File.expand_path('../config/environment', __FILE__) | |
require 'faye' | |
require 'eventmachine' | |
require 'redis' | |
class NumberHelper | |
extend ActionView::Helpers::NumberHelper | |
def self.format(number) | |
number_with_delimiter(number) | |
end | |
end | |
Faye::WebSocket.load_adapter('thin') | |
faye_server = Faye::RackAdapter.new(:mount => '/faye', :timeout => 45) | |
#faye_server.add_extension(ServerAuth.new) | |
client = Faye::Client.new(BacklinkHealth::FAYE_SERVER[:development]) # yes, even in production we want localhost here! | |
# couldn't get client authentication to work!! | |
#client.add_extension(ClientAuth.new) | |
app = ActionDispatch::Integration::Session.new(Rails.application) | |
subscribed_channels = [] | |
# when parameters (pagerank, trust flow, citation flow etc.) are processed on the website | |
Thread.new do | |
redis = Redis.new | |
redis.subscribe('website_processed') do |on| | |
on.message do |event, data| | |
data = eval(data) | |
for channel in subscribed_channels | |
next unless channel.starts_with?('/website/') | |
begin | |
website_id = channel.split('/')[2].to_i | |
if(data[:website_id] == website_id && Website.exists?(website_id)) | |
website = Website.find(website_id) | |
EM.run { | |
client.publish("/website/#{website_id}", data) | |
} | |
end | |
rescue | |
puts $!.message | |
puts $!.backtrace | |
end | |
end | |
end | |
end | |
end | |
Thread.new do | |
redis = Redis.new | |
redis.subscribe('backlinks_fetched') do |on| | |
on.message do |event, data| | |
data = eval(data) | |
for channel in subscribed_channels | |
next unless channel.starts_with?('/backlinks_fetched/') | |
begin | |
website_id = channel.split('/')[2].to_i | |
if(data[:website_id] == website_id && Website.exists?(website_id)) | |
website = Website.find(website_id) | |
num = website.backlinks.count | |
EM.run { | |
client.publish("/backlinks_fetched/#{website_id}", 'count' => NumberHelper.format(num), :description => (num == 1 ? "backlink" : "backlinks")) | |
} | |
end | |
rescue | |
puts $!.message | |
puts $!.backtrace | |
end | |
end | |
end | |
end | |
end | |
Thread.new do | |
redis = Redis.new | |
redis.subscribe('finished_processing') do |on| | |
on.message do |event, data| | |
data = eval(data) | |
#puts "Event #{event}, Data #{data}" | |
for channel in subscribed_channels | |
next unless channel.starts_with?('/charts_ready/') | |
begin | |
website_id = channel.split('/')[2].to_i | |
if(data[:website_id] == website_id && Website.exists?(website_id)) | |
website = Website.find(website_id) | |
app.get app.charts_website_path(website) | |
EM.run { | |
client.publish("/charts_ready/#{website_id}", 'charts_html' => app.response.body) | |
} | |
end | |
rescue | |
puts $!.message | |
puts $!.backtrace | |
end | |
end | |
end | |
end | |
end | |
# when backlinks are being processed (pagerank, link counts etc.) an redis event is being triggered | |
# we subscribe to these events and push new data to the frontend - when received the table row is updated and highlight effect is used | |
Thread.new do | |
redis = Redis.new | |
redis.subscribe('backlink_processed') do |on| | |
on.message do |event, data| | |
data = eval(data) | |
#puts "Event #{event}, Data #{data}" | |
for channel in subscribed_channels | |
next unless channel.starts_with?('/backlink_rows/') | |
begin | |
website_id = channel.split('/')[2].to_i | |
page = channel.split('/')[3].to_i | |
if(data[:website_id] == website_id && Website.exists?(website_id)) | |
website = Website.find(website_id) | |
all_backlinks = website.backlinks | |
backlinks = all_backlinks.page(page).per(BacklinksController::RESULTS_PER_PAGE).select('id').map { |b| b.id } | |
#puts "website #{website.id}" | |
#p backlinks | |
if(backlinks.include?(data[:backlink_id])) | |
app.get app.table_rows_website_backlinks_path(website, :backlink_id => data[:backlink_id]) | |
EM.run { | |
#puts "starting to publish #{data[:backlink_id]} to /backlink_rows/#{website_id}/#{page}" | |
client.publish("/backlink_rows/#{website_id}/#{page}", 'table_row' => app.response.body, 'backlink_id' => data[:backlink_id]) | |
#puts "published backlink row for website #{website_id} - backlink id #{data[:backlink_id]}" | |
} | |
end | |
# for the "charts are not yet accurate because backlinks are still processing ( 267 / 1000 )" notice... | |
EM.run { | |
client.publish("/backlink_count/#{website_id}", :count => all_backlinks.count - all_backlinks.fresh.count) | |
} | |
end | |
rescue | |
puts $!.message | |
puts $!.backtrace | |
end | |
end | |
end | |
end | |
end | |
Thread.new do | |
redis = Redis.new | |
redis.subscribe('domain_processed') do |on| | |
on.message do |event, data| | |
data = eval(data) | |
#puts "Event #{event}, Data #{data}" | |
for channel in subscribed_channels | |
next unless channel.starts_with?('/domain_rows/') | |
begin | |
website_id = channel.split('/')[2].to_i | |
page = channel.split('/')[3].to_i | |
if(data[:website_id] == website_id && Website.exists?(website_id)) | |
website = Website.find(website_id) | |
domains = website.domains.page(page).per(BacklinksController::RESULTS_PER_PAGE).select('id').map { |d| d.id } | |
#puts "website #{website.id}" | |
#p backlinks | |
if(domains.include?(data[:domain_id])) | |
app.get app.table_rows_website_domains_path(website, :domain_id => data[:domain_id]) | |
EM.run { | |
#puts "starting to publish #{data[:backlink_id]} to /backlink_rows/#{website_id}/#{page}" | |
client.publish("/domain_rows/#{website_id}/#{page}", 'table_row' => app.response.body, 'domain_id' => data[:domain_id]) | |
#puts "published backlink row for website #{website_id} - backlink id #{data[:backlink_id]}" | |
} | |
end | |
end | |
rescue | |
puts $!.message | |
puts $!.backtrace | |
end | |
end | |
end | |
end | |
end | |
# we monitor the database for new backlinks (for domains that are being processed) and send new pagination and table rows to frontend | |
Thread.new do | |
app = ActionDispatch::Integration::Session.new(Rails.application) | |
loop do | |
for channel in subscribed_channels | |
begin | |
website_id = channel.split('/')[2] | |
next unless Website.exists?(website_id) | |
page = channel.split('/')[3] | |
website = Website.find(website_id) | |
next unless website.processing | |
# backlinks | |
if channel.starts_with?('/backlinks/') | |
app.get app.pagination_website_backlinks_path(website, :page => page) | |
pagination = app.response.body | |
app.get app.table_rows_website_backlinks_path(website, :page => page) | |
table_rows = app.response.body | |
count = website.backlinks.count | |
unless table_rows.empty? && pagination.strip.empty? | |
EM.run { | |
client.publish(channel, 'pagination' => pagination, 'table_rows' => table_rows, 'backlinks_count' => count) | |
} | |
end | |
end | |
# domains | |
if channel.starts_with?('/domains/') | |
app.get app.pagination_website_domains_path(website, :page => page) | |
pagination = app.response.body | |
app.get app.table_rows_website_domains_path(website, :page => page) | |
table_rows = app.response.body | |
count = website.domains.count | |
unless table_rows.empty? && pagination.strip.empty? | |
EM.run { | |
client.publish(channel, 'pagination' => pagination, 'table_rows' => table_rows, 'domains_count' => count) | |
} | |
end | |
end | |
rescue | |
puts $!.message | |
puts $!.backtrace | |
end | |
end | |
#p subscribed_channels | |
sleep 1 | |
end | |
end | |
# frontend subscribes to channels of type /backlinks/30/2 - this means they are interested for the page 2 of backlinks for website with id=30 | |
faye_server.on(:subscribe) do |client_id, channel| | |
puts "#{client_id} subscribed to #{channel}" | |
if channel =~ /^\/(backlinks|domains|backlink_rows|domain_rows|website|backlinks_fetched|charts_ready)/ | |
subscribed_channels << channel # two separate threads monitor this variable | |
end | |
end | |
faye_server.on(:unsubscribe) do |client_id, channel| | |
puts "#{client_id} unsubscribed #{channel}" | |
# delete the first occurence of the channel | |
subscribed_channels.delete_at(subscribed_channels.index(channel) || subscribed_channels.length) | |
end | |
run faye_server |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment