Created
March 9, 2016 17:43
-
-
Save zvkemp/3cff279388b89745313e to your computer and use it in GitHub Desktop.
Annotated multi-process mapping module in Elixir
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Parallel do | |
def map(enum, function, workers: n) do | |
worker_pool(n) # set up a worker pool | |
# lazily zip it together with the enum elements (result: [ { pid0, e0 }, { pid1, e1 } ... ]) | |
|> Stream.zip(enum) | |
# send each piece of data to the worker, along with the mapping function | |
|> Enum.each(fn { pid, e } -> | |
send(pid, { e, function }) | |
end) | |
# slurp data out of incoming message buffer into a list | |
# Receive data the same number of times as elements in the list | |
Enum.reduce(enum, [], &receive_to_list/2) | |
end | |
# first argument is ignored, because we are just using `Enum.reduce` to collect remote data. | |
# NOTE: this is not guaranteed to receive data in the same order | |
# it was pushed out to remotes; rather it is in first-received order. | |
defp receive_to_list(_, acc) do | |
acc ++ [receive do d -> d end] | |
end | |
defp worker_pool(count) do | |
Enum.map(1..count, fn (_) -> spawn_worker end) |> Stream.cycle | |
end | |
defp spawn_worker do | |
# `self` is a keyword referring to the current process | |
# binding it to a variable here ensures that the correct caller process | |
# is found when the function is called on a remote worker (otherwise, `self` would | |
# refer to the remote) | |
caller = self | |
spawn_link(fn -> remote_loop(caller) end) | |
end | |
defp remote_loop(caller) do | |
# receive is a blocking loop; it will wait until one of the following | |
# patterns is matched by a message from another process. | |
# If an unmatched message is received the process will error and exit. | |
receive do | |
:stop -> nil # nothing happens, and the recursion stops | |
{ data, function } -> | |
send caller, function.(data) | |
remote_loop(caller) # recursively call this function to wait for another message | |
end | |
end | |
end | |
pause = fn (x) -> | |
:timer.sleep(1) # milliseconds | |
x | |
end | |
benchmark = fn (workers) -> | |
IO.puts("#{workers} workers:") | |
{ microseconds, result } = :timer.tc(fn -> | |
Parallel.map(1..2500, pause, workers: workers) | |
end) | |
IO.puts "in #{microseconds / 1_000_000.0}s" | |
end | |
benchmark.(1) #=> in 5.002589s | |
benchmark.(2) #=> in 2.500837s | |
benchmark.(5) #=> in 1.000895s | |
benchmark.(100) #=> in 0.051999s | |
benchmark.(250) #=> in 0.034882s | |
benchmark.(500) #=> in 0.038719s | |
benchmark.(2500) #=> in 0.05327s | |
# Mapping over 2500 elements at 1ms each, the time taken generally | |
# shrinks linearly with the number of workers (up to a point - the latency | |
# of inter-process communication is very small, but not nothing). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment