Created
November 19, 2024 08:00
-
-
Save monotykamary/84efc95e303dc19e55164dcee23e34a2 to your computer and use it in GitHub Desktop.
FLAME Elixir as a Decorator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule DiscordCollector.FLAMEDecorator do | |
| @moduledoc """ | |
| Provides FLAME function decorators that wrap functions to run in FLAME pools. | |
| ## Usage | |
| defmodule MyModule do | |
| use DiscordCollector.FLAMEDecorator | |
| @decorate flame(MyApp.Pool) | |
| def my_function(arg1, arg2) do | |
| # This will run in the MyApp.Pool FLAME pool | |
| end | |
| end | |
| """ | |
| use Decorator.Define, [flame: 1] | |
| @doc """ | |
| Wraps a function to run in a FLAME pool. | |
| ## Options | |
| - The first argument must be the FLAME pool module name | |
| ## Examples | |
| @decorate flame(MyApp.MessageProcessor) | |
| def process_message(msg) do | |
| # Runs in MyApp.MessageProcessor pool | |
| end | |
| """ | |
| def flame(pool, body, context) do | |
| quote do | |
| # Validate pool is an atom/alias at compile time | |
| pool = unquote(pool) | |
| unless is_atom(pool), do: raise "FLAME pool must be a module name, got: #{inspect(pool)}" | |
| # Get function name for better error messages | |
| function_name = "#{unquote(context.module)}.#{unquote(context.name)}/#{unquote(context.arity)}" | |
| # Wrap the function in a FLAME.call | |
| FLAME.call(pool, fn -> | |
| try do | |
| unquote(body) | |
| rescue | |
| e -> | |
| # Add function context to error | |
| reraise %{e | message: "#{function_name}: #{Exception.message(e)}"}, __STACKTRACE__ | |
| end | |
| end) | |
| end | |
| end | |
| end | |
| # Example usage with configuration | |
| defmodule DiscordCollector.MessageProcessor do | |
| use DiscordCollector.FLAMEDecorator | |
| # Optional: Define pool settings as module attributes | |
| @pool_settings [ | |
| min_size: 1, | |
| max_size: 5, | |
| max_concurrency: 2 | |
| ] | |
| @decorate flame(DiscordCollector.MessageCollector) | |
| def process_message(message) do | |
| # This runs in the MessageCollector pool | |
| {:ok, processed} = do_process(message) | |
| processed | |
| end | |
| @decorate flame(DiscordCollector.FileProcessor) | |
| def save_message(message) do | |
| # This runs in the FileProcessor pool | |
| {:ok, saved} = do_save(message) | |
| saved | |
| end | |
| end | |
| # Example of pool initialization | |
| defmodule DiscordCollector.Pools do | |
| @moduledoc """ | |
| Defines and starts FLAME pools for the application. | |
| """ | |
| def child_specs do | |
| [ | |
| # Message processing pool | |
| {FLAME.Pool, | |
| name: DiscordCollector.MessageCollector, | |
| min: 0, | |
| max: 10, | |
| max_concurrency: 5, | |
| idle_shutdown_after: 30_000}, | |
| # File processing pool | |
| {FLAME.Pool, | |
| name: DiscordCollector.FileProcessor, | |
| min: 0, | |
| max: 5, | |
| max_concurrency: 2, | |
| idle_shutdown_after: 30_000} | |
| ] | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule DiscordCollector.Storage do | |
| use DiscordCollector.FLAMEDecorator | |
| alias DiscordCollector.Config | |
| @decorate flame(DiscordCollector.FileProcessor) | |
| def save_messages_chunk(messages, channel_info, year, month, chunk_num) do | |
| bucket = "dwarvesf-discord" | |
| filename = "discord_messages/#{channel_info.id}_#{year}_#{month}_chunk#{chunk_num}.parquet" | |
| # Create Req S3 client | |
| s3_client = Req.new( | |
| base_url: "https://#{bucket}.s3.amazonaws.com", | |
| auth: {:aws_v4, "s3", System.fetch_env!("AWS_REGION")} | |
| ) | |
| |> ReqS3.attach() | |
| messages | |
| |> Enum.chunk_every(Config.chunk_size()) | |
| |> Flow.from_enumerable() | |
| |> Flow.map(fn chunk -> | |
| # Convert to parquet using Explorer | |
| df = Explorer.DataFrame.new(chunk) | |
| {:ok, buffer} = Explorer.DataFrame.to_parquet(df) | |
| case ReqS3.put(s3_client, filename, body: buffer) do | |
| {:ok, _response} -> | |
| {:ok, filename} | |
| {:error, reason} -> | |
| IO.puts("Error saving to #{filename}: #{inspect(reason)}") | |
| {:error, reason} | |
| end | |
| end) | |
| |> Flow.partition() | |
| |> Flow.reduce( | |
| fn -> [] end, | |
| fn | |
| {:ok, filename}, acc -> [filename | acc] | |
| {:error, _}, acc -> acc | |
| end | |
| ) | |
| |> Enum.to_list() | |
| |> case do | |
| [filename | _] -> {:ok, "s3://#{bucket}/#{filename}"} | |
| [] -> {:error, "Failed to save any chunks"} | |
| end | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment