Skip to content

Instantly share code, notes, and snippets.

@emorydunn
Last active April 19, 2022 19:36
Show Gist options
  • Save emorydunn/e6b5c9803e5774c26926595a63b23f37 to your computer and use it in GitHub Desktop.
Save emorydunn/e6b5c9803e5774c26926595a63b23f37 to your computer and use it in GitHub Desktop.
A publisher that delivers the messages from a WebSocket.
//
// WebSocketTaskPublisher.swift
//
//
// Created by Emory Dunn on 5/23/21.
//
import Foundation
import Combine
extension URLSession {
/// Returns a publisher that wraps a URL session WebSocket task for a given URL.
///
/// The provided URL must have a `ws` or `wss` scheme.
/// - Parameter url: The WebSocket URL with which to connect.
func webSocketTaskPublisher(for url: URL) -> WebSocketTaskPublisher {
WebSocketTaskPublisher(with: url, session: self)
}
}
/// A publisher that delivers the messages from a WebSocket.
public struct WebSocketTaskPublisher: Publisher {
public typealias Output = URLSessionWebSocketTask.Message
public typealias Failure = Error
let task: URLSessionWebSocketTask
/// Creates a WebSocket task publisher from the provided URL and URL session.
///
/// The provided URL must have a `ws` or `wss` scheme.
/// - Parameters:
/// - url: The WebSocket URL with which to connect.
/// - session: The URLSession to create the WebSocket task.
public init(with url: URL, session: URLSession = URLSession.shared) {
self.task = session.webSocketTask(with: url)
}
public func receive<S>(subscriber: S) where S : Subscriber, Error == S.Failure, URLSessionWebSocketTask.Message == S.Input {
let subscrption = Subscription(task: task, target: subscriber)
subscriber.receive(subscription: subscrption)
}
}
extension WebSocketTaskPublisher {
class Subscription<Target: Subscriber>: Combine.Subscription where Target.Input == Output, Target.Failure == Error {
let task: URLSessionWebSocketTask
var target: Target?
init(task: URLSessionWebSocketTask, target: Target) {
self.task = task
self.target = target
}
func request(_ demand: Subscribers.Demand) {
guard let target = target else { return }
// Resume the task
task.resume()
listen(for: target, with: demand)
}
func listen(for target: Target, with demand: Subscribers.Demand) {
var demand = demand
self.task.receive { [weak self] result in
switch result {
case let .success(message):
demand -= 1
demand += target.receive(message)
case let .failure(error):
target.receive(completion: .failure(error))
}
if demand > 0 {
self?.listen(for: target, with: demand)
}
}
}
func cancel() {
task.cancel()
target = nil
}
}
}
@ars-java
Copy link

It seems that code

...
              case let .failure(error):
                  target.receive(completion: .failure(error))
              }
              
              if demand > 0 {
                  self?.listen(for: target, with: demand)
              }
...

is incorrect because it's possible that you can send new data or error messages received from task to subscriber after you sent .failure() completion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment