Skip to content

Instantly share code, notes, and snippets.

@kerryb
Created June 11, 2024 15:27
Show Gist options
  • Save kerryb/130088a7067f841bd69af109d0edb5fa to your computer and use it in GitHub Desktop.
Save kerryb/130088a7067f841bd69af109d0edb5fa to your computer and use it in GitHub Desktop.

Concurrency in Elixir

Mix.install([
  {:kino, "~> 0.12.3"}
])

Basic message passing with send and receive

A receive block will wait until a message matching one of its clauses is received. Here we wait for any message, and print it out. However, this now means the current process is blocked, so the message we attempt to send ourself isn’t received. The receive block is set to time out after 10ms, otherwise it would hang forever – after it times out the send will be executed, but no-one’s listening.

Note: All the :ok lines at the end of these examples are just to avoid LiveBook showing the result of the last line in the block as a return value, which can be confusing (in this case, it would have been "Hello", because send returns the message it sent).

receive do
  msg -> IO.inspect(msg, label: :received)
after
  10 -> IO.puts("Bored now")
end

send(self(), "Hello")
:ok

We never handled the message, so it’s still in our process mailbox. Let’s receive it now:

receive do
  msg -> IO.inspect(msg, label: :received)
end

:ok

We can only look for messages we sent to ourself after we sent them, which may not be very useful. But we can use spawn to create a new process, and send messages to that. The two processes run concurrently, so don’t block each other.

pid =
  spawn(fn ->
    receive do
      msg -> IO.inspect(msg, label: :received)
    end
  end)

send(pid, "Hello")
send(pid, "Hello again")
:ok

In the example above, the spawned process received one message then immediately terminated. If we want to receive more than one message, we need to recursively call our receive block. Let’s put our function into a module to make this cleaner, and also look for a specific message to tell our receiver to terminate, rather than timing out.

We’re also using Kino to render a sequence diagram of the messages which are being passed.

defmodule Echo do
  def run(caller_pid) do
    receive do
      :stop ->
        IO.puts("Stopping")

      msg ->
        send(caller_pid, "I got #{inspect(msg)}")
        run(caller_pid)
    end
  end
end

Kino.Process.render_seq_trace(fn ->
  self = self()
  echo_pid = spawn(fn -> Echo.run(self) end)
  send(echo_pid, "Hello")

  receive do
    msg -> IO.inspect(msg, label: :reply_1)
  end

  send(echo_pid, {:foo, %{value: 42, bar: "baz"}})

  receive do
    msg -> IO.inspect(msg, label: :reply_2)
  end

  send(echo_pid, :stop)
end)

GenServer

GenServer is an abstraction round processes and message passing, and is part of OTP (Open Telecom Platform). It implements battle-tested patterns for process management, supervision etc, and provides a number of callbacks for handling messages – we’ll just look at handle_call (for messages requiring a reply) and handle_cast (for “fire and forget” messages).

defmodule Counter do
  use GenServer

  @impl GenServer
  def init(value) do
    {:ok, value}
  end

  @impl GenServer
  def handle_cast(:inc, value) do
    {:noreply, value + 1}
  end

  @impl GenServer
  def handle_call(:value, _from, value) do
    {:reply, value, value}
  end
end

Kino.Process.render_seq_trace(fn ->
  {:ok, pid} = GenServer.start_link(Counter, 0)
  GenServer.cast(pid, :inc)
  GenServer.cast(pid, :inc)
  value = GenServer.call(pid, :value)
  IO.inspect(value, label: :value)
end)

:ok

Calling GenServer functions directly isn’t particularly user-friendly, so let’s wrap the calls in an API.

It’s important to note that the client functions run in the calling process, and the callbacks in the server.

defmodule Counter2 do
  use GenServer

  # Client

  def start_link(value) do
    GenServer.start_link(__MODULE__, value)
  end

  def inc(pid) do
    GenServer.cast(pid, :inc)
  end

  def value(pid) do
    GenServer.call(pid, :value)
  end

  # Server

  @impl GenServer
  def init(value) do
    {:ok, value}
  end

  @impl GenServer
  def handle_cast(:inc, value) do
    {:noreply, value + 1}
  end

  @impl GenServer
  def handle_call(:value, _from, value) do
    {:reply, value, value}
  end
end

{:ok, pid} = Counter2.start_link(0)
Counter2.inc(pid)
Counter2.inc(pid)
value = Counter2.value(pid)
IO.inspect(value, label: :value)
:ok

Agent

An Agent is a specialised version of a GenServer, for when you just want to retain some persistent state.

An Agent is started with a function which is called on the server process to return the initial state (in the example below this is just a number, but could be any data structure), and provides functions to get and update the state. These functions also take a function as an argument, and the provided function is called with the current state. In the case of update, the function needs to return the new state, and for get, it is applied to the state to produce the value to return (in the example below it simply returns the entire state).

{:ok, pid} = Agent.start_link(fn -> 32 end)

Agent.update(pid, fn n -> n + 10 end)
Agent.get(pid, fn n -> n end)

Again, we can wrap this in a module rather than interacting with the Agent directly:

defmodule Total do
  def start_link(initial_value) do
    Agent.start_link(fn -> initial_value end)
  end

  def add(pid, increment) do
    Agent.update(pid, fn value -> value + increment end)
  end

  def value(pid) do
    Agent.get(pid, & &1)
  end
end

{:ok, total_pid} = Total.start_link(32)

Total.add(total_pid, 10)
Total.value(total_pid)

Task

A Task is another specialised GenServer, providing a simple way of running code concurrently in the background and (optionally) waiting for it to return a result.

IO.puts("Before")

task_1 =
  Task.async(fn ->
    Process.sleep(2000)
    IO.puts("Finished task 1")
    "Hello from task 1"
  end)

task_2 =
  Task.async(fn ->
    Process.sleep(1000)
    IO.puts("Finished task 2")
    "Hello from task 2"
  end)

IO.puts("After")

Task.await(task_1) |> IO.inspect(label: "Task 1 result")
Task.await(task_2) |> IO.inspect(label: "Task 2 result")
:ok
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment