Created
August 13, 2012 12:28
-
-
Save gotar/3340194 to your computer and use it in GitHub Desktop.
ParallelSearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module API | |
class ParallelSearch | |
API_FETCHING_TIMEOUT = 1 # seconds | |
def initialize(config, search_apis) | |
@config = config | |
@search_apis = search_apis | |
end | |
# Returns results hash of {#ExternalSearchApi => [item, ...], ...}. | |
def call(keyword, limit) | |
search_all_in_parallel(keyword, limit).get_at_least_one_non_empty | |
end | |
private | |
# Runs all api searches in parallel via Array#pmap. Returns ResultsSet | |
# instance. | |
def search_all_in_parallel(keyword, limit) | |
ResultsSet.new(@search_apis.pmap {|api| [api, search_api(api, keyword, limit)]}.to_h) | |
end | |
# Returns Result instance. Notifies airbrake in case of errors. | |
def search_api(api, keyword, limit) | |
ReadyResult.new(find_with_cache(api, keyword, limit)) | |
rescue Cache::TimedOut => e | |
FutureResult.new(e.future) | |
rescue => e | |
Airbrake.notify(e) | |
ReadyResult.new([]) | |
end | |
# Returns an array of items or raises Cache::TimedOut exception. | |
def find_with_cache(api, keyword, limit) | |
cache(api, keyword, limit) do | |
api.find(keyword, limit) | |
end | |
end | |
def cache(api, keyword, limit, &block) | |
Cache.call(unique_api_search_key(api, keyword, limit), | |
:timeout => API_FETCHING_TIMEOUT, | |
&block) | |
end | |
def unique_api_search_key(api, keyword, limit) | |
["items_query", api.find_key(keyword, limit)].join(':') | |
end | |
class Result | |
# Only access #value if #ready? is true. | |
attr_reader :value | |
def ready? | |
true | |
end | |
end | |
class FutureResult < Result | |
# Initialize with an instance of Celluloid::Future that will return | |
# a Cache::Entry instance. | |
# Notifies airbrake in case of errors. | |
def initialize(future) | |
Celluloid::Future.new do | |
begin | |
@value = future.value.value | |
rescue => e | |
Airbrake.notify(e) | |
@value = [] | |
end | |
end | |
end | |
def ready? | |
[email protected]? | |
end | |
end | |
class ReadyResult < Result | |
# Initialize with an Array. | |
def initialize(value) | |
@value = value | |
end | |
end | |
class ResultsSet | |
def initialize(apis_with_results) | |
@apis_with_results = apis_with_results | |
end | |
# Returns Hash of {#ExternalSearchApi => [item, ...], ...} of the | |
# APIs that finished processing. Tries hard to return at least one | |
# element. Will return empty hash only when all results from the input | |
# finished and returned nothing. | |
def get_at_least_one_non_empty | |
while !at_least_one_non_empty? && !all_finished? | |
sleep(0.05) | |
end | |
results2values(ready_non_empty) | |
end | |
private | |
def at_least_one_non_empty? | |
!ready_non_empty.empty? | |
end | |
def all_finished? | |
@apis_with_results.all? {|api, result| result.ready? } | |
end | |
def ready_non_empty | |
@apis_with_results.select {|api, result| result.ready? && !result.value.empty?} | |
end | |
# Maps {#ExternalSearchApi => Result} hash to {#ExternalSearchApi => [item, ...]} hash. | |
def results2values(h) | |
h.map {|api, result| [api, result.value]}.to_h | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment