Skip to content

Instantly share code, notes, and snippets.

@RBusarow
Last active January 7, 2020 19:33
Show Gist options
  • Save RBusarow/90270a1c73c4ae40304e1ee05a769159 to your computer and use it in GitHub Desktop.
Save RBusarow/90270a1c73c4ae40304e1ee05a769159 to your computer and use it in GitHub Desktop.
A hot flow which will auto-start upon its first observer, broadcast to multiple observers, auto-close when there are no observers, and restart when observed again.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@FlowPreview
@ExperimentalCoroutinesApi
internal class BroadcastFlow<T>(
private val scope: CoroutineScope,
private val capacity: Int = Channel.Factory.BUFFERED,
private val start: CoroutineStart = CoroutineStart.LAZY,
private val block: suspend ProducerScope<T>.() -> Unit
) : Flow<T> {
var count = 0
private var lazyChannelRef = lazyChannel()
private fun lazyChannel() = lazy(LazyThreadSafetyMode.NONE) {
callbackFlow<T> {
block()
}
.broadcastIn(scope, start)
}
private val countMutex = Mutex(locked = false)
private val channelMutex = Mutex(locked = false)
suspend fun channel() = channelMutex.withLock { lazyChannelRef.value }
suspend fun get(): Flow<T> = channel()
.asFlow()
.buffer(capacity)
.onStart {
countMutex.withLock {
count++
}
}
.onCompletion {
countMutex.withLock {
if (--count == 0) {
channelMutex.withLock {
lazyChannelRef.value.cancel()
lazyChannelRef = lazyChannel()
}
}
}
}
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) = get().collect(collector)
}
@Suppress("NOTHING_TO_INLINE")
@ExperimentalCoroutinesApi
fun <T> broadcastFlow(
scope: CoroutineScope,
capacity: Int = Channel.Factory.BUFFERED,
start: CoroutineStart = CoroutineStart.LAZY,
block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = BroadcastFlow(scope, capacity, start, block)
import android.location.Location
import android.os.Looper
import com.google.android.gms.location.FusedLocationProviderClient
import com.google.android.gms.location.LocationCallback
import com.google.android.gms.location.LocationRequest
import com.google.android.gms.location.LocationResult
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
/**
* Example implementation of a BroadcastFlow.
*
* Any existing location update requests will be replaced,
* so we can only ever have one active request.
*
* [BroadcastFlow]'s producer will automatically make the request when it is being collected.
*
* If a second receiver begins collecting the _locations_ flow while the producer is already active,
* it will receive the same updates as the first without making a second request.
*
* When there are no more receivers of the flow, the channel inside the ProducerScope will
* automatically close and the request for location updates will be cancelled.
*
* If the Flow is observed again, a new producer coroutine will be created and
* a new request for location updates will be made.
*/
class SomeLocationService(
private val locationClient: FusedLocationProviderClient,
coroutineScope: CoroutineScope
) {
private val locationRequest = LocationRequest().apply {
fastestInterval = 1000
interval = 1000
priority = LocationRequest.PRIORITY_HIGH_ACCURACY
}
@ExperimentalCoroutinesApi
val locations: Flow<Location> = broadcastFlow(coroutineScope) {
val callback = object : LocationCallback() {
override fun onLocationResult(locationResult: LocationResult) {
offer(locationResult.lastLocation)
}
}
val task = locationClient.requestLocationUpdates(
locationRequest, callback, Looper.myLooper()
)
task.addOnCompleteListener {
if (!it.isSuccessful) {
it.exception?.let { exception ->
throw exception
}
}
}
awaitClose {
locationClient.removeLocationUpdates(callback)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment