Last active
April 19, 2022 19:36
-
-
Save emorydunn/e6b5c9803e5774c26926595a63b23f37 to your computer and use it in GitHub Desktop.
A publisher that delivers the messages from a WebSocket.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// WebSocketTaskPublisher.swift | |
// | |
// | |
// Created by Emory Dunn on 5/23/21. | |
// | |
import Foundation | |
import Combine | |
extension URLSession { | |
/// Returns a publisher that wraps a URL session WebSocket task for a given URL. | |
/// | |
/// The provided URL must have a `ws` or `wss` scheme. | |
/// - Parameter url: The WebSocket URL with which to connect. | |
func webSocketTaskPublisher(for url: URL) -> WebSocketTaskPublisher { | |
WebSocketTaskPublisher(with: url, session: self) | |
} | |
} | |
/// A publisher that delivers the messages from a WebSocket. | |
public struct WebSocketTaskPublisher: Publisher { | |
public typealias Output = URLSessionWebSocketTask.Message | |
public typealias Failure = Error | |
let task: URLSessionWebSocketTask | |
/// Creates a WebSocket task publisher from the provided URL and URL session. | |
/// | |
/// The provided URL must have a `ws` or `wss` scheme. | |
/// - Parameters: | |
/// - url: The WebSocket URL with which to connect. | |
/// - session: The URLSession to create the WebSocket task. | |
public init(with url: URL, session: URLSession = URLSession.shared) { | |
self.task = session.webSocketTask(with: url) | |
} | |
public func receive<S>(subscriber: S) where S : Subscriber, Error == S.Failure, URLSessionWebSocketTask.Message == S.Input { | |
let subscrption = Subscription(task: task, target: subscriber) | |
subscriber.receive(subscription: subscrption) | |
} | |
} | |
extension WebSocketTaskPublisher { | |
class Subscription<Target: Subscriber>: Combine.Subscription where Target.Input == Output, Target.Failure == Error { | |
let task: URLSessionWebSocketTask | |
var target: Target? | |
init(task: URLSessionWebSocketTask, target: Target) { | |
self.task = task | |
self.target = target | |
} | |
func request(_ demand: Subscribers.Demand) { | |
guard let target = target else { return } | |
// Resume the task | |
task.resume() | |
listen(for: target, with: demand) | |
} | |
func listen(for target: Target, with demand: Subscribers.Demand) { | |
var demand = demand | |
self.task.receive { [weak self] result in | |
switch result { | |
case let .success(message): | |
demand -= 1 | |
demand += target.receive(message) | |
case let .failure(error): | |
target.receive(completion: .failure(error)) | |
} | |
if demand > 0 { | |
self?.listen(for: target, with: demand) | |
} | |
} | |
} | |
func cancel() { | |
task.cancel() | |
target = nil | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It seems that code
is incorrect because it's possible that you can send new data or error messages received from task to subscriber after you sent
.failure()
completion.