Created
February 19, 2016 03:38
-
-
Save abhi-bit/b3c1fa83fc14993c181f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env escript | |
%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ | |
%%! -smp enable | |
% Licensed under the Apache License, Version 2.0 (the "License"); you may not | |
% use this file except in compliance with the License. You may obtain a copy of | |
% the License at | |
% | |
% http://www.apache.org/licenses/LICENSE-2.0 | |
% | |
% Unless required by applicable law or agreed to in writing, software | |
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
% License for the specific language governing permissions and limitations under | |
% the License. | |
-include_lib("couch_set_view/include/couch_set_view.hrl"). | |
-define(MAX_WAIT_TIME, 10 * 1000). | |
test_set_name() -> <<"couch_test_dcp_rollback">>. | |
num_set_partitions() -> 4. | |
ddoc_id() -> <<"_design/test">>. | |
num_docs() -> 1024. % keep it a multiple of num_set_partitions() | |
num_docs_pp() -> 1024 div num_set_partitions(). | |
main(_) -> | |
test_util:init_code_path(), | |
etap:plan(12), | |
case (catch test()) of | |
ok -> | |
etap:end_tests(); | |
Other -> | |
etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), | |
etap:bail(Other) | |
end, | |
%init:stop(), | |
%receive after infinity -> ok end, | |
ok. | |
test() -> | |
couch_set_view_test_util:start_server(test_set_name()), | |
etap:diag("Testing DCP in regards to view groups"), | |
test_partition_versions_update(), | |
couch_set_view_test_util:stop_server(), | |
ok. | |
test_partition_versions_update() -> | |
etap:diag("Testing whether the view partition versions are updated or not"), | |
setup_test(), | |
FailoverLog = [ | |
{1001, num_docs_pp() * 1}, | |
{1002, 0}], | |
FailoverLog1 = [ | |
{1101, num_docs_pp() * 2}, | |
{1102, 0}], | |
FailoverLog2 = [ | |
{1201, num_docs_pp() * 3}, | |
{1202, 0}], | |
FailoverLog3 = [ | |
{1301, num_docs_pp() * 4}, | |
{1302, 0}], | |
%% Set failover logs for 4 partitions | |
{_ViewResultNoRollback0, FLRollback0} = rollback_different_heads( | |
force_a_rollback, 0, FailoverLog), | |
{_ViewResultNoRollback1, FLRollback1} = rollback_different_heads( | |
force_a_rollback, 1, FailoverLog1), | |
{_ViewResultNoRollback2, FLRollback2} = rollback_different_heads( | |
force_a_rollback, 2, FailoverLog2), | |
{_ViewResultNoRollback3, FLRollback3} = rollback_different_heads( | |
force_a_rollback, 3, FailoverLog3), | |
etap:is(FLRollback0, tl(FailoverLog), "Failover consistent post rollback"), | |
etap:is(FLRollback1, tl(FailoverLog1), "Failover consistent post rollback"), | |
etap:is(FLRollback2, tl(FailoverLog2), "Failover consistent post rollback"), | |
etap:is(FLRollback3, tl(FailoverLog3), "Failover consistent post rollback"), | |
etap:diag(io_lib:format("ABHI: FLRollback0: ~p FLRollback1: ~p | |
FLRollback2: ~p FLRollback3: ~p", | |
[FLRollback0, FLRollback1, | |
FLRollback2, FLRollback3])), | |
trigger_initial_build(), | |
shutdown_mr_group(), | |
%init:stop(), | |
%receive after infinity -> ok end, | |
ok. | |
rollback_different_heads(DoRollback, PartId, FailoverLog) -> | |
Msg = case DoRollback of | |
dont_force_a_rollback -> | |
"Query data without rollback"; | |
force_a_rollback -> | |
"Query data with rollback" | |
end, | |
etap:diag(Msg), | |
couch_dcp_fake_server:set_failover_log(PartId, FailoverLog), | |
% Update index twice, so that there are header to roll back to | |
{ok, {_ViewResults1}} = couch_set_view_test_util:query_view( | |
test_set_name(), ddoc_id(), <<"test">>, []), | |
populate_set(1, 4 * num_docs()), | |
{ok, {_ViewResults2}} = couch_set_view_test_util:query_view( | |
test_set_name(), ddoc_id(), <<"test">>, []), | |
GroupFailoverLog = get_group_failover_log(PartId), | |
etap:diag(io_lib:format("ABHI: PartId: ~p GroupFailoverLog: ~p ~n", | |
[PartId, GroupFailoverLog])), | |
etap:is(GroupFailoverLog, FailoverLog, | |
"Group has initially the correct failover log"), | |
case DoRollback of | |
dont_force_a_rollback -> | |
FailoverLog2 = FailoverLog; | |
force_a_rollback -> | |
% Change the failover log on the server that is different from what | |
% The client has, so that a rollback is needed | |
% FailoverLog2 = [{1000 + PartId, 0}] ++ | |
% tl(FailoverLog), | |
FailoverLog2 = tl(FailoverLog), | |
couch_dcp_fake_server:set_failover_log(PartId, FailoverLog2) | |
end, | |
% Insert new docs so that the updater is run on the new query | |
populate_set((num_docs() * 2) + 1, 3 * num_docs()), | |
{ok, {ViewResults3}} = couch_set_view_test_util:query_view( | |
test_set_name(), ddoc_id(), <<"test">>, []), | |
GroupFailoverLog2 = get_group_failover_log(PartId), | |
etap:diag(io_lib:format("ABHI: PartId: ~p GroupFailoverLog2: ~p FailoverLog2: ~p~n", | |
[PartId, GroupFailoverLog2, FailoverLog2])), | |
etap:is(GroupFailoverLog2, FailoverLog2, | |
"Group has correct failover log after it might have changed"), | |
{ViewResults3, FailoverLog2}. | |
trigger_initial_build() -> | |
GroupPid = couch_set_view:get_group_pid( | |
mapreduce_view, test_set_name(), ddoc_id(), prod), | |
{ok, _, _} = gen_server:call( | |
GroupPid, #set_view_group_req{stale = false, debug = true}, ?MAX_WAIT_TIME). | |
random_binary(N) -> | |
random:seed({1, 2, 3}), | |
<< <<(random:uniform(20) + 100):8>> || _ <- lists:seq(1, N) >>. | |
configure_view_group() -> | |
etap:diag("Configuring view group"), | |
Params = #set_view_params{ | |
max_partitions = num_set_partitions(), | |
active_partitions = lists:seq(0, num_set_partitions()-1), | |
passive_partitions = [], | |
use_replica_index = false | |
}, | |
try | |
couch_set_view:define_group( | |
mapreduce_view, test_set_name(), ddoc_id(), Params) | |
catch _:Error -> | |
Error | |
end. | |
shutdown_mr_group() -> | |
GroupPid = couch_set_view:get_group_pid( | |
mapreduce_view, test_set_name(), ddoc_id(), prod), | |
couch_set_view_test_util:wait_for_updater_to_finish(GroupPid, ?MAX_WAIT_TIME), | |
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()), | |
MonRef = erlang:monitor(process, GroupPid), | |
receive | |
{'DOWN', MonRef, _, _, _} -> | |
ok | |
after 10000 -> | |
etap:bail("Timeout waiting for group shutdown") | |
end. | |
setup_test() -> | |
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()), | |
couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()), | |
populate_set(1, 4 * num_docs()), | |
DDoc = {[ | |
{<<"meta">>, {[{<<"id">>, ddoc_id()}]}}, | |
{<<"json">>, {[ | |
{<<"views">>, {[ | |
{<<"test">>, {[ | |
{<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>} | |
]}} | |
]}} | |
]}} | |
]}, | |
ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc), | |
ok = configure_view_group(). | |
shutdown_group() -> | |
couch_dcp_fake_server:reset(), | |
GroupPid = couch_set_view:get_group_pid( | |
mapreduce_view, test_set_name(), ddoc_id(), prod), | |
couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()), | |
MonRef = erlang:monitor(process, GroupPid), | |
receive | |
{'DOWN', MonRef, _, _, _} -> | |
ok | |
after 10000 -> | |
etap:bail("Timeout waiting for group shutdown") | |
end. | |
populate_set(From, To) -> | |
etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++ | |
" databases with " ++ integer_to_list(num_docs()) ++ " documents"), | |
DocList = create_docs(From, To), | |
ok = couch_set_view_test_util:populate_set_sequentially( | |
test_set_name(), | |
lists:seq(0, num_set_partitions() - 1), | |
DocList). | |
doc_id(I) -> | |
iolist_to_binary(io_lib:format("doc_~8..0b", [I])). | |
create_docs(From, To) -> | |
lists:map( | |
fun(I) -> | |
Cas = I, | |
ExpireTime = 0, | |
Flags = 0, | |
RevMeta1 = <<Cas:64/native, ExpireTime:32/native, Flags:32/native>>, | |
RevMeta2 = [[io_lib:format("~2.16.0b",[X]) || <<X:8>> <= RevMeta1 ]], | |
RevMeta3 = iolist_to_binary(RevMeta2), | |
{[ | |
{<<"meta">>, {[ | |
{<<"id">>, doc_id(I)}, | |
{<<"rev">>, <<"1-", RevMeta3/binary>>} | |
]}}, | |
{<<"json">>, {[{<<"value">>, I}]}} | |
]} | |
end, | |
lists:seq(From, To)). | |
get_group_info() -> | |
GroupPid = couch_set_view:get_group_pid( | |
mapreduce_view, test_set_name(), ddoc_id(), prod), | |
{ok, GroupInfo} = couch_set_view_group:request_group_info(GroupPid), | |
GroupInfo. | |
get_group_failover_log(PartId) -> | |
GroupInfo = get_group_info(), | |
{partition_versions, {PartVersions0}} = lists:keyfind( | |
partition_versions, 1, GroupInfo), | |
PartVersions = lists:map(fun({PartId0, PartVersion}) -> | |
{list_to_integer(binary_to_list(PartId0)), | |
[list_to_tuple(V) || V <- PartVersion]} | |
end, PartVersions0), | |
{PartId, FailoverLog} = lists:keyfind(PartId, 1, PartVersions), | |
FailoverLog. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment