Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active April 25, 2023 09:40
Show Gist options
  • Save danielt1263/17ebe60a1c7d9aa87c8b5393639a079e to your computer and use it in GitHub Desktop.
Save danielt1263/17ebe60a1c7d9aa87c8b5393639a079e to your computer and use it in GitHub Desktop.
I have implemented RetryWhen in Combine.
//
// RetryWhen.swift
// CombineSandbox
//
// Created by Daniel Tartaglia on 9/27/19.
// Copyright © 2019 Daniel Tartaglia. MIT License.
//
import Foundation
import Combine
extension Publisher {
public func retryWhen<P>( _ handler: @escaping (AnyPublisher<Self.Failure, Never>) -> P) -> Publishers.RetryWhen<Self, P>
where P: Publisher, P.Failure == Self.Failure {
return Publishers.RetryWhen(upstream: self, handler: handler)
}
}
extension Publishers {
public class RetryWhen<Upstream, Handler>: Publisher where Upstream: Publisher, Handler: Publisher, Handler.Failure == Upstream.Failure {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public init(upstream: Upstream, handler: @escaping (AnyPublisher<Upstream.Failure, Never>) -> Handler) {
self.upstream = upstream
self.handler = handler
}
public func receive<S>(subscriber: S) where S: Subscriber, RetryWhen.Failure == S.Failure, RetryWhen.Output == S.Input {
let subject = CurrentValueSubject<Optional<Upstream.Failure>, Never>(nil)
let errorHandler = handler(subject.compactMap { $0 }.eraseToAnyPublisher())
errorHandler
.map { Optional.some($0) }
.prepend(nil)
.flatMap { _ in
self.upstream
.catch { (error) -> Empty<Upstream.Output, Upstream.Failure> in
subject.send(error)
return Empty(completeImmediately: true)
}
}
.subscribe(subscriber)
}
private let upstream: Upstream
private let handler: (AnyPublisher<Upstream.Failure, Never>) -> Handler
}
}
//
// RetryWhenTests.swift
// CombineSandboxTests
//
// Created by Daniel Tartaglia on 9/27/19.
// Copyright © 2019 Daniel Tartaglia. MIT License.
//
import XCTest
import Combine
class RetryWhenTests: XCTestCase {
func testPassthroughValue() {
let expectation = XCTestExpectation(description: "testPassthroughValue")
let subject = PassthroughSubject<Int, Error>()
let completion = subject.retryWhen { (input) -> Empty<Int, Error> in Empty<Int, Error>() }
.sink(receiveCompletion: { _ in }, receiveValue: {
XCTAssertEqual($0, 5)
expectation.fulfill()
})
subject.send(5)
wait(for: [expectation], timeout: 5.0)
completion.cancel()
}
func testHandlerCalledOnError() {
let expectation = XCTestExpectation(description: "testHandlerCalledOnError")
let subject = PassthroughSubject<Int, Error>()
let handler: (AnyPublisher<Error, Never>) -> Fail<Int, Error> = { _ in
expectation.fulfill()
return Fail.init(error: TestError())
}
let completion = subject.retryWhen(handler)
.sink(receiveCompletion: { _ in }, receiveValue: { _ in
XCTFail()
})
subject.send(completion: Subscribers.Completion<Error>.failure(TestError()))
wait(for: [expectation], timeout: 5.0)
completion.cancel()
}
func testPublisherResubscribed() {
let expectation = XCTestExpectation(description: "testPublisherResubscribed")
var first = false
let deferredPublisher = Deferred { () -> AnyPublisher<Int, TestError> in
if first {
return Just(13)
.mapError { _ in TestError() }
.eraseToAnyPublisher()
}
else {
first = true
return Fail<Int, TestError>(error: TestError())
.eraseToAnyPublisher()
}
}
let handler: (AnyPublisher<TestError, Never>) -> AnyPublisher<Int, TestError> = { input in
return input
.flatMap { _ in Just(17) }
.mapError { _ in TestError() }
.eraseToAnyPublisher()
}
let completion = deferredPublisher.retryWhen(handler)
.sink(receiveCompletion: { _ in }, receiveValue: {
XCTAssertEqual($0, 13)
expectation.fulfill()
})
wait(for: [expectation], timeout: 5.0)
completion.cancel()
}
}
struct TestError: Error { }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment