Skip to content

Instantly share code, notes, and snippets.

@gglanzani
Created January 13, 2015 11:16
Show Gist options
  • Save gglanzani/de14420a609dfbea04bd to your computer and use it in GitHub Desktop.
Save gglanzani/de14420a609dfbea04bd to your computer and use it in GitHub Desktop.
riak-python-client#396
riakc_pb_socket:mapred( Client, [
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-1">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-3">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-4">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-5">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-6">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-7">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-8">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-9">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-10">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-11">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-12">>},
{<<"SCF">>,<<"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-2">>}
],
[{map, {modfun, top_hubs, filterCountriesMapper}, [<<"IT">>], true}
]).
% this function returns the expected results, both with lists like [<<"IT">>], or when passing `none`.
---------------------------------------------------------------------------
RiakError Traceback (most recent call last)
<ipython-input-13-f971b67bd19e> in <module>()
5 #mr.reduce(['top_hubs', 'sortByHub'], options={"keep": False, "args": None})
6 #mr.reduce(['top_hubs', 'aggregateHubReducer'], options={"keep": True, "args": None})
----> 7 for result in mr.run():
8 print "%s" % result
/Users/gio/.virtualenvs/numpy/lib/python2.7/site-packages/riak/mapreduce.pyc in run(self, timeout)
317 'when not allowed\n'
318 'original error: ' + e.value)
--> 319 raise e
320
321 # If the last phase is NOT a link phase, then return the result.
RiakError: '{"phase":0,"error":"badarg","input":"{ok,{r_object,<<\\"SCF\\">>,<<\\"satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-5\\">>,[{r_content,{dict,4,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<\\"content-type\\">>,97,112,112,108,105,99,97,116,105,111,110,47,106,115,111,110],[<<\\"X-Riak-VTag\\">>,54,83,98,68,110,83,53,109,55,70,89,97,98,76,107,97,98,66,55,97,105,111]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1421,68851,210161}]],[],[]}}},<<\\"[{\\\\\\"rati...\\">>}],...},...}","type":"error","stack":"[{lists,member,[null,null],[]},{top_hubs,\'-filterCountries/2-lc$^0/1-0-\',2,[{file,\\"top_hubs.erl\\"},{line,17}]},{top_hubs,filterCountriesMapper,3,[{file,\\"top_hubs.erl\\"},{line,40}]},{riak_kv_mrc_map,map,3,[{file,\\"src/riak_kv_mrc_map.erl\\"},{line,168}]},{riak_kv_mrc_map,process,3,[{file,\\"src/riak_kv_mrc_map.erl\\"},{line,144}]},{riak_pipe_vnode_worker,process_input,3,[{file,\\"src/riak_pipe_vnode_worker.erl\\"},{line,446}]},{riak_pipe_vnode_worker,wait_for_input,2,[{file,\\"src/riak_pipe_vnode_worker.er...\\"},...]},...]"}'
mr = RiakMapReduce(riak)
keys = ["satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-%d" % d for d in xrange(1, 13)]
mr.add("SCF", keys)
mr.map(['top_hubs', 'filterCountriesMapper'], options={"keep": True, "args": '[<<"IT">>]'})
for result in mr.run():
print "%s" % result
mr = RiakMapReduce(riak)
keys = ["satellite_00BCD806247A70C80E9780BBC274C42CB06F3047E03CABC0652F72FB0A2C55A0_False_2013-%d" % d for d in xrange(1, 13)]
mr.add("SCF", keys)
## trying with no argument, it doesn't fail in Erlang, but it fails similarly to python-first-fail
mr.map(['top_hubs', 'filterCountriesMapper'], options={"keep": True})
for result in mr.run():
print "%s" % result
-module(top_hubs).
-export([filterCountriesMapper/3
]).
filterCountries(Sats, Countries) ->
[I || {struct, I} <- Sats, (Countries == none) orelse % orelse because if Countries is none, the next line is invalid
lists:member(proplists:get_value(<<"country_of_residence">>, I, none), Countries)]
filterCountriesMapper(O, _KeyData, Countries) ->
Sats = mochijson2:decode(binary_to_list(riak_object:get_value(O))),
BinaryKey = riak_object:key(O),
[_, Hub, _, _] = string:tokens(binary_to_list(BinaryKey), "_"),
[{Hub, filterCountries(Sats, Countries)}]
.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment