Skip to content

Instantly share code, notes, and snippets.

@bdionne
Last active December 14, 2015 00:08
Show Gist options
  • Save bdionne/4996271 to your computer and use it in GitHub Desktop.
Save bdionne/4996271 to your computer and use it in GitHub Desktop.
handle_message({complete, EndSeq}, Worker, State) ->
%io:format("complete with ~p from ~p ~n",[EndSeq, Worker]),
#collector{
callback = Callback,
counters = S0,
total_rows = Completed, % override
user_acc = Acc
} = State,
case fabric_dict:lookup_element(Worker, S0) of
undefined ->
{ok, State};
_ ->
S1 = fabric_dict:store(Worker, EndSeq, S0),
% unlikely to have overlaps here, but possible w/ filters
S2 = fabric_view:remove_overlapping_shards(Worker, S1),
NewState = State#collector{counters=S2, total_rows=Completed+1},
case fabric_dict:size(S2) =:= (Completed+1) of
true ->
case fabric_view:is_progress_possible(S2) of
true ->
{stop, NewState};
false ->
Reason = {rangenotcovered, <<"invalid sequence">>},
{Go, Acc1} = Callback({error, Reason}, Acc),
{Go, State#collector{user_acc = Acc1}}
end;
false ->
{ok, NewState}
end
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment