require 'ruote/storage/base' module Ruote module ActiveRecord class Document < ::ActiveRecord::Base set_table_name :documents def self.before_fork ::ActiveRecord::Base.clear_all_connections! end def self.after_fork ::ActiveRecord::Base.establish_connection end end class Storage include Ruote::StorageBase def initialize end def put_msg(action, options) # put_msg is a unique action, no need for all the complexity of put do_insert(prepare_msg_doc(action, options), 1) nil end def put_schedule(flavour, owner_fei, s, msg) # put_schedule is a unique action, no need for all the complexity of put doc = prepare_schedule_doc(flavour, owner_fei, s, msg) return nil unless doc do_insert(doc, 1) doc['_id'] end def put(doc, opts={}) if doc['_rev'] d = get(doc['type'], doc['_id']) return true unless d return d if d['_rev'] != doc['_rev'] # failures end nrev = doc['_rev'].to_i + 1 begin do_insert(doc, nrev) rescue Exception => de return (get(doc['type'], doc['_id']) || true) # failure end Document.delete(:typ => doc['type'], :ide => doc['_id'], :rev.lt => nrev) # @sequel[@table].where( # :typ => doc['type'], :ide => doc['_id'] # ).filter { rev < nrev }.delete doc['_rev'] = nrev if opts[:update_rev] nil # success end def get(type, key) d = do_get(type, key) d ? Rufus::Json.decode(d[:doc]) : nil end def delete(doc) raise ArgumentError.new('no _rev for doc') unless doc['_rev'] count = do_delete(doc) return (get(doc['type'], doc['_id']) || true) if count < 1 # failure nil # success end def get_many(type, key=nil, opts={}) ds = Document.where(:typ => type) keys = key ? Array(key) : nil ds = ds.where(:wfid => keys) if keys && keys.first.is_a?(String) return ds.all.size if opts[:count] ds = ds.order( *(opts[:descending] ? [ :ide.desc, :rev.desc ] : [ :ide.asc, :rev.asc ]) ) ds = ds.limit(opts[:limit]).offset(opts[:skip]) docs = ds.all docs = select_last_revs(docs, opts[:descending]) docs = docs.collect { |d| Rufus::Json.decode(d[:doc]) } keys && keys.first.is_a?(Regexp) ? docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } : docs # (pass on the dataset.filter(:wfid => /regexp/) for now # since we have potentially multiple keys) end # Returns all the ids of the documents of a given type. # def ids(type) Document.where(:typ => type).select(:ide).order(:ide.asc).collect { |d| d[:ide] }.uniq end # Nukes all the documents in this storage. # def purge! Document.delete_all end # Returns a string representation the current content of the storage for # a given type. # def dump(type) "=== #{type} ===\n" + get_many(type).map { |h| " #{h['_id']} => #{h.inspect}" }.join("\n") end # Calls #disconnect on the db. According to Sequel's doc, it closes # all the idle connections in the pool (not the active ones). # def shutdown #Ruote::ActiveRecord::Document.before_fork end # Grrr... I should sort the mess between close and shutdown... # Tests vs production :-( # def close #Ruote::ActiveRecord::Document.before_fork end # Mainly used by ruote's test/unit/ut_17_storage.rb # def add_type(type) # does nothing, types are differentiated by the 'typ' column end # Nukes a db type and reputs it (losing all the documents that were in it). # def purge_type!(type) Document.delete(:typ => type) end # A provision made for workitems, allow to query them directly by # participant name. # def by_participant(type, participant_name, opts) raise NotImplementedError if type != 'workitems' docs = Document.where( :typ => type, :participant_name => participant_name ).limit(opts[:limit]).offset(opts[:offset] || opts[:skip]) docs = select_last_revs(docs) opts[:count] ? docs.size : docs.collect { |d| Ruote::Workitem.from_json(d[:doc]) } end # Querying workitems by field (warning, goes deep into the JSON structure) # def by_field(type, field, value, opts) raise NotImplementedError if type != 'workitems' lk = [ '%"', field, '":' ] lk.push(Rufus::Json.encode(value)) if value lk.push('%') docs = Document.where(:typ => type, :doc.matches => lk.join) docs = docs.limit(opts[:limit]).offset(opts[:skip] || opts[:offset]) docs = select_last_revs(docs) opts[:count] ? docs.size : docs.map { |d| Ruote::Workitem.from_json(d[:doc]) } end def query_workitems(criteria) ds = Document.where(:typ => 'workitems') count = criteria.delete('count') limit = criteria.delete('limit') offset = criteria.delete('offset') || criteria.delete('skip') ds = ds.limit(limit).offset(offset) wfid = criteria.delete('wfid') pname = criteria.delete('participant_name') || criteria.delete('participant') ds = ds.where(:ide.matches => "%!#{wfid}") if wfid ds = ds.where(:participant_name => pname) if pname criteria.collect do |k, v| ds = ds.where(:doc.matches => "%\"#{k}\":#{Rufus::Json.encode(v)}%") end ds = select_last_revs(ds.all) count ? ds.size : ds.collect { |d| Ruote::Workitem.new(Rufus::Json.decode(d[:doc])) } end protected def do_delete(doc) Document.delete( :ide => doc['_id'], :typ => doc['type'], :rev => doc['_rev'].to_i ) end def do_insert(doc, rev) Document.create!( :ide => doc['_id'], :rev => rev, :typ => doc['type'], :doc => Rufus::Json.encode(doc.merge( '_rev' => rev, 'put_at' => Ruote.now_to_utc_s)), :wfid => extract_wfid(doc), :participant_name => doc['participant_name'] ) end def extract_wfid(doc) doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil) end def do_get(type, key) Document.where( :typ => type, :ide => key ).order(:rev.desc).first end # Don't put configuration if it's already in # # (avoid storages from trashing configuration...) # def put_configuration return if get('configurations', 'engine') conf = { '_id' => 'engine', 'type' => 'configurations' }.merge(@options) put(conf) end def select_last_revs(docs, reverse=false) docs = docs.inject({}) { |h, doc| h[doc[:ide]] = doc h }.values.sort_by { |h| h[:ide] } reverse ? docs.reverse : docs end end end end