-
-
Save dch/47df5d251bad7dd88437 to your computer and use it in GitHub Desktop.
A multi-master (AKA distributed) counter implemented as a Conflict-free Replicated Data Type.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(mmc). | |
% A multi-master counter implemented as a CRDT. Each server holds an array of counters (implemented as a dict here...) | |
% and Strong Eventual Consistency is acheived by using commutive increments on each of the counters. The current | |
% value read at any server is the sum of all the counter values (again, one per server) stored on that machine. | |
% As updates are synced between servers, all servers will eventually converge on a counter sum value with no | |
% conflicts possible. Also note that this does not use any consensus algorithm, so it remains fast as the number of | |
% servers scales. See http://research.microsoft.com/apps/video/default.aspx?id=153540&r=1 | |
-behaviour(gen_server). | |
% Client API | |
-export([start_link/0, read/1, increment/1, send_state/2, stop/1]). | |
% Server Implementation | |
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2, code_change/3]). | |
%--------------------------------------------------------------------------------------------- | |
% Client API | |
%--------------------------------------------------------------------------------------------- | |
start_link() -> gen_server:start_link(?MODULE, [], []). | |
% Get the value of the counter. | |
read(ServerPid) -> gen_server:call(ServerPid, read). | |
% Tell the cluster (via ANY server) to increment the distributed counter. | |
increment(ServerPid) -> gen_server:call(ServerPid, increment). | |
% Command this server to send it's current state to another server. | |
send_state(SourceServerPid, DestinationServerPid) -> gen_server:call(SourceServerPid, {send_state_to, DestinationServerPid}). | |
stop(ServerPid) -> gen_server:cast(ServerPid, stop). | |
%--------------------------------------------------------------------------------------------- | |
% Server Implementation | |
%--------------------------------------------------------------------------------------------- | |
init([]) -> | |
Dict = dict:store(self(), 0, dict:new()), | |
{ok, Dict}. | |
handle_cast(stop, Dict) -> {stop, normal, Dict}; | |
handle_cast({update_remote_state, RemoteServerId, RemoteValue}, Dict) -> | |
Dict1 = dict:store(RemoteServerId, RemoteValue, Dict), | |
{noreply, Dict1}; | |
handle_cast(_any, State) -> {noreply, State}. | |
handle_call(read, _From, Dict) -> | |
Val = dict:fold(fun(_K, V, Acc) -> V + Acc end, 0, Dict), | |
{reply, Val, Dict}; | |
handle_call(increment, _From, Dict) -> | |
Dict1 = dict:update_counter(self(), 1, Dict), | |
{reply, ok, Dict1}; | |
handle_call({send_state_to, DestinationServerPid}, _From, Dict) -> | |
SourceServerPid = self(), | |
SourceValue = dict:fetch(self(), Dict), | |
% Use cast to avoid a deadlock! | |
gen_server:cast(DestinationServerPid, {update_remote_state, SourceServerPid, SourceValue}), | |
{reply, ok, Dict}. | |
handle_info(Message, Dict) -> | |
io:format("Unexpected message ~p~n", [Message]), | |
{noreply, Dict}. | |
terminate(normal, _State) -> ok; | |
terminate(shutdown, _State) -> ok. | |
code_change(_OldVsn, State, _Extra) -> {ok, State}. | |
%--------------------------------------------------------------------------------------------- | |
% Unit Tests | |
%--------------------------------------------------------------------------------------------- | |
-include_lib("eunit/include/eunit.hrl"). | |
all_test() -> | |
{ok, Server1} = start_link(), | |
{ok, Server2} = start_link(), | |
?assertEqual(0, read(Server1)), | |
?assertEqual(0, read(Server2)), | |
increment(Server1), % Begin concurrent updates... | |
?assertEqual(1, read(Server1)), | |
?assertEqual(0, read(Server2)), | |
increment(Server2), % Concurrent updates complete. | |
?assertEqual(1, read(Server1)), | |
?assertEqual(1, read(Server2)), % Replicas have not converged... | |
% This is kind of like a split-brain... pretend it healed and begin sending updates throughout the cluster: | |
send_state(Server1, Server2), % Replicate the update on S1 to S2 | |
?assertEqual(1, read(Server1)), | |
?assertEqual(2, read(Server2)), % Server two has converged... | |
send_state(Server2, Server1), % Replicate the update on S2 to S1 | |
?assertEqual(2, read(Server1)), % Server one has converged... | |
?assertEqual(2, read(Server2)), % The DISTRIBUTED value of the replica set has converged completely! SEC worked without conflict :) | |
stop(Server1), | |
stop(Server2). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment