Skip to content

Instantly share code, notes, and snippets.

@thecoolwinter
Last active July 20, 2025 04:00
Show Gist options
  • Save thecoolwinter/e5248dcf11c01196a4ba5ec05aaa0dea to your computer and use it in GitHub Desktop.
Save thecoolwinter/e5248dcf11c01196a4ba5ec05aaa0dea to your computer and use it in GitHub Desktop.
Swift extension on Swift Concurrency's task group to process an array of data in parallel, with logic for setting a maximum number of parallel items to process at once.
//
// TaskGroup+addTasks.swift
// Anchor
//
// Copyright (C) 2025 Khan Winter
//
extension TaskGroup {
/// Add tasks to a task group using an array of data, and set a maximum number of parallel tasks to enqueue.
///
/// Usage:
/// ```swift
/// func processItem(_ item: Int) async throws { /* long work... */ }
///
/// let itemsToProcess = [1, 2, 3, 4, 5, 6]
/// let (stream, continuation) = AsyncStream<Int>.makeStream()
/// try await withThrowingTaskGroup(of: Int.self) { group in
/// group.addTasks(
/// items: itemsToProcess,
/// maximumParallelTasks: 2
/// ) { newElement in
/// continuation.yield(newElement)
/// } body: { item in
/// try await processItem(item)
/// }
/// }
/// ```
///
/// - Parameters:
/// - items: The items to process.
/// - maximumParallelTasks: The number of tasks to add to the task group at a time.
/// - isolation: The task group isolation.
/// - onReceive: Called when an element is finished processing and returned by the group.
/// - body: Called on each item in turn, in parallel.
mutating func addTasks<T: Sendable>(
items: [T],
maximumParallelTasks: Int,
isolation: isolated (any Actor)? = #isolation,
onReceive: (@isolated(any) (ChildTaskResult) -> Void)?,
body: @Sendable @escaping @isolated(any) (T) async -> ChildTaskResult
) async throws {
let maxConcurrentTasks = Swift.min(items.count, maximumParallelTasks)
var sentTasks = 0
func addNewTask() {
let taskData = items[sentTasks]
addTask {
await body(taskData)
}
sentTasks += 1
}
for _ in 0..<maxConcurrentTasks {
addNewTask()
}
for try await result in self {
await onReceive?(result)
try Task.checkCancellation()
if sentTasks < items.count {
addNewTask()
}
}
}
}
extension ThrowingTaskGroup {
/// Add tasks to a task group using an array of data, and set a maximum number of parallel tasks to enqueue.
///
/// Usage:
/// ```swift
/// func processItem(_ item: Int) async throws { /* long work... */ }
///
/// let itemsToProcess = [1, 2, 3, 4, 5, 6]
/// let (stream, continuation) = AsyncStream<Int>.makeStream()
/// try await withThrowingTaskGroup(of: Int.self) { group in
/// group.addTasks(
/// items: itemsToProcess,
/// maximumParallelTasks: 2
/// ) { newElement in
/// continuation.yield(newElement)
/// } body: { item in
/// try await processItem(item)
/// }
/// }
/// ```
///
/// - Parameters:
/// - items: The items to process.
/// - maximumParallelTasks: The number of tasks to add to the task group at a time.
/// - isolation: The task group isolation.
/// - onReceive: Called when an element is finished processing and returned by the group.
/// - body: Called on each item in turn, in parallel.
mutating func addTasks<T: Sendable>(
items: [T],
maximumParallelTasks: Int,
isolation: isolated (any Actor)? = #isolation,
onReceive: (@isolated(any) (ChildTaskResult) -> Void)?,
body: @Sendable @escaping @isolated(any) (T) async throws -> ChildTaskResult
) async throws {
let maxConcurrentTasks = Swift.min(items.count, maximumParallelTasks)
var sentTasks = 0
func addNewTask() {
let taskData = items[sentTasks]
addTask {
try await body(taskData)
}
sentTasks += 1
}
for _ in 0..<maxConcurrentTasks {
addNewTask()
}
for try await result in self {
await onReceive?(result)
try Task.checkCancellation()
if sentTasks < items.count {
addNewTask()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment