Skip to content

Instantly share code, notes, and snippets.

@tkersey
Created April 14, 2024 18:51
Show Gist options
  • Save tkersey/4c5fd81a0047aaa4065c572621673c92 to your computer and use it in GitHub Desktop.
Save tkersey/4c5fd81a0047aaa4065c572621673c92 to your computer and use it in GitHub Desktop.
An example of using AsyncStream
import Combine
import Foundation
import os
@MainActor public final class PollingDataModel<DataModels: Sendable> {
private let logger = Logger(subsystem: "Polling", category: "PollingDataModel")
private var pollingTask: Task<Void, Never>?
private var currentlyPolling = false
public init() {}
public func startPolling(
interval: TimeInterval = 10,
triggerInitially: Bool = false,
dataModels: [DataModels],
action: @escaping ([DataModels]) async -> ([DataModels], TimeInterval?),
update: @escaping ([DataModels]) -> Void
) {
guard !currentlyPolling else { return }
currentlyPolling = true
pollingTask = Task {
for await dataModels in await polling(
interval: interval,
triggerInitially: triggerInitially,
dataModels: dataModels,
action: action
) {
update(dataModels)
}
}
}
public func stopPolling() {
guard currentlyPolling else { return }
pollingTask?.cancel()
currentlyPolling = false
}
private func polling(
interval: TimeInterval?,
triggerInitially: Bool = false,
dataModels: [DataModels],
action: @escaping ([DataModels]) async -> ([DataModels], TimeInterval?)
) async -> AsyncStream<[DataModels]> {
AsyncStream { continuation in
var dataModels = dataModels
var interval = interval
Task {
if triggerInitially {
(dataModels, interval) = await action(dataModels)
}
if let currentInterval = interval {
let intervals = Timer.publish(every: currentInterval, on: .main, in: .common)
.autoconnect()
.values
for await _ in intervals {
(dataModels, interval) = await action(dataModels)
continuation.yield(dataModels)
}
}
}
return continuation.finish()
}
}
}
@Mrteller
Copy link

Mrteller commented Apr 11, 2025

Should not continuation.finish() be executed only when cutting off the AsyncStream is necessary?
Currently it's executed immediately after the Task is created, which prevents the polling from continuing.
Also task cancellation is not handled inside the Task. So stopPolling() might not do what it is supposed to.
Setting up cleanup when the AsyncStream is terminated won't hurt. This can be done by using the provided .onTermination handler.

private func polling(
    interval: TimeInterval?,
    triggerInitially: Bool = false,
    dataModels: [DataModels],
    action: @escaping ([DataModels]) async -> ([DataModels], TimeInterval?)
) async -> AsyncStream<[DataModels]> {
    AsyncStream { continuation in
        var dataModels = dataModels
        var interval = interval
        
        let task = Task {
            do {
                if triggerInitially {
                    let result = await action(dataModels)
                    dataModels = result.0
                    interval = result.1
                    continuation.yield(dataModels)
                }

                if let currentInterval = interval {
                    let intervals = Timer.publish(every: currentInterval, on: .main, in: .common)
                        .autoconnect()
                        .values

                    for await _ in intervals {
                        try Task.checkCancellation()
                        let result = await action(dataModels)
                        dataModels = result.0
                        interval = result.1
                        continuation.yield(dataModels)
                    }
                }
            } catch {
                // Handle task cancellation or other errors
            }
            continuation.finish()
        }
        
        continuation.onTermination = { _ in
            task.cancel()
        }
    }
}

@tkersey
Copy link
Author

tkersey commented Apr 11, 2025

@Mrteller I think you're right. Never actually used this code in production. I appreciate the feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment