Skip to content

Instantly share code, notes, and snippets.

@kellymclaughlin
Created June 20, 2011 15:31
Show Gist options
  • Save kellymclaughlin/1035827 to your computer and use it in GitHub Desktop.
Save kellymclaughlin/1035827 to your computer and use it in GitHub Desktop.
riak_kv_pb_socket:legacy_mapreduce using if statement to reduce nesting
legacy_mapreduce(Req, #state{client=C}=State, Inputs, Query, Timeout) ->
if
is_binary(Inputs)
orelse (is_tuple(Inputs)
andalso is_binary(element(1, Inputs))
andalso is_list(element(2, Inputs))) ->
case C:mapred_bucket_stream(Inputs, Query,
self(), Timeout) of
{stop, Error} ->
send_error("~p", [Error], State);
{ok, ReqId} ->
{pause, State#state{req = Req, req_ctx = ReqId}}
end;
is_list(Inputs) ->
case C:mapred_stream(Query, self(), Timeout) of
{stop, Error} ->
send_error("~p", [Error], State);
{ok, {ReqId, FSM}} ->
luke_flow:add_inputs(FSM, Inputs),
luke_flow:finish_inputs(FSM),
%% Pause incoming packets - map/reduce results
%% will be processed by handle_info, it will
%% set socket active again on completion of streaming.
{pause, State#state{req = Req, req_ctx = ReqId}}
end;
is_tuple(Inputs)
andalso size(Inputs)==4
andalso element(1, Inputs) == modfun
andalso is_atom(element(2, Inputs))
andalso is_atom(element(3, Inputs)) ->
case C:mapred_stream(Query, self(), Timeout) of
{stop, Error} ->
send_error("~p", [Error], State);
{ok, {ReqId, FSM}} ->
C:mapred_dynamic_inputs_stream(
FSM, Inputs, Timeout),
luke_flow:finish_inputs(FSM),
%% Pause incoming packets - map/reduce results
%% will be processed by handle_info, it will
%% set socket active again on completion of streaming.
{pause, State#state{req = Req, req_ctx = ReqId}}
end;
true ->
{error, bad_mapred_inputs}
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment