Skip to content

Instantly share code, notes, and snippets.

@mazz
Created November 17, 2024 00:59
Show Gist options
  • Save mazz/1430a3b695756b52b09dd58ed222e448 to your computer and use it in GitHub Desktop.
Save mazz/1430a3b695756b52b09dd58ed222e448 to your computer and use it in GitHub Desktop.
broadway
defmodule Markably.UserNotifications.Broadway do
@moduledoc false
use Broadway
alias Broadway.Message
require Logger
def start_link(_opts) do
Broadway.start_link(Markably.UserNotifications.Broadway,
name: BroadwayUserNotificationsProcessor,
producer: [
module: {Markably.UpdateMbyUserNotificationsProducer, []},
concurrency: 1
],
processors: [
default: [concurrency: 2]
]
)
end
@impl true
def handle_message(:default, %Message{data: {:text, event}} = message, _context) do
dbg(event)
dbg(message)
# case Jason.decode(event) do
# {:ok, %{"commit" => %{"record" => %{"langs" => ["en"], "text" => text}}}} ->
# Message.put_data(message, text)
# {:ok, _message} ->
# Message.failed(message, "Non-english post")
# {:error, reason} ->
# Logger.error(inspect(reason))
# Message.failed(message, "Decoding error")
# end
end
end
defmodule MarkablyWebApi.UserNotificationController do
use MarkablyWeb, :controller
alias Markably.Repo
alias Markably.UpdateMbyUserNotificationsProducer
alias Markably.UserNotifications
alias MarkablyWeb.UserAuth
require Logger
@doc """
"data" -> JSON of existing user_notification
"""
# def update_user_notifications(conn, %{"data" => user_notification_params}) do
def update_user_notifications(conn, params) do
dbg(params)
user_notifications_attr_list = params["user_notifications"]
dbg(user_notifications_attr_list)
UpdateMbyUserNotificationsProducer.enqueue_update_user_notifications(user_notifications_attr_list)
conn
|> put_status(200)
|> render("ack.json", %{ack: "OK"})
end
end
defmodule Markably.UpdateMbyUserNotificationsProducer do
@moduledoc false
@behaviour Broadway.Producer
use GenStage
alias Broadway.Message
alias Markably.Pipeline.Timing
def start_link(opts) do
{[name: name], opts} = Keyword.split(opts, [:name])
GenStage.start_link(__MODULE__, opts, name: name)
end
def init(_opts) do
# {:producer, :unused, buffer_size: 10_000}
{:producer, {[], 0}}
end
def enqueue_update_user_notifications(mby_notifications) do
dbg(mby_notifications)
GenStage.cast(
__MODULE__,
{:enqueue_update_notifications, mby_notifications, Timing.unix_ms_now()}
)
end
def handle_cast({:enqueue_update_notifications, mby_notifications, unix_ms}, state) do
{:noreply, [%{enqueue_update_notifications: mby_notifications, enqueued_at: unix_ms}], state}
end
@doc "Sends an event and returns only after the event is dispatched."
def put_event(event, timeout \\ 5000) do
dbg(event)
GenStage.call(__MODULE__, {:put, event}, timeout)
end
# def init(_opts) do
# {:producer, [], buffer_size: 10_000, buffer_keep: :first}
# end
def handle_call({:put, event}, _from, state) do
dbg(event)
dbg(state)
# Dispatch immediately
{:reply, :ok, [event], state}
end
def handle_demand(demand, state) do
dbg(demand)
dbg(state)
{:noreply, [], state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment