Skip to content

Instantly share code, notes, and snippets.

@sorentwo
Created August 29, 2021 16:52
Show Gist options
  • Save sorentwo/7091331bd6270946f7eae0fa036af638 to your computer and use it in GitHub Desktop.
Save sorentwo/7091331bd6270946f7eae0fa036af638 to your computer and use it in GitHub Desktop.
Tenant based partitioned rate limiting in Oban Pro
defmodule MyApp.TenantWorker do
@moduledoc false
use Oban.Worker
@impl Worker
def perform(%Job{conf: conf, args: %{"tenant" => tenant, "id" => id}}) do
time =
Time.utc_now()
|> Time.truncate(:second)
|> Time.to_string()
IO.inspect({time, node: conf.node, tenant: tenant, id: id})
:ok
end
end
Oban.Pro.Repo.start_link()
Oban.Pro.Repo.delete_all(Oban.Job)
Oban.Pro.Repo.delete_all(Oban.Pro.Producer)
opts = [
repo: Oban.Pro.Repo,
engine: Oban.Pro.Queue.SmartEngine,
queues: [
default: [
local_limit: 7,
rate_limit: [
allowed: 2,
period: {5, :seconds},
partition: [fields: [:args], keys: [:tenant]]
]
]
]
]
IO.puts """
Partitioned rate limiting 297 jobs (99 ids / 3 tenants) across 2 nodes using:
#{inspect(opts, pretty: true)}
"""
Process.sleep(5_000)
Oban.start_link(Keyword.merge(opts, name: Oban.A, node: "Demo.A"))
Oban.start_link(Keyword.merge(opts, name: Oban.B, node: "Demo.B"))
changesets =
(for id <- 1..99, tenant <- 1..3, do: {id, tenant})
|> Enum.shuffle()
|> Enum.map(&MyApp.TenantWorker.new(%{id: elem(&1, 0), tenant: elem(&1, 1)}))
Oban.insert_all(Oban.A, changesets)
receive do
:finish -> :ok
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment