|
import Foundation |
|
import Testing |
|
@testable import TaskQueueKit |
|
|
|
struct TasksTests { |
|
|
|
let delay = 1000 |
|
|
|
@globalActor public final actor TasksTestsActor { |
|
public static let shared = TasksTestsActor() |
|
} |
|
|
|
actor Buffer { |
|
private var continuation: CheckedContinuation<Void, Never>? |
|
let max: Int |
|
var numbers: [Int] = [] |
|
|
|
init(max: Int) { |
|
self.max = max |
|
} |
|
|
|
func append(number: Int) { |
|
numbers.append(number) |
|
if let continuation, numbers.count == max { |
|
continuation.resume() |
|
self.continuation = nil |
|
} |
|
} |
|
|
|
func untilDone() async { |
|
guard numbers.count < max else { |
|
return |
|
} |
|
await withCheckedContinuation { continuation in |
|
self.continuation = continuation |
|
} |
|
} |
|
} |
|
|
|
var halfCoreCount: Int { |
|
max(ProcessInfo.processInfo.activeProcessorCount, 2) |
|
} |
|
|
|
func validateOrder(numbers: [Int]) { |
|
var count = 0 |
|
for i in 0..<numbers.count { |
|
if i < numbers.count - 1 { |
|
let current = numbers[i] |
|
let next = numbers[i+1] |
|
if current != next - 1 { |
|
count += 1 |
|
} |
|
} |
|
} |
|
if count > 0 { |
|
print("💥 There are \(count) numbers out of order") |
|
} else { |
|
print("👌 Numbers are in order") |
|
} |
|
} |
|
|
|
@Test func testTaskQueueSerial() async throws { |
|
print("There are \(ProcessInfo.processInfo.activeProcessorCount) cores") |
|
let max = ProcessInfo.processInfo.activeProcessorCount * 2 |
|
let numbers: [Int] = (1...max).map { $0 } |
|
|
|
let taskQueue = TaskQueue(behavior: .serial, bufferingPolicy: .bufferingNewest(max)) |
|
|
|
let buffer = Buffer(max: max) |
|
|
|
for number in numbers { |
|
taskQueue.receiveNext { |
|
let randomizedDelay = Int.random(in: (delay / 2)...delay) |
|
try? await Task.sleep(for: .milliseconds(randomizedDelay)) |
|
print(number, terminator: " ") |
|
await buffer.append(number: number) |
|
} |
|
} |
|
|
|
await buffer.untilDone() |
|
print("") |
|
print("Input:", numbers) |
|
let output = await buffer.numbers |
|
validateOrder(numbers: output) |
|
#expect(numbers == output) |
|
print("Output:", output) |
|
} |
|
|
|
@Test func testTaskQueueConcurrentTwo() async throws { |
|
print("There are \(ProcessInfo.processInfo.activeProcessorCount) cores") |
|
let max = ProcessInfo.processInfo.activeProcessorCount * 2 |
|
let numbers: [Int] = (1...max).map { $0 } |
|
let maxConcurrentTasks = 2 |
|
|
|
let taskQueue = TaskQueue(behavior: .concurrent(maxConcurrentTasks: maxConcurrentTasks), bufferingPolicy: .bufferingNewest(max)) |
|
|
|
let buffer = Buffer(max: max) |
|
|
|
for number in numbers { |
|
taskQueue.receiveNext { |
|
let randomizedDelay = Int.random(in: (delay / 2)...delay) |
|
try? await Task.sleep(for: .milliseconds(randomizedDelay)) |
|
print(number, terminator: " ") |
|
await buffer.append(number: number) |
|
} |
|
} |
|
|
|
await buffer.untilDone() |
|
print("") |
|
print("Input:", numbers) |
|
let output = await buffer.numbers |
|
validateOrder(numbers: output) |
|
#expect(numbers == output.sorted()) |
|
print("Output:", output) |
|
} |
|
|
|
@Test func testTaskQueueConcurrentMany() async throws { |
|
print("There are \(ProcessInfo.processInfo.activeProcessorCount) cores") |
|
let max = ProcessInfo.processInfo.activeProcessorCount * 2 |
|
let numbers: [Int] = (1...max).map { $0 } |
|
|
|
let taskQueue = TaskQueue(behavior: .concurrent(maxConcurrentTasks: halfCoreCount), bufferingPolicy: .bufferingNewest(max)) |
|
|
|
let buffer = Buffer(max: max) |
|
|
|
for number in numbers { |
|
taskQueue.receiveNext { |
|
let randomizedDelay = Int.random(in: (delay / 2)...delay) |
|
try? await Task.sleep(for: .milliseconds(randomizedDelay)) |
|
print(number, terminator: " ") |
|
await buffer.append(number: number) |
|
} |
|
} |
|
|
|
await buffer.untilDone() |
|
print("") |
|
print("Input:", numbers) |
|
let output = await buffer.numbers |
|
validateOrder(numbers: output) |
|
#expect(numbers == output.sorted()) |
|
print("Output:", output) |
|
} |
|
|
|
@Test func testTaskQueueConcurrentManyOnMainActor() async throws { |
|
print("There are \(ProcessInfo.processInfo.activeProcessorCount) cores") |
|
let max = ProcessInfo.processInfo.activeProcessorCount * 2 |
|
let numbers: [Int] = (1...max).map { $0 } |
|
|
|
let taskQueue = TaskQueue(behavior: .concurrent(maxConcurrentTasks: 3), bufferingPolicy: .bufferingNewest(max)) |
|
|
|
let buffer = Buffer(max: max) |
|
|
|
for number in numbers { |
|
taskQueue.receiveNext { @MainActor in |
|
let randomizedDelay = Int.random(in: (delay / 2)...delay) |
|
try? await Task.sleep(for: .milliseconds(randomizedDelay)) |
|
print(number, terminator: " ") |
|
await buffer.append(number: number) |
|
} |
|
} |
|
|
|
await buffer.untilDone() |
|
print("") |
|
print("Input:", numbers) |
|
let output = await buffer.numbers |
|
validateOrder(numbers: output) |
|
#expect(numbers == output.sorted()) |
|
print("Output:", output) |
|
} |
|
|
|
@Test func testTaskQueueConcurrentManyOnTasksTestsActor() async throws { |
|
print("There are \(ProcessInfo.processInfo.activeProcessorCount) cores") |
|
let max = ProcessInfo.processInfo.activeProcessorCount * 2 |
|
let numbers: [Int] = (1...max).map { $0 } |
|
|
|
let taskQueue = TaskQueue(behavior: .concurrent(maxConcurrentTasks: 3), bufferingPolicy: .bufferingNewest(max)) |
|
|
|
let buffer = Buffer(max: max) |
|
|
|
for number in numbers { |
|
taskQueue.receiveNext { @TasksTestsActor in |
|
let randomizedDelay = Int.random(in: (delay / 2)...delay) |
|
try? await Task.sleep(for: .milliseconds(randomizedDelay)) |
|
print(number, terminator: " ") |
|
await buffer.append(number: number) |
|
} |
|
} |
|
|
|
await buffer.untilDone() |
|
print("") |
|
print("Input:", numbers) |
|
let output = await buffer.numbers |
|
validateOrder(numbers: output) |
|
#expect(numbers == output.sorted()) |
|
print("Output:", output) |
|
} |
|
|
|
@Test func testTaskQueueConcurrentManyOnTasksTestsActorForOdd() async throws { |
|
print("There are \(ProcessInfo.processInfo.activeProcessorCount) cores") |
|
let max = ProcessInfo.processInfo.activeProcessorCount * 2 |
|
let numbers: [Int] = (1...max).map { $0 } |
|
|
|
let taskQueue = TaskQueue(behavior: .concurrent(maxConcurrentTasks: 3), bufferingPolicy: .bufferingNewest(max)) |
|
|
|
let buffer = Buffer(max: max) |
|
|
|
for number in numbers { |
|
if number % 2 == 0 { |
|
taskQueue.receiveNext { |
|
let randomizedDelay = Int.random(in: (delay / 2)...delay) |
|
try? await Task.sleep(for: .milliseconds(randomizedDelay)) |
|
print(number, terminator: " ") |
|
await buffer.append(number: number) |
|
} |
|
} else { |
|
taskQueue.receiveNext { @TasksTestsActor in |
|
let randomizedDelay = Int.random(in: (delay / 2)...delay) |
|
try? await Task.sleep(for: .milliseconds(randomizedDelay)) |
|
print(number, terminator: " ") |
|
await buffer.append(number: number) |
|
} |
|
} |
|
} |
|
|
|
await buffer.untilDone() |
|
print("") |
|
print("Input:", numbers) |
|
let output = await buffer.numbers |
|
validateOrder(numbers: output) |
|
#expect(numbers == output.sorted()) |
|
print("Output:", output) |
|
} |
|
|
|
@Test func testTaskQueueWithBarriers() async throws { |
|
print("There are \(ProcessInfo.processInfo.activeProcessorCount) cores") |
|
let max = ProcessInfo.processInfo.activeProcessorCount * 2 |
|
let numbers: [Int] = (1...max).map { $0 } |
|
|
|
let taskQueue = TaskQueue(behavior: .concurrent(maxConcurrentTasks: ProcessInfo.processInfo.activeProcessorCount), bufferingPolicy: .bufferingNewest(max)) |
|
|
|
let buffer = Buffer(max: max) |
|
|
|
print("Queueing", terminator: " ") |
|
for number in numbers { |
|
let isBarrier = number % 7 == 0 |
|
print(number, terminator: " ") |
|
taskQueue.receiveNext(isBarrier: isBarrier) { |
|
let randomizedDelay = Int.random(in: (delay / 2)...delay) |
|
try? await Task.sleep(for: .milliseconds(randomizedDelay)) |
|
if isBarrier { |
|
print("[\(number)]", terminator: " ") |
|
} else { |
|
print(number, terminator: " ") |
|
} |
|
await buffer.append(number: number) |
|
} |
|
} |
|
print("") |
|
|
|
await buffer.untilDone() |
|
print("") |
|
print("Input:", numbers) |
|
let output = await buffer.numbers |
|
validateOrder(numbers: output) |
|
#expect(numbers == output.sorted()) |
|
print("Output:", output) |
|
} |
|
|
|
} |