Skip to content

Instantly share code, notes, and snippets.

@MarioAriasC
Last active December 17, 2015 04:39
Show Gist options
  • Save MarioAriasC/5552266 to your computer and use it in GitHub Desktop.
Save MarioAriasC/5552266 to your computer and use it in GitHub Desktop.
Creating a rx.Observable from a function
import rx.Observer
import rx.Subscription
import rx.Observable
import rx.util.functions.*
fun<T> Function1<Observer<T>, Subscription>.asObservable(): Observable<T> {
return Observable.create(Func1<Observer<T>, Subscription>{
this(it!!)
})!!
}
private fun customObservableNonBlocking(): Observable<String> {
return {(observer: Observer<String>) ->
val t = Thread(Runnable{
for(i in 0..75){
observer.onNext("anotherValue_$i")
}
observer.onCompleted()
})
t.start()
Subscription{
t.interrupt()
}
}.asObservable()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment