-
-
Save w495/3122708 to your computer and use it in GitHub Desktop.
Еще одна простая обобщенная реализация параллельного, но не распределенного mapreduce
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%%% @file smr.erl | |
%%% | |
%%% Еще одна простая обобщенная реализация параллельного, но не распределенного | |
%%% mapreduce. | |
%%% | |
%%% При вызове одной из функций mapreduce создается корневой поток. | |
%%% Внутри корневого потока создаются рабочие потоки по одному | |
%%% для каждого элемента данных. Элементы данных обрабатываются | |
%%% функцией отображения заданной в клиентском коде. | |
%%% Результаты обработки пересылается корневому потоку. | |
%%% После этого, с помощью функции свертки результаты вычислений | |
%%% собираются в одно результирующее значение. | |
%%% Это значение и является результатом выполнения функций mapreduce. | |
%%% | |
%%% Возможно, все это имеет смысл оформить в виде gen_server, | |
%%% но автор не уверен в целесообразности такого решения. | |
%%% | |
%%% Пример использования: | |
%%% smr:mapreduce( | |
%%% fun(I, _)-> [{a, 1}|I] end, % отображение | |
%%% fun(I, A, _) -> [I|A] end, % свертка | |
%%% [], % начальный аккумулятор | |
%%% [[{b, 1}], [{c, 1}]] % входной список | |
%%% ). | |
%%% | |
-module(smr). | |
-export([ | |
mapreduce/4, | |
mapreduce/5, | |
mapreduce/6, | |
mapreduce/7 | |
]). | |
%%% | |
%%% Максимальное ожидание на одном элементе | |
%%% | |
-define(SMR_HANDLE_TIMEOUT, 20000). | |
%%% | |
%%% Максимальное ожидание на полном наборе данных | |
%%% | |
-define(SMR_GLOBAL_TIMEOUT, 200000). | |
-spec mapreduce( | |
Mfun ::fun((Item::any(), Opts::[{atom(),any()}]) -> any()), | |
Rfun ::fun((Item::any(), Acc::any(), Opts::[{atom(),any()}]) -> any()), | |
Racc ::any(), | |
List ::[any()] | |
) -> any(). | |
-spec mapreduce( | |
Mfun ::fun((Item::any(), Opts::[{atom(),any()}]) -> any()), | |
Rfun ::fun((Item::any(), Acc::any(), Opts::[{atom(),any()}]) -> any()), | |
Racc ::any(), | |
List ::[any()], | |
Timeout ::integer() | |
) -> any(). | |
-spec mapreduce( | |
Mfun ::fun((Item::any(), Opts::[{atom(),any()}]) -> any()), | |
Mopts ::[{atom(), any()}], | |
Rfun ::fun((Item::any(), Acc::any(), Opts::[{atom(),any()}]) -> any()), | |
Racc ::any(), | |
Ropts ::[{atom(), any()}], | |
List ::[any()] | |
) -> any(). | |
-spec mapreduce( | |
Mfun ::fun((Item::any(), Opts::[{atom(),any()}]) -> any()), | |
Mopts ::[{atom(), any()}], | |
Rfun ::fun((Item::any(), Acc::any(), Opts::[{atom(),any()}]) -> any()), | |
Racc ::any(), | |
Ropts ::[{atom(), any()}], | |
List ::[any()], | |
Timeout ::integer() | |
) -> any(). | |
%%% | |
%%% | |
%%% @spec mapreduce( | |
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()), | |
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Racc::any(), | |
%%% List::[any()] | |
%%% ) -> any(). | |
%%% | |
%%% @doc Создает отдельный поток, который вычисляет сам mapreduce. | |
%%% Отдельный поток выделяется в целях безопасности вычислений | |
%%% проводимых в рабочих потоках. Если обработка полного набора данных | |
%%% будет длиться более чем ?SMR_GLOBAL_TIMEOUT мкс, | |
%%% то будет вызвано исключение {timeout, ?MODULE, ?LINE}. | |
%%% Обертка для функции mapreduce/5. | |
%%% | |
%%% @par Mfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Функция для отображения. принимает 2 аргумента. | |
%%% 1 аргумент: элемент данных для обработки; | |
%%% 2 аргумент: proplist с настройками. В момент вызова функции | |
%%% к настройкам добавляется {parent_pid, Pid}, | |
%%% где Pid --- pid() процесса вызывающего параллельную | |
%%% обработку. | |
%%% Rfun::fun((any(), any(), [{atom(), any()}]) -> any()) | |
%%% Функция для отображения. принимает 2 аргумента. | |
%%% 1 аргумент: элемент данных для обработки; | |
%%% 2 аргумент: аккумулятор для данных | |
%%% 3 аргумент: proplist с настройками. В момент вызова функции | |
%%% к настройкам добавляется {from_pid, From}, | |
%%% где From --- pid() процесса, от которого пришел | |
%%% результат. | |
%%% Racc::any() | |
%%% Начальное значение аккумулятора для данных. | |
%%% List::[any()] | |
%%% Список данных для обработки. | |
%%% | |
mapreduce(Mfun, Rfun, Racc, List) | |
when erlang:is_function(Mfun, 2) | |
andalso erlang:is_function(Rfun, 3) | |
andalso erlang:is_list(List) -> | |
mapreduce(Mfun, Rfun, Racc, List, ?SMR_GLOBAL_TIMEOUT). | |
%%% | |
%%% @spec mapreduce( | |
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()), | |
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Racc::any(), | |
%%% List::[any()], | |
%%% Timeout::integer() | |
%%% ) -> any(). | |
%%% | |
%%% @doc Создает отдельный поток, который вычисляет сам mapreduce. | |
%%% Отдельный поток выделяется в целях безопасности вычислений | |
%%% проводимых в рабочих потоках. Если обработка полного набора данных | |
%%% будет длиться более чем Timeout мкс, то будет вызвано исключение | |
%%% {timeout, ?MODULE, ?LINE}. | |
%%% Обертка для функции mapreduce/7. | |
%%% | |
%%% @par Mfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Функция для отображения. принимает 2 аргумента. | |
%%% 1 аргумент: элемент данных для обработки; | |
%%% 2 аргумент: proplist с настройками. В момент вызова функции | |
%%% к настройкам добавляется {parent_pid, Pid}, | |
%%% где Pid --- pid() процесса вызывающего параллельную | |
%%% обработку. | |
%%% Rfun::fun((any(), any(), [{atom(), any()}]) -> any()) | |
%%% Функция для отображения. принимает 2 аргумента. | |
%%% 1 аргумент: элемент данных для обработки; | |
%%% 2 аргумент: аккумулятор для данных | |
%%% 3 аргумент: proplist с настройками. В момент вызова функции | |
%%% к настройкам добавляется {from_pid, From}, | |
%%% где From --- pid() процесса, от которого пришел | |
%%% результат. | |
%%% Racc::any() | |
%%% Начальное значение аккумулятора для данных. | |
%%% List::[any()] | |
%%% Список данных для обработки. | |
%%% Timeout::integer() | |
%%% Максимальное время обработки полного набора данных. | |
%%% | |
mapreduce(Mfun, Rfun, Racc, List, Timeout) | |
when erlang:is_function(Mfun, 2) | |
andalso erlang:is_function(Rfun, 3) | |
andalso erlang:is_list(List) | |
andalso erlang:is_integer(Timeout) -> | |
mapreduce(Mfun, [], Rfun, Racc, [], List, Timeout). | |
%%% | |
%%% @spec mapreduce( | |
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()), | |
%%% Mopts::[{atom(), any()}], | |
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Racc::any(), | |
%%% Ropts::[{atom(), any()}], | |
%%% List::[any()] | |
%%% ) -> any(). | |
%%% | |
%%% @doc Создает отдельный поток, который вычисляет сам mapreduce. | |
%%% Отдельный поток выделяется в целях безопасности вычислений | |
%%% проводимых в рабочих потоках. Если обработка полного набора данных | |
%%% будет длиться более чем ?SMR_GLOBAL_TIMEOUT мкс, | |
%%% то будет вызвано исключение {timeout, ?MODULE, ?LINE}. | |
%%% Обертка для функции mapreduce/7. | |
%%% | |
%%% @par Mfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Функция для отображения. принимает 2 аргумента. | |
%%% 1 аргумент: элемент данных для обработки; | |
%%% 2 аргумент: proplist с настройками. В момент вызова функции | |
%%% к настройкам добавляется {parent_pid, Pid}, | |
%%% где Pid --- pid() процесса вызывающего параллельную | |
%%% обработку. | |
%%% Mopts::[{atom(), any()}], | |
%%% Начальное значение настроек для функции Mfun/2. | |
%%% Rfun::fun((any(), any(), [{atom(), any()}]) -> any()) | |
%%% Функция для отображения. принимает 2 аргумента. | |
%%% 1 аргумент: элемент данных для обработки; | |
%%% 2 аргумент: аккумулятор для данных | |
%%% 3 аргумент: proplist с настройками. В момент вызова функции | |
%%% к настройкам добавляется {from_pid, From}, | |
%%% где From --- pid() процесса, от которого пришел | |
%%% результат. | |
%%% Racc::any() | |
%%% Начальное значение аккумулятора для данных. | |
%%% Ropts::[{atom(), any()}] | |
%%% Начальное значение настроек для функции Rfun/3. | |
%%% В клиентском коде к настройкам может быть добавлены опции | |
%%% handle_timeout | |
%%% Время максимального ожидания обработки элемента | |
%%% (время в течении которого должен поступить | |
%%% результат работы Mfun/2). | |
%%% Если не задана используется ?SMR_HANDLE_TIMEOUT. | |
%%% use_throw_on_collect. | |
%%% Если она равна true, то | |
%%% при получении от рабочего процесса | |
%%% неопознанного сообщения вызывается исключение | |
%%% {unexpected_message, {Mess, ?MODULE, ?LINE}} | |
%%% при истечении тайм-аута максимального ожидания обработки | |
%%% одного элемента (если в течении определенного | |
%%% времени не поступил ) вызывается исключение | |
%%% {timeout, ?MODULE, ?LINE}; | |
%%% Если она равна false, то | |
%%% при получении от рабочего процесса | |
%%% неопознанного сообщения оно игнорируется | |
%%% при истечении тайм-аута максимального ожидания обработки | |
%%% одного элемента возвращается текущее значение | |
%%% результата Rfun/3. | |
%%% List::[any()] | |
%%% Список данных для обработки. | |
%%% | |
mapreduce(Mfun, Mopts, Rfun, Racc, Ropts, List) | |
when erlang:is_function(Mfun, 2) | |
andalso erlang:is_function(Rfun, 3) | |
andalso erlang:is_list(List) -> | |
mapreduce(Mfun, Mopts, Rfun, Racc, Ropts, List, ?SMR_GLOBAL_TIMEOUT). | |
%%% | |
%%% @spec mapreduce( | |
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()), | |
%%% Mopts::[{atom(), any()}], | |
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Racc::any(), | |
%%% Ropts::[{atom(), any()}], | |
%%% List::[any()], | |
%%% Timeout::integer() | |
%%% ) -> any(). | |
%%% | |
%%% @doc Создает отдельный поток, который вычисляет сам mapreduce. | |
%%% Отдельный поток выделяется в целях безопасности вычислений | |
%%% проводимых в рабочих потоках. Если обработка полного набора данных | |
%%% будет длиться более чем Timeout мкс, то будет вызвано исключение | |
%%% {timeout, ?MODULE, ?LINE}. | |
%%% | |
%%% @par Mfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Функция для отображения. принимает 2 аргумента. | |
%%% 1 аргумент: элемент данных для обработки; | |
%%% 2 аргумент: proplist с настройками. В момент вызова функции | |
%%% к настройкам добавляется {parent_pid, Pid}, | |
%%% где Pid --- pid() процесса вызывающего параллельную | |
%%% обработку. | |
%%% Mopts::[{atom(), any()}], | |
%%% Начальное значение настроек для функции Mfun/2. | |
%%% Rfun::fun((any(), any(), [{atom(), any()}]) -> any()) | |
%%% Функция для отображения. принимает 2 аргумента. | |
%%% 1 аргумент: элемент данных для обработки; | |
%%% 2 аргумент: аккумулятор для данных | |
%%% 3 аргумент: proplist с настройками. В момент вызова функции | |
%%% к настройкам добавляется {from_pid, From}, | |
%%% где From --- pid() процесса, от которого пришел | |
%%% результат. | |
%%% Racc::any() | |
%%% Начальное значение аккумулятора для данных. | |
%%% Ropts::[{atom(), any()}] | |
%%% Начальное значение настроек для функции Rfun/3. | |
%%% В клиентском коде к настройкам может быть добавлены опции | |
%%% handle_timeout | |
%%% Время максимального ожидания обработки элемента | |
%%% (время в течении которого должен поступить | |
%%% результат работы Mfun/2). | |
%%% Если не задана используется ?SMR_HANDLE_TIMEOUT. | |
%%% use_throw_on_collect. | |
%%% Если она равна true, то | |
%%% при получении от рабочего процесса | |
%%% неопознанного сообщения вызывается исключение | |
%%% {unexpected_message, {Mess, ?MODULE, ?LINE}} | |
%%% при истечении тайм-аута максимального ожидания обработки | |
%%% одного элемента (если в течении определенного | |
%%% времени не поступил ) вызывается исключение | |
%%% {timeout, ?MODULE, ?LINE}; | |
%%% Если она равна false, то | |
%%% при получении от рабочего процесса | |
%%% неопознанного сообщения оно игнорируется | |
%%% при истечении тайм-аута максимального ожидания обработки | |
%%% одного элемента возвращается текущее значение | |
%%% результата Rfun/3. | |
%%% List::[any()] | |
%%% Список данных для обработки. | |
%%% Timeout::integer() | |
%%% Максимальное время обработки полного набора данных. | |
%%% | |
mapreduce(Mfun, Mopts, Rfun, Racc, Ropts, List, Timeout) | |
when erlang:is_function(Mfun, 2) | |
andalso erlang:is_function(Rfun, 3) | |
andalso erlang:is_list(List) | |
andalso erlang:is_integer(Timeout) -> | |
Ppid = self(), | |
Ref = make_ref(), | |
Pid = spawn_link(fun()-> | |
Ppid ! { | |
?MODULE, Ref, self(), | |
reduce(Rfun, Racc, Ropts, List, map(Mfun, Mopts, List)) | |
} | |
end), | |
receive | |
{?MODULE, Ref, Pid, Result} -> Result | |
after Timeout -> throw({timeout, ?MODULE, ?LINE}) | |
end. | |
%%% | |
%%% @spec map( | |
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()), | |
%%% Mopts::[{atom(), any()}], | |
%%% List::[any()], | |
%%% ) -> ref(). | |
%%% | |
%%% @doc Проходит по списку данных и для каждого элемента запускает | |
%%% параллельную обработку. Результат вычисления отправляется обратно | |
%%% в запустивший процесс. Возвращает уникальную ссылку (ref), | |
%%% для правильного определения процесса от которого будет получен | |
%%% результат вычислений. | |
%%% | |
map(Function, Options, List) | |
when erlang:is_function(Function, 2) -> | |
Ppid = self(), | |
Ref = make_ref(), | |
lists:foreach(fun(Item) -> spawn_link(fun()-> | |
Ppid ! { | |
?MODULE, Ref, self(), | |
erlang:apply(Function,[Item,[{parent_pid, Ppid}|Options]]) | |
} | |
end) end, List), Ref. | |
%%% | |
%%% @spec reduce( | |
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Racc::any(), | |
%%% Ropts::[{atom(), any()}], | |
%%% List::[any()], | |
%%% Ref:ref() | |
%%% ) -> any(). | |
%%% | |
%%% @doc Собирает данные, которые до этого вычислялись параллельно. | |
%%% Обертка для функции collect/5. | |
%%% | |
reduce(Function, Acc, Options, List, Ref) | |
when erlang:is_function(Function, 3) -> | |
collect(Function, Acc, Options, erlang:length(List), Ref). | |
%%% | |
%%% @spec collect( | |
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()), | |
%%% Racc::any(), | |
%%% Ropts::[{atom(), any()}], | |
%%% Counter::integer(), | |
%%% Ref:ref() | |
%%% ) -> any(). | |
%%% | |
%%% @doc Собирает данные, которые до этого вычислялись параллельно. | |
%%% Что происходит: | |
%%% 1) Получаем данные от параллельного map | |
%%% 2) Получаем новое значение Aсс на основании Function(). | |
%%% 3) Уменьшаем размер счетчика (Counter) на 1. | |
%%% Счетчик нужен для того, чтобы определить сколько | |
%%% элементов требуется обработать, чтобы их число было не больше | |
%%% и не меньше требуемого. | |
%%% | |
collect(_function, Acc, _options, 0, _ref) -> | |
Acc; | |
collect(Function, Acc, Options, Counter, Ref) | |
when erlang:is_function(Function, 3) -> | |
receive | |
{?MODULE, Ref, From, Item} -> | |
collect( | |
Function, | |
erlang:apply(Function,[Item,Acc,[{from_pid, From}|Options]]), | |
Options, | |
Counter - 1, | |
Ref | |
); | |
Mess -> | |
case proplists:get_value(use_throw_on_collect, Options, false) of | |
true -> | |
throw({unexpected_message, {Mess, ?MODULE, ?LINE}}); | |
_ -> | |
collect(Function, Options, Acc, Counter, Ref) | |
end | |
after | |
proplists:get_value(handle_timeout,Options,?SMR_HANDLE_TIMEOUT)-> | |
case proplists:get_value(use_throw_on_collect, Options, false) of | |
true -> | |
throw({timeout, ?MODULE, ?LINE}); | |
_ -> | |
Acc | |
end | |
end. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment