Skip to content

Instantly share code, notes, and snippets.

@lafka
Last active October 7, 2015 12:44
Show Gist options
  • Save lafka/9c21a60b6fb73c64e829 to your computer and use it in GitHub Desktop.
Save lafka/9c21a60b6fb73c64e829 to your computer and use it in GitHub Desktop.
Simple pub/sub with elixir and erlang/otp pg2 module
defmodule DBL.Message.Channel do
def subscribe([]), do: :ok
def subscribe([chan | rest]) do
case :pg2.join "stream/" <> chan, self do
{:error, {:no_such_group, _}} ->
:ok = :pg2.create "stream/" <> chan
:ok = :pg2.join "stream/" <> chan, self
:ok ->
:ok
end
subscribe rest
end
def publish(chans, ev), do: publish(chans, ev, [])
defp publish([], ev, acc) do
Enum.each acc, fn({pid, chan}) ->
Process.send(pid, {:msg, chan, ev})
end
end
defp publish([chan | rest], ev, acc) do
case :pg2.get_members "stream/" <> chan do
{:error, {:no_such_group, _}} ->
publish rest, ev, acc
pids ->
publish rest, ev, Enum.reduce(pids, acc, fn(pid, acc1) ->
Dict.put acc1, pid, chan
end)
end
end
end
defmodule MsgChanTest do
use ExUnit.Case, async: false
test "pub/sub - de-duplicate pids" do
parent = self
refs = [{1, make_ref}, {2, make_ref}, {3, make_ref}]
chans = Enum.map refs, fn({n, ref}) ->
subscriptions = ["app/#{n}/1", "app/#{n}/2"]
spawn fn() ->
:ok = DBL.Message.Channel.subscribe subscriptions
Process.send parent, {ref, :init}
:ok = recvloop ref, parent, subscriptions
end
receive do {^ref, :init} -> :ok end
{n, ref, subscriptions}
end
Enum.each chans, fn({n, ref, subscriptions}) ->
ev = "ev-#{n}"
DBL.Message.Channel.publish subscriptions, ev
# Check that data sent to both is actually only received once per proc
assert :ok == (receive do {^ref, ^ev} -> :ok
after 500 -> :timeout end)
assert :timeout == (receive do {^ref, ^ev} -> :ok
after 500 -> :timeout end)
end
end
test "pub/sub - broadcast" do
parent = self
refs = [{1, make_ref}, {2, make_ref}, {3, make_ref}]
chans = Enum.map refs, fn({n, ref}) ->
subscriptions = ["app/1", "app/1/#{n}"]
spawn fn() ->
:ok = DBL.Message.Channel.subscribe subscriptions
Process.send parent, {ref, :init}
:ok = recvloop ref, parent, subscriptions
end
receive do {^ref, :init} -> :ok end
{n, ref}
end
# check that each can receive a single message to non-broadcast channel
Enum.each chans, fn({n, ref}) ->
ev = "ev-#{n}"
DBL.Message.Channel.publish ["app/1/#{n}"], ev
assert :ok == (receive do {^ref, ^ev} -> :ok
after 500 -> :timeout end)
end
# check that broadcasts are received by all members
ev = "ev-broadcast"
DBL.Message.Channel.publish ["app/1"], ev
Enum.each chans, fn({_n, ref}) ->
assert :ok == (receive do {^ref, ^ev} -> :ok
after 500 -> :timeout end)
end
end
defp recvloop(ref, parent, subscriptions) do
receive do
{:msg, chan, ev} ->
if chan in subscriptions do
Process.send parent, {ref, ev}
end
recvloop ref, parent, subscriptions
after
5000 -> :timeout
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment