Created
November 9, 2012 21:02
-
-
Save drbobbeaty/4048211 to your computer and use it in GitHub Desktop.
Optionally queueing Ruby CouchDB/CouchRest client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
require 'quantum_lead/application' | |
require 'singleton' | |
require 'logger' | |
require 'version' | |
require 'couchrest' | |
require 'java' | |
java_import 'java.util.concurrent.ConcurrentLinkedQueue' | |
class Database | |
include Singleton | |
# On large bulk store operations, we still need to keep some kind of | |
# limit on how many documents we'll send up to CouchDB at once. This | |
# will be that limit. | |
MAX_DOCS_PER_SEND = 2000 | |
# This will be sent to the unspoolers to tell them to stop what they | |
# are doing and shut down. | |
QUEUE_SHUTDOWN = '__queue_shutdown__' | |
def self.store(label, message = '') | |
return nil unless use_db? | |
doc = block_given? ? yield : {} | |
# make the tagging metadata for this save operation | |
metadata = { | |
:created => Time.now, | |
:label => label, | |
:message => message, | |
:environment => QuantumLead::Application.environment, | |
:division => QuantumLead::Application.division, | |
:execution_tag => QuantumLead::Application.execution_tag, | |
:jar_name => QuantumLead::Application.jar_name, | |
:pid => Process.pid, | |
:version => { :version => Version.version, | |
:branch => Version.branch } | |
} | |
if doc.is_a?(Array) | |
spool(doc.map { |elem| elem.to_hash.merge(:meta => metadata) }) | |
else | |
# make it a hash - a document, if it's not already | |
doc = { :data => doc } unless doc.is_a?(Hash) | |
spool(doc.to_hash.merge(:meta => metadata)) | |
end | |
end | |
def self.view(path, opts = {}) | |
return nil unless use_db? | |
retryable { database.view(path, opts) } | |
end | |
def self.get(id, opts = {}) | |
return nil unless use_db? | |
retryable { database.get(id, opts) } | |
end | |
def self.save_doc(doc, opts = {}) | |
return nil unless use_db? | |
meta = doc["meta"] || {} | |
payload = doc.merge("meta" => meta.merge("updated" => Time.now)) | |
retryable { database.save_doc(payload, opts) } | |
end | |
def self.use_db? | |
QuantumLead::Application.config.application.data_to == 'database' | |
end | |
def self.start(thread_count = 2) | |
thread_count.times do | |
workers << Thread.new { self.unspool() } | |
end | |
end | |
def self.flush | |
unless workers.empty? | |
# tell all the workers it's time to stop, and then wait until they are done | |
workers.each { |t| spool(QUEUE_SHUTDOWN) } | |
QuantumLead::Application.logger.info("Database.flush[#{Thread.current.object_id}]") { "waiting for queue to empty..." } | |
workers.each { |t| t.join unless t.nil? } | |
end | |
end | |
private | |
def self.queue | |
@queue ||= ConcurrentLinkedQueue.new | |
end | |
def self.workers | |
@workers ||= [] | |
end | |
def self.spool(doc) | |
return nil unless use_db? && !doc.nil? | |
workers.empty? ? shoot(doc) : queue.offer(doc) | |
end | |
def self.unspool | |
keep_going = true | |
QuantumLead::Application.logger.info("Database.unspool[#{Thread.current.object_id}]") { "starting unspooler" } | |
begin | |
doc = queue.poll | |
if doc.nil? | |
sleep(0.5) | |
elsif doc.is_a?(String) | |
# this is a control message to this processing code | |
keep_going = false if doc == QUEUE_SHUTDOWN | |
else | |
shoot(doc) | |
end | |
end while keep_going | |
QuantumLead::Application.logger.info("Database.unspool[#{Thread.current.object_id}]") { "shutting down unspooler" } | |
end | |
def self.shoot(doc) | |
return nil if doc.nil? | |
if doc.is_a?(Array) | |
# if it's an array - handle it as a bulk_save | |
doc.each_slice(MAX_DOCS_PER_SEND) do |bunch| | |
retryable { database.bulk_save(bunch) } | |
end | |
else | |
retryable { database.save_doc(doc) } | |
end | |
end | |
def self.database(config = QuantumLead::Application.config) | |
CouchRest.proxy(config.database.proxy_uri) if config.database.use_proxy? | |
Thread.current[:couch_db] ||= CouchRest.database(config.database.uri) | |
end | |
# We need to have a general retry processor for all the database calls, | |
# and this is it. We have a block, and this will retry it some number of | |
# times - logging problems, and giving up if it can't eventually get it | |
# right. This makes it a lot easier to deal with intermittant issues with | |
# the database. | |
def self.retryable(retries = 3) | |
return (block_given? ? yield : nil) | |
rescue Errno::EADDRNOTAVAIL => se | |
if retries > 0 | |
QuantumLead::Application.logger.warn('Database.retryable') { se } | |
QuantumLead::Application.logger.warn('Database.retryable') { "problem sending data to CouchDB... retrying..." } | |
sleep(0.5) | |
retries -= 1 | |
retry | |
else | |
QuantumLead::Application.logger.error('Database.retryable') { se } | |
QuantumLead::Application.logger.error('Database.retryable') { "problem sending data to CouchDB... giving up..." } | |
raise | |
end | |
rescue Exception, RestClient::RequestTimeout => e | |
if retries > 0 | |
QuantumLead::Application.logger.warn('Database.retryable') { e } | |
QuantumLead::Application.logger.warn('Database.retryable') { "problem dealing with CouchDB... retrying..." } | |
retries -= 1 | |
retry | |
else | |
QuantumLead::Application.logger.error('Database.retryable') { e } | |
QuantumLead::Application.logger.error('Database.retryable') { "problem dealing with CouchDB... giving up..." } | |
raise | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment