Skip to content

Instantly share code, notes, and snippets.

@dallasmarlow
Created December 19, 2012 20:25
Show Gist options
  • Save dallasmarlow/4340152 to your computer and use it in GitHub Desktop.
Save dallasmarlow/4340152 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
%w[logger thread json collins_client highline/import file-tail zk].each {|l| require l}
# simple daemon to tail the act_post giant octopus log and create znodes for blogs and posts being created
class ZookeeperActPost
attr_reader :options, :collins, :log, :zk
def initialize options = {}
@options = {
log: STDOUT,
act_post_log: '/var/log/scribe/act_post/act_post_current',
act_post_schema: [:time, :act_id, :source, :identifier, :geo, :language, :post_id, :root_post_id, :post_type, :blog],
tail_interval: 10,
collins: {
host: 'https://collins.ewr01.tumblr.net',
timeout: 300,
strict: true,
},
zookeeper: {
pool: :testing,
port: 2181,
chroot: '/tumblelogs',
},
}.merge options
@collins = Collins::Client.new @options[:collins].merge CollinsAuthenticator.authenticate
@log = Logger.new @options[:log]
@log.info 'setting up zookeeper client'
@zk = ZK::Client.new quorum.join(',')
end
module CollinsAuthenticator
def self.authenticate
puts 'collins authentication:'
username = ask('username: ') {|username| username.default = ENV['USER']}
password = ask('password: ') {|password| password.echo = false}
{username: username, password: password}
end
end
def quorum
unless @zookeeper_quorum
log.info "querying collins for zookeeper nodes in pool: #{options[:pool]}"
zookeeper_nodes = collins.find primary_role: :zookeeper,
pool: options[:zookeeper][:pool],
details: true,
size: 100
log.info "found #{zookeeper_nodes.size} nodes: #{zookeeper_nodes.collect(&:hostname).join(', ')}"
@zookeeper_quorum = zookeeper_nodes.collect {|node| [node.hostname, options[:zookeeper][:port]].join(':')}
end
@zookeeper_quorum
end
def process
log.info "opening act post log: #{options[:act_post_log]}"
File.open(options[:act_post_log]) do |post_log|
post_log.extend File::Tail
post_log.interval = options[:tail_interval]
log.info "tailing act post log: #{options[:act_post_log]}"
post_log.tail do |entry|
post = Hash[options[:act_post_schema].zip(entry.split("\t"))] rescue nil
if post and post[:blog] and not post[:blog].empty?
publish post
end
end
end
end
def publish post
# create blog znode if needed
unless zk.exists? znode post[:blog]
log.info "creating znode for blog: #{post[:blog]}}"
zk.create znode post[:blog]
end
# create the post znodes
if post[:post_id] and not post[:post_id].empty?
log.info "creating znode for blog: #{post[:blog]} post: #{post[:post_id]}"
zk.create znode(post[:blog], post[:post_id]),
post[:time],
ephemeral: true
end
rescue Exception => message
log.error "unable to create znode for post: " + post.to_json
log.error message
abort
end
private
def znode *path
File.join '/', *path
end
end
ZookeeperActPost.new.process
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment