Last active
October 7, 2015 12:44
-
-
Save lafka/9c21a60b6fb73c64e829 to your computer and use it in GitHub Desktop.
Simple pub/sub with elixir and erlang/otp pg2 module
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule DBL.Message.Channel do | |
def subscribe([]), do: :ok | |
def subscribe([chan | rest]) do | |
case :pg2.join "stream/" <> chan, self do | |
{:error, {:no_such_group, _}} -> | |
:ok = :pg2.create "stream/" <> chan | |
:ok = :pg2.join "stream/" <> chan, self | |
:ok -> | |
:ok | |
end | |
subscribe rest | |
end | |
def publish(chans, ev), do: publish(chans, ev, []) | |
defp publish([], ev, acc) do | |
Enum.each acc, fn({pid, chan}) -> | |
Process.send(pid, {:msg, chan, ev}) | |
end | |
end | |
defp publish([chan | rest], ev, acc) do | |
case :pg2.get_members "stream/" <> chan do | |
{:error, {:no_such_group, _}} -> | |
publish rest, ev, acc | |
pids -> | |
publish rest, ev, Enum.reduce(pids, acc, fn(pid, acc1) -> | |
Dict.put acc1, pid, chan | |
end) | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MsgChanTest do | |
use ExUnit.Case, async: false | |
test "pub/sub - de-duplicate pids" do | |
parent = self | |
refs = [{1, make_ref}, {2, make_ref}, {3, make_ref}] | |
chans = Enum.map refs, fn({n, ref}) -> | |
subscriptions = ["app/#{n}/1", "app/#{n}/2"] | |
spawn fn() -> | |
:ok = DBL.Message.Channel.subscribe subscriptions | |
Process.send parent, {ref, :init} | |
:ok = recvloop ref, parent, subscriptions | |
end | |
receive do {^ref, :init} -> :ok end | |
{n, ref, subscriptions} | |
end | |
Enum.each chans, fn({n, ref, subscriptions}) -> | |
ev = "ev-#{n}" | |
DBL.Message.Channel.publish subscriptions, ev | |
# Check that data sent to both is actually only received once per proc | |
assert :ok == (receive do {^ref, ^ev} -> :ok | |
after 500 -> :timeout end) | |
assert :timeout == (receive do {^ref, ^ev} -> :ok | |
after 500 -> :timeout end) | |
end | |
end | |
test "pub/sub - broadcast" do | |
parent = self | |
refs = [{1, make_ref}, {2, make_ref}, {3, make_ref}] | |
chans = Enum.map refs, fn({n, ref}) -> | |
subscriptions = ["app/1", "app/1/#{n}"] | |
spawn fn() -> | |
:ok = DBL.Message.Channel.subscribe subscriptions | |
Process.send parent, {ref, :init} | |
:ok = recvloop ref, parent, subscriptions | |
end | |
receive do {^ref, :init} -> :ok end | |
{n, ref} | |
end | |
# check that each can receive a single message to non-broadcast channel | |
Enum.each chans, fn({n, ref}) -> | |
ev = "ev-#{n}" | |
DBL.Message.Channel.publish ["app/1/#{n}"], ev | |
assert :ok == (receive do {^ref, ^ev} -> :ok | |
after 500 -> :timeout end) | |
end | |
# check that broadcasts are received by all members | |
ev = "ev-broadcast" | |
DBL.Message.Channel.publish ["app/1"], ev | |
Enum.each chans, fn({_n, ref}) -> | |
assert :ok == (receive do {^ref, ^ev} -> :ok | |
after 500 -> :timeout end) | |
end | |
end | |
defp recvloop(ref, parent, subscriptions) do | |
receive do | |
{:msg, chan, ev} -> | |
if chan in subscriptions do | |
Process.send parent, {ref, ev} | |
end | |
recvloop ref, parent, subscriptions | |
after | |
5000 -> :timeout | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment