Last active
December 9, 2024 15:41
-
-
Save mazz/10529e98ebda6ec2dd5f0776741f1d0c to your computer and use it in GitHub Desktop.
broadway broken
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule App.Simplified.Broadway do | |
use Broadway | |
require Logger | |
alias Broadway.Message | |
def start_link(_opts) do | |
Broadway.start_link(App.Simplified.Broadway, | |
name: BroadwayBlueskeySimplified, | |
producer: [ | |
module: {App.Simplified.Producer, []}, | |
concurrency: 1 | |
], | |
processors: [ | |
default: [concurrency: 2] | |
], | |
batchers: [ | |
default: [concurrency: 1, batch_size: 15] | |
], | |
partition_by: &partition/1 | |
) | |
end | |
def partition(_msg), do: Enum.random([0, 1]) | |
@impl true | |
def handle_message(:default, %Message{data: {:text, event}} = message, _context) do | |
dbg(event) | |
end | |
@impl true | |
def handle_batch(:default, messages, _batch_info, _context) do | |
# serving = if batch_info.partition == 0, do: BertServing1, else: BertServing2 | |
posts = Enum.map(messages, & &1.data) | |
messages | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Bags.Simplified.Producer do | |
@behaviour Broadway.Producer | |
require Logger | |
alias Broadway.Message | |
def start_link(_argse) do | |
GenStage.start_link(__MODULE__, []) | |
end | |
def insert(message) do | |
dbg(message) | |
# Bags.Simplified.Broadway | |
# |> Broadway.producer_names() | |
# |> Enum.random() | |
# |> GenStage.call({:insert, message}) | |
# |> Bags.Simplified.Producer | |
GenStage.call({:insert, message}) | |
end | |
def init(_opts) do | |
initial_demand = 0 | |
# {:producer, {[], 0}} | |
{:producer, initial_demand} | |
end | |
@impl true | |
def handle_demand(incoming_demand, existing_demand) do | |
{:noreply, [], existing_demand + incoming_demand} | |
end | |
@impl true | |
def handle_call({:insert, _message}, _from, demand) when demand <= 0 do | |
dbg(":insert") | |
dbg("demand <= 0") | |
{:reply, {:error, :pipeline_full}, [], demand} | |
end | |
def handle_call({:insert, message}, _from, demand) do | |
dbg(":insert") | |
dbg(demand) | |
{:reply, :ok, [broadway_transform(message)], demand - 1} | |
end | |
defp broadway_transform(message) do | |
dbg(message) | |
%Broadway.Message{ | |
acknowledger: Broadway.NoopAcknowledger.init(), | |
data: message | |
} | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Bags.Websocket do | |
use WebSockex | |
def start_link(state) do | |
WebSockex.start_link("wss://wbs.mexc.com/ws", __MODULE__, state) | |
end | |
def handle_connect(_conn, state) do | |
WebSockex.cast(self(), :subscribe) | |
{:ok, state} | |
end | |
def handle_cast(:subscribe, state) do | |
# if we use Jason.encode!() mexc complains about "invalid msg format" | |
{:reply, {:text, Poison.encode!(state)}, state} | |
end | |
def handle_frame({:text, msg}, state) do | |
dbg(msg) | |
decoded = Jason.decode!(msg) | |
atomized = Bags.Extensions.MapExt.atomize_keys(decoded) | |
dbg(atomized) | |
Bags.Simplified.Producer.insert(atomized) | |
{:ok, state} | |
end | |
def handle_disconnect({:local, reason}, state) do | |
dbg("handle_disconnect") | |
# stuff... | |
{:reconnect, state} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment