Created
June 8, 2016 13:13
-
-
Save pardom-zz/bc6cef79f71bfb28d9f9bcb99afd4e72 to your computer and use it in GitHub Desktop.
RPM
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable | |
import java.util.LinkedHashMap | |
class HashStreamMap<K> : LinkedHashMap<K, Observable<*>>, MutableStreamMap<K> { | |
constructor() : super() | |
constructor(initialCapacity: Int) : super(initialCapacity) | |
constructor(initialCapacity: Int, loadFactor: Float) : super(initialCapacity, loadFactor) | |
constructor(m: MutableMap<out K, out Observable<*>>?) : super(m) | |
constructor(initialCapacity: Int, loadFactor: Float, accessOrder: Boolean) : super(initialCapacity, loadFactor, accessOrder) | |
override fun <T : Any> stream(key: K): Observable<T> { | |
@Suppress("UNCHECKED_CAST") | |
return this[key] as Observable<T> | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable | |
import rx.subjects.ReplaySubject | |
import rx.subjects.Subject | |
import rx.subscriptions.CompositeSubscription | |
abstract class Model<I : Key, O : Key> { | |
private val proxy = SourcesProxy<I>() | |
private val sinks: StreamMap<O> by lazy { setUp(proxy) } | |
abstract protected fun setUp(sources: StreamMap<I>): StreamMap<O> | |
open protected fun onAttach() { | |
// no op | |
} | |
open protected fun onDetach() { | |
// no op | |
} | |
fun attach(sources: StreamMap<I>): StreamMap<O> { | |
proxy.attach(sources) | |
onAttach() | |
return sinks | |
} | |
fun detach() { | |
proxy.detach() | |
onDetach() | |
} | |
interface Key { | |
operator fun <T : Any> invoke(streamMap: StreamMap<out Key>): Observable<T> { | |
@Suppress("UNCHECKED_CAST") | |
return (streamMap as StreamMap<Key>).stream<T>(this) | |
} | |
} | |
// Proxy | |
private class SourcesProxy<K : Key> : MutableStreamMap<K> by mutableStreamMapOf<K>() { | |
private val subscriptions = CompositeSubscription() | |
override fun <T : Any> stream(key: K): Observable<T> { | |
if (!containsKey(key)) { | |
put(key, ReplaySubject.create<T>(1)) | |
} | |
@Suppress("UNCHECKED_CAST") | |
return this[key] as Observable<T> | |
} | |
fun attach(sources: StreamMap<K>) { | |
sources.keys.forEach { key -> | |
@Suppress("UNCHECKED_CAST") | |
sources.stream<Any>(key) | |
.subscribe(stream<Any>(key) as Subject<Any, Any>) | |
.apply { subscriptions.add(this) } | |
} | |
} | |
fun detach() { | |
subscriptions.clear() | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable | |
// Maps | |
interface StreamMap<K> : Map<K, Observable<*>> { | |
fun <T : Any> stream(key: K): Observable<T> | |
} | |
interface MutableStreamMap<K> : StreamMap<K>, MutableMap<K, Observable<*>> | |
// Utils | |
fun <K> streamMapOf(vararg pairs: Pair<K, Observable<*>>): StreamMap<K> = hashStreamMapOf(*pairs) | |
fun <K> mutableStreamMapOf(vararg pairs: Pair<K, Observable<*>>): MutableStreamMap<K> = hashStreamMapOf(*pairs) | |
fun <K> hashStreamMapOf(vararg pairs: Pair<K, Observable<*>>): MutableStreamMap<K> | |
= HashStreamMap<K>(mapCapacity(pairs.size)).apply { putAll(pairs) } | |
// Taken from Maps.kt | |
private val INT_MAX_POWER_OF_TWO: Int = Int.MAX_VALUE / 2 + 1 | |
private fun mapCapacity(expectedSize: Int): Int { | |
if (expectedSize < 3) { | |
return expectedSize + 1 | |
} | |
if (expectedSize < INT_MAX_POWER_OF_TWO) { | |
return expectedSize + expectedSize / 3 | |
} | |
return Int.MAX_VALUE // any large value | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment