Skip to content

Instantly share code, notes, and snippets.

@Vagabond
Forked from IgorKarymov/gist:1224897
Created September 26, 2011 17:08
Show Gist options
  • Save Vagabond/1242755 to your computer and use it in GitHub Desktop.
Save Vagabond/1242755 to your computer and use it in GitHub Desktop.
riak if_not_modified
-module(test).
-export([
test/0
]).
-define(RIAK_HOST, "127.0.0.1").
-define(RIAK_PORT, 8087).
test() ->
{ok, Pid} = riakc_pb_socket:start_link(?RIAK_HOST, ?RIAK_PORT),
StartValue = term_to_binary(1),
NewObj = riakc_obj:new(<<"counters">>, <<"counter">>, StartValue),
ok = riakc_pb_socket:put(Pid, NewObj),
parmap(fun get_autoincrement_counter/1, lists:duplicate(20, 3)).
get_autoincrement_counter(Repeats) ->
case riakc_pb_socket:start_link(?RIAK_HOST, ?RIAK_PORT) of
{ok, Pid} ->
autoincrement_loop(Pid, Repeats);
{error, _} ->
%% try to get a socket again
timer:sleep(100),
io:format("failed to get socket~n"),
get_autoincrement_counter(Repeats)
end.
autoincrement_loop(_Pid, 0) ->
{error, repeats};
autoincrement_loop(Pid, Repeats) ->
{ok, RiakObj} = riakc_pb_socket:get(Pid, <<"counters">>, <<"counter">>),
Value = riakc_obj:get_value(RiakObj),
DecodedValue = binary_to_term(Value),
NewObj = riakc_obj:update_value(RiakObj, term_to_binary(DecodedValue + 1)),
case riakc_pb_socket:put(Pid, NewObj, [if_not_modified]) of
ok ->
DecodedValue;
{error, Reason} ->
io:format("reason ~p ~p~n", [Reason, Repeats]),
autoincrement_loop(Pid, Repeats - 1)
end.
parmap(F, L) ->
Parent = self(),
Pids = [spawn_opt(fun() ->
Parent ! {self(), F(X)} end, [monitor]) || X <- L],
collect_results(Pids, []).
collect_results([], Acc) ->
Acc;
collect_results([{Pid, Ref}|T], Acc) ->
receive
{Pid, Results} ->
collect_results(T, [Results|Acc]);
{'DOWN', Ref, _, Pid, Reason} ->
collect_results(T, [Reason|Acc])
after 100 ->
collect_results(T ++ [{Pid, Ref}], Acc)
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment