-
-
Save vjache/895329369b2da1054ac2 to your computer and use it in GitHub Desktop.
Deadly simple Shared Ring Buffer in Erlang.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% Shared Ring Buffer | |
-module(srb). | |
-export([new/2, push/2, pop/2, pop/1]). | |
-compile(export_all). | |
-record(counters, | |
{ clock :: non_neg_integer(), | |
max_size :: non_neg_integer() }). | |
new(Id, MaxSize) -> | |
ets:new(Id, [named_table, ordered_set, public, {write_concurrency, true}]), | |
ets:insert(Id, #counters{ clock = 0, max_size = MaxSize}), | |
Id. | |
push(Id, Object) -> | |
[Clock, MaxSize] = | |
ets:update_counter( | |
Id, 'counters', [{#counters.clock,1}, {#counters.max_size,0}]), | |
ets:insert(Id, {Clock, Object, 0}), | |
ets:delete(Id, Clock - MaxSize), | |
ok. | |
pop(Id) -> | |
case pop(Id, 1) of | |
[] -> empty; | |
[Obj] -> {ok, Obj} | |
end. | |
pop(_Id, 0) -> | |
[]; | |
pop(Id, N) -> | |
%% Take an oldest entry | |
case ets:first(Id) of | |
'counters' -> | |
[]; | |
%% No data at the SRB | |
'$end_of_table' -> | |
[]; | |
%% Data exist | |
Key -> | |
%% 1. Try lock entry | |
try ets:update_counter(Id, Key, [{3,1}]) of | |
%% 1.2. Lock success | |
[1] -> | |
%% 1.2.1 Try read entry | |
case ets:lookup(Id, Key) of | |
%% 1.2.1.1 Data concurrently removed try again to pop data | |
[] -> | |
pop(Id, N); | |
%% 1.2.1.2 Data successfuly read, remove it and proceed pop | |
[{Key,Obj,_}] -> | |
ets:delete(Id,Key), | |
[Obj|pop(Id, N - 1)] | |
end; | |
%% 1.2.2 Already locked by concurrent readers | |
[L] when L > 1 -> | |
pop(Id, N) | |
catch | |
%% 1.2.3 Data concurrently removed try again to pop data | |
_:badarg -> | |
pop(Id, N) | |
end | |
end. | |
%% | |
%% Tests | |
%% | |
producer_test() -> | |
Id = new(srb_pt, 100), | |
spawn_link( | |
fun()-> | |
producer_loop(Id, 0, 1) | |
end). | |
many_producers_test() -> | |
Id = new(srb_mpt, 100), | |
[spawn_link( | |
fun()-> | |
producer_loop(Id, 0, 1) | |
end) || _ <- [1,2,3,4] ]. | |
consumer_test() -> | |
Id = new(srb_ct, 100), | |
spawn_link( | |
fun()-> | |
consumer_loop(Id, 1) | |
end). | |
concurrent_test() -> | |
Id = new(srb_cnc, 100), | |
spawn_link( | |
fun()-> | |
producer_loop(Id, 0, 1) | |
end), | |
spawn_link( | |
fun()-> | |
consumer_loop(Id,1) | |
end). | |
m2m_concurrent_test() -> | |
Id = new(srb_cnc, 100), | |
[spawn_link( | |
fun()-> | |
producer_loop(Id, 0, 1) | |
end) || _ <- [1,2,3,4]], | |
[spawn_link( | |
fun()-> | |
consumer_loop(Id,1) | |
end) || _ <- [1,2,3,4]]. | |
producer_loop(Id, N, Delay) -> | |
Obj = {self(), N}, | |
push(Id, Obj), | |
io:format("~p | Pushed: ~p~n", [self(), Obj]), | |
timer:sleep(Delay), | |
producer_loop(Id, N + 1, Delay). | |
consumer_loop(Id, Delay) -> | |
Obj = pop(Id), | |
io:format("~p | Poped: ~p~n", [self(), Obj]), | |
timer:sleep(Delay), | |
consumer_loop(Id, Delay). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment