Last active
January 7, 2020 19:33
-
-
Save RBusarow/90270a1c73c4ae40304e1ee05a769159 to your computer and use it in GitHub Desktop.
A hot flow which will auto-start upon its first observer, broadcast to multiple observers, auto-close when there are no observers, and restart when observed again.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.channels.ProducerScope | |
import kotlinx.coroutines.flow.* | |
import kotlinx.coroutines.sync.Mutex | |
import kotlinx.coroutines.sync.withLock | |
@FlowPreview | |
@ExperimentalCoroutinesApi | |
internal class BroadcastFlow<T>( | |
private val scope: CoroutineScope, | |
private val capacity: Int = Channel.Factory.BUFFERED, | |
private val start: CoroutineStart = CoroutineStart.LAZY, | |
private val block: suspend ProducerScope<T>.() -> Unit | |
) : Flow<T> { | |
var count = 0 | |
private var lazyChannelRef = lazyChannel() | |
private fun lazyChannel() = lazy(LazyThreadSafetyMode.NONE) { | |
callbackFlow<T> { | |
block() | |
} | |
.broadcastIn(scope, start) | |
} | |
private val countMutex = Mutex(locked = false) | |
private val channelMutex = Mutex(locked = false) | |
suspend fun channel() = channelMutex.withLock { lazyChannelRef.value } | |
suspend fun get(): Flow<T> = channel() | |
.asFlow() | |
.buffer(capacity) | |
.onStart { | |
countMutex.withLock { | |
count++ | |
} | |
} | |
.onCompletion { | |
countMutex.withLock { | |
if (--count == 0) { | |
channelMutex.withLock { | |
lazyChannelRef.value.cancel() | |
lazyChannelRef = lazyChannel() | |
} | |
} | |
} | |
} | |
@InternalCoroutinesApi | |
override suspend fun collect(collector: FlowCollector<T>) = get().collect(collector) | |
} | |
@Suppress("NOTHING_TO_INLINE") | |
@ExperimentalCoroutinesApi | |
fun <T> broadcastFlow( | |
scope: CoroutineScope, | |
capacity: Int = Channel.Factory.BUFFERED, | |
start: CoroutineStart = CoroutineStart.LAZY, | |
block: suspend ProducerScope<T>.() -> Unit | |
): Flow<T> = BroadcastFlow(scope, capacity, start, block) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.location.Location | |
import android.os.Looper | |
import com.google.android.gms.location.FusedLocationProviderClient | |
import com.google.android.gms.location.LocationCallback | |
import com.google.android.gms.location.LocationRequest | |
import com.google.android.gms.location.LocationResult | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.ExperimentalCoroutinesApi | |
import kotlinx.coroutines.channels.awaitClose | |
import kotlinx.coroutines.flow.Flow | |
/** | |
* Example implementation of a BroadcastFlow. | |
* | |
* Any existing location update requests will be replaced, | |
* so we can only ever have one active request. | |
* | |
* [BroadcastFlow]'s producer will automatically make the request when it is being collected. | |
* | |
* If a second receiver begins collecting the _locations_ flow while the producer is already active, | |
* it will receive the same updates as the first without making a second request. | |
* | |
* When there are no more receivers of the flow, the channel inside the ProducerScope will | |
* automatically close and the request for location updates will be cancelled. | |
* | |
* If the Flow is observed again, a new producer coroutine will be created and | |
* a new request for location updates will be made. | |
*/ | |
class SomeLocationService( | |
private val locationClient: FusedLocationProviderClient, | |
coroutineScope: CoroutineScope | |
) { | |
private val locationRequest = LocationRequest().apply { | |
fastestInterval = 1000 | |
interval = 1000 | |
priority = LocationRequest.PRIORITY_HIGH_ACCURACY | |
} | |
@ExperimentalCoroutinesApi | |
val locations: Flow<Location> = broadcastFlow(coroutineScope) { | |
val callback = object : LocationCallback() { | |
override fun onLocationResult(locationResult: LocationResult) { | |
offer(locationResult.lastLocation) | |
} | |
} | |
val task = locationClient.requestLocationUpdates( | |
locationRequest, callback, Looper.myLooper() | |
) | |
task.addOnCompleteListener { | |
if (!it.isSuccessful) { | |
it.exception?.let { exception -> | |
throw exception | |
} | |
} | |
} | |
awaitClose { | |
locationClient.removeLocationUpdates(callback) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment