Mix.install([
{:kino, "~> 0.12.3"}
])
A receive
block will wait until a message matching one of its clauses is received. Here we wait for any message, and print it out. However, this now means the current process is blocked, so the message we attempt to send ourself isn’t received. The receive
block is set to time out after 10ms, otherwise it would hang forever – after it times out the send
will be executed, but no-one’s listening.
Note: All the :ok
lines at the end of these examples are just to avoid LiveBook showing the result of the last line in the block as a return value, which can be confusing (in this case, it would have been "Hello"
, because send
returns the message it sent).
receive do
msg -> IO.inspect(msg, label: :received)
after
10 -> IO.puts("Bored now")
end
send(self(), "Hello")
:ok
We never handled the message, so it’s still in our process mailbox. Let’s receive it now:
receive do
msg -> IO.inspect(msg, label: :received)
end
:ok
We can only look for messages we sent to ourself after we sent them, which may not be very useful. But we can use spawn
to create a new process, and send messages to that. The two processes run concurrently, so don’t block each other.
pid =
spawn(fn ->
receive do
msg -> IO.inspect(msg, label: :received)
end
end)
send(pid, "Hello")
send(pid, "Hello again")
:ok
In the example above, the spawned process received one message then immediately terminated. If we want to receive more than one message, we need to recursively call our receive
block. Let’s put our function into a module to make this cleaner, and also look for a specific message to tell our receiver to terminate, rather than timing out.
We’re also using Kino to render a sequence diagram of the messages which are being passed.
defmodule Echo do
def run(caller_pid) do
receive do
:stop ->
IO.puts("Stopping")
msg ->
send(caller_pid, "I got #{inspect(msg)}")
run(caller_pid)
end
end
end
Kino.Process.render_seq_trace(fn ->
self = self()
echo_pid = spawn(fn -> Echo.run(self) end)
send(echo_pid, "Hello")
receive do
msg -> IO.inspect(msg, label: :reply_1)
end
send(echo_pid, {:foo, %{value: 42, bar: "baz"}})
receive do
msg -> IO.inspect(msg, label: :reply_2)
end
send(echo_pid, :stop)
end)
GenServer
is an abstraction round processes and message passing, and is part of OTP (Open Telecom Platform). It implements battle-tested patterns for process management, supervision etc, and provides a number of callbacks for handling messages – we’ll just look at handle_call
(for messages requiring a reply) and handle_cast
(for “fire and forget” messages).
defmodule Counter do
use GenServer
@impl GenServer
def init(value) do
{:ok, value}
end
@impl GenServer
def handle_cast(:inc, value) do
{:noreply, value + 1}
end
@impl GenServer
def handle_call(:value, _from, value) do
{:reply, value, value}
end
end
Kino.Process.render_seq_trace(fn ->
{:ok, pid} = GenServer.start_link(Counter, 0)
GenServer.cast(pid, :inc)
GenServer.cast(pid, :inc)
value = GenServer.call(pid, :value)
IO.inspect(value, label: :value)
end)
:ok
Calling GenServer
functions directly isn’t particularly user-friendly, so let’s wrap the calls in an API.
It’s important to note that the client functions run in the calling process, and the callbacks in the server.
defmodule Counter2 do
use GenServer
# Client
def start_link(value) do
GenServer.start_link(__MODULE__, value)
end
def inc(pid) do
GenServer.cast(pid, :inc)
end
def value(pid) do
GenServer.call(pid, :value)
end
# Server
@impl GenServer
def init(value) do
{:ok, value}
end
@impl GenServer
def handle_cast(:inc, value) do
{:noreply, value + 1}
end
@impl GenServer
def handle_call(:value, _from, value) do
{:reply, value, value}
end
end
{:ok, pid} = Counter2.start_link(0)
Counter2.inc(pid)
Counter2.inc(pid)
value = Counter2.value(pid)
IO.inspect(value, label: :value)
:ok
An Agent
is a specialised version of a GenServer
, for when you just want to retain some persistent state.
An Agent
is started with a function which is called on the server process to return the initial state (in the example below this is just a number, but could be any data structure), and provides functions to get and update the state. These functions also take a function as an argument, and the provided function is called with the current state. In the case of update
, the function needs to return the new state, and for get
, it is applied to the state to produce the value to return (in the example below it simply returns the entire state).
{:ok, pid} = Agent.start_link(fn -> 32 end)
Agent.update(pid, fn n -> n + 10 end)
Agent.get(pid, fn n -> n end)
Again, we can wrap this in a module rather than interacting with the Agent
directly:
defmodule Total do
def start_link(initial_value) do
Agent.start_link(fn -> initial_value end)
end
def add(pid, increment) do
Agent.update(pid, fn value -> value + increment end)
end
def value(pid) do
Agent.get(pid, & &1)
end
end
{:ok, total_pid} = Total.start_link(32)
Total.add(total_pid, 10)
Total.value(total_pid)
A Task
is another specialised GenServer
, providing a simple way of running code concurrently in the background and (optionally) waiting for it to return a result.
IO.puts("Before")
task_1 =
Task.async(fn ->
Process.sleep(2000)
IO.puts("Finished task 1")
"Hello from task 1"
end)
task_2 =
Task.async(fn ->
Process.sleep(1000)
IO.puts("Finished task 2")
"Hello from task 2"
end)
IO.puts("After")
Task.await(task_1) |> IO.inspect(label: "Task 1 result")
Task.await(task_2) |> IO.inspect(label: "Task 2 result")
:ok