Created
April 25, 2011 22:08
-
-
Save grantr/941359 to your computer and use it in GitHub Desktop.
ActiveModel module for elasticsearch indexing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# class Person | |
# include SearchableModel | |
# include SearchableModel::SearchMethods | |
# | |
# ... | |
# | |
# end | |
module SearchableModel | |
extend ActiveSupport::Concern | |
included do | |
class_attribute :search_index, :search_type, :indexing_disabled | |
self.search_index = model_name.collection | |
self.search_type = model_name.element | |
self.indexing_disabled = false | |
after_save :add_to_index | |
after_destroy :delete_from_index | |
include IndexConfiguration | |
extend IndexCreation | |
end | |
module ClassMethods | |
def disable_indexing! | |
self.indexing_disabled = true | |
end | |
def enable_indexing! | |
self.indexing_disabled = false | |
end | |
def search_client | |
@search_client ||= ElasticSearch.new(ES_CONFIG["servers"], ES_CONFIG["options"].merge(index: search_index, type: search_type)) | |
end | |
def refresh_index! | |
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'refresh') do | |
search_client.refresh | |
end | |
end | |
def delete_all | |
super | |
begin | |
search_client.delete_index(search_index) | |
rescue ElasticSearch::RequestError => e | |
raise e unless e.message =~ /missing/ | |
end | |
end | |
end | |
def search_client | |
self.class.search_client | |
end | |
def add_to_index | |
return false if indexing_disabled | |
document = respond_to?(:to_search) ? to_search : default_to_search | |
ActiveSupport::Notifications.instrument('index.elastic_search', :id => key.to_s, :document => document) do | |
search_client.index(document, :id => id, :type => search_type) | |
end | |
end | |
def delete_from_index | |
return false if indexing_disabled | |
ActiveSupport::Notifications.instrument('delete.elastic_search', :id => key.to_s) do | |
search_client.delete(key.to_s) | |
end | |
end | |
def default_to_search | |
as_json | |
end | |
# extend FinderMethods to get activerecord style find, count, paginate, etc. | |
# depends on SearchMethods | |
module FinderMethods | |
def find(*args) | |
options = args.extract_options! | |
#TODO should do activerecord style logging/benchmark | |
case args.first | |
when :first | |
find_initial(options) | |
when :last | |
find_last(options) | |
when :all | |
find_every(options) | |
else | |
find_with_ids(args, options) | |
end | |
end | |
def find_every(options) | |
search(options) | |
end | |
def count(options={}) | |
count_every(options) | |
end | |
def paginate(*args) | |
paginate_every(*args) | |
end | |
def paginated_each(options = {}, &block) | |
#if options[:order] | |
options = { :page => 1, limit: 100 }.merge options | |
options[:page] = options[:page].to_i | |
total = 0 | |
begin | |
collection = paginate(options) | |
total += collection.each { |item| yield item }.size | |
options[:page] += 1 | |
end until collection.size < collection.per_page | |
total | |
#else | |
# scan_every(options, &block) | |
#end | |
end | |
end | |
module Paginatable | |
# will_paginate methods | |
attr_reader :current_page, :per_page, :total_entries | |
def total_pages | |
(total_entries / per_page.to_f).ceil | |
end | |
def offset | |
(current_page - 1) * per_page | |
end | |
alias_method :offset_value, :offset | |
def previous_page | |
current_page > 1 ? (current_page - 1) : nil | |
end | |
def next_page | |
current_page < total_pages ? (current_page + 1) : nil | |
end | |
# kaminari methods | |
alias_method :num_pages, :total_pages | |
alias_method :limit_value, :per_page | |
alias_method :total_count, :total_entries | |
def paginated(current_page, per_page, total_entries) | |
@current_page = current_page.to_i | |
@per_page = per_page.to_i | |
@total_entries = total_entries.to_i | |
self | |
end | |
#TODO implement will_paginate methods if needed (but probably not) | |
end | |
# extend SearchMethods to get count_every, paginate_every, and search | |
module SearchMethods | |
def count_every(options={}) | |
options = options.with_indifferent_access | |
query = compose_query(options) | |
Rails.logger.debug("query: #{query.inspect}") | |
Rails.logger.debug("options: #{options.inspect}") | |
ActiveSupport::Notifications.instrument('count.elastic_search', :query => query) do | |
begin | |
search_client.count(query, options) | |
rescue ElasticSearch::RequestError => e | |
if e.message =~ /IndexMissingException/ | |
0 | |
else | |
raise e | |
end | |
end | |
end | |
end | |
def paginate_every(*args) | |
options = args.extract_options! | |
hits = get_hits_from_index(options) | |
#TODO either this should handle a non-paginated hits, or get_hits_from_index should not return [] when index is missing | |
collection = options[:ids_only] ? hits.to_a : find_with_ids([hits.to_a]) | |
# if collection is frozen it can't be extended | |
collection = collection.dup if collection.frozen? | |
collection.extend(Paginatable) | |
collection.paginated(hits.current_page, hits.per_page, hits.total_entries) if hits.respond_to?(:current_page) | |
collection | |
end | |
# scan through results like paginated_each, but use scrolling | |
#TODO use scan when 0.16.0 is released: | |
#options = {:scroll => '5m', :limit => 100, :search_type => 'scan'}.merge options | |
def scan_every(options={}) | |
options = {:scroll => '5m', :limit => 100}.merge options | |
total = 0 | |
# get first batch | |
hits = get_hits_from_index(options) | |
objects = find_with_ids([hits.to_a]) | |
total += objects.each { |item| yield item }.size | |
# scroll through the rest | |
search_client.scroll(hits.scroll_id, :scroll => options[:scroll], :ids_only => true) do |hits| | |
objects = find_with_ids([hits.to_a]) | |
total += objects.each { |item| yield item }.size | |
end | |
total | |
end | |
## :query => lucene query string or es query hash (if hash, then conditions and with are ignored) | |
## :conditions => hash of attributes to search on (target_id, base_version_id, etc | |
# :with => filters (no analysis) | |
# :without => exclusion filters (no analysis) | |
## :limit, :offset | |
## :order => this should be an array of sort strings (field, field:reverse, field asc, field desc) | |
# | |
# conditions/with hash should support single value, array, or range | |
def search(options={}) | |
hits = get_hits_from_index(options) | |
#Rails.logger.debug("hits: #{hits.inspect}") | |
# return find_with_ids on result | |
options[:ids_only] ? hits.to_a : find_with_ids([hits.to_a]) | |
end | |
# Lucene special characters: | |
# + - && || ! ( ) { } [ ] ^ " ~ * ? : \ | |
LUCENE_ESCAPE_REGEX = | |
/(\+|-|&&|\|\||!|\(|\)|{|}|\[|\]|`|"|~|\?|:|\\)/ | |
def lucene_escape(query) | |
query.gsub(LUCENE_ESCAPE_REGEX, "\\\\\\1") | |
end | |
private | |
def get_hits_from_index(options={}) | |
options = options.with_indifferent_access | |
query = { :query => compose_query(options) } | |
if options[:order] && !query[:sort] | |
query[:sort] = compose_sorts(options[:order]) | |
end | |
options[:from] = options[:offset] if options[:offset] | |
options[:size] = options[:limit] if options[:limit] | |
Rails.logger.debug("query: #{query.inspect}") | |
Rails.logger.debug("options: #{options.inspect}") | |
# do search, get hits | |
#TODO could also change search_type to query_and_fetch and remove dupes locally (not a big difference in wire size, since only ids are returned) | |
ActiveSupport::Notifications.instrument('search.elastic_search', :query => query) do | |
begin | |
search_client.search(query, options.merge(:ids_only => true)) | |
rescue ElasticSearch::RequestError => e | |
if e.message =~ /IndexMissingException/ | |
[] #TODO this causes paginate to fail if the index is missing | |
else | |
raise e | |
end | |
end | |
end | |
end | |
def compose_query(options) | |
query = options.delete(:query) | |
case query | |
when String, nil | |
query_string = query #TODO should accept hash query as well if there is no :query key | |
# if we have :with or :without, then use a filtered query on the outside | |
# if we have one :conditions, then use a term query on the inside (range should be handled by range query, array of terms by a bool MUST query) | |
# if we have multiple :conditions, then use a bool query on the inside | |
# if we have a query string and :conditions, use a bool query around query string and conditions | |
# if we have only a query string, then just use that as the query | |
#TODO inconsistent results when using with for certain queries | |
#TODO add a lucene escape function | |
with_filters = compose_filters(options[:with]) unless options[:with].blank? | |
without_filters = compose_filters(options[:without]) unless options[:without].blank? | |
conditions = compose_conditions(options[:conditions]) unless options[:conditions].blank? | |
prefix = compose_prefix(options[:prefix]) unless options[:prefix].blank? | |
inner_query = nil | |
inner_filter = nil | |
if conditions || query_string || prefix | |
inner_query = { :bool => { :must => [] }} | |
inner_query[:bool][:must] << { :bool => { :must => conditions }} if conditions | |
inner_query[:bool][:must] << { :query_string => { :query => query_string }} unless query_string.blank? | |
inner_query[:bool][:must] << { :bool => { :must => prefix }} if prefix | |
end | |
if with_filters || without_filters | |
inner_filter = { :bool => {}} | |
inner_filter[:bool][:must] = with_filters if with_filters | |
inner_filter[:bool][:must_not] = without_filters if without_filters | |
end | |
if inner_query && inner_filter | |
# if we have both query and filters, wrap them in an and filter | |
outer_filter = { :and => [] } | |
outer_filter[:and] << { :query => inner_query } | |
outer_filter[:and] << inner_filter | |
elsif inner_query | |
# if there is just a query, then just use a query_filter | |
outer_filter = { :query => inner_query } | |
elsif inner_filter | |
# if there is only a filter, then use that directly | |
outer_filter = inner_filter | |
else | |
outer_filter = { :match_all => {}} | |
end | |
# outer query is always constant_score | |
outer_query = { :constant_score => { :filter => outer_filter} } | |
query = outer_query | |
when Hash #TODO this should only happen if :query option is specified | |
# query hash (ES query dsl) | |
end | |
query | |
end | |
def normalize_value(value) | |
case value | |
when true | |
"T" | |
when false | |
"F" | |
when Time | |
value.iso8601 | |
else | |
value.to_s | |
end | |
end | |
# with: {field: nil} is all records missing field | |
# with: {field: :empty} is all records where field is the empty string | |
# with: {field: :blank} is all records where field is missing or empty | |
# with: {field: :nonblank} is all records where field is not missing or empty | |
def compose_filters(withs) | |
withs.collect do |field, value| | |
case value | |
when Array | |
{:terms => { field => value.collect { |v| normalize_value(v) } }} | |
when Range | |
{ :range => { field => {:from => normalize_value(value.begin), :to => normalize_value(value.end) }}} | |
when Hash | |
{ field => value } | |
when nil | |
{ :missing => { :field => field }} | |
when :blank | |
blank_filter(field) | |
when :nonblank | |
{ :not => { :filter => blank_filter(field) }} | |
when :empty | |
{:term => { field => "" }} | |
else | |
{:term => { field => normalize_value(value) }} | |
end | |
end | |
end | |
def blank_filter(field) | |
{ :or => [ | |
{ :missing => { :field => field }}, | |
{ :term => { field => "" }}] | |
} | |
end | |
def compose_conditions(conditions) | |
conditions.collect do |field, value| | |
case value | |
when Array | |
{ :dis_max => { :queries => value.collect { |t| { :term => { field => normalize_value(t) }} } }} | |
when Range | |
{ :range => { field => {:from => normalize_value(value.begin), :to => normalize_value(value.end) }}} | |
when Hash | |
{ field => value } | |
else | |
{ :field => { field => lucene_escape(normalize_value(value)) }} | |
end | |
end | |
end | |
def compose_prefix(prefixes) | |
prefixes.collect do |field, value| | |
{ :prefix => { field => value } } | |
end | |
end | |
def compose_sorts(orders) | |
sorts = [] | |
Array(orders).each do |order| | |
case order | |
when Hash | |
sorts << order | |
when /^(.+) (desc|asc)$/i | |
sorts << { $1 => $2.downcase } | |
when /^(.+):reverse$/i | |
sorts << { $1 => { :reverse => true }} | |
else | |
sorts << order | |
end | |
end | |
sorts | |
end | |
end | |
module IndexConfiguration | |
extend ActiveSupport::Concern | |
included do | |
class_attribute :index_creation_options | |
class_attribute :index_mapping | |
self.index_creation_options = {} | |
self.index_mapping = {} | |
end | |
end | |
module IndexCreation | |
def create_index!(index_name=search_index) | |
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'create index', :description => index_name) do | |
search_client.create_index(index_name, index_creation_options) | |
end | |
#update mapping if necessary | |
unless index_mapping.empty? | |
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'update mapping', :description => index_name) do | |
search_client.update_mapping(index_mapping, :index => index_name) | |
end | |
end | |
true | |
end | |
def create_pending_index!(index_name=generate_pending_index_name) | |
raise "An index is already pending. Delete or unalias that index first." if current_pending_index | |
create_index!(index_name) | |
# alias to search_index-pending | |
alias_actions = {:add => {index_name => pending_index_alias}} | |
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'alias index', :description => alias_actions.inspect) do | |
search_client.alias_index(alias_actions) | |
end | |
end | |
def deploy_index!(index_to_deploy=current_pending_index) | |
raise "No index to deploy. Create a pending index first." unless index_to_deploy | |
alias_actions = { :add => {index_to_deploy => search_index}, :remove => {index_to_deploy => pending_index_alias}} | |
if deployed_index = current_deployed_index | |
alias_actions[:remove][deployed_index] = search_index | |
end | |
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'alias index', :description => alias_actions.inspect) do | |
search_client.alias_index(alias_actions) | |
end | |
end | |
def delete_stale_indices! | |
deployed_index = current_deployed_index | |
raise "No index is deployed. Deploy an index first." unless deployed_index | |
# find all indices that start with search_index- | |
all_indices = search_client.index_status(:all)["indices"].keys.select { |i| i =~ /^#{search_index}-/ } | |
# remove the currently deployed index from the list | |
stale_indices = all_indices - [deployed_index] | |
stale_indices.each do |index| | |
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'delete index', :description => index) do | |
search_client.delete_index(index) | |
end | |
end | |
end | |
# Takes a block with parameters batch and error | |
# If error is not nil, there was an exception indexing | |
# There is no way to tell which object in the batch caused the error. | |
# reindex_all! do |batch, err| | |
# if err | |
# puts "error indexing batch #{err} #{err.message} #{batch.collect(&:id).inspect}" | |
# else | |
# puts "indexed #{batch.collect(&:id).inspect}" | |
# end | |
# end | |
# TODO for 0.16.0 update refresh and merge settings to increase indexing throughput, change back after | |
def reindex_all!(options={}, &block) | |
options = options.dup | |
index = options.delete(:index) | |
index ||= options.delete(:pending) ? current_pending_index : search_index | |
batch_reindex = ->(batch) do | |
begin | |
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'bulk index', :description => "(#{batch.size})") do | |
search_client.bulk do |c| | |
batch.each do |object| | |
document = object.respond_to?(:to_search) ? object.to_search : object.default_to_search | |
c.index(document, :id => object.id, :index => index, :type => object.search_type) | |
end | |
end | |
end | |
yield(batch, nil) if block_given? | |
rescue => e | |
Rails.logger.warn("Error reindexing batch: #{e} #{e.message}") | |
yield(batch, e) if block_given? | |
end | |
end | |
# can pass a :batch option with an array of objects to index | |
if options[:batch] | |
batch_reindex.call(options[:batch]) | |
else | |
find_in_batches(options) do |batch| | |
batch_reindex.call(batch) | |
end | |
end | |
end | |
def current_pending_index | |
pending_index, _ = search_client.index_status(:all)["indices"].detect { |name, status| status["aliases"].include?(pending_index_alias) } | |
pending_index | |
end | |
def current_deployed_index | |
deployed_index, _ = search_client.index_status(:all)["indices"].detect { |name, status| status["aliases"].include?(search_index) } | |
deployed_index | |
end | |
private | |
def generate_pending_index_name | |
# index name with time suffix | |
"#{search_index}-#{Time.current.utc.strftime("%Y%m%d%H%M%S")}" | |
end | |
def pending_index_alias | |
"#{search_index}-pending" | |
end | |
end | |
class LogSubscriber < ActiveSupport::LogSubscriber | |
def index(event) | |
name = 'ElasticSearch index (%.1fms)' % event.duration | |
record = "#{event.payload[:id]} #{event.payload[:document].inspect}" | |
debug " #{name} #{record}" | |
end | |
def delete(event) | |
name = 'ElasticSearch delete (%.1fms)' % event.duration | |
debug " #{name} #{event.payload[:id]}" | |
end | |
def search(event) | |
name = 'ElasticSearch search (%.1fms)' % event.duration | |
debug " #{name} #{event.payload[:query].inspect}" | |
end | |
def count(event) | |
name = 'ElasticSearch count (%.1fms)' % event.duration | |
debug " #{name} #{event.payload[:query].inspect}" | |
end | |
def operation(event) | |
name = 'ElasticSearch %s (%.1fms)' % [event.payload[:name], event.duration] | |
debug " #{name} #{event.payload[:description]}" | |
end | |
end | |
end | |
#TODO this is the wrong place to call this - causes issues with reload in dev mode. | |
SearchableModel::LogSubscriber.attach_to :elastic_search |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment