Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Created November 19, 2024 08:00
Show Gist options
  • Select an option

  • Save monotykamary/84efc95e303dc19e55164dcee23e34a2 to your computer and use it in GitHub Desktop.

Select an option

Save monotykamary/84efc95e303dc19e55164dcee23e34a2 to your computer and use it in GitHub Desktop.
FLAME Elixir as a Decorator
defmodule DiscordCollector.FLAMEDecorator do
@moduledoc """
Provides FLAME function decorators that wrap functions to run in FLAME pools.
## Usage
defmodule MyModule do
use DiscordCollector.FLAMEDecorator
@decorate flame(MyApp.Pool)
def my_function(arg1, arg2) do
# This will run in the MyApp.Pool FLAME pool
end
end
"""
use Decorator.Define, [flame: 1]
@doc """
Wraps a function to run in a FLAME pool.
## Options
- The first argument must be the FLAME pool module name
## Examples
@decorate flame(MyApp.MessageProcessor)
def process_message(msg) do
# Runs in MyApp.MessageProcessor pool
end
"""
def flame(pool, body, context) do
quote do
# Validate pool is an atom/alias at compile time
pool = unquote(pool)
unless is_atom(pool), do: raise "FLAME pool must be a module name, got: #{inspect(pool)}"
# Get function name for better error messages
function_name = "#{unquote(context.module)}.#{unquote(context.name)}/#{unquote(context.arity)}"
# Wrap the function in a FLAME.call
FLAME.call(pool, fn ->
try do
unquote(body)
rescue
e ->
# Add function context to error
reraise %{e | message: "#{function_name}: #{Exception.message(e)}"}, __STACKTRACE__
end
end)
end
end
end
# Example usage with configuration
defmodule DiscordCollector.MessageProcessor do
use DiscordCollector.FLAMEDecorator
# Optional: Define pool settings as module attributes
@pool_settings [
min_size: 1,
max_size: 5,
max_concurrency: 2
]
@decorate flame(DiscordCollector.MessageCollector)
def process_message(message) do
# This runs in the MessageCollector pool
{:ok, processed} = do_process(message)
processed
end
@decorate flame(DiscordCollector.FileProcessor)
def save_message(message) do
# This runs in the FileProcessor pool
{:ok, saved} = do_save(message)
saved
end
end
# Example of pool initialization
defmodule DiscordCollector.Pools do
@moduledoc """
Defines and starts FLAME pools for the application.
"""
def child_specs do
[
# Message processing pool
{FLAME.Pool,
name: DiscordCollector.MessageCollector,
min: 0,
max: 10,
max_concurrency: 5,
idle_shutdown_after: 30_000},
# File processing pool
{FLAME.Pool,
name: DiscordCollector.FileProcessor,
min: 0,
max: 5,
max_concurrency: 2,
idle_shutdown_after: 30_000}
]
end
end
defmodule DiscordCollector.Storage do
use DiscordCollector.FLAMEDecorator
alias DiscordCollector.Config
@decorate flame(DiscordCollector.FileProcessor)
def save_messages_chunk(messages, channel_info, year, month, chunk_num) do
bucket = "dwarvesf-discord"
filename = "discord_messages/#{channel_info.id}_#{year}_#{month}_chunk#{chunk_num}.parquet"
# Create Req S3 client
s3_client = Req.new(
base_url: "https://#{bucket}.s3.amazonaws.com",
auth: {:aws_v4, "s3", System.fetch_env!("AWS_REGION")}
)
|> ReqS3.attach()
messages
|> Enum.chunk_every(Config.chunk_size())
|> Flow.from_enumerable()
|> Flow.map(fn chunk ->
# Convert to parquet using Explorer
df = Explorer.DataFrame.new(chunk)
{:ok, buffer} = Explorer.DataFrame.to_parquet(df)
case ReqS3.put(s3_client, filename, body: buffer) do
{:ok, _response} ->
{:ok, filename}
{:error, reason} ->
IO.puts("Error saving to #{filename}: #{inspect(reason)}")
{:error, reason}
end
end)
|> Flow.partition()
|> Flow.reduce(
fn -> [] end,
fn
{:ok, filename}, acc -> [filename | acc]
{:error, _}, acc -> acc
end
)
|> Enum.to_list()
|> case do
[filename | _] -> {:ok, "s3://#{bucket}/#{filename}"}
[] -> {:error, "Failed to save any chunks"}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment