Skip to content

Instantly share code, notes, and snippets.

@sudeshim3
Last active June 30, 2019 17:56
Show Gist options
  • Save sudeshim3/55a09ba35625f65e4f96ab5efa1e4eff to your computer and use it in GitHub Desktop.
Save sudeshim3/55a09ba35625f65e4f96ab5efa1e4eff to your computer and use it in GitHub Desktop.
RxJava
Observable.just(1,2,3).subscribe(Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d("","all done. oncompleted called");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.d("","here is my integer:"+integer.intValue());
}
})
/*
just() – Makes only 1 emission. Observable.just(new Integer[]{1, 2, 3}) makes one emission with Observer callback as onNext(Integer[] integers)
fromArray() – Makes N emissions. Observable.fromArray(new Integer[]{1, 2, 3}) makes three emission with Observer callback as onNext(Integer integer)
*/
//--------------------------------------------------------------------------------------------------------------
Observable.just(Any())
.subscribe(
{ o -> processOnNext(o) },
{ throwable -> processError(throwable) },
{ processCompletion() },
{ disposable -> processSubscription() }
)
//---------------------------------------------------------------------------------------------------------------
/* seperate functions returning observable and observer
Here we show an example where progress is emitted.
*/
fun getDownloadObservable(url: String): Observable<Int> =
Observable.create { emitter ->
if (!emitter.isDisposed)
emitter.onNext(10)
if (!emitter.isDisposed)
emitter.onNext(30)
if (!emitter.isDisposed)
emitter.onNext(40)
if (!emitter.isDisposed)
emitter.onNext(50)
if (!emitter.isDisposed)
emitter.onNext(70)
if (!emitter.isDisposed)
emitter.onNext(100)
}
fun getObserver(): Observer<Int> = object : Observer<Int> {
override fun onComplete() { }
override fun onSubscribe(d: Disposable) { }
override fun onNext(t: Int) { }
override fun onError(e: Throwable) { }
}
getDownloadObservable("").subscribe(getObserver())
//----------------------------------------------------------------------------------------------------------
//SingleObservable
//with Java version less than 8 you have to write the detailed ovveride function which is quite verbose.
Single.create(new SingleOnSubscribe<String>() {
@ovveride
public void subscribe(SingleEmitter<String> emitter) {
if(!emitter.isDisposed())
emitter.onSuccess("amit")
}
})
// With kotlin you get access to lambda functions, makeing the code consise.
fun getSingleObservable():Single<String> = Single.create {
if(!it.isDisposed())
it.onSuccess("Sudesh")
}
fun getSingleObserver() = object : SingleObserver<String> {
override fun onSuccess(t: String) {}
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {}
}
// this is how observalbe can be subscribed to observer
getSingleObservable().subscribe(getSingleObserver())
//callable
private fun callable() {
//From callable---------------------------------------------------------------------------------
//wraps a synchronous function into a asynchronous call
Observable.fromCallable(this::getString).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::receiver)
//--------------------------------OR--------------------------------------------------------------
// declare from callable as seperate function
val networkCallable = Callable<String> { serverRequest() }
Observable.fromCallable(networkCallable).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::receiver)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment