-
-
Save orderthruchaos/939f6f0100d4732ff01bae5e6df8ec96 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule GCM.Pusher do | |
use GenStage | |
# The maximum number of requests Firebase allows at once per XMPP connection | |
@max_demand 100 | |
defstruct [ | |
:producer, | |
:producer_from, | |
:fcm_conn_pid, | |
:pending_requests, | |
] | |
def start_link(producer, fcm_conn_pid, opts \\ []) do | |
GenStage.start_link(__MODULE__, {producer, fcm_conn_pid}, opts) | |
end | |
def init({producer, fcm_conn_pid}) do | |
state = %__MODULE__{ | |
next_id: 1, | |
pending_requests: Map.new, | |
producer: producer, | |
fcm_conn_pid: fcm_conn_pid, | |
} | |
send(self, :init) | |
# Run as consumer | |
{:consumer, state} | |
end | |
def handle_info(:init, %{producer: producer}=state) do | |
# Subscribe to the Push Collector | |
GenStage.async_subscribe(self, to: producer, cancel: :temporary) | |
{:noreply, [], state} | |
end | |
def handle_subscribe(:producer, _opts, from, state) do | |
# Start demanding requests now that we are subscribed | |
GenStage.ask(from, @max_demand) | |
{:manual, %{state | producer_from: from}} | |
end | |
def handle_events(push_requests, _from, state) do | |
# We got some push requests from the Push Collector. | |
# Let’s send them. | |
state = Enum.reduce(push_requests, state, &do_send/2) | |
{:noreply, [], state} | |
end | |
# Send the message to FCM, track as a pending request | |
defp do_send(push_request, %{fcm_conn_pid: fcm_conn_pid, pending_requests: pending_requests}=state) do | |
{message_id, state} = generate_id(state) | |
xml = PushRequest.to_xml(push_request, message_id) | |
:ok = FCM.Connection.send(fcm_conn_pid, xml) | |
pending_requests = Map.put(pending_requests, message_id, push_request) | |
%{state | pending_requests: pending_requests} | |
end | |
# FCM response handling | |
defp handle_response(%{message_id: message_id}=response, %{pending_requests: pending_requests, producer_from: producer_from}=state) do | |
{push_request, pending_requests} = Map.pop(pending_requests, message_id) | |
# Since we finished a request, ask the Push Collector for more. | |
GenStage.ask(producer_from, 1) | |
%{state | pending_requests: pending_requests} | |
end | |
defp generate_id(%{next_id: next_id}=state) do | |
{to_string(next_id), %{state | next_id: next_id + 1}} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment