Last active
May 4, 2016 06:56
-
-
Save imran0101/fcf56dcc6bd38784d4d8f9e415f318d0 to your computer and use it in GitHub Desktop.
RxJava time bound operation. Debounce items without discarding, in the same sequence as the source observable. Raw
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.Subscriber; | |
import rx.functions.Action0; | |
import rx.functions.Action1; | |
import rx.schedulers.Schedulers; | |
/** | |
* This operation queues the items and dispatches them based on time specified for individual item. | |
* <p/> | |
* Note: No item will be discarded. | |
* | |
* @param <T> | |
*/ | |
public class TimeBoundOperator<T> implements Observable.Operator<T, T> { | |
//scheduler for workers | |
Scheduler scheduler; | |
//maintaining the state of the values | |
TimeBoundState<T> state; | |
public TimeBoundOperator() { | |
this(Schedulers.computation()); | |
} | |
public TimeBoundOperator(Scheduler scheduler) { | |
this.scheduler = scheduler; | |
state = new TimeBoundState<>(this.scheduler); | |
} | |
/** | |
* compare each item and return a specific timeout | |
* @param t individual item in the order it was received | |
* @return time out in long for each item. | |
*/ | |
public abstract long timeout(T t); | |
/** | |
* compare each item and return a {@link TimeUnit} based on the timeout. {@link #timeout(Object)} | |
* @param t individual item in the order it was received | |
* @return {@link TimeUnit} | |
*/ | |
public abstract TimeUnit unit(T t); | |
/** | |
* observe the next item in queue. | |
* time for processing the item has started at this method. | |
* @param t | |
*/ | |
public abstract void onWaitNext(T t); | |
@Override | |
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) { | |
return new Subscriber<T>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(final T t) { | |
TimeBoundValue<T> v = TimeBoundValue | |
.create(t, timeout(t), unit(t)) | |
.onStartAction(new Action1<T>() { | |
@Override | |
public void call(T t) { | |
onWaitNext(t); | |
} | |
}) | |
.onNextAction(new Action1<T>() { | |
@Override | |
public void call(T t) { | |
subscriber.onNext(t); | |
} | |
}); | |
state.add(v); | |
if (!state.executing) { | |
state.execute(); | |
} | |
} | |
}; | |
} | |
/** | |
* TimeBoundValue hold individual item values, timeunit and timeout | |
* @param <T> | |
*/ | |
private static class TimeBoundValue<T> { | |
T value; | |
long timeout; | |
TimeUnit unit; | |
Action1<T> onStartAction; | |
Action1<T> onNextAction; | |
public static <T> TimeBoundValue<T> create(T value, long timeout, TimeUnit unit) { | |
TimeBoundValue<T> tValue = new TimeBoundValue<>(); | |
tValue.value = value; | |
tValue.timeout = timeout; | |
tValue.unit = unit; | |
return tValue; | |
} | |
/** | |
* an action to notify when the item process has started | |
* @param action {@link Action1} | |
* @return {@link TimeBoundValue} | |
*/ | |
public TimeBoundValue<T> onStartAction(Action1<T> action) { | |
this.onStartAction = action; | |
return this; | |
} | |
/** | |
* an action to notify when the item process has finished onNext | |
* @param action {@link Action1} | |
* @return {@link TimeBoundValue} | |
*/ | |
public TimeBoundValue<T> onNextAction(Action1<T> action) { | |
this.onNextAction = action; | |
return this; | |
} | |
/** | |
* notify action that the item processing has started | |
*/ | |
public void onStart() { | |
if (this.onStartAction != null) { | |
this.onStartAction.call(value); | |
} | |
} | |
/** | |
* notify action that the item processing has finished | |
*/ | |
public void onNext() { | |
if (this.onNextAction != null) { | |
this.onNextAction.call(value); | |
} | |
} | |
} | |
/** | |
* TimeBoundState manages the values and works on individual items for sequential release | |
* @param <T> | |
*/ | |
private static class TimeBoundState<T> { | |
Scheduler scheduler; | |
List<TimeBoundValue<T>> values = new ArrayList<>(); | |
boolean executing = false; | |
public TimeBoundState() { | |
this.scheduler = Schedulers.computation(); | |
} | |
public TimeBoundState(Scheduler scheduler) { | |
this.scheduler = scheduler; | |
} | |
/** | |
* Add a {@link TimeBoundValue} to the queue | |
* @param value {@link TimeBoundValue} | |
*/ | |
public void add(TimeBoundValue<T> value) { | |
values.add(value); | |
} | |
/** | |
* Execute and work on the based in the sequence they were added. | |
*/ | |
public void execute() { | |
final TimeBoundValue<T> value = values.remove(0); | |
if (!executing) { | |
value.onStart(); | |
} | |
executing = true; | |
Scheduler.Worker worker = scheduler.createWorker(); | |
worker.schedule(new Action0() { | |
@Override | |
public void call() { | |
value.onNext(); | |
if (values.size() > 0) { | |
values.get(0).onStart(); | |
execute(); | |
} else { | |
executing = false; | |
} | |
} | |
}, value.timeout, value.unit); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final Integer[] array = new Integer[]{1, 2, 3, 4, 5}; | |
final long time = System.currentTimeMillis(); | |
Observable | |
.from(array) | |
.lift(new TimeBoundOperator<Integer>(AndroidSchedulers.mainThread()) { | |
@Override | |
public long timeout(Integer integer) { | |
return 5; | |
} | |
@Override | |
public TimeUnit unit(Integer integer) { | |
return TimeUnit.SECONDS; | |
} | |
@Override | |
public void onWaitNext(Integer integer) { | |
//next item work has started | |
System.out.println("Item of type " + integer.getClass() + ":" + integer + " is waiting.. "); | |
} | |
}) | |
.subscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(new Observer<Integer>() { | |
@Override | |
public void onCompleted() { | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(Integer integer) { | |
//item loading has finished. | |
System.out.println("Item of type " + integer.getClass() + ":" + integer + " Time Taken: " + (System.currentTimeMillis() - time) / 1000); | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment