Skip to content

Instantly share code, notes, and snippets.

@eiri
Created March 24, 2020 15:07
Show Gist options
  • Save eiri/029a23cd37ca6f7398b5920e60c85cfd to your computer and use it in GitHub Desktop.
Save eiri/029a23cd37ca6f7398b5920e60c85cfd to your computer and use it in GitHub Desktop.
Demo on how foundationdb atomic counters update in one transaction cancels all the read transactions
#!/usr/bin/env escript
%% -*- erlang -*-
%%! -s -pa src/erlfdb/ebin
-define(CLUSTER_FILE, <<"/usr/local/etc/foundationdb/fdb.cluster">>).
-define(DIR, <<"test">>).
-define(KEY, <<"counter">>).
-define(TIMEOUT, 1000).
-define(uint2bin(I), binary:encode_unsigned(I, little)).
-define(bin2uint(I), binary:decode_unsigned(I, little)).
main(_) ->
%% open fdb
Db = erlfdb:open(?CLUSTER_FILE),
%% create counter "counter" in dir "test"
erlfdb:transactional(Db, fun(Tx) ->
Key = get_key(Tx),
erlfdb:add(Tx, Key, ?uint2bin(0))
end),
%% spawn mutex
Mutex = spawn(fun() -> mutex({false, []}) end),
%% spawn all the workers
Ws = lists:foldl(fun(N, A) ->
{W, R} = spawn_monitor(fun() -> worker(Mutex, Db, N) end),
[{W, R} | A]
end, [], lists:seq(1, 3)),
%% wait for all workers to finish
lists:foreach(fun({W, R}) ->
receive {'DOWN', R, _Type, W, _Info} -> ok end
end, Ws),
%% shutdown the mutex
call(Mutex, done),
%% delete dir and counter
erlfdb:transactional(Db, fun(Tx) ->
remove_dir(Tx)
end).
create_dir(Tx) ->
Root = erlfdb_directory:root(),
Dir = erlfdb_directory:create_or_open(Tx, Root, ?DIR),
erlfdb_directory:get_name(Dir).
remove_dir(Tx) ->
Root = erlfdb_directory:root(),
ok = erlfdb_directory:remove_if_exists(Tx, Root, ?DIR).
get_key(Tx) ->
Dir = create_dir(Tx),
erlfdb_tuple:pack({?KEY}, Dir).
worker(Mutex, Db, N) ->
erlfdb:transactional(Db, fun(Tx) ->
Key = get_key(Tx),
C = erlfdb:wait(erlfdb:get(Tx, Key)),
io:format("tx ~p: counter = ~p\n", [N, ?bin2uint(C)]),
call(Mutex, lock, N),
erlfdb:add(Tx, Key, ?uint2bin(1)),
call(Mutex, unlock, N)
end).
mutex({Locked, Q}) ->
receive
{lock, From, N} when Locked ->
io:format("tx ~p: set in queue for lock\n", [N]),
mutex({Locked, [{From, N} | Q]});
{lock, From, N} ->
io:format("tx ~p: got lock\n", [N]),
From ! {ok, self(), ok},
mutex({true, []});
{unlock, From, N} ->
io:format("tx ~p: returned lock\n", [N]),
case Q of
[] ->
io:format("mutex: queue is empty - unlock\n", []),
From ! {ok, self(), ok},
mutex({false, Q});
[{From1, N1} | Rest] ->
io:format("tx ~p got lock\n", [N1]),
From ! {ok, self(), ok},
From1 ! {ok, self(), ok},
mutex({true, Rest})
end;
{done, From} ->
io:format("mutex: done\n", []),
From ! {ok, self(), ok}
end.
call(Pid, done) ->
Ref = erlang:monitor(process, Pid),
Pid ! {done, self()},
Res = wait(Pid),
receive {'DOWN', Ref, _Type, Pid, _Info} ->
Res
after ?TIMEOUT ->
throw(timeout_error)
end.
call(Pid, Tag, Payload) ->
Pid ! {Tag, self(), Payload},
wait(Pid).
wait(Pid) ->
receive {ok, Pid, Msg} ->
Msg
after ?TIMEOUT ->
throw(timeout_error)
end.
$ ./acounter.escript
tx 2: counter = 0
tx 3: counter = 0
tx 1: counter = 0
tx 2: got lock
tx 1: set in queue for lock
tx 3: set in queue for lock
# tx2 set counter to 1
tx 2: returned lock
tx 3 got lock
# nothing happened here, tx3 didn't updated the counter, but got cancelled because of tx2 update retried the transaction
tx 3: returned lock
tx 1 got lock
# nothing happened here, tx1 didn't updated the counter, but got cancelled because of tx2 update retried the transaction
tx 1: returned lock
mutex: queue is empty - unlock
tx 1: counter = 1
tx 1: got lock
# tx1 set counter to 2
tx 1: returned lock
mutex: queue is empty - unlock
# tx3 got old counter
tx 3: counter = 1
tx 3: got lock
# again, nothing happened here, tx3 got cancelled and retried
tx 3: returned lock
mutex: queue is empty - unlock
# tx3 finally got proper counter
tx 3: counter = 2
tx 3: got lock
# tx3 updated counter to 3
tx 3: returned lock
mutex: queue is empty - unlock
mutex: done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment