Last active
October 25, 2016 10:41
-
-
Save Piasy/613ded2cee703b853464acd1645c7687 to your computer and use it in GitHub Desktop.
Demonstrate ValueRequestOperator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Random; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReference; | |
import org.junit.Test; | |
import rx.Emitter; | |
import rx.Observable; | |
import rx.Producer; | |
import rx.Subscriber; | |
import rx.Subscription; | |
import rx.schedulers.Schedulers; | |
public class ProducerTest { | |
private static Observable<Long> source() { | |
return Observable.fromEmitter(emitter -> { | |
new Thread(() -> { | |
while (true) { | |
long value = System.currentTimeMillis(); | |
emitter.onNext(value); | |
sleep(10); | |
} | |
}, "source thread").start(); | |
}, Emitter.BackpressureMode.DROP); | |
} | |
private static void sleep(long millis) { | |
try { | |
Thread.sleep(millis); | |
} catch (InterruptedException e) { | |
// ignore | |
} | |
} | |
private static void log(String message) { | |
System.out.println(message | |
+ " @ " | |
+ Thread.currentThread().getName() | |
+ ", " | |
+ System.currentTimeMillis()); | |
} | |
@Test | |
public void test() { | |
final Random random = new Random(System.currentTimeMillis()); | |
Subscription subscription = source() | |
.observeOn(Schedulers.computation(), 1) | |
.map(value -> { | |
sleep(50 + random.nextInt(50)); // simulate heavy job | |
if (value % 2 == 0) { | |
return String.valueOf(value); | |
} else { | |
return ""; | |
} | |
}) | |
.filter(str -> str.length() > 0) | |
.observeOn(Schedulers.io(), 1) | |
.lift(new ValueRequestOperator<>()) | |
.subscribe(new Subscriber<String>() { | |
@Override | |
public void onStart() { | |
request(1); | |
} | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(String s) { | |
log("final subscriber got " + s); | |
Observable.timer(1, TimeUnit.SECONDS) | |
.subscribe(l -> { | |
log("request another"); | |
request(1); | |
}); | |
} | |
}); | |
sleep(10_000); | |
subscription.unsubscribe(); | |
} | |
static class ValueRequestOperator<T> implements Observable.Operator<T, T> { | |
@Override | |
public Subscriber<? super T> call(Subscriber<? super T> child) { | |
ValueRequestProducer<T> producer = new ValueRequestProducer<>(child); | |
Subscriber<T> subscriber = new Subscriber<T>() { | |
@Override | |
public void onCompleted() { | |
child.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
child.onError(e); | |
} | |
@Override | |
public void onNext(T s) { | |
producer.setValue(s); | |
} | |
}; | |
child.setProducer(producer); | |
return subscriber; | |
} | |
} | |
static class ValueRequestProducer<T> implements Producer { | |
private static final int NO_REQ_NO_VALUE = 1; | |
private static final int NO_REQ_HAS_VALUE = 2; | |
private static final int HAS_REQ_NO_VALUE = 3; | |
private final AtomicInteger mState = new AtomicInteger(NO_REQ_NO_VALUE); | |
private final AtomicReference<T> mValue = new AtomicReference<>(); | |
private final Subscriber<? super T> mChild; | |
ValueRequestProducer(Subscriber<? super T> child) { | |
mChild = child; | |
} | |
public void setValue(T value) { | |
while (true) { | |
int state = mState.get(); | |
if (state == NO_REQ_NO_VALUE) { | |
mValue.set(value); | |
if (!mState.compareAndSet(NO_REQ_NO_VALUE, NO_REQ_HAS_VALUE)) { | |
continue; | |
} | |
} else if (state == HAS_REQ_NO_VALUE) { | |
if (mState.compareAndSet(HAS_REQ_NO_VALUE, NO_REQ_NO_VALUE)) { | |
if (!mChild.isUnsubscribed()) { | |
mChild.onNext(value); | |
mValue.set(null); | |
} | |
} | |
} else if (state == NO_REQ_HAS_VALUE) { | |
mValue.set(value); | |
} | |
return; | |
} | |
} | |
@Override | |
public void request(long n) { | |
if (n == 0) { | |
return; | |
} | |
if (n < 0) { | |
throw new IllegalStateException("Request can't be negative! " + n); | |
} | |
while (true) { | |
int state = mState.get(); | |
if (state == NO_REQ_NO_VALUE) { | |
if (!mState.compareAndSet(NO_REQ_NO_VALUE, HAS_REQ_NO_VALUE)) { | |
continue; | |
} | |
} else if (state == NO_REQ_HAS_VALUE) { | |
if (mState.compareAndSet(NO_REQ_HAS_VALUE, NO_REQ_NO_VALUE)) { | |
if (!mChild.isUnsubscribed()) { | |
T value = mValue.getAndSet(null); | |
if (value != null && !mChild.isUnsubscribed()) { | |
mChild.onNext(value); | |
} | |
} | |
} | |
} | |
return; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The problem:
We have a data source, which will keep emitting values in a separate thread, then we need to process the emitted value, which will take some time, during the process, we may or may not send a value to downstream. At the subscriber, we need to show a dialog to the user, when the dialog is showing, no event should be sent to the subscriber.
The code above simulate this process, we use the
source()
function to generate values, and use amap()
operator to simulate to process, then we usefilter()
to filter out illegal values, at last welift()
theValueRequestOperator
on it. Besides, we may need to switch executing thread of different steps, doing the process incomputation
thread, and responding to the value inmain
thread (here I useio
in order to run the test in JVM). At last, we use a "1 second" later request to simulate user's action on the dialog.The
ValueRequestOperator
works like theSingleDelayedProducer
in http://akarnokd.blogspot.com/2015/05/operator-concurrency-primitives_86.html, except its state machine.So I have two questions:
ValueRequestOperator
correct?