Skip to content

Instantly share code, notes, and snippets.

@abhi-bit
Created February 19, 2016 03:38
Show Gist options
  • Save abhi-bit/b3c1fa83fc14993c181f to your computer and use it in GitHub Desktop.
Save abhi-bit/b3c1fa83fc14993c181f to your computer and use it in GitHub Desktop.
#!/usr/bin/env escript
%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
%%! -smp enable
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-include_lib("couch_set_view/include/couch_set_view.hrl").
-define(MAX_WAIT_TIME, 10 * 1000).
test_set_name() -> <<"couch_test_dcp_rollback">>.
num_set_partitions() -> 4.
ddoc_id() -> <<"_design/test">>.
num_docs() -> 1024. % keep it a multiple of num_set_partitions()
num_docs_pp() -> 1024 div num_set_partitions().
main(_) ->
test_util:init_code_path(),
etap:plan(12),
case (catch test()) of
ok ->
etap:end_tests();
Other ->
etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
etap:bail(Other)
end,
%init:stop(),
%receive after infinity -> ok end,
ok.
test() ->
couch_set_view_test_util:start_server(test_set_name()),
etap:diag("Testing DCP in regards to view groups"),
test_partition_versions_update(),
couch_set_view_test_util:stop_server(),
ok.
test_partition_versions_update() ->
etap:diag("Testing whether the view partition versions are updated or not"),
setup_test(),
FailoverLog = [
{1001, num_docs_pp() * 1},
{1002, 0}],
FailoverLog1 = [
{1101, num_docs_pp() * 2},
{1102, 0}],
FailoverLog2 = [
{1201, num_docs_pp() * 3},
{1202, 0}],
FailoverLog3 = [
{1301, num_docs_pp() * 4},
{1302, 0}],
%% Set failover logs for 4 partitions
{_ViewResultNoRollback0, FLRollback0} = rollback_different_heads(
force_a_rollback, 0, FailoverLog),
{_ViewResultNoRollback1, FLRollback1} = rollback_different_heads(
force_a_rollback, 1, FailoverLog1),
{_ViewResultNoRollback2, FLRollback2} = rollback_different_heads(
force_a_rollback, 2, FailoverLog2),
{_ViewResultNoRollback3, FLRollback3} = rollback_different_heads(
force_a_rollback, 3, FailoverLog3),
etap:is(FLRollback0, tl(FailoverLog), "Failover consistent post rollback"),
etap:is(FLRollback1, tl(FailoverLog1), "Failover consistent post rollback"),
etap:is(FLRollback2, tl(FailoverLog2), "Failover consistent post rollback"),
etap:is(FLRollback3, tl(FailoverLog3), "Failover consistent post rollback"),
etap:diag(io_lib:format("ABHI: FLRollback0: ~p FLRollback1: ~p
FLRollback2: ~p FLRollback3: ~p",
[FLRollback0, FLRollback1,
FLRollback2, FLRollback3])),
trigger_initial_build(),
shutdown_mr_group(),
%init:stop(),
%receive after infinity -> ok end,
ok.
rollback_different_heads(DoRollback, PartId, FailoverLog) ->
Msg = case DoRollback of
dont_force_a_rollback ->
"Query data without rollback";
force_a_rollback ->
"Query data with rollback"
end,
etap:diag(Msg),
couch_dcp_fake_server:set_failover_log(PartId, FailoverLog),
% Update index twice, so that there are header to roll back to
{ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
test_set_name(), ddoc_id(), <<"test">>, []),
populate_set(1, 4 * num_docs()),
{ok, {_ViewResults2}} = couch_set_view_test_util:query_view(
test_set_name(), ddoc_id(), <<"test">>, []),
GroupFailoverLog = get_group_failover_log(PartId),
etap:diag(io_lib:format("ABHI: PartId: ~p GroupFailoverLog: ~p ~n",
[PartId, GroupFailoverLog])),
etap:is(GroupFailoverLog, FailoverLog,
"Group has initially the correct failover log"),
case DoRollback of
dont_force_a_rollback ->
FailoverLog2 = FailoverLog;
force_a_rollback ->
% Change the failover log on the server that is different from what
% The client has, so that a rollback is needed
% FailoverLog2 = [{1000 + PartId, 0}] ++
% tl(FailoverLog),
FailoverLog2 = tl(FailoverLog),
couch_dcp_fake_server:set_failover_log(PartId, FailoverLog2)
end,
% Insert new docs so that the updater is run on the new query
populate_set((num_docs() * 2) + 1, 3 * num_docs()),
{ok, {ViewResults3}} = couch_set_view_test_util:query_view(
test_set_name(), ddoc_id(), <<"test">>, []),
GroupFailoverLog2 = get_group_failover_log(PartId),
etap:diag(io_lib:format("ABHI: PartId: ~p GroupFailoverLog2: ~p FailoverLog2: ~p~n",
[PartId, GroupFailoverLog2, FailoverLog2])),
etap:is(GroupFailoverLog2, FailoverLog2,
"Group has correct failover log after it might have changed"),
{ViewResults3, FailoverLog2}.
trigger_initial_build() ->
GroupPid = couch_set_view:get_group_pid(
mapreduce_view, test_set_name(), ddoc_id(), prod),
{ok, _, _} = gen_server:call(
GroupPid, #set_view_group_req{stale = false, debug = true}, ?MAX_WAIT_TIME).
random_binary(N) ->
random:seed({1, 2, 3}),
<< <<(random:uniform(20) + 100):8>> || _ <- lists:seq(1, N) >>.
configure_view_group() ->
etap:diag("Configuring view group"),
Params = #set_view_params{
max_partitions = num_set_partitions(),
active_partitions = lists:seq(0, num_set_partitions()-1),
passive_partitions = [],
use_replica_index = false
},
try
couch_set_view:define_group(
mapreduce_view, test_set_name(), ddoc_id(), Params)
catch _:Error ->
Error
end.
shutdown_mr_group() ->
GroupPid = couch_set_view:get_group_pid(
mapreduce_view, test_set_name(), ddoc_id(), prod),
couch_set_view_test_util:wait_for_updater_to_finish(GroupPid, ?MAX_WAIT_TIME),
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
MonRef = erlang:monitor(process, GroupPid),
receive
{'DOWN', MonRef, _, _, _} ->
ok
after 10000 ->
etap:bail("Timeout waiting for group shutdown")
end.
setup_test() ->
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
populate_set(1, 4 * num_docs()),
DDoc = {[
{<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
{<<"json">>, {[
{<<"views">>, {[
{<<"test">>, {[
{<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>}
]}}
]}}
]}}
]},
ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
ok = configure_view_group().
shutdown_group() ->
couch_dcp_fake_server:reset(),
GroupPid = couch_set_view:get_group_pid(
mapreduce_view, test_set_name(), ddoc_id(), prod),
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
MonRef = erlang:monitor(process, GroupPid),
receive
{'DOWN', MonRef, _, _, _} ->
ok
after 10000 ->
etap:bail("Timeout waiting for group shutdown")
end.
populate_set(From, To) ->
etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
" databases with " ++ integer_to_list(num_docs()) ++ " documents"),
DocList = create_docs(From, To),
ok = couch_set_view_test_util:populate_set_sequentially(
test_set_name(),
lists:seq(0, num_set_partitions() - 1),
DocList).
doc_id(I) ->
iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
create_docs(From, To) ->
lists:map(
fun(I) ->
Cas = I,
ExpireTime = 0,
Flags = 0,
RevMeta1 = <<Cas:64/native, ExpireTime:32/native, Flags:32/native>>,
RevMeta2 = [[io_lib:format("~2.16.0b",[X]) || <<X:8>> <= RevMeta1 ]],
RevMeta3 = iolist_to_binary(RevMeta2),
{[
{<<"meta">>, {[
{<<"id">>, doc_id(I)},
{<<"rev">>, <<"1-", RevMeta3/binary>>}
]}},
{<<"json">>, {[{<<"value">>, I}]}}
]}
end,
lists:seq(From, To)).
get_group_info() ->
GroupPid = couch_set_view:get_group_pid(
mapreduce_view, test_set_name(), ddoc_id(), prod),
{ok, GroupInfo} = couch_set_view_group:request_group_info(GroupPid),
GroupInfo.
get_group_failover_log(PartId) ->
GroupInfo = get_group_info(),
{partition_versions, {PartVersions0}} = lists:keyfind(
partition_versions, 1, GroupInfo),
PartVersions = lists:map(fun({PartId0, PartVersion}) ->
{list_to_integer(binary_to_list(PartId0)),
[list_to_tuple(V) || V <- PartVersion]}
end, PartVersions0),
{PartId, FailoverLog} = lists:keyfind(PartId, 1, PartVersions),
FailoverLog.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment