Created March 28, 2012 11:14
distributed k/v store in erlang
% internal operations
-export([init/0, loop/1, write/3]).
% public system operations
-export([start_link/0, stop/0, sync/2, replicate/1, pid/0]).
% public k/v operations
-export([lookup/1, write/2]).
-define(TIMEOUT, 30000).
-record(state, {tags2Id, id2Value, value2Id, last_id=0, replicas=[]}).
start_link() ->
case ?MODULE:pid() of
undefined ->
Pid = spawn_link(?MODULE, init, []),
register(?MODULE, Pid);
_ ->
{error, already_started}
stop() ->
lookup(Key) ->
call({lookup, Key}).
write(Key, Value) ->
call({write, Key, Value}).
write(Key, Value, Replica) ->
call({write, Key, Value, Replica}).
sync(Node, Flag) when is_boolean(Flag) ->
call({sync, Node, Flag}).
replicate(Node) ->
call({replicate, Node}).
pid() -> whereis(?MODULE).
call(Command) ->
call(?MODULE:pid(), Command).
call(Db, Command) ->
io:format("Sending (~p) to (~p)~n", [Command, Db]),
Ref = make_ref(),
Db ! {self(), Ref, Command},
{reply, Ref, Value} -> Value
after ?TIMEOUT ->
{error, timeout}
init() ->
TagsId = ets:new(tags2Id, [duplicate_bag, named_table]),
IdId = ets:new(id2Value, [named_table]),
ValuesId = ets:new(value2Id, [named_table]),
loop(#state{tags2Id=TagsId, id2Value=IdId, value2Id=ValuesId}).
loop(State) ->
{Pid, Ref, stop} when is_pid(Pid), is_reference(Ref) ->
Pid ! {reply, Ref, ok};
{Pid, Ref, Command} when is_pid(Pid), is_reference(Ref) ->
{NewState, Return} = handle_call(State, Command),
Pid ! {reply, Ref, Return},
{nodedown, Node} ->
Replicas = lists:delete(Node, State#state.replicas),
io:format("Lost connection with ~p, ending sync~n", [Node]),
Other ->
io:format("Unknown message received: ~p~n", [Other]),
handle_call(State, {lookup, Key}) ->
Ids = [Id || {_Tag, Id} <- ets:lookup(State#state.tags2Id, Key)],
Return = [ets:lookup(State#state.id2Value, Id) || Id <- Ids],
{State, lists:flatten(Return)};
handle_call(State, {write, Key, Value}) ->
handle_call(State, {write, Key, Value, undefined});
handle_call(State, {write, Key, Value, Replica}) ->
ValueId = case ets:lookup(State#state.value2Id, Value) of
[{Value, Id}] ->
[] ->
Id = State#state.last_id + 1,
ets:insert(State#state.id2Value, {Id, Value}),
ets:insert(State#state.value2Id, {Value, Id}),
ets:insert(State#state.tags2Id, {Key, ValueId}),
ReplicateTo = lists:filter(fun(N) -> N /= Replica end, State#state.replicas),
io:format("Replicating to ~p~n", [ReplicateTo]),
rpc:multicall(ReplicateTo, ?MODULE, write, [Key, Value, node()]),
io:format("Tag/Value (~p) written and replicated (~p)~n",
[{Key, Value}, ReplicateTo]),
{State#state{last_id = ValueId}, ok};
handle_call(State, {sync, Node, false}) ->
monitor_node(Node, false),
Replicas = lists:delete(Node, State#state.replicas),
Remote = rpc:call(Node, erlang, whereis, [?MODULE]),
% need async to prevent deadlock
Remote ! {self(), make_ref(), {sync, node(), false}},
io:format("Writes will no longer sync to (~p)~n", [Node]),
{State#state{replicas=Replicas}, ok};
handle_call(State, {sync, Node, true}) ->
case lists:any(fun (N) -> N == Node end, State#state.replicas) of
true ->
{State, ok};
false ->
case net_adm:ping(Node) of
pong ->
monitor_node(Node, true),
Replicas = State#state.replicas,
Remote = rpc:call(Node, erlang, whereis, [?MODULE]),
% need async to prevent deadlock
Remote ! {self(), make_ref(), {sync, node(), true}},
io:format("Writes will sync to (~p)~n", [Node]),
{State#state{replicas=[Node | Replicas]}, ok};
pang ->
{State, {error, connection_failed}}
handle_call(State, {replicate, Node}) ->
case net_adm:ping(Node) of
pong ->
replicate(State, Node);
pang ->
{State, {error, connection_failed}}
replicate(State, Node) ->
% maybe setup sync
spawn(fun () ->
io:format("Sync started (~p)...~n", [now()]),
ets:safe_fixtable(State#state.tags2Id, true),
replicate(State, Node, ets:first(State#state.tags2Id)),
ets:safe_fixtable(State#state.tags2Id, false),
io:format("...Sync finished (~p)~n", [now()])
{State, started}.
replicate(_State, _Node, '$end_of_table') ->
io:format("End of table hit~n", []),
replicate(State, Node, Tag) ->
Ids = [Id || {_, Id} <- ets:lookup(State#state.tags2Id, Tag)],
Records = [ets:lookup(State#state.id2Value, Id) || Id <- Ids],
Values = [Value || {_, Value} <- Records],
io:format("Replicating tag (~p) and its values (~p) to (~p)~n", [Tag, Values, Node]),
lists:foreach(fun(Value) ->
Result = rpc:call(Node, ?MODULE, write, [Tag, Value, node()]),
io:format("Tag (~p) written to node (~p). Result: (~p)~n", [Tag, Node, Result])
end, Values),
replicate(State, Node, ets:next(State#state.tags2Id, Tag)).
