Last active
May 6, 2017 07:23
-
-
Save stephanenicolas/3d0674cca56f0acf327ae2e3e6bf5a72 to your computer and use it in GitHub Desktop.
Updates an adapter concurrently using Rx-managed Redux
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Maybe; | |
import io.reactivex.MaybeSource; | |
import io.reactivex.Observable; | |
import io.reactivex.ObservableTransformer; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.functions.Consumer; | |
import io.reactivex.functions.Function; | |
import io.reactivex.subjects.BehaviorSubject; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import rx.exceptions.OnErrorNotImplementedException; | |
import static io.reactivex.Observable.merge; | |
import static io.reactivex.schedulers.Schedulers.computation; | |
import static io.reactivex.schedulers.Schedulers.single; | |
public class Main { | |
public static void main(String[] args) { | |
final int controllerCount = 10; | |
FeatureController[] featureControllers = createControllers(controllerCount); | |
Adapter adapter = new Adapter(); | |
Observable<Long> storeObservable = createStore(); | |
storeObservable.doOnSubscribe(registerFeatureControllers(adapter, featureControllers)) | |
.compose(buildItemsConcurrently(featureControllers)) | |
.subscribeOn(computation()) | |
.observeOn(single()) | |
.subscribe(updateItems(adapter),// | |
Main::crash); | |
sleep(10000); | |
} | |
public static class FeatureController { | |
private int id; | |
public FeatureController(int id) { | |
this.id = id; | |
} | |
//can return null if there is no update to be performed | |
public List<Object> updateItemList(long bigModel) { | |
return Arrays.asList(bigModel * 2 + id, bigModel * 2 + id + 1); | |
} | |
} | |
public static class FeatureItemsUpdateEvent { | |
public int featureControllerId; | |
public List<Object> items; | |
public FeatureItemsUpdateEvent(int featureControllerId, List<Object> items) { | |
this.featureControllerId = featureControllerId; | |
this.items = items; | |
} | |
} | |
public static class Adapter { | |
List<List<Object>> itemList = new ArrayList<>(); | |
public void registerFeatureControllers(FeatureController... featureControllers) { | |
while (itemList.size() < featureControllers.length) { | |
itemList.add(null); | |
} | |
} | |
public void updateFeatureItems(int featureControllerId, List<Object> items) { | |
itemList.set(featureControllerId, items); | |
display(); | |
} | |
public void display() { | |
for (int featureControllerId = 0; featureControllerId < itemList.size(); featureControllerId++) { | |
List<Object> items = itemList.get(featureControllerId); | |
System.out.println(featureControllerId + " - " + items); | |
} | |
System.out.println(); | |
} | |
} | |
private static Consumer<? super Disposable> registerFeatureControllers(Adapter adapter, FeatureController... featureControllers) { | |
return ignore -> adapter.registerFeatureControllers(featureControllers); | |
} | |
private static ObservableTransformer<Long, FeatureItemsUpdateEvent> buildItemsConcurrently(FeatureController... featureControllers) { | |
return source -> { | |
List<Function<Long, MaybeSource<FeatureItemsUpdateEvent>>> list = new ArrayList<>(); | |
for (FeatureController featureController : featureControllers) { | |
list.add(toFeatureItemsUpdateEventObservable(featureController)); | |
} | |
Function<Long, MaybeSource<FeatureItemsUpdateEvent>>[] transformers = list.toArray(new Function[0]); | |
return source.compose(composeConcurrently(transformers)); | |
}; | |
} | |
private static <T, R> ObservableTransformer<T, R> composeConcurrently(Function<? super T, ? extends MaybeSource<R>>... transformers) { | |
return source -> source.publish(shared -> { | |
List<Observable<R>> list = new ArrayList<>(); | |
for (Function<? super T, ? extends MaybeSource<R>> transformer : transformers) { | |
list.add(shared.flatMapMaybe(transformer)); | |
} | |
return merge(list); | |
}); | |
} | |
private static Function<Long, MaybeSource<FeatureItemsUpdateEvent>> toFeatureItemsUpdateEventObservable(FeatureController featureController) { | |
return bigModel -> justFeatureItemsUpdateEvent(bigModel, featureController); | |
} | |
private static Maybe<FeatureItemsUpdateEvent> justFeatureItemsUpdateEvent(Long bigModel, FeatureController featureController) { | |
return Maybe // | |
.just(featureController.updateItemList(bigModel)) // | |
.filter(it -> it != null) // | |
.map(it -> new FeatureItemsUpdateEvent(featureController.id, it)); | |
} | |
private static Consumer<FeatureItemsUpdateEvent> updateItems(Adapter adapter) { | |
return event -> adapter.updateFeatureItems(event.featureControllerId, event.items); | |
} | |
private static Observable<Long> createStore() { | |
return BehaviorSubject.interval(1, TimeUnit.SECONDS); | |
} | |
private static FeatureController[] createControllers(int controllerCount) { | |
FeatureController[] featureControllers = new FeatureController[controllerCount]; | |
for (int controllerIndex = 0; controllerIndex < controllerCount; controllerIndex++) { | |
featureControllers[controllerIndex] = new FeatureController(controllerIndex); | |
} | |
return featureControllers; | |
} | |
private static void crash(Throwable error) { | |
throw new OnErrorNotImplementedException(error); | |
} | |
private static void sleep(int millis) { | |
try { | |
Thread.sleep(millis); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment