Skip to content

Instantly share code, notes, and snippets.

@thecarlhall
Last active December 12, 2015 09:28
Show Gist options
  • Save thecarlhall/4751334 to your computer and use it in GitHub Desktop.
Save thecarlhall/4751334 to your computer and use it in GitHub Desktop.
Ad-hoc Erlang Map/Reduce in Riak using Ruby
module Riak
class MapReduce
def self.bucket_name
'super_awesome_bucket'
end
def self.get_keys
# TODO get the keys to process by querying an index or listing all keys in a bucket
[]
end
def self.find_keys_by_region(regions = [], groups_of = 10000, batch_limit = 100)
warn "WARNING: This operation should only be used for exploratory purposes."
regions = [*regions]
map_lang = 'erlang'
map_function = %q{
fun(Obj, _KeyData, _Arg) ->
Struct = mochijson2:decode(riak_object:get_value(Obj)),
{struct, JsonData} = Struct,
IsInDesiredRegion = fun(RegionToMatch) ->
case proplists:get_value(<<"region_id">>, JsonData) of
RegionToMatch -> true;
_ -> false
end
end,
case lists:any(IsInDesiredRegion, _Arg) of
%true -> [{riak_object:key(Obj), JsonData}];
true -> [riak_object:key(Obj)];
_ -> []
end
end.
}
reduce_lang = 'erlang'
reduce_function = %q{
fun(Values, _Arg) ->
lists:sublist(Values, _Arg)
end.
}
client = Ripple.client
bucket_name = self.bucket_name
doc_ids = []
all_keys = self.get_keys
puts "Found: #{all_keys.count} keys."
all_keys.in_groups_of(groups_of).each do |keys|
job = Riak::MapReduce.new(client).
map(map_function, language: map_lang, arg: regions, keep: false).
reduce(reduce_function, language: reduce_lang, arg: batch_limit, keep: true)
keys.each { |key| job.add(bucket_name, key) }
counter = 0
job.run do |key, doc_id|
counter += 1
if block_given?
yield(doc_id[0])
else
doc_ids << doc_id[0]
end
end
puts "Batch contained #{counter} matches."
end
doc_ids unless block_given?
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment