Skip to content

Instantly share code, notes, and snippets.

@jebu
Created August 5, 2010 17:34
Show Gist options
  • Save jebu/510070 to your computer and use it in GitHub Desktop.
Save jebu/510070 to your computer and use it in GitHub Desktop.
handle_call({lookup_popular_replies, TweetId, AuthorId}, _From, #state{client=Client} = State) ->
% reduce fun to get valid tweet keys in tweet response bucket phase 3
Fun = make_local_fun("fun(O, _) ->
lists:foldl(fun
([LinkedTweet = <<Id:128>>, <<Val:128>>, <<\"reply\">>], Acc) ->
[{{<<\"tweet_responses_bucket\">>, LinkedTweet}, {Id, Val}} | Acc];
({{<<\"tweet_responses_bucket\">>, _}, _} = PreviousPass, Acc) ->
[PreviousPass | Acc];
(_, Acc) ->
Acc
end, [], O)
end.", "lookup_popular_replies_r1"),
% map over replies to filter any reply which has further links phase 4
Fun1 = make_local_fun("fun
(_, KeyData = {_, AuthorId}, AuthorId) ->
[KeyData];
(Object, KeyData, _) ->
MD = riak_object:get_metadata(Object),
case dict:find(<<\"Links\">>, MD) of
{ok, _} -> [KeyData];
_ -> []
end
end.", "lookup_popular_replies_r2"),
% reduce fun to sort and filter not_found responses. phase 5
Fun2 = make_local_fun("fun(Replies,_) ->
lists:keysort(1,
lists:filter(fun
({not_found, _, _}) -> false;
(_) -> true
end, Replies))
end.", "lookup_popular_replies_r3"),
%
% we start with the Tweet and get all subkeys phase 1
% we extract all the tweets tagged reply across these buckets phase 2
% reduce this to get valid tweet keys in tweet response bucket phase 3
% map over these replies to filter any reply which has further links phase 4
% reduce this to sort and filter not_found responses. phase 5
{ok, L} = Client:mapred([{<<"tweet_responses_bucket">>, <<TweetId:128>>}],
[
{link, <<"tweet_responses_subkeys_bucket">>, <<"tweet_response">>, false},
{link, '_', <<"reply">>, false},
{reduce, {qfun, Fun}, none, false},
{map, {qfun, Fun1}, AuthorId, false},
{reduce, {qfun, Fun2}, none, true}
]),
{reply, L, State};
%
% this makes a function reference which can be passed onto a remote node for execution
% optionally the code can be made available in the code path for riak server
% the parsed form is cached in the process dictionary to avoid reparsing the code
%
make_local_fun(String, Name) ->
make_local_fun(String, Name, erl_eval:new_bindings()).
make_local_fun(String, Name, Bindings) ->
Hash = erlang:md5(String),
Form1 =
case get(Name) of
undefined ->
Form = parse_erl_string(String),
put(Name, Hash),
put(Hash, Form),
Form;
Hash ->
get(Hash);
OldHash ->
erase(OldHash),
Form = parse_erl_string(String),
put(Name, Hash),
put(Hash, Form),
Form
end,
{value, Fun, _} = erl_eval:expr(Form1, Bindings),
Fun.
%
parse_erl_string(String) ->
{ok, Tokens, _} = erl_scan:string(String),
{ok, [Form]} = erl_parse:parse_exprs(Tokens),
Form.
%
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment