Skip to content

Instantly share code, notes, and snippets.

@jrstrunk
Last active December 17, 2024 17:36
Show Gist options
  • Save jrstrunk/c09fe4acf848a2a68eef8ad761eddac6 to your computer and use it in GitHub Desktop.
Save jrstrunk/c09fe4acf848a2a68eef8ad761eddac6 to your computer and use it in GitHub Desktop.
A small module for running a pool of workers over a long list of tasks to be run concurrently
import gleam/bool
import gleam/erlang/process
import gleam/function
import gleam/int
import gleam/list
import gleam/otp/actor
pub fn work_pool_forever(
inputs: List(a),
num_workers: Int,
process_func: fn(a) -> b,
) -> List(b) {
use <- bool.guard(when: num_workers <= 0, return: [])
let work_items = list.index_map(inputs, fn(input, i) { WorkItem(i, input) })
let process_work = fn(work_item: WorkItem(a)) {
WorkItem(work_item.index, process_func(work_item.value))
}
let results_reply = process.new_subject()
let assert Ok(results_collector) =
actor.start(
CollectorState(list.length(work_items), [], results_reply),
handle_collector,
)
let assert Ok(sync_queue) = actor.start(work_items, handle_sync_queue)
list.range(1, num_workers)
|> list.map(fn(_worker_id) {
let assert Ok(worker) =
actor.start(
WorkerState(sync_queue, process_work, results_collector),
handle_worker,
)
worker
})
|> list.map(process.send(_, DoWork))
let work_results =
process.new_selector()
|> process.selecting(results_reply, function.identity)
|> process.select_forever
|> list.sort(fn(a, b) { int.compare(a.index, b.index) })
|> list.map(fn(a) { a.value })
process.send(sync_queue, ShutdownSyncQueue)
work_results
}
type WorkItem(a) {
WorkItem(index: Int, value: a)
}
type CollectorState(b) {
CollectorState(
items_to_process: Int,
processed_results: List(WorkItem(b)),
reply: process.Subject(List(WorkItem(b))),
)
}
type CollectorMsg(b) {
WorkComplete(WorkItem(b))
}
fn handle_collector(msg, state: CollectorState(b)) {
case msg {
WorkComplete(result) -> {
let state =
CollectorState(
..state,
processed_results: [result, ..state.processed_results],
)
case state.processed_results |> list.length == state.items_to_process {
True -> {
actor.send(state.reply, state.processed_results)
actor.Stop(process.Normal)
}
False -> actor.continue(state)
}
}
}
}
type SyncQueueMsg(a) {
RequestNextItem(reply: process.Subject(SyncQueueResponse(a)))
ShutdownSyncQueue
}
type SyncQueueResponse(a) {
SyncQueueItem(a)
SyncQueueEmpty
}
fn handle_sync_queue(msg, queue) {
case msg {
RequestNextItem(reply:) ->
case queue {
[item, ..rest] -> {
actor.send(reply, SyncQueueItem(item))
rest
}
[] -> {
actor.send(reply, SyncQueueEmpty)
[]
}
}
|> actor.continue
ShutdownSyncQueue -> actor.Stop(process.Normal)
}
}
type WorkerState(a, b) {
WorkerState(
work_queue: process.Subject(SyncQueueMsg(WorkItem(a))),
process_func: fn(WorkItem(a)) -> WorkItem(b),
collector: process.Subject(CollectorMsg(b)),
)
}
type WorkerMsg {
DoWork
}
fn handle_worker(msg, state: WorkerState(a, b)) {
case msg {
DoWork -> {
process_sync_queue(
state.work_queue,
with: state.process_func,
sending_to: state.collector,
)
actor.Stop(process.Normal)
}
}
}
fn process_sync_queue(queue, with func, sending_to collector) {
case process.call_forever(queue, fn(a) { RequestNextItem(a) }) {
SyncQueueItem(item) -> {
process.send(collector, WorkComplete(func(item)))
process_sync_queue(queue, with: func, sending_to: collector)
}
SyncQueueEmpty -> Nil
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment