Last active
December 17, 2024 17:36
-
-
Save jrstrunk/c09fe4acf848a2a68eef8ad761eddac6 to your computer and use it in GitHub Desktop.
A small module for running a pool of workers over a long list of tasks to be run concurrently
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gleam/bool | |
import gleam/erlang/process | |
import gleam/function | |
import gleam/int | |
import gleam/list | |
import gleam/otp/actor | |
pub fn work_pool_forever( | |
inputs: List(a), | |
num_workers: Int, | |
process_func: fn(a) -> b, | |
) -> List(b) { | |
use <- bool.guard(when: num_workers <= 0, return: []) | |
let work_items = list.index_map(inputs, fn(input, i) { WorkItem(i, input) }) | |
let process_work = fn(work_item: WorkItem(a)) { | |
WorkItem(work_item.index, process_func(work_item.value)) | |
} | |
let results_reply = process.new_subject() | |
let assert Ok(results_collector) = | |
actor.start( | |
CollectorState(list.length(work_items), [], results_reply), | |
handle_collector, | |
) | |
let assert Ok(sync_queue) = actor.start(work_items, handle_sync_queue) | |
list.range(1, num_workers) | |
|> list.map(fn(_worker_id) { | |
let assert Ok(worker) = | |
actor.start( | |
WorkerState(sync_queue, process_work, results_collector), | |
handle_worker, | |
) | |
worker | |
}) | |
|> list.map(process.send(_, DoWork)) | |
let work_results = | |
process.new_selector() | |
|> process.selecting(results_reply, function.identity) | |
|> process.select_forever | |
|> list.sort(fn(a, b) { int.compare(a.index, b.index) }) | |
|> list.map(fn(a) { a.value }) | |
process.send(sync_queue, ShutdownSyncQueue) | |
work_results | |
} | |
type WorkItem(a) { | |
WorkItem(index: Int, value: a) | |
} | |
type CollectorState(b) { | |
CollectorState( | |
items_to_process: Int, | |
processed_results: List(WorkItem(b)), | |
reply: process.Subject(List(WorkItem(b))), | |
) | |
} | |
type CollectorMsg(b) { | |
WorkComplete(WorkItem(b)) | |
} | |
fn handle_collector(msg, state: CollectorState(b)) { | |
case msg { | |
WorkComplete(result) -> { | |
let state = | |
CollectorState( | |
..state, | |
processed_results: [result, ..state.processed_results], | |
) | |
case state.processed_results |> list.length == state.items_to_process { | |
True -> { | |
actor.send(state.reply, state.processed_results) | |
actor.Stop(process.Normal) | |
} | |
False -> actor.continue(state) | |
} | |
} | |
} | |
} | |
type SyncQueueMsg(a) { | |
RequestNextItem(reply: process.Subject(SyncQueueResponse(a))) | |
ShutdownSyncQueue | |
} | |
type SyncQueueResponse(a) { | |
SyncQueueItem(a) | |
SyncQueueEmpty | |
} | |
fn handle_sync_queue(msg, queue) { | |
case msg { | |
RequestNextItem(reply:) -> | |
case queue { | |
[item, ..rest] -> { | |
actor.send(reply, SyncQueueItem(item)) | |
rest | |
} | |
[] -> { | |
actor.send(reply, SyncQueueEmpty) | |
[] | |
} | |
} | |
|> actor.continue | |
ShutdownSyncQueue -> actor.Stop(process.Normal) | |
} | |
} | |
type WorkerState(a, b) { | |
WorkerState( | |
work_queue: process.Subject(SyncQueueMsg(WorkItem(a))), | |
process_func: fn(WorkItem(a)) -> WorkItem(b), | |
collector: process.Subject(CollectorMsg(b)), | |
) | |
} | |
type WorkerMsg { | |
DoWork | |
} | |
fn handle_worker(msg, state: WorkerState(a, b)) { | |
case msg { | |
DoWork -> { | |
process_sync_queue( | |
state.work_queue, | |
with: state.process_func, | |
sending_to: state.collector, | |
) | |
actor.Stop(process.Normal) | |
} | |
} | |
} | |
fn process_sync_queue(queue, with func, sending_to collector) { | |
case process.call_forever(queue, fn(a) { RequestNextItem(a) }) { | |
SyncQueueItem(item) -> { | |
process.send(collector, WorkComplete(func(item))) | |
process_sync_queue(queue, with: func, sending_to: collector) | |
} | |
SyncQueueEmpty -> Nil | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment