Created
March 28, 2012 11:14
-
-
Save copenhas/2225487 to your computer and use it in GitHub Desktop.
distributed k/v store in erlang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(dstore). | |
% internal operations | |
-export([init/0, loop/1, write/3]). | |
% public system operations | |
-export([start_link/0, stop/0, sync/2, replicate/1, pid/0]). | |
% public k/v operations | |
-export([lookup/1, write/2]). | |
-define(TIMEOUT, 30000). | |
-record(state, {tags2Id, id2Value, value2Id, last_id=0, replicas=[]}). | |
start_link() -> | |
case ?MODULE:pid() of | |
undefined -> | |
Pid = spawn_link(?MODULE, init, []), | |
register(?MODULE, Pid); | |
_ -> | |
{error, already_started} | |
end. | |
stop() -> | |
call(stop). | |
lookup(Key) -> | |
call({lookup, Key}). | |
write(Key, Value) -> | |
call({write, Key, Value}). | |
write(Key, Value, Replica) -> | |
call({write, Key, Value, Replica}). | |
sync(Node, Flag) when is_boolean(Flag) -> | |
call({sync, Node, Flag}). | |
replicate(Node) -> | |
call({replicate, Node}). | |
pid() -> whereis(?MODULE). | |
call(Command) -> | |
call(?MODULE:pid(), Command). | |
call(Db, Command) -> | |
io:format("Sending (~p) to (~p)~n", [Command, Db]), | |
Ref = make_ref(), | |
Db ! {self(), Ref, Command}, | |
receive | |
{reply, Ref, Value} -> Value | |
after ?TIMEOUT -> | |
{error, timeout} | |
end. | |
init() -> | |
TagsId = ets:new(tags2Id, [duplicate_bag, named_table]), | |
IdId = ets:new(id2Value, [named_table]), | |
ValuesId = ets:new(value2Id, [named_table]), | |
loop(#state{tags2Id=TagsId, id2Value=IdId, value2Id=ValuesId}). | |
loop(State) -> | |
receive | |
{Pid, Ref, stop} when is_pid(Pid), is_reference(Ref) -> | |
Pid ! {reply, Ref, ok}; | |
{Pid, Ref, Command} when is_pid(Pid), is_reference(Ref) -> | |
{NewState, Return} = handle_call(State, Command), | |
Pid ! {reply, Ref, Return}, | |
loop(NewState); | |
{nodedown, Node} -> | |
Replicas = lists:delete(Node, State#state.replicas), | |
io:format("Lost connection with ~p, ending sync~n", [Node]), | |
loop(State#state{replicas=Replicas}); | |
Other -> | |
io:format("Unknown message received: ~p~n", [Other]), | |
loop(State) | |
end. | |
handle_call(State, {lookup, Key}) -> | |
Ids = [Id || {_Tag, Id} <- ets:lookup(State#state.tags2Id, Key)], | |
Return = [ets:lookup(State#state.id2Value, Id) || Id <- Ids], | |
{State, lists:flatten(Return)}; | |
handle_call(State, {write, Key, Value}) -> | |
handle_call(State, {write, Key, Value, undefined}); | |
handle_call(State, {write, Key, Value, Replica}) -> | |
ValueId = case ets:lookup(State#state.value2Id, Value) of | |
[{Value, Id}] -> | |
Id; | |
[] -> | |
Id = State#state.last_id + 1, | |
ets:insert(State#state.id2Value, {Id, Value}), | |
ets:insert(State#state.value2Id, {Value, Id}), | |
Id | |
end, | |
ets:insert(State#state.tags2Id, {Key, ValueId}), | |
ReplicateTo = lists:filter(fun(N) -> N /= Replica end, State#state.replicas), | |
io:format("Replicating to ~p~n", [ReplicateTo]), | |
rpc:multicall(ReplicateTo, ?MODULE, write, [Key, Value, node()]), | |
io:format("Tag/Value (~p) written and replicated (~p)~n", | |
[{Key, Value}, ReplicateTo]), | |
{State#state{last_id = ValueId}, ok}; | |
handle_call(State, {sync, Node, false}) -> | |
monitor_node(Node, false), | |
Replicas = lists:delete(Node, State#state.replicas), | |
Remote = rpc:call(Node, erlang, whereis, [?MODULE]), | |
% need async to prevent deadlock | |
Remote ! {self(), make_ref(), {sync, node(), false}}, | |
io:format("Writes will no longer sync to (~p)~n", [Node]), | |
{State#state{replicas=Replicas}, ok}; | |
handle_call(State, {sync, Node, true}) -> | |
case lists:any(fun (N) -> N == Node end, State#state.replicas) of | |
true -> | |
{State, ok}; | |
false -> | |
case net_adm:ping(Node) of | |
pong -> | |
monitor_node(Node, true), | |
Replicas = State#state.replicas, | |
Remote = rpc:call(Node, erlang, whereis, [?MODULE]), | |
% need async to prevent deadlock | |
Remote ! {self(), make_ref(), {sync, node(), true}}, | |
io:format("Writes will sync to (~p)~n", [Node]), | |
{State#state{replicas=[Node | Replicas]}, ok}; | |
pang -> | |
{State, {error, connection_failed}} | |
end | |
end; | |
handle_call(State, {replicate, Node}) -> | |
case net_adm:ping(Node) of | |
pong -> | |
replicate(State, Node); | |
pang -> | |
{State, {error, connection_failed}} | |
end. | |
replicate(State, Node) -> | |
% maybe setup sync | |
spawn(fun () -> | |
io:format("Sync started (~p)...~n", [now()]), | |
ets:safe_fixtable(State#state.tags2Id, true), | |
replicate(State, Node, ets:first(State#state.tags2Id)), | |
ets:safe_fixtable(State#state.tags2Id, false), | |
io:format("...Sync finished (~p)~n", [now()]) | |
end), | |
{State, started}. | |
replicate(_State, _Node, '$end_of_table') -> | |
io:format("End of table hit~n", []), | |
ok; | |
replicate(State, Node, Tag) -> | |
Ids = [Id || {_, Id} <- ets:lookup(State#state.tags2Id, Tag)], | |
Records = [ets:lookup(State#state.id2Value, Id) || Id <- Ids], | |
Values = [Value || {_, Value} <- Records], | |
io:format("Replicating tag (~p) and its values (~p) to (~p)~n", [Tag, Values, Node]), | |
lists:foreach(fun(Value) -> | |
Result = rpc:call(Node, ?MODULE, write, [Tag, Value, node()]), | |
io:format("Tag (~p) written to node (~p). Result: (~p)~n", [Tag, Node, Result]) | |
end, Values), | |
replicate(State, Node, ets:next(State#state.tags2Id, Tag)). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment