flow wrap
Creates a wrapper for a function returning Flow accepting no arguments
fun $NAME$(onEach: ($TYPE$) -> Unit, onCompletion: (Throwable?) -> Unit): Cancellable =
$NAME$().collect(onEach, onCompletion)
import Combine | |
import Foundation | |
import shared | |
typealias OnEach<Output> = (Output) -> Void | |
typealias OnCompletion<Failure> = (Failure?) -> Void | |
typealias OnCollect<Output, Failure> = (@escaping OnEach<Output>, @escaping OnCompletion<Failure>) -> shared.Cancellable | |
/** |
import Combine | |
import Foundation | |
import shared | |
extension KotlinThrowable: Error { | |
} | |
struct PublisherFailures { | |
/** | |
The action to invoke when a failure is dropped as the result of a `Publisher` returned by |
collect(taskService.tasks) | |
.completeOnFailure() | |
.sink { [weak self] tasks in | |
print("Received \(tasks.count) tasks") | |
} | |
.store(in: &subscriptions) |
import Combine | |
import Foundation | |
import shared | |
typealias OnEach<Output> = (Output) -> Void | |
typealias OnCompletion<Failure> = (Failure?) -> Void | |
typealias OnCollect<Output, Failure> = (@escaping OnEach<Output>, @escaping OnCompletion<Failure>) -> shared.Cancellable | |
/** |
fun <T> Flow<T>.collect(onEach: (T) -> Unit, onCompletion: (cause: Throwable?) -> Unit): Cancellable { | |
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main) | |
scope.launch { | |
try { | |
collect { | |
onEach(it) | |
} | |
onCompletion(null) |
interface TaskService { | |
val tasks: Flow<List<Task>> | |
fun tasks(onEach: (List<Task>) -> Unit, onCompletion: (Throwable?) -> Unit): Cancellable = | |
tasks.collect(onEach, onCompletion) | |
} |
interface Cancellable { | |
fun cancel() | |
} |
class AnyFlow<T>(source: Flow<T>): Flow<T> by source |
class AnyFlow<T>(source: Flow<T>): Flow<T> by source { | |
fun collect(onEach: (T) -> Unit, onCompletion: (cause: Throwable?) -> Unit): Cancellable { | |
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main) | |
scope.launch { | |
try { | |
collect { | |
onEach(it) | |
} |