Last active
March 3, 2016 14:25
-
-
Save clemp6r/bd8da18f70c15ea15a61 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.wizbii.wizbiiandroid.services | |
import rx.Observable | |
import rx.subjects.PublishSubject | |
/** | |
* POC of a reactive data store. | |
*/ | |
abstract class ReactiveStore<T> { | |
/** | |
* Local version of the data. | |
*/ | |
private var value: T? = null | |
/** | |
* Bus for notifying new versions of the data to all subscribers. | |
*/ | |
private var bus = PublishSubject.create<T>() | |
/** | |
* Observable for fetching the remote version of the data. | |
*/ | |
protected abstract fun getRemoteData(): Observable<T> | |
/** | |
* Updates the remote data and emits the updated value. | |
*/ | |
protected abstract fun updateRemoteData(parameters: Any): Observable<T> | |
/** | |
* Infinite stream of value updates. | |
*/ | |
fun values(): Observable<T> { | |
return Observable.create { | |
if (value != null) { | |
it.onNext(value) | |
bus.subscribe(it) | |
} else { | |
bus.subscribe(it) | |
getRemoteData().subscribe( | |
{ | |
onNewValue(it) | |
}, | |
{ | |
// TODO wrap errors into values without stopping the observable | |
bus.onError(it) | |
bus = PublishSubject.create<T>() | |
} | |
) | |
} | |
} | |
} | |
private fun onNewValue(it: T) { | |
value = it | |
bus.onNext(it) | |
} | |
fun updateValue(parameters: Any): Observable<T> { | |
return updateRemoteData(parameters).doOnNext { onNewValue(it) } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment