Skip to content

Instantly share code, notes, and snippets.

@mazz
Last active December 9, 2024 15:41
Show Gist options
  • Save mazz/10529e98ebda6ec2dd5f0776741f1d0c to your computer and use it in GitHub Desktop.
Save mazz/10529e98ebda6ec2dd5f0776741f1d0c to your computer and use it in GitHub Desktop.
broadway broken
defmodule App.Simplified.Broadway do
use Broadway
require Logger
alias Broadway.Message
def start_link(_opts) do
Broadway.start_link(App.Simplified.Broadway,
name: BroadwayBlueskeySimplified,
producer: [
module: {App.Simplified.Producer, []},
concurrency: 1
],
processors: [
default: [concurrency: 2]
],
batchers: [
default: [concurrency: 1, batch_size: 15]
],
partition_by: &partition/1
)
end
def partition(_msg), do: Enum.random([0, 1])
@impl true
def handle_message(:default, %Message{data: {:text, event}} = message, _context) do
dbg(event)
end
@impl true
def handle_batch(:default, messages, _batch_info, _context) do
# serving = if batch_info.partition == 0, do: BertServing1, else: BertServing2
posts = Enum.map(messages, & &1.data)
messages
end
end
defmodule Bags.Simplified.Producer do
@behaviour Broadway.Producer
require Logger
alias Broadway.Message
def start_link(_argse) do
GenStage.start_link(__MODULE__, [])
end
def insert(message) do
dbg(message)
# Bags.Simplified.Broadway
# |> Broadway.producer_names()
# |> Enum.random()
# |> GenStage.call({:insert, message})
# |> Bags.Simplified.Producer
GenStage.call({:insert, message})
end
def init(_opts) do
initial_demand = 0
# {:producer, {[], 0}}
{:producer, initial_demand}
end
@impl true
def handle_demand(incoming_demand, existing_demand) do
{:noreply, [], existing_demand + incoming_demand}
end
@impl true
def handle_call({:insert, _message}, _from, demand) when demand <= 0 do
dbg(":insert")
dbg("demand <= 0")
{:reply, {:error, :pipeline_full}, [], demand}
end
def handle_call({:insert, message}, _from, demand) do
dbg(":insert")
dbg(demand)
{:reply, :ok, [broadway_transform(message)], demand - 1}
end
defp broadway_transform(message) do
dbg(message)
%Broadway.Message{
acknowledger: Broadway.NoopAcknowledger.init(),
data: message
}
end
end
defmodule Bags.Websocket do
use WebSockex
def start_link(state) do
WebSockex.start_link("wss://wbs.mexc.com/ws", __MODULE__, state)
end
def handle_connect(_conn, state) do
WebSockex.cast(self(), :subscribe)
{:ok, state}
end
def handle_cast(:subscribe, state) do
# if we use Jason.encode!() mexc complains about "invalid msg format"
{:reply, {:text, Poison.encode!(state)}, state}
end
def handle_frame({:text, msg}, state) do
dbg(msg)
decoded = Jason.decode!(msg)
atomized = Bags.Extensions.MapExt.atomize_keys(decoded)
dbg(atomized)
Bags.Simplified.Producer.insert(atomized)
{:ok, state}
end
def handle_disconnect({:local, reason}, state) do
dbg("handle_disconnect")
# stuff...
{:reconnect, state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment