Last active
July 20, 2025 04:00
-
-
Save thecoolwinter/e5248dcf11c01196a4ba5ec05aaa0dea to your computer and use it in GitHub Desktop.
Swift extension on Swift Concurrency's task group to process an array of data in parallel, with logic for setting a maximum number of parallel items to process at once.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// TaskGroup+addTasks.swift | |
// Anchor | |
// | |
// Copyright (C) 2025 Khan Winter | |
// | |
extension TaskGroup { | |
/// Add tasks to a task group using an array of data, and set a maximum number of parallel tasks to enqueue. | |
/// | |
/// Usage: | |
/// ```swift | |
/// func processItem(_ item: Int) async throws { /* long work... */ } | |
/// | |
/// let itemsToProcess = [1, 2, 3, 4, 5, 6] | |
/// let (stream, continuation) = AsyncStream<Int>.makeStream() | |
/// try await withThrowingTaskGroup(of: Int.self) { group in | |
/// group.addTasks( | |
/// items: itemsToProcess, | |
/// maximumParallelTasks: 2 | |
/// ) { newElement in | |
/// continuation.yield(newElement) | |
/// } body: { item in | |
/// try await processItem(item) | |
/// } | |
/// } | |
/// ``` | |
/// | |
/// - Parameters: | |
/// - items: The items to process. | |
/// - maximumParallelTasks: The number of tasks to add to the task group at a time. | |
/// - isolation: The task group isolation. | |
/// - onReceive: Called when an element is finished processing and returned by the group. | |
/// - body: Called on each item in turn, in parallel. | |
mutating func addTasks<T: Sendable>( | |
items: [T], | |
maximumParallelTasks: Int, | |
isolation: isolated (any Actor)? = #isolation, | |
onReceive: (@isolated(any) (ChildTaskResult) -> Void)?, | |
body: @Sendable @escaping @isolated(any) (T) async -> ChildTaskResult | |
) async throws { | |
let maxConcurrentTasks = Swift.min(items.count, maximumParallelTasks) | |
var sentTasks = 0 | |
func addNewTask() { | |
let taskData = items[sentTasks] | |
addTask { | |
await body(taskData) | |
} | |
sentTasks += 1 | |
} | |
for _ in 0..<maxConcurrentTasks { | |
addNewTask() | |
} | |
for try await result in self { | |
await onReceive?(result) | |
try Task.checkCancellation() | |
if sentTasks < items.count { | |
addNewTask() | |
} | |
} | |
} | |
} | |
extension ThrowingTaskGroup { | |
/// Add tasks to a task group using an array of data, and set a maximum number of parallel tasks to enqueue. | |
/// | |
/// Usage: | |
/// ```swift | |
/// func processItem(_ item: Int) async throws { /* long work... */ } | |
/// | |
/// let itemsToProcess = [1, 2, 3, 4, 5, 6] | |
/// let (stream, continuation) = AsyncStream<Int>.makeStream() | |
/// try await withThrowingTaskGroup(of: Int.self) { group in | |
/// group.addTasks( | |
/// items: itemsToProcess, | |
/// maximumParallelTasks: 2 | |
/// ) { newElement in | |
/// continuation.yield(newElement) | |
/// } body: { item in | |
/// try await processItem(item) | |
/// } | |
/// } | |
/// ``` | |
/// | |
/// - Parameters: | |
/// - items: The items to process. | |
/// - maximumParallelTasks: The number of tasks to add to the task group at a time. | |
/// - isolation: The task group isolation. | |
/// - onReceive: Called when an element is finished processing and returned by the group. | |
/// - body: Called on each item in turn, in parallel. | |
mutating func addTasks<T: Sendable>( | |
items: [T], | |
maximumParallelTasks: Int, | |
isolation: isolated (any Actor)? = #isolation, | |
onReceive: (@isolated(any) (ChildTaskResult) -> Void)?, | |
body: @Sendable @escaping @isolated(any) (T) async throws -> ChildTaskResult | |
) async throws { | |
let maxConcurrentTasks = Swift.min(items.count, maximumParallelTasks) | |
var sentTasks = 0 | |
func addNewTask() { | |
let taskData = items[sentTasks] | |
addTask { | |
try await body(taskData) | |
} | |
sentTasks += 1 | |
} | |
for _ in 0..<maxConcurrentTasks { | |
addNewTask() | |
} | |
for try await result in self { | |
await onReceive?(result) | |
try Task.checkCancellation() | |
if sentTasks < items.count { | |
addNewTask() | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment