Created
July 16, 2014 12:26
-
-
Save daschl/1b7db65cf5d73f6af264 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // 1: do func1(); which returns an Observable<Res1> | |
| // 2: once Res1 is onComplete (so done without error): | |
| // 3a: do func2(); | |
| // 3b: as long as the result does not match a given codition. | |
| // 4: if done, return the original result from func1 | |
| In sync it would more or less look like: | |
| Res1 res1 = func1(); | |
| done = false; | |
| while(!done) { | |
| if (func2() == condition) { | |
| done = true; | |
| } | |
| } | |
| ... for more context: I store a document on the server and then need to poll the server for its state (replicated, persisted).. and once my needed state is satisfied I just return whatever got returned from the original store call. | |
| return res1; |
@daschl in addition, you forget to use take(1).
Not sure what's going on actually, the code below works as I expect. Not sure why you send is returning the same value.
upsert(1)
calling send() with 0
calling send() with 1
calling send() with 2
calling send() with 3
calling send() with 4
calling send() with 5
1
object MainScala {
def main(args: Array[String]): Unit = {
def upsert(i: Int) = {
println(s"upsert($i)")
Observable.items(i)
}
var i = 0;
def send() = Observable[Int](observer => {
println(s"calling send() with $i")
observer.onNext(i)
i += 1
observer.onCompleted()
})
def test(n: Int) = n < 5
val zs = upsert(1).flatMap(x =>
send().repeat.dropWhile(n => test(n))
.take(1).map(_ => x)
)
zs.subscribe(z => println(z))
readLine()
}
}
Author
@headinthebox, @zsxwing thanks much :) .. My thought then was around maybe using the scheduler to schedule into a observable and then use this code so N responses get emitted from one observable.
Not sure if this is the right way :)
Author
Ha! I think this works:
what do you think guys?
private <D extends Document<?>> Observable<D> observe(Observable<D> original, PersistTo persistTo, ReplicateTo replicateTo) {
return original.flatMap(new Func1<D, Observable<? extends D>>() {
@Override
public Observable<? extends D> call(final D original) {
// simulate with persistto master for now
return Observable.defer(new Func0<Observable<ObserveResponse>>() {
@Override
public Observable<ObserveResponse> call() {
return core.send(new ObserveRequest(original.id(), original.cas(), true, (short) 0, bucket));
}
}).repeat()
.skipWhile(new Func1<ObserveResponse, Boolean>() {
@Override
public Boolean call(ObserveResponse observeResponse) {
// also simulate with persistto master for now
System.err.println(observeResponse.observeStatus());
return observeResponse.observeStatus() != ObserveResponse.ObserveStatus.FOUND_PERSISTED;
}
}).take(1).map(new Func1<ObserveResponse, D>() {
@Override
public D call(ObserveResponse observeResponse) {
return original;
}
});
}
});
}Yup, that looks good.
object MainScala {
def main(args: Array[String]): Unit = {
def upsert(i: Int) = {
println(s"upsert($i)")
Observable.items(i)
}
var i = 0;
def send() = {
val result = AsyncSubject[Int]();
Observable[Int](observer => {
println(s"calling send() with $i")
observer.onNext(i)
i += 1
observer.onCompleted()
}).subscribe(result)
result
}
def test(n: Int) = n < 5
val zs = upsert(1).flatMap(x =>
Observable.defer{ send() }.repeat.dropWhile(n => test(n))
.take(1).map(_ => x)
)
zs.subscribe(z => println(z))
readLine()
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Use https://github.com/Netflix/RxJava/wiki/Creating-Observables#defer.