Last active
December 17, 2020 10:09
-
-
Save makomweb/8529192 to your computer and use it in GitHub Desktop.
Rx playground. Create a new Rx operator which can be used for processing data. It is parametrizable with a scheduler.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Threading.Tasks; | |
using FluentAssertions; | |
using NUnit.Framework; | |
namespace Tests | |
{ | |
public class RxPlayground | |
{ | |
[Datapoint] | |
public IScheduler Current; | |
[Datapoint] | |
public IScheduler Default; | |
[Datapoint] | |
public IScheduler Immediate; | |
[Datapoint] | |
public IScheduler TaskPool; | |
[Datapoint] | |
public IScheduler NewThread; | |
public RxPlayground() | |
{ | |
Current = Scheduler.CurrentThread; | |
Default = Scheduler.Default; | |
Immediate = Scheduler.Immediate; | |
TaskPool = TaskPoolScheduler.Default; | |
NewThread = NewThreadScheduler.Default; | |
} | |
[Theory] | |
public async Task Process_using_scheduler(IScheduler scheduler) | |
{ | |
var sut = new ReplaySubject<string>(); | |
sut.OnNext("message"); | |
var result = await sut.Process(scheduler).FirstAsync(); | |
result.Should().Be("message + processed!"); | |
} | |
[Theory] | |
public async Task Process_async_using_scheduler(IScheduler scheduler) | |
{ | |
var sut = new ReplaySubject<string>(); | |
sut.OnNext("message"); | |
var result = await sut.ProcessAsync(scheduler).FirstAsync(); | |
result.Should().Be("message + processed!"); | |
} | |
} | |
public static class MyRxOperator | |
{ | |
public static IObservable<string> Process(this IObservable<string> source) | |
{ | |
return Process(source, Scheduler.Immediate); | |
} | |
public static IObservable<string> Process(this IObservable<string> source, IScheduler scheduler) | |
{ | |
return Observable.Create<string>(observer => | |
source.Subscribe(x => | |
scheduler.Schedule(() => | |
observer.OnNext(Process(x)) | |
) | |
) | |
); | |
} | |
public static IObservable<string> ProcessAsync(this IObservable<string> source, IScheduler scheduler) | |
{ | |
return Observable.Create<string>(observer => | |
source.Subscribe(x => | |
scheduler.Schedule(async () => | |
{ | |
try | |
{ | |
var result = await ProcessAsync(x); | |
observer.OnNext(result); | |
} | |
catch (Exception ex) | |
{ | |
observer.OnError(ex); | |
} | |
}), | |
observer.OnError, | |
observer.OnCompleted) | |
); | |
} | |
private static string Process(string data) | |
{ | |
return data + " + processed!"; | |
} | |
private static Task<string> ProcessAsync(string data) | |
{ | |
return Task.Factory.StartNew(() => Process(data)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment