Skip to content

Instantly share code, notes, and snippets.

@cstar
Created December 22, 2009 10:55
Show Gist options
  • Save cstar/261670 to your computer and use it in GitHub Desktop.
Save cstar/261670 to your computer and use it in GitHub Desktop.
Run 1 :
Fill time for adding 1000 entry: 6.247 seconds
Usage per user : [{<<"martin">>,68200},
{<<"boulette">>,99700},
{<<"cstar">>,102100},
{<<"sacha">>,83400},
{<<"helene">>,101300}]
MapRed time=0.508 seconds
Usage : {<<"boulette">>,99700}
MapRed time=0.422 seconds
Run 2 :
Fill time for adding 1000 entry: 2.588 seconds
Usage per user : [{<<"martin">>,155700},
{<<"boulette">>,212500},
{<<"cstar">>,246000},
{<<"sacha">>,170100},
{<<"helene">>,221700}]
MapRed time=0.266 seconds
Usage : {<<"boulette">>,212500}
MapRed time=0.245 seconds
avg : 0.360
Run 1 :
Fill time for adding 1000 entry: 8.492 seconds
Usage per user : [{<<"martin">>,279600},
{<<"boulette">>,274900},
{<<"cstar">>,405500},
{<<"helene">>,273000},
{<<"sacha">>,320400}]
MapRed time=0.411 seconds
Usage : {<<"boulette">>,274900}
MapRed time=0.351 seconds
Run 2 :
Fill time for adding 1000 entry: 7.958 seconds
Usage per user : [{<<"martin">>,88400},
{<<"boulette">>,86400},
{<<"cstar">>,94600},
{<<"helene">>,108100},
{<<"sacha">>,69500}]
MapRed time=0.345 seconds
Usage : {<<"boulette">>,86400}
MapRed time=0.162 seconds
AVG : 0.317
#!/usr/bin/env escript
%% -*- erlang -*-
%%! -sasl errlog_type error -name tester -noshell -setcookie riak -pa ../riak/apps/riak/ebin
%%-kernel error_logger silent
main(_)->
{ok, C} = riak:client_connect([email protected]),
step1(C),
step2(C),
%empty_bucket(C, <<"entries">>).
ok.
empty_bucket(C, Name)->
{ok, Keys} = C:list_keys(Name),
lists:map(fun(Key)->
C:delete(Name, Key, 1)
end, Keys).
step2(C)->
Participant = make_rand_list_picker([<<"cstar">>, <<"martin">>, <<"sacha">>, <<"helene">>, <<"boulette">>]),
Entry = make_rand_fun(1000),
Blog = make_rand_fun(200),
statistics(wall_clock),
fill(1000, C, Participant, Entry, Blog),
{_, W1} = statistics(wall_clock),
io:format("Fill time for adding 1000 entry: ~p seconds~n", [ W1/1000]),
count_chars_per_user(C),
{_, W2} = statistics(wall_clock),
io:format("MapRed time=~p seconds~n", [W2/1000]),
statistics(wall_clock),
count_chars_for_user(C, <<"boulette">>),
{_, W3} = statistics(wall_clock),
io:format("MapRed time=~p seconds~n", [W3/1000]),
ok.
fill(0,_, _, _ , _)->
d("done !");
fill(N,C, Participant, Entry, Blog)->
Size = erlang:phash(now(), 10)*100,
add_entry_to_blog(C, Entry(), Blog(), Size,Participant()),
fill(N-1,C, Participant, Entry, Blog).
add_entry_to_blog(C, EntryId, BlogId, Size, Author)->
case C:get(<<"blogs">>, BlogId, 1) of
{error,_}=_Error ->
%Error;
Blog = [{size, 0}, {entries, [EntryId]}, {owners, [Author]}],
C:put(riak_object:new(<<"blogs">>, BlogId, Blog), 1);
{ok, Blog}->
%d("Blog",riak_object:get_value(Blog)),
Dict = dict:from_list(riak_object:get_value(Blog)),
D2 = dict:append(entries, EntryId, Dict),
D3 = dict:update(size,fun(OldSize) -> OldSize + Size end, D2),
UpBlog = riak_object:update_value(Blog, dict:to_list(D3)),
C:put(UpBlog,1)
end,
case C:get(<<"entries">>, EntryId, 1) of
{error,_} ->
Entry = [{blog, [BlogId]}, {size, Size}, {author, Author}],
C:put(riak_object:new(<<"entries">>, EntryId, Entry), 1);
{ok, Obj}->
%d("Entry", riak_object:get_value(Obj)),
BDict = dict:from_list(riak_object:get_value(Obj)),
BD2 = dict:append(blog, BlogId, BDict),
UpEntry = riak_object:update_value(Obj, dict:to_list(BD2)),
C:put(UpEntry,1)
end.
count_chars_per_user(C)->
Count = fun(G, undefined, none) ->
O = riak_object:get_value(G),
Size = proplists:get_value(size, O),
Authors = proplists:get_value(owners, O),
[dict:from_list(lists:map(fun(Author)->
{Author, Size}
end, Authors))]
end,
Merge = fun(Gcounts, none) ->
[lists:foldl(fun(G, Acc) ->
dict:merge(fun(_, X, Y) -> X+Y end,
G, Acc)
end,
dict:new(),Gcounts)]
end,
{ok, [R]} = C:mapred_bucket(<<"blogs">>,
[{map, {qfun, Count}, none, false},
{reduce, {qfun, Merge}, none, true}]),
d("Usage per user", dict:to_list(R)),
dict:to_list(R).
count_chars_for_user(C, Name)->
Count = fun(G, undefined, none) ->
O = riak_object:get_value(G),
Size = proplists:get_value(size, O),
Authors = proplists:get_value(owners, O),
case lists:member(Name, Authors) of
true ->
[Size];
_ ->
[]
end
end,
Merge = fun(Gcounts, none) ->
lists:foldl(fun(G, Acc) ->
G + Acc
end,
0,Gcounts)
end,
{ok, R} = C:mapred_bucket(<<"blogs">>,
[{map, {qfun, Count}, none, false},
{reduce, {qfun, Merge}, none, true}]),
d("Usage ", {Name, R}),
R.
step1(C)->
Mine = riak_object:new(<<"groceries">>, <<"mine">>, ["eggs", "bacon"]),
C:put(Mine, 1),
Yours = riak_object:new(<<"groceries">>, <<"yours">>, ["eggs", "wine"]),
C:put(Yours, 2),
d(C:list_keys(<<"groceries">>)),
{ok, Oa} = C:get(<<"groceries">>, <<"mine">>, 2),
d(riak_object:get_value(Oa)),
d(dict:to_list(riak_object:get_metadata(Oa))),
Ob = riak_object:update_value(Oa, ["ice cream"|riak_object:get_value(Oa)]),
C:put(Ob, 2),
Count = fun(G, undefined, none) ->
[dict:from_list([{I, 1} || I <- riak_object:get_value(G)])]
end,
Merge = fun(Gcounts, none) ->
[lists:foldl(fun(G, Acc) ->
dict:merge(fun(_, X, Y) -> X+Y end,
G, Acc)
end,
dict:new(),Gcounts)]
end,
{ok, [R]} = C:mapred([{<<"groceries">>, <<"mine">>},{<<"groceries">>, <<"yours">>}],
[{map, {qfun, Count}, none, false},
{reduce, {qfun, Merge}, none, true}]),
d(dict:to_list(R)),
C:delete(<<"groceries">>, <<"mine">>, 1),
C:delete(<<"groceries">>, <<"yours">>, 1),
d(C:list_keys(<<"groceries">>)).
d({ok, V})->
d(V);
d(V)->
d("output", V).
d(T, V) ->
io:format(T ++ " : ~p~n", [V]).
make_rand_fun(Max)->
fun()->
list_to_binary(integer_to_list(erlang:phash(now(), Max)))
end.
make_rand_list_picker(List)->
fun()->
lists:nth(erlang:phash(now(), length(List)), List)
end.
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%[{ totals, 4239, 44.484, 22.362}].
%%[{ totals, 4951, 36.381, 26.965}].
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%% @doc riak_redis_backend is a Riak storage backend using erldis.
%% Blog post about the initial release :
%% http://cestari.wordpress.com/2009/12/22/a-redis-backend-for-riak/
-module(riak_redis_backend).
-author('Eric Cestari <[email protected]').
-export([start/1,stop/1,get/2,put/3,list/1,list_bucket/2,delete/2]).
-define(RSEND(V), redis_send(fun()-> V end)).
% @type state() = term().
-record(state, {pid, partition}).
% @spec start(Partition :: integer()) ->
% {ok, state()} | {{error, Reason :: term()}, state()}
start(Partition)->
{ok, Pid} = erldis_sync_client:connect(),
P=list_to_binary(integer_to_list(Partition)),
{ok, #state{pid=Pid, partition = P}}.
% @spec stop(state()) -> ok | {error, Reason :: term()}
stop(_State)->
ok.
% get(state(), Key :: binary()) ->
% {ok, Val :: binary()} | {error, Reason :: term()}
get(#state{partition=P, pid=Pid}, BK)->
case erldis:get(Pid, k2l(P,BK)) of
nil -> {error, notfound};
Val ->
case catch binary_to_term(Val) of
{'EXIT', _}->
throw({badterm, BK, Val});
V ->
{ok, V}
end
end.
% put(state(), Key :: binary(), Val :: binary()) ->
% ok | {error, Reason :: term()}
put(#state{partition=P,pid=Pid}, {Bucket, Key}=BK, Value)->
redis_send(fun()->erldis:sadd(Pid, <<"buckets:",P/binary>>,Bucket) end),
redis_send(fun()->erldis:set(Pid, k2l(P,BK), term_to_binary(Value))end),
redis_send(fun()->erldis:sadd(Pid, <<P/binary,Bucket/binary>>, Key)end),
redis_send(fun()->erldis:sadd(Pid, <<"world:",P/binary>>, term_to_binary(BK))end),
case redis_recv(4) of
[_,_, _, _] ->
ok;
_ ->
{error, unable_to_put}
end.
% delete(state(), Key :: binary()) ->
% ok | {error, Reason :: term()}
delete(#state {partition=P, pid=Pid }, {Bucket, Key}=BK) ->
?RSEND(erldis:srem(Pid, <<"buckets:",P/binary>>,Bucket)),
?RSEND(erldis:del(Pid, k2l(P,BK))),
?RSEND(erldis:srem(Pid, <<P/binary,Bucket/binary>>, Key)),
?RSEND(erldis:srem(Pid, <<"world:",P/binary>>, term_to_binary(BK))),
case redis_recv(4) of
[_,_, _, _] ->
ok;
_ ->
{error, unable_to_delete}
end.
% list(state()) -> [Key :: binary()]
list(#state {partition=P, pid=Pid }) ->
lists:map(fun binary_to_term/1,
erldis:smembers(Pid, <<"world:",P/binary>>)).
list_bucket(#state {partition=P, pid=Pid }, '_')->
erldis:smembers(Pid, <<"buckets:",P/binary>>);
list_bucket(#state {partition=P, pid=Pid }, {filter, Bucket, Fun})->
lists:filter(Fun, erldis:smembers(Pid, <<P/binary,Bucket/binary>>));
list_bucket(#state {partition=P, pid=Pid }, Bucket) ->
erldis:smembers(Pid, <<P/binary,Bucket/binary>>).
k2l(P,{B, V})->
<<P/binary,B/binary,V/binary>>.
redis_recv(N)->
lists:map(
fun(_)->
receive {redis, Ret} -> Ret end
end, lists:seq(1,N)).
redis_send(Fun)->
Pid = self(),
spawn(fun()->
Res = Fun(),
Pid ! {redis, Res}
end).
#!/usr/bin/env escript
%% -*- erlang -*-
%%! -boot start_sasl -name tester -setcookie riak -pa . -pa ../erldis/ebin -pa ../riak/apps/riak/ebin
main(_)->
fprof:apply(riak_test_util,standard_backend_test, [riak_redis_backend]),
fprof:profile(),
fprof:analyse([no_callers, no_details, totals]).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment