|
func testListeningUpdates() async throws { |
|
let controller = SequenceController<Int>() |
|
let sequence = AsyncStream<Int> { [controller] continuation in |
|
controller.onPublishElement = { |
|
continuation.yield($0) |
|
} |
|
controller.onFinish = { |
|
continuation.finish() |
|
} |
|
} |
|
.shared() |
|
|
|
final class ItemsHolder: @unchecked Sendable { |
|
var values: [Int] = [] |
|
} |
|
let itemsHolder1 = ItemsHolder() |
|
let itemsHolder2 = ItemsHolder() |
|
let itemsHolder3 = ItemsHolder() |
|
|
|
let stream1 = sequence |
|
let stream2 = sequence |
|
let stream3 = SharedAsyncSequence(sequence) |
|
|
|
|
|
Task.detached { |
|
try await Task.sleep(nanoseconds: 100_000_000) |
|
controller.onPublishElement!(1) |
|
try await Task.sleep(nanoseconds: 100_000_000) |
|
controller.onPublishElement!(5) |
|
try await Task.sleep(nanoseconds: 100_000_000) |
|
controller.onPublishElement!(23) |
|
try await Task.sleep(nanoseconds: 100_000_000) |
|
controller.onFinish!() |
|
} |
|
|
|
try await withThrowingTaskGroup(of: Void.self) { [stream1, stream2, stream3] group in |
|
group.addTask { |
|
for try await value in stream1 { |
|
itemsHolder1.values.append(value) |
|
} |
|
} |
|
group.addTask { |
|
for try await value in stream2 { |
|
itemsHolder2.values.append(value) |
|
} |
|
} |
|
group.addTask { |
|
for try await value in stream3 { |
|
itemsHolder3.values.append(value) |
|
} |
|
} |
|
try await group.waitForAll() |
|
} |
|
|
|
XCTAssert(itemsHolder1.values == [1, 5, 23]) |
|
XCTAssert(itemsHolder2.values == [1, 5, 23]) |
|
XCTAssert(itemsHolder3.values == [1, 5, 23]) |
|
} |
|
|
|
final class SequenceController<Element> { |
|
var onPublishElement: ((Element) -> Void)? = nil |
|
var onFinish: (() -> Void)? = nil |
|
} |