Skip to content

Instantly share code, notes, and snippets.

@fschwiet
Last active September 28, 2016 19:21
Show Gist options
  • Save fschwiet/05ffd75570755688cafff7cf3e2c62e6 to your computer and use it in GitHub Desktop.
Save fschwiet/05ffd75570755688cafff7cf3e2c62e6 to your computer and use it in GitHub Desktop.
Experimenting with Reactive schedulers
Experiment code:
var emitter = new Subject<string>();
var schedulers = new[]
{
new KeyValuePair<string, IScheduler>("EventLoop", new EventLoopScheduler()),
new KeyValuePair<string, IScheduler>("RxApp.MainThreadScheduler", RxApp.MainThreadScheduler),
new KeyValuePair<string, IScheduler>("RxApp.TaskpoolScheduler", RxApp.TaskpoolScheduler),
};
var interval = 5;
foreach (var scheduler in schedulers)
{
interval += 3;
Observable.Timer(TimeSpan.FromSeconds(interval), scheduler.Value)
.Select(l => "event from " + scheduler.Key)
.Subscribe(l =>
{
Debug.WriteLine("");
trace("**** emit begin", l, null);
emitter.OnNext(l);
trace("**** emit blocking", l, null);
Task.Delay(TimeSpan.FromSeconds(1)).Wait(); // Wait giving a chance for re-entrancy
trace("**** emit exiting", l, null);
});
}
emitter.Subscribe(l =>
{
trace("received", l, "immediate");
});
foreach (var scheduler in schedulers)
{
emitter.ObserveOn(scheduler.Value).Subscribe(l =>
{
trace("received", l, "ObservedOn() " + scheduler.Key);
});
}
emitter.ObserveOn(Scheduler.CurrentThread).Subscribe(l =>
{
trace("received", l, "ObservedOn() Schedular.CurrentThread");
});
foreach (var scheduler in schedulers)
{
emitter.Subscribe(l =>
{
scheduler.Value.Schedule(() =>
{
trace("received", l, "Schedule()d by " + scheduler.Key);
});
});
}
foreach (var scheduler in schedulers)
{
emitter.Delay(TimeSpan.FromSeconds(0.0), scheduler.Value).Subscribe(l =>
{
trace("received", l, "Delayed 0s subscription on " + scheduler.Key);
});
emitter.Delay(TimeSpan.FromSeconds(0.25), scheduler.Value).Subscribe(l =>
{
trace("received", l, "Delayed 0.25s subscription on " + scheduler.Key);
});
}
emitter.Subscribe(l => Task.Factory.StartNew(() =>
{
trace("received", l, "LongRunning task");
}, TaskCreationOptions.LongRunning));
Experiment output:
[0:]
[0:] thread #30 **** emit begin event from EventLoop
[0:] thread #30 received event from EventLoop immediate
Thread started: #32
[0:] thread #33 received event from EventLoop ObservedOn() RxApp.TaskpoolScheduler
[0:] thread #01 received event from EventLoop ObservedOn() RxApp.MainThreadScheduler
[0:] thread #30 received event from EventLoop ObservedOn() Schedular.CurrentThread
[0:] thread #09 received event from EventLoop Schedule()d by RxApp.TaskpoolScheduler
[0:] thread #01 received event from EventLoop Schedule()d by RxApp.MainThreadScheduler
Thread started: #33
[0:] thread #30 **** emit blocking event from EventLoop
[0:] thread #34 received event from EventLoop LongRunning task
Thread finished: #33
[0:] thread #31 received event from EventLoop Delayed 0s subscription on RxApp.TaskpoolScheduler
[0:] thread #01 received event from EventLoop Delayed 0s subscription on RxApp.MainThreadScheduler
The thread 'Unknown' (0x21) has exited with code 0 (0x0).
[0:] thread #01 received event from EventLoop Delayed 0.25s subscription on RxApp.MainThreadScheduler
[0:] thread #32 received event from EventLoop Delayed 0.25s subscription on RxApp.TaskpoolScheduler
[0:] thread #30 **** emit exiting event from EventLoop
[0:] thread #30 received event from EventLoop ObservedOn() EventLoop
[0:] thread #30 received event from EventLoop Schedule()d by EventLoop
[0:] thread #30 received event from EventLoop Delayed 0s subscription on EventLoop
[0:] thread #30 received event from EventLoop Delayed 0.25s subscription on EventLoop
[0:]
[0:] thread #01 **** emit begin event from RxApp.MainThreadScheduler
[0:] thread #01 received event from RxApp.MainThreadScheduler immediate
[0:] thread #30 received event from RxApp.MainThreadScheduler ObservedOn() EventLoop
[0:] thread #01 received event from RxApp.MainThreadScheduler ObservedOn() RxApp.MainThreadScheduler
[0:] thread #33 received event from RxApp.MainThreadScheduler ObservedOn() RxApp.TaskpoolScheduler
[0:] thread #01 received event from RxApp.MainThreadScheduler ObservedOn() Schedular.CurrentThread
[0:] thread #30 received event from RxApp.MainThreadScheduler Schedule()d by EventLoop
[0:] thread #01 received event from RxApp.MainThreadScheduler Schedule()d by RxApp.MainThreadScheduler
[0:] thread #16 received event from RxApp.MainThreadScheduler Schedule()d by RxApp.TaskpoolScheduler
[0:] thread #30 received event from RxApp.MainThreadScheduler Delayed 0s subscription on EventLoop
[0:] thread #31 received event from RxApp.MainThreadScheduler Delayed 0s subscription on RxApp.TaskpoolScheduler
Thread started: #34
[0:] thread #01 **** emit blocking event from RxApp.MainThreadScheduler
[0:] thread #35 received event from RxApp.MainThreadScheduler LongRunning task
Thread finished: #34
The thread 'Unknown' (0x22) has exited with code 0 (0x0).
[0:] thread #30 received event from RxApp.MainThreadScheduler Delayed 0.25s subscription on EventLoop
[0:] thread #32 received event from RxApp.MainThreadScheduler Delayed 0.25s subscription on RxApp.TaskpoolScheduler
[0:] thread #01 **** emit exiting event from RxApp.MainThreadScheduler
[0:] thread #01 received event from RxApp.MainThreadScheduler Delayed 0s subscription on RxApp.MainThreadScheduler
[0:] thread #01 received event from RxApp.MainThreadScheduler Delayed 0.25s subscription on RxApp.MainThreadScheduler
[0:]
[0:] thread #05 **** emit begin event from RxApp.TaskpoolScheduler
[0:] thread #05 received event from RxApp.TaskpoolScheduler immediate
[0:] thread #30 received event from RxApp.TaskpoolScheduler ObservedOn() EventLoop
[0:] thread #33 received event from RxApp.TaskpoolScheduler ObservedOn() RxApp.TaskpoolScheduler
[0:] thread #01 received event from RxApp.TaskpoolScheduler ObservedOn() RxApp.MainThreadScheduler
[0:] thread #05 received event from RxApp.TaskpoolScheduler ObservedOn() Schedular.CurrentThread
[0:] thread #30 received event from RxApp.TaskpoolScheduler Schedule()d by EventLoop
[0:] thread #01 received event from RxApp.TaskpoolScheduler Schedule()d by RxApp.MainThreadScheduler
[0:] thread #16 received event from RxApp.TaskpoolScheduler Schedule()d by RxApp.TaskpoolScheduler
[0:] thread #30 received event from RxApp.TaskpoolScheduler Delayed 0s subscription on EventLoop
[0:] thread #31 received event from RxApp.TaskpoolScheduler Delayed 0s subscription on RxApp.TaskpoolScheduler
Thread started: #35
[0:] thread #01 received event from RxApp.TaskpoolScheduler Delayed 0s subscription on RxApp.MainThreadScheduler
[0:] thread #05 **** emit blocking event from RxApp.TaskpoolScheduler
[0:] thread #36 received event from RxApp.TaskpoolScheduler LongRunning task
Thread finished: #35
[0:] thread #30 received event from RxApp.TaskpoolScheduler Delayed 0.25s subscription on EventLoop
The thread 'Unknown' (0x23) has exited with code 0 (0x0).
[0:] thread #32 received event from RxApp.TaskpoolScheduler Delayed 0.25s subscription on RxApp.TaskpoolScheduler
[0:] thread #01 received event from RxApp.TaskpoolScheduler Delayed 0.25s subscription on RxApp.MainThreadScheduler
[0:] thread #05 **** emit exiting event from RxApp.TaskpoolScheduler
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment