Last active
August 29, 2015 14:06
-
-
Save Dykam/760c78312bee5d8d6919 to your computer and use it in GitHub Desktop.
Event and Threading model proposal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| A small non-complete example design and implementation of Observables/Tasks integrated into events. | |
| It is inspired by a mix of classic Observables and .NET's Tasks/SynchronizationContext. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class SpongeObservable<T> implements Observable<T>, Callback<T> { | |
| protected final List<Callback<T>> callbacks = new ArrayList<Callback<T>>(); | |
| public Observable<T> on(Callback<T> callback) { | |
| callbacks.add(callback); | |
| } | |
| public Observable<T> on(Runnable runnable) { | |
| on(new Callback<T>(T value) { | |
| runnable.run(); | |
| }); | |
| } | |
| public <S> Observable<S> map(final Fn<T, S> mapper) { | |
| final Observable<S> observable = new SpongeObservable<S>(); | |
| on(new Callback<T>(T value) { | |
| observable.call(mapper(value)); | |
| }); | |
| } | |
| public Observable<T> sync(SynchronizationContext context) { | |
| Observable<T> observable = SpongeSyncObservable<T>(context); | |
| on(observable); | |
| return observable; | |
| } | |
| public void call(T value) { | |
| for(Callback<T> callback : callbacks) { | |
| callback.call(value); | |
| } | |
| } | |
| } | |
| class SyncObservable<T> extends SpongeObservable { | |
| SynchronizationContext context; | |
| public SpongeSyncObservable(SynchronizationContext context) { | |
| this.context = context; | |
| } | |
| @Override | |
| public void call(T value) { | |
| context.run(new Runnable() { | |
| super.call(value); | |
| }); | |
| } | |
| } | |
| class SingleFireObservable<T> extends SpongeObservable<T> { | |
| @Override | |
| public void call(T value) { | |
| super.call(value); | |
| callbacks.clear(); | |
| } | |
| } | |
| // Implementation is pseudo. Each method has to somehow queue itself on the thread or synchronize otherwise | |
| class SpongeSynchronizationContextFactory { | |
| // A context which runs synchronized with all the contexts provided | |
| public SynchronizationContext multiple(SynchronizationContext... contexts) { | |
| return new MultipleSynchronizationContext(contexts); | |
| } | |
| // A context which runs synchronized with all the worlds provided | |
| public SynchronizationContext world(World... worlds) { | |
| if(worlds.length == null) { | |
| throw new IllegalArgumentException("At least one world has to be provided"); | |
| } | |
| if(worlds.length == 1) { | |
| return new WorldSynchronizationContext(world); | |
| } | |
| SynchronizationContext[] contexts = new SynchronizationContext[worlds.length]; | |
| for(int i = 0; i < worlds.length; i++) { | |
| contexts[i] = new WorldSynchroniztionContext(worlds[i])); | |
| } | |
| return multiple(contexts); | |
| } | |
| // A context which runs synchronized with the main thread | |
| public SynchronizationContext main() { | |
| return new MainSynchronizationContext(Sponge.instance); | |
| } | |
| // A context running without any synchronization (threadpool) | |
| public SynchronizationContext async() { | |
| return new AsyncSynchronizationContext(Threadpool.instance); | |
| } | |
| } | |
| class MultipleSynchronizationContext implements SynchronizationContext { | |
| SynchronizationContext contexts; | |
| public MultipleSynchronizationContext(SynchronizationContext... contexts) { | |
| this.contexts = contexts; | |
| } | |
| public void run(Runnable runnable) { | |
| Iterator<SynchronizationContext> contextIterator = new ArrayIterator(contexts).iterator(); | |
| new Runnable() { | |
| if(iterator.hasNext()) { | |
| iterator.next().runSync(this); | |
| } else { | |
| runnable.run(); | |
| } | |
| }.run(); | |
| } | |
| } | |
| class WorldSynchroniztionContext implements SynchronizationContext { | |
| // ... to be implemented | |
| } | |
| class MainSynchronizationContext implements SynchronizationContext { | |
| // ... to be implemented | |
| } | |
| class AsyncSynchronizationContext implements SynchronizationContext { | |
| // ... to be implemented | |
| } | |
| class SpongeGame { | |
| // ... | |
| Observable mainObservable = new SingleFireObservable(); | |
| // ... | |
| public <T> Observable<T> schedule() { | |
| return mainObservable; | |
| } | |
| // ... | |
| } | |
| // somewhere | |
| @SubscribeEvent | |
| public void playerTick(TickEvent.ServerTickEvent event) { | |
| SpongeGame.instance.mainObservable.call(null); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| interface Observable<T> { | |
| Observable<T> on(Callback<T> callback); | |
| Observable<T> on(Runnable runnable); | |
| <S> Observable<S> map(Fn<T, S> mapper); | |
| Observable<T> sync(SynchronizationContext context); | |
| } | |
| interface SynchronizationContextFactory { | |
| // A context which runs synchronized with all the contexts provided | |
| SynchronizationContext multiple(SynchronizationContext... contexts); | |
| // A context which runs synchronized with all the worlds provided | |
| SynchronizationContext world(World... worlds); | |
| // A context which runs synchronized with the main thread | |
| SynchronizationContext main(); | |
| // A context running without any synchronization (threadpool) | |
| SynchronizationContext async(); | |
| } | |
| interface SynchronizationContext { | |
| // Run without blocking the callee. Generally this means queueing it | |
| void runAsync(Runnable runnable); | |
| // Run with blocking the callee. | |
| // This can either be done by locking the callee and unlocking after the runnable ran | |
| void runSync(Runnable runnable); | |
| } | |
| interface Game { | |
| // ... | |
| <T> Observable<T> scheduler(); | |
| // ... | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /// Simple event | |
| Observable<Vector> positions = | |
| server.events.playerMove().order(Order.MONITOR) | |
| .on(event ->Sponge.getLogger().log("Player moved to " + event.getPlayer().getPosition().toString())); | |
| /// More complex example | |
| Observable<Vector> positions = | |
| server.events.playerMove().order(Order.MONITOR) | |
| .map(event -> event.getPlayer().getPosition()) | |
| // Run async on some threadpool thread | |
| .sync(Sponge.getSynchronizer().async()) | |
| .map(vector -> /* do complex slow math */) | |
| // Ensure executed synchronized with someWorld's thread and the main thread | |
| .sync(Sponge.getSynchronizer().multiple(Sponge.getSynchronizer().world(someWorld), Sponge.getSynchronizer().main())) | |
| .on(vector -> { | |
| // Requires main thread synchonicity | |
| Vector otherVector = someGlobalVar.get(vector); | |
| // Requires someWorld thread synchronicity | |
| someWorld.getEntitiesNear(otherVector, 6); | |
| } | |
| }); | |
| // Run something async | |
| Sponge.scheduler() | |
| .sync(Sponge.getSynchronizer().async()) | |
| .on(() -> /* Do some heavy work */); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /// Simple event | |
| Observable<Vector> positions = | |
| server.events.playerMove().order(Order.MONITOR) | |
| .on(new Callback<PlayerMoveEvent>() { | |
| public void call(PlayerMoveEvent event) { | |
| Sponge.getLogger().log("Player moved to " + event.getPlayer().getPosition().toString()); | |
| } | |
| }); | |
| /// More complex example | |
| Observable<Vector> positions = | |
| server.events.playerMove().order(Order.MONITOR) | |
| .map(new Fn<PlayerMoveEvent, Vector>() { | |
| public Vector call(PlayerMoveEvent event) { | |
| return event.getPlayer().getPosition(); | |
| } | |
| }) | |
| // Run async on some threadpool thread | |
| .sync(Sponge.getSynchronizer().async()) | |
| .map(new Fn<Vector, Vector>() { | |
| public Vector call(Vector vector) { | |
| return /* do complex slow math */; | |
| } | |
| }) | |
| // Ensure executed synchronized with someWorld's thread and the main thread | |
| .sync(Sponge.getSynchronizer().multiple(Sponge.getSynchronizer().world(someWorld), Sponge.getSynchronizer().main())) | |
| .on(new Callback<Vector>() { | |
| public void call(Vector vector) { | |
| // Requires main thread synchonicity | |
| Vector otherVector = someGlobalVar.get(vector); | |
| // Requires someWorld thread synchronicity | |
| someWorld.getEntitiesNear(otherVector, 6); | |
| } | |
| }); | |
| // Run something async | |
| Sponge.scheduler() | |
| .sync(Sponge.getSynchronizer().async()) | |
| .on(new Runnable() { | |
| // Do some heavy work | |
| }); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment