Skip to content

Instantly share code, notes, and snippets.

@Dykam
Last active August 29, 2015 14:06
Show Gist options
  • Select an option

  • Save Dykam/760c78312bee5d8d6919 to your computer and use it in GitHub Desktop.

Select an option

Save Dykam/760c78312bee5d8d6919 to your computer and use it in GitHub Desktop.
Event and Threading model proposal
A small non-complete example design and implementation of Observables/Tasks integrated into events.
It is inspired by a mix of classic Observables and .NET's Tasks/SynchronizationContext.
class SpongeObservable<T> implements Observable<T>, Callback<T> {
protected final List<Callback<T>> callbacks = new ArrayList<Callback<T>>();
public Observable<T> on(Callback<T> callback) {
callbacks.add(callback);
}
public Observable<T> on(Runnable runnable) {
on(new Callback<T>(T value) {
runnable.run();
});
}
public <S> Observable<S> map(final Fn<T, S> mapper) {
final Observable<S> observable = new SpongeObservable<S>();
on(new Callback<T>(T value) {
observable.call(mapper(value));
});
}
public Observable<T> sync(SynchronizationContext context) {
Observable<T> observable = SpongeSyncObservable<T>(context);
on(observable);
return observable;
}
public void call(T value) {
for(Callback<T> callback : callbacks) {
callback.call(value);
}
}
}
class SyncObservable<T> extends SpongeObservable {
SynchronizationContext context;
public SpongeSyncObservable(SynchronizationContext context) {
this.context = context;
}
@Override
public void call(T value) {
context.run(new Runnable() {
super.call(value);
});
}
}
class SingleFireObservable<T> extends SpongeObservable<T> {
@Override
public void call(T value) {
super.call(value);
callbacks.clear();
}
}
// Implementation is pseudo. Each method has to somehow queue itself on the thread or synchronize otherwise
class SpongeSynchronizationContextFactory {
// A context which runs synchronized with all the contexts provided
public SynchronizationContext multiple(SynchronizationContext... contexts) {
return new MultipleSynchronizationContext(contexts);
}
// A context which runs synchronized with all the worlds provided
public SynchronizationContext world(World... worlds) {
if(worlds.length == null) {
throw new IllegalArgumentException("At least one world has to be provided");
}
if(worlds.length == 1) {
return new WorldSynchronizationContext(world);
}
SynchronizationContext[] contexts = new SynchronizationContext[worlds.length];
for(int i = 0; i < worlds.length; i++) {
contexts[i] = new WorldSynchroniztionContext(worlds[i]));
}
return multiple(contexts);
}
// A context which runs synchronized with the main thread
public SynchronizationContext main() {
return new MainSynchronizationContext(Sponge.instance);
}
// A context running without any synchronization (threadpool)
public SynchronizationContext async() {
return new AsyncSynchronizationContext(Threadpool.instance);
}
}
class MultipleSynchronizationContext implements SynchronizationContext {
SynchronizationContext contexts;
public MultipleSynchronizationContext(SynchronizationContext... contexts) {
this.contexts = contexts;
}
public void run(Runnable runnable) {
Iterator<SynchronizationContext> contextIterator = new ArrayIterator(contexts).iterator();
new Runnable() {
if(iterator.hasNext()) {
iterator.next().runSync(this);
} else {
runnable.run();
}
}.run();
}
}
class WorldSynchroniztionContext implements SynchronizationContext {
// ... to be implemented
}
class MainSynchronizationContext implements SynchronizationContext {
// ... to be implemented
}
class AsyncSynchronizationContext implements SynchronizationContext {
// ... to be implemented
}
class SpongeGame {
// ...
Observable mainObservable = new SingleFireObservable();
// ...
public <T> Observable<T> schedule() {
return mainObservable;
}
// ...
}
// somewhere
@SubscribeEvent
public void playerTick(TickEvent.ServerTickEvent event) {
SpongeGame.instance.mainObservable.call(null);
}
interface Observable<T> {
Observable<T> on(Callback<T> callback);
Observable<T> on(Runnable runnable);
<S> Observable<S> map(Fn<T, S> mapper);
Observable<T> sync(SynchronizationContext context);
}
interface SynchronizationContextFactory {
// A context which runs synchronized with all the contexts provided
SynchronizationContext multiple(SynchronizationContext... contexts);
// A context which runs synchronized with all the worlds provided
SynchronizationContext world(World... worlds);
// A context which runs synchronized with the main thread
SynchronizationContext main();
// A context running without any synchronization (threadpool)
SynchronizationContext async();
}
interface SynchronizationContext {
// Run without blocking the callee. Generally this means queueing it
void runAsync(Runnable runnable);
// Run with blocking the callee.
// This can either be done by locking the callee and unlocking after the runnable ran
void runSync(Runnable runnable);
}
interface Game {
// ...
<T> Observable<T> scheduler();
// ...
}
/// Simple event
Observable<Vector> positions =
server.events.playerMove().order(Order.MONITOR)
.on(event ->Sponge.getLogger().log("Player moved to " + event.getPlayer().getPosition().toString()));
/// More complex example
Observable<Vector> positions =
server.events.playerMove().order(Order.MONITOR)
.map(event -> event.getPlayer().getPosition())
// Run async on some threadpool thread
.sync(Sponge.getSynchronizer().async())
.map(vector -> /* do complex slow math */)
// Ensure executed synchronized with someWorld's thread and the main thread
.sync(Sponge.getSynchronizer().multiple(Sponge.getSynchronizer().world(someWorld), Sponge.getSynchronizer().main()))
.on(vector -> {
// Requires main thread synchonicity
Vector otherVector = someGlobalVar.get(vector);
// Requires someWorld thread synchronicity
someWorld.getEntitiesNear(otherVector, 6);
}
});
// Run something async
Sponge.scheduler()
.sync(Sponge.getSynchronizer().async())
.on(() -> /* Do some heavy work */);
/// Simple event
Observable<Vector> positions =
server.events.playerMove().order(Order.MONITOR)
.on(new Callback<PlayerMoveEvent>() {
public void call(PlayerMoveEvent event) {
Sponge.getLogger().log("Player moved to " + event.getPlayer().getPosition().toString());
}
});
/// More complex example
Observable<Vector> positions =
server.events.playerMove().order(Order.MONITOR)
.map(new Fn<PlayerMoveEvent, Vector>() {
public Vector call(PlayerMoveEvent event) {
return event.getPlayer().getPosition();
}
})
// Run async on some threadpool thread
.sync(Sponge.getSynchronizer().async())
.map(new Fn<Vector, Vector>() {
public Vector call(Vector vector) {
return /* do complex slow math */;
}
})
// Ensure executed synchronized with someWorld's thread and the main thread
.sync(Sponge.getSynchronizer().multiple(Sponge.getSynchronizer().world(someWorld), Sponge.getSynchronizer().main()))
.on(new Callback<Vector>() {
public void call(Vector vector) {
// Requires main thread synchonicity
Vector otherVector = someGlobalVar.get(vector);
// Requires someWorld thread synchronicity
someWorld.getEntitiesNear(otherVector, 6);
}
});
// Run something async
Sponge.scheduler()
.sync(Sponge.getSynchronizer().async())
.on(new Runnable() {
// Do some heavy work
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment