Skip to content

Instantly share code, notes, and snippets.

@w495
Created July 16, 2012 13:24
Show Gist options
  • Save w495/3122708 to your computer and use it in GitHub Desktop.
Save w495/3122708 to your computer and use it in GitHub Desktop.
Еще одна простая обобщенная реализация параллельного, но не распределенного mapreduce
%%% @file smr.erl
%%%
%%% Еще одна простая обобщенная реализация параллельного, но не распределенного
%%% mapreduce.
%%%
%%% При вызове одной из функций mapreduce создается корневой поток.
%%% Внутри корневого потока создаются рабочие потоки по одному
%%% для каждого элемента данных. Элементы данных обрабатываются
%%% функцией отображения заданной в клиентском коде.
%%% Результаты обработки пересылается корневому потоку.
%%% После этого, с помощью функции свертки результаты вычислений
%%% собираются в одно результирующее значение.
%%% Это значение и является результатом выполнения функций mapreduce.
%%%
%%% Возможно, все это имеет смысл оформить в виде gen_server,
%%% но автор не уверен в целесообразности такого решения.
%%%
%%% Пример использования:
%%% smr:mapreduce(
%%% fun(I, _)-> [{a, 1}|I] end, % отображение
%%% fun(I, A, _) -> [I|A] end, % свертка
%%% [], % начальный аккумулятор
%%% [[{b, 1}], [{c, 1}]] % входной список
%%% ).
%%%
-module(smr).
-export([
mapreduce/4,
mapreduce/5,
mapreduce/6,
mapreduce/7
]).
%%%
%%% Максимальное ожидание на одном элементе
%%%
-define(SMR_HANDLE_TIMEOUT, 20000).
%%%
%%% Максимальное ожидание на полном наборе данных
%%%
-define(SMR_GLOBAL_TIMEOUT, 200000).
-spec mapreduce(
Mfun ::fun((Item::any(), Opts::[{atom(),any()}]) -> any()),
Rfun ::fun((Item::any(), Acc::any(), Opts::[{atom(),any()}]) -> any()),
Racc ::any(),
List ::[any()]
) -> any().
-spec mapreduce(
Mfun ::fun((Item::any(), Opts::[{atom(),any()}]) -> any()),
Rfun ::fun((Item::any(), Acc::any(), Opts::[{atom(),any()}]) -> any()),
Racc ::any(),
List ::[any()],
Timeout ::integer()
) -> any().
-spec mapreduce(
Mfun ::fun((Item::any(), Opts::[{atom(),any()}]) -> any()),
Mopts ::[{atom(), any()}],
Rfun ::fun((Item::any(), Acc::any(), Opts::[{atom(),any()}]) -> any()),
Racc ::any(),
Ropts ::[{atom(), any()}],
List ::[any()]
) -> any().
-spec mapreduce(
Mfun ::fun((Item::any(), Opts::[{atom(),any()}]) -> any()),
Mopts ::[{atom(), any()}],
Rfun ::fun((Item::any(), Acc::any(), Opts::[{atom(),any()}]) -> any()),
Racc ::any(),
Ropts ::[{atom(), any()}],
List ::[any()],
Timeout ::integer()
) -> any().
%%%
%%%
%%% @spec mapreduce(
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()),
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Racc::any(),
%%% List::[any()]
%%% ) -> any().
%%%
%%% @doc Создает отдельный поток, который вычисляет сам mapreduce.
%%% Отдельный поток выделяется в целях безопасности вычислений
%%% проводимых в рабочих потоках. Если обработка полного набора данных
%%% будет длиться более чем ?SMR_GLOBAL_TIMEOUT мкс,
%%% то будет вызвано исключение {timeout, ?MODULE, ?LINE}.
%%% Обертка для функции mapreduce/5.
%%%
%%% @par Mfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Функция для отображения. принимает 2 аргумента.
%%% 1 аргумент: элемент данных для обработки;
%%% 2 аргумент: proplist с настройками. В момент вызова функции
%%% к настройкам добавляется {parent_pid, Pid},
%%% где Pid --- pid() процесса вызывающего параллельную
%%% обработку.
%%% Rfun::fun((any(), any(), [{atom(), any()}]) -> any())
%%% Функция для отображения. принимает 2 аргумента.
%%% 1 аргумент: элемент данных для обработки;
%%% 2 аргумент: аккумулятор для данных
%%% 3 аргумент: proplist с настройками. В момент вызова функции
%%% к настройкам добавляется {from_pid, From},
%%% где From --- pid() процесса, от которого пришел
%%% результат.
%%% Racc::any()
%%% Начальное значение аккумулятора для данных.
%%% List::[any()]
%%% Список данных для обработки.
%%%
mapreduce(Mfun, Rfun, Racc, List)
when erlang:is_function(Mfun, 2)
andalso erlang:is_function(Rfun, 3)
andalso erlang:is_list(List) ->
mapreduce(Mfun, Rfun, Racc, List, ?SMR_GLOBAL_TIMEOUT).
%%%
%%% @spec mapreduce(
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()),
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Racc::any(),
%%% List::[any()],
%%% Timeout::integer()
%%% ) -> any().
%%%
%%% @doc Создает отдельный поток, который вычисляет сам mapreduce.
%%% Отдельный поток выделяется в целях безопасности вычислений
%%% проводимых в рабочих потоках. Если обработка полного набора данных
%%% будет длиться более чем Timeout мкс, то будет вызвано исключение
%%% {timeout, ?MODULE, ?LINE}.
%%% Обертка для функции mapreduce/7.
%%%
%%% @par Mfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Функция для отображения. принимает 2 аргумента.
%%% 1 аргумент: элемент данных для обработки;
%%% 2 аргумент: proplist с настройками. В момент вызова функции
%%% к настройкам добавляется {parent_pid, Pid},
%%% где Pid --- pid() процесса вызывающего параллельную
%%% обработку.
%%% Rfun::fun((any(), any(), [{atom(), any()}]) -> any())
%%% Функция для отображения. принимает 2 аргумента.
%%% 1 аргумент: элемент данных для обработки;
%%% 2 аргумент: аккумулятор для данных
%%% 3 аргумент: proplist с настройками. В момент вызова функции
%%% к настройкам добавляется {from_pid, From},
%%% где From --- pid() процесса, от которого пришел
%%% результат.
%%% Racc::any()
%%% Начальное значение аккумулятора для данных.
%%% List::[any()]
%%% Список данных для обработки.
%%% Timeout::integer()
%%% Максимальное время обработки полного набора данных.
%%%
mapreduce(Mfun, Rfun, Racc, List, Timeout)
when erlang:is_function(Mfun, 2)
andalso erlang:is_function(Rfun, 3)
andalso erlang:is_list(List)
andalso erlang:is_integer(Timeout) ->
mapreduce(Mfun, [], Rfun, Racc, [], List, Timeout).
%%%
%%% @spec mapreduce(
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()),
%%% Mopts::[{atom(), any()}],
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Racc::any(),
%%% Ropts::[{atom(), any()}],
%%% List::[any()]
%%% ) -> any().
%%%
%%% @doc Создает отдельный поток, который вычисляет сам mapreduce.
%%% Отдельный поток выделяется в целях безопасности вычислений
%%% проводимых в рабочих потоках. Если обработка полного набора данных
%%% будет длиться более чем ?SMR_GLOBAL_TIMEOUT мкс,
%%% то будет вызвано исключение {timeout, ?MODULE, ?LINE}.
%%% Обертка для функции mapreduce/7.
%%%
%%% @par Mfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Функция для отображения. принимает 2 аргумента.
%%% 1 аргумент: элемент данных для обработки;
%%% 2 аргумент: proplist с настройками. В момент вызова функции
%%% к настройкам добавляется {parent_pid, Pid},
%%% где Pid --- pid() процесса вызывающего параллельную
%%% обработку.
%%% Mopts::[{atom(), any()}],
%%% Начальное значение настроек для функции Mfun/2.
%%% Rfun::fun((any(), any(), [{atom(), any()}]) -> any())
%%% Функция для отображения. принимает 2 аргумента.
%%% 1 аргумент: элемент данных для обработки;
%%% 2 аргумент: аккумулятор для данных
%%% 3 аргумент: proplist с настройками. В момент вызова функции
%%% к настройкам добавляется {from_pid, From},
%%% где From --- pid() процесса, от которого пришел
%%% результат.
%%% Racc::any()
%%% Начальное значение аккумулятора для данных.
%%% Ropts::[{atom(), any()}]
%%% Начальное значение настроек для функции Rfun/3.
%%% В клиентском коде к настройкам может быть добавлены опции
%%% handle_timeout
%%% Время максимального ожидания обработки элемента
%%% (время в течении которого должен поступить
%%% результат работы Mfun/2).
%%% Если не задана используется ?SMR_HANDLE_TIMEOUT.
%%% use_throw_on_collect.
%%% Если она равна true, то
%%% при получении от рабочего процесса
%%% неопознанного сообщения вызывается исключение
%%% {unexpected_message, {Mess, ?MODULE, ?LINE}}
%%% при истечении тайм-аута максимального ожидания обработки
%%% одного элемента (если в течении определенного
%%% времени не поступил ) вызывается исключение
%%% {timeout, ?MODULE, ?LINE};
%%% Если она равна false, то
%%% при получении от рабочего процесса
%%% неопознанного сообщения оно игнорируется
%%% при истечении тайм-аута максимального ожидания обработки
%%% одного элемента возвращается текущее значение
%%% результата Rfun/3.
%%% List::[any()]
%%% Список данных для обработки.
%%%
mapreduce(Mfun, Mopts, Rfun, Racc, Ropts, List)
when erlang:is_function(Mfun, 2)
andalso erlang:is_function(Rfun, 3)
andalso erlang:is_list(List) ->
mapreduce(Mfun, Mopts, Rfun, Racc, Ropts, List, ?SMR_GLOBAL_TIMEOUT).
%%%
%%% @spec mapreduce(
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()),
%%% Mopts::[{atom(), any()}],
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Racc::any(),
%%% Ropts::[{atom(), any()}],
%%% List::[any()],
%%% Timeout::integer()
%%% ) -> any().
%%%
%%% @doc Создает отдельный поток, который вычисляет сам mapreduce.
%%% Отдельный поток выделяется в целях безопасности вычислений
%%% проводимых в рабочих потоках. Если обработка полного набора данных
%%% будет длиться более чем Timeout мкс, то будет вызвано исключение
%%% {timeout, ?MODULE, ?LINE}.
%%%
%%% @par Mfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Функция для отображения. принимает 2 аргумента.
%%% 1 аргумент: элемент данных для обработки;
%%% 2 аргумент: proplist с настройками. В момент вызова функции
%%% к настройкам добавляется {parent_pid, Pid},
%%% где Pid --- pid() процесса вызывающего параллельную
%%% обработку.
%%% Mopts::[{atom(), any()}],
%%% Начальное значение настроек для функции Mfun/2.
%%% Rfun::fun((any(), any(), [{atom(), any()}]) -> any())
%%% Функция для отображения. принимает 2 аргумента.
%%% 1 аргумент: элемент данных для обработки;
%%% 2 аргумент: аккумулятор для данных
%%% 3 аргумент: proplist с настройками. В момент вызова функции
%%% к настройкам добавляется {from_pid, From},
%%% где From --- pid() процесса, от которого пришел
%%% результат.
%%% Racc::any()
%%% Начальное значение аккумулятора для данных.
%%% Ropts::[{atom(), any()}]
%%% Начальное значение настроек для функции Rfun/3.
%%% В клиентском коде к настройкам может быть добавлены опции
%%% handle_timeout
%%% Время максимального ожидания обработки элемента
%%% (время в течении которого должен поступить
%%% результат работы Mfun/2).
%%% Если не задана используется ?SMR_HANDLE_TIMEOUT.
%%% use_throw_on_collect.
%%% Если она равна true, то
%%% при получении от рабочего процесса
%%% неопознанного сообщения вызывается исключение
%%% {unexpected_message, {Mess, ?MODULE, ?LINE}}
%%% при истечении тайм-аута максимального ожидания обработки
%%% одного элемента (если в течении определенного
%%% времени не поступил ) вызывается исключение
%%% {timeout, ?MODULE, ?LINE};
%%% Если она равна false, то
%%% при получении от рабочего процесса
%%% неопознанного сообщения оно игнорируется
%%% при истечении тайм-аута максимального ожидания обработки
%%% одного элемента возвращается текущее значение
%%% результата Rfun/3.
%%% List::[any()]
%%% Список данных для обработки.
%%% Timeout::integer()
%%% Максимальное время обработки полного набора данных.
%%%
mapreduce(Mfun, Mopts, Rfun, Racc, Ropts, List, Timeout)
when erlang:is_function(Mfun, 2)
andalso erlang:is_function(Rfun, 3)
andalso erlang:is_list(List)
andalso erlang:is_integer(Timeout) ->
Ppid = self(),
Ref = make_ref(),
Pid = spawn_link(fun()->
Ppid ! {
?MODULE, Ref, self(),
reduce(Rfun, Racc, Ropts, List, map(Mfun, Mopts, List))
}
end),
receive
{?MODULE, Ref, Pid, Result} -> Result
after Timeout -> throw({timeout, ?MODULE, ?LINE})
end.
%%%
%%% @spec map(
%%% Mfun::fun((any(), [{atom(),any()}]) -> any()),
%%% Mopts::[{atom(), any()}],
%%% List::[any()],
%%% ) -> ref().
%%%
%%% @doc Проходит по списку данных и для каждого элемента запускает
%%% параллельную обработку. Результат вычисления отправляется обратно
%%% в запустивший процесс. Возвращает уникальную ссылку (ref),
%%% для правильного определения процесса от которого будет получен
%%% результат вычислений.
%%%
map(Function, Options, List)
when erlang:is_function(Function, 2) ->
Ppid = self(),
Ref = make_ref(),
lists:foreach(fun(Item) -> spawn_link(fun()->
Ppid ! {
?MODULE, Ref, self(),
erlang:apply(Function,[Item,[{parent_pid, Ppid}|Options]])
}
end) end, List), Ref.
%%%
%%% @spec reduce(
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Racc::any(),
%%% Ropts::[{atom(), any()}],
%%% List::[any()],
%%% Ref:ref()
%%% ) -> any().
%%%
%%% @doc Собирает данные, которые до этого вычислялись параллельно.
%%% Обертка для функции collect/5.
%%%
reduce(Function, Acc, Options, List, Ref)
when erlang:is_function(Function, 3) ->
collect(Function, Acc, Options, erlang:length(List), Ref).
%%%
%%% @spec collect(
%%% Rfun::fun((any(), any(), [{atom(),any()}]) -> any()),
%%% Racc::any(),
%%% Ropts::[{atom(), any()}],
%%% Counter::integer(),
%%% Ref:ref()
%%% ) -> any().
%%%
%%% @doc Собирает данные, которые до этого вычислялись параллельно.
%%% Что происходит:
%%% 1) Получаем данные от параллельного map
%%% 2) Получаем новое значение Aсс на основании Function().
%%% 3) Уменьшаем размер счетчика (Counter) на 1.
%%% Счетчик нужен для того, чтобы определить сколько
%%% элементов требуется обработать, чтобы их число было не больше
%%% и не меньше требуемого.
%%%
collect(_function, Acc, _options, 0, _ref) ->
Acc;
collect(Function, Acc, Options, Counter, Ref)
when erlang:is_function(Function, 3) ->
receive
{?MODULE, Ref, From, Item} ->
collect(
Function,
erlang:apply(Function,[Item,Acc,[{from_pid, From}|Options]]),
Options,
Counter - 1,
Ref
);
Mess ->
case proplists:get_value(use_throw_on_collect, Options, false) of
true ->
throw({unexpected_message, {Mess, ?MODULE, ?LINE}});
_ ->
collect(Function, Options, Acc, Counter, Ref)
end
after
proplists:get_value(handle_timeout,Options,?SMR_HANDLE_TIMEOUT)->
case proplists:get_value(use_throw_on_collect, Options, false) of
true ->
throw({timeout, ?MODULE, ?LINE});
_ ->
Acc
end
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment