Skip to content

Instantly share code, notes, and snippets.

@Vagabond
Created May 17, 2011 03:00
Show Gist options
  • Save Vagabond/975828 to your computer and use it in GitHub Desktop.
Save Vagabond/975828 to your computer and use it in GitHub Desktop.
%% -------------------------------------------------------------------
%%
%% riak_fail_backend: a fake storage engine for emulating failures.
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
% @doc riak_kv_fail_backend is a Riak storage backend that passes through
% requests to a real backend but can emulate failures.
-module(riak_kv_fail_backend).
-behavior(riak_kv_backend).
-behavior(gen_server).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-export([start/2,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2,
is_empty/1, drop/1, fold/3, fold_bucket_keys/4, callback/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
% @type state() = term().
-record(state, {
backend_mod :: atom(),
backend_pid :: pid(),
failfun :: function()
}).
% @spec start(Partition :: integer(), Config :: proplist()) ->
% {ok, state()} | {{error, Reason :: term()}, state()}
start(Partition, Config) ->
gen_server:start_link(?MODULE, [Partition, Config], []).
%% @private
init([Partition, Config]) ->
io:format("starting fail backend~n"),
case proplists:get_value(passthrough_backend, Config) of
undefined ->
riak:stop("passthrough_backend not set, failing");
Backend ->
PassthruConfig = proplists:get_value(passthrough_config, Config, Config),
try ets:new(fail_backend, [public, named_table]) of
_ ->
ets:insert(fail_backend, {counter, 0})
catch
_:_ -> ok
end,
Count = ets:update_counter(fail_backend, counter, 1),
io:format("count ~p~n", [Count]),
FailFun = proplists:get_value(failfun, Config, fun({get, Key}) ->
Counter = case get(keycount) of
undefined ->
ets:insert_new(fail_backend, {{get, Key}, 0}),
KeyCount = ets:update_counter(fail_backend, {get, Key}, 1),
put(keycount, KeyCount),
KeyCount;
Val -> Val
end,
io:format("Counter ~p~n", [Counter]),
case 3 of
1 ->
io:format("notfound read~n"),
%timer:sleep(100),
{error, notfound};
X when X < 3 ->
io:format("error read~n"),
timer:sleep(100),
{error, error_code_of_doom};
_ ->
io:format("succeeding read~n"),
ok
end;
({put, Key, _Value}) ->
Counter = case get(keycount) of
undefined ->
ets:insert_new(fail_backend, {{get, Key}, 0}),
KeyCount = ets:update_counter(fail_backend, {get, Key}, 1),
put(keycount, KeyCount),
KeyCount;
Val -> Val
end,
io:format("Counter ~p~n", [Counter]),
case 3 of
1 ->
io:format("failing write~n"),
{error, error_code_of_doom};
_ ->
io:format("succeding write~n"),
ok
end;
(_) ->
ok
end),
case Backend:start(Partition, PassthruConfig) of
{ok, Pid} ->
{ok, #state{backend_mod = Backend, backend_pid = Pid,
failfun = FailFun}};
_ ->
riak:stop("passthrough backend failed to start")
end
end.
%% @private
handle_cast(_, State) -> {noreply, State}.
%% @private
handle_call(stop,_From,State) -> {reply, srv_stop(State), State};
handle_call({get,BKey},_From,State) -> {reply, srv_get(State,BKey), State};
handle_call({put,BKey,Val},_From,State) ->
{reply, srv_put(State,BKey,Val),State};
handle_call({delete,BKey},_From,State) -> {reply, srv_delete(State,BKey),State};
handle_call(list,_From,State) -> {reply, srv_list(State), State};
handle_call({list_bucket,Bucket},_From,State) ->
{reply, srv_list_bucket(State, Bucket), State};
handle_call(is_empty, _From, #state{backend_mod=Mod,backend_pid=Pid} = State) ->
{reply, Mod:is_empty(Pid), State};
handle_call(drop, _From, #state{backend_mod=Mod,backend_pid=Pid} = State) ->
{reply, Mod:drop(Pid), State};
handle_call({fold, Fun0, Acc}, _From,
#state{backend_mod=Mod,backend_pid=Pid} = State) ->
{reply, Mod:fold(Pid, Fun0, Acc), State};
handle_call({fold_bucket_keys, Bucket, Fun0, Acc}, _From,
#state{backend_mod=Mod,backend_pid=Pid} = State) ->
{reply, Mod:fold_bucket_keys(Pid, Bucket, Fun0, Acc), State}.
% @spec stop(state()) -> ok | {error, Reason :: term()}
stop(SrvRef) -> gen_server:call(SrvRef,stop).
srv_stop(#state{backend_mod=Mod,backend_pid=Pid}) ->
Mod:stop(Pid).
% get(state(), riak_object:bkey()) ->
% {ok, Val :: binary()} | {error, Reason :: term()}
% key must be 160b
get(SrvRef, BKey) -> gen_server:call(SrvRef,{get,BKey}).
srv_get(#state{backend_mod=Mod,backend_pid=Pid,failfun=FailFun}, BKey) ->
case FailFun({get, BKey}) of
ok ->
Mod:get(Pid, BKey);
Result ->
Result
end.
% put(state(), riak_object:bkey(), Val :: binary()) ->
% ok | {error, Reason :: term()}
% key must be 160b
put(SrvRef, BKey, Val) -> gen_server:call(SrvRef,{put,BKey,Val}).
srv_put(#state{backend_mod=Mod,backend_pid=Pid,failfun=FailFun},BKey,Val) ->
case FailFun({put, BKey, Val}) of
ok ->
Mod:put(Pid, BKey, Val);
Result ->
Result
end.
% delete(state(), riak_object:bkey()) ->
% ok | {error, Reason :: term()}
% key must be 160b
delete(SrvRef, BKey) -> gen_server:call(SrvRef,{delete,BKey}).
srv_delete(#state{backend_mod=Mod,backend_pid=Pid}, BKey) ->
Mod:delete(Pid, BKey).
% list(state()) -> [riak_object:bkey()]
list(SrvRef) -> gen_server:call(SrvRef,list).
srv_list(#state{backend_mod=Mod,backend_pid=Pid}) ->
Mod:list(Pid).
% list_bucket(term(), Bucket :: riak_object:bucket()) -> [Key :: binary()]
list_bucket(SrvRef, Bucket) ->
gen_server:call(SrvRef,{list_bucket, Bucket}).
srv_list_bucket(#state{backend_pid=Pid}, {filter, Bucket, Fun}) ->
gen_server:call(Pid, {list_bucket, {filter, Bucket, Fun}});
srv_list_bucket(#state{backend_mod=Mod,backend_pid=Pid}, Bucket) ->
Mod:list_bucket(Pid, Bucket).
is_empty(SrvRef) -> gen_server:call(SrvRef, is_empty).
drop(SrvRef) -> gen_server:call(SrvRef, drop).
fold(SrvRef, Fun, Acc0) -> gen_server:call(SrvRef, {fold, Fun, Acc0}, infinity).
fold_bucket_keys(SrvRef, Bucket, Fun, Acc0) ->
gen_server:call(SrvRef, {fold_bucket_keys, Bucket, Fun, Acc0}, infinity).
%% Ignore callbacks for other backends so multi backend works
callback(_State, _Ref, _Msg) ->
ok.
%% @private
handle_info(_Msg, State) -> {noreply, State}.
%% @private
terminate(_Reason, _State) -> ok.
%% @private
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%
%% Test
%%
-ifdef(TEST).
simple_test() ->
riak_kv_backend:standard_test(?MODULE, [{passthrough_backend,
riak_kv_ets_backend}, {failfun,
fun(_) ->
ok
end}]).
-ifdef(EQC).
eqc_test() ->
?assertEqual(true, backend_eqc:test(?MODULE, true)).
-endif. % EQC
-endif. % TEST
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment