Skip to content

Instantly share code, notes, and snippets.

View Nillerr's full-sized avatar

Nicklas Jensen Nillerr

View GitHub Profile

Flow Wrapper Function (Live Templates)

flow wrap

Creates a wrapper for a function returning Flow accepting no arguments

fun $NAME$(onEach: ($TYPE$) -> Unit, onCompletion: (Throwable?) -> Unit): Cancellable =
    $NAME$().collect(onEach, onCompletion)
import Combine
import Foundation
import shared
typealias OnEach<Output> = (Output) -> Void
typealias OnCompletion<Failure> = (Failure?) -> Void
typealias OnCollect<Output, Failure> = (@escaping OnEach<Output>, @escaping OnCompletion<Failure>) -> shared.Cancellable
/**
import Combine
import Foundation
import shared
extension KotlinThrowable: Error {
}
struct PublisherFailures {
/**
The action to invoke when a failure is dropped as the result of a `Publisher` returned by
collect(taskService.tasks)
.completeOnFailure()
.sink { [weak self] tasks in
print("Received \(tasks.count) tasks")
}
.store(in: &subscriptions)
import Combine
import Foundation
import shared
typealias OnEach<Output> = (Output) -> Void
typealias OnCompletion<Failure> = (Failure?) -> Void
typealias OnCollect<Output, Failure> = (@escaping OnEach<Output>, @escaping OnCompletion<Failure>) -> shared.Cancellable
/**
fun <T> Flow<T>.collect(onEach: (T) -> Unit, onCompletion: (cause: Throwable?) -> Unit): Cancellable {
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main)
scope.launch {
try {
collect {
onEach(it)
}
onCompletion(null)
interface TaskService {
val tasks: Flow<List<Task>>
fun tasks(onEach: (List<Task>) -> Unit, onCompletion: (Throwable?) -> Unit): Cancellable =
tasks.collect(onEach, onCompletion)
}
interface Cancellable {
fun cancel()
}
@Nillerr
Nillerr / AnyFlow.kt
Created May 20, 2021 19:21
AnyFlow<T>
class AnyFlow<T>(source: Flow<T>): Flow<T> by source
@Nillerr
Nillerr / AnyFlow.kt
Last active May 25, 2021 17:12
AnyFlow<T> with collect
class AnyFlow<T>(source: Flow<T>): Flow<T> by source {
fun collect(onEach: (T) -> Unit, onCompletion: (cause: Throwable?) -> Unit): Cancellable {
val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main)
scope.launch {
try {
collect {
onEach(it)
}