Created
March 24, 2020 15:07
-
-
Save eiri/029a23cd37ca6f7398b5920e60c85cfd to your computer and use it in GitHub Desktop.
Demo on how foundationdb atomic counters update in one transaction cancels all the read transactions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env escript | |
%% -*- erlang -*- | |
%%! -s -pa src/erlfdb/ebin | |
-define(CLUSTER_FILE, <<"/usr/local/etc/foundationdb/fdb.cluster">>). | |
-define(DIR, <<"test">>). | |
-define(KEY, <<"counter">>). | |
-define(TIMEOUT, 1000). | |
-define(uint2bin(I), binary:encode_unsigned(I, little)). | |
-define(bin2uint(I), binary:decode_unsigned(I, little)). | |
main(_) -> | |
%% open fdb | |
Db = erlfdb:open(?CLUSTER_FILE), | |
%% create counter "counter" in dir "test" | |
erlfdb:transactional(Db, fun(Tx) -> | |
Key = get_key(Tx), | |
erlfdb:add(Tx, Key, ?uint2bin(0)) | |
end), | |
%% spawn mutex | |
Mutex = spawn(fun() -> mutex({false, []}) end), | |
%% spawn all the workers | |
Ws = lists:foldl(fun(N, A) -> | |
{W, R} = spawn_monitor(fun() -> worker(Mutex, Db, N) end), | |
[{W, R} | A] | |
end, [], lists:seq(1, 3)), | |
%% wait for all workers to finish | |
lists:foreach(fun({W, R}) -> | |
receive {'DOWN', R, _Type, W, _Info} -> ok end | |
end, Ws), | |
%% shutdown the mutex | |
call(Mutex, done), | |
%% delete dir and counter | |
erlfdb:transactional(Db, fun(Tx) -> | |
remove_dir(Tx) | |
end). | |
create_dir(Tx) -> | |
Root = erlfdb_directory:root(), | |
Dir = erlfdb_directory:create_or_open(Tx, Root, ?DIR), | |
erlfdb_directory:get_name(Dir). | |
remove_dir(Tx) -> | |
Root = erlfdb_directory:root(), | |
ok = erlfdb_directory:remove_if_exists(Tx, Root, ?DIR). | |
get_key(Tx) -> | |
Dir = create_dir(Tx), | |
erlfdb_tuple:pack({?KEY}, Dir). | |
worker(Mutex, Db, N) -> | |
erlfdb:transactional(Db, fun(Tx) -> | |
Key = get_key(Tx), | |
C = erlfdb:wait(erlfdb:get(Tx, Key)), | |
io:format("tx ~p: counter = ~p\n", [N, ?bin2uint(C)]), | |
call(Mutex, lock, N), | |
erlfdb:add(Tx, Key, ?uint2bin(1)), | |
call(Mutex, unlock, N) | |
end). | |
mutex({Locked, Q}) -> | |
receive | |
{lock, From, N} when Locked -> | |
io:format("tx ~p: set in queue for lock\n", [N]), | |
mutex({Locked, [{From, N} | Q]}); | |
{lock, From, N} -> | |
io:format("tx ~p: got lock\n", [N]), | |
From ! {ok, self(), ok}, | |
mutex({true, []}); | |
{unlock, From, N} -> | |
io:format("tx ~p: returned lock\n", [N]), | |
case Q of | |
[] -> | |
io:format("mutex: queue is empty - unlock\n", []), | |
From ! {ok, self(), ok}, | |
mutex({false, Q}); | |
[{From1, N1} | Rest] -> | |
io:format("tx ~p got lock\n", [N1]), | |
From ! {ok, self(), ok}, | |
From1 ! {ok, self(), ok}, | |
mutex({true, Rest}) | |
end; | |
{done, From} -> | |
io:format("mutex: done\n", []), | |
From ! {ok, self(), ok} | |
end. | |
call(Pid, done) -> | |
Ref = erlang:monitor(process, Pid), | |
Pid ! {done, self()}, | |
Res = wait(Pid), | |
receive {'DOWN', Ref, _Type, Pid, _Info} -> | |
Res | |
after ?TIMEOUT -> | |
throw(timeout_error) | |
end. | |
call(Pid, Tag, Payload) -> | |
Pid ! {Tag, self(), Payload}, | |
wait(Pid). | |
wait(Pid) -> | |
receive {ok, Pid, Msg} -> | |
Msg | |
after ?TIMEOUT -> | |
throw(timeout_error) | |
end. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ./acounter.escript | |
tx 2: counter = 0 | |
tx 3: counter = 0 | |
tx 1: counter = 0 | |
tx 2: got lock | |
tx 1: set in queue for lock | |
tx 3: set in queue for lock | |
# tx2 set counter to 1 | |
tx 2: returned lock | |
tx 3 got lock | |
# nothing happened here, tx3 didn't updated the counter, but got cancelled because of tx2 update retried the transaction | |
tx 3: returned lock | |
tx 1 got lock | |
# nothing happened here, tx1 didn't updated the counter, but got cancelled because of tx2 update retried the transaction | |
tx 1: returned lock | |
mutex: queue is empty - unlock | |
tx 1: counter = 1 | |
tx 1: got lock | |
# tx1 set counter to 2 | |
tx 1: returned lock | |
mutex: queue is empty - unlock | |
# tx3 got old counter | |
tx 3: counter = 1 | |
tx 3: got lock | |
# again, nothing happened here, tx3 got cancelled and retried | |
tx 3: returned lock | |
mutex: queue is empty - unlock | |
# tx3 finally got proper counter | |
tx 3: counter = 2 | |
tx 3: got lock | |
# tx3 updated counter to 3 | |
tx 3: returned lock | |
mutex: queue is empty - unlock | |
mutex: done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment