Last active
October 25, 2020 00:08
-
-
Save MelbourneDeveloper/dc5f04782e54e20d5ef268b47d6adc08 to your computer and use it in GitHub Desktop.
Use a hot observable to share
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Output | |
Name: One Message: Hi | |
Name: Two Message: Hi | |
Name: One Message: Hi | |
Name: Two Message: Hi | |
Name: One Message: Hi | |
Name: Two Message: Hi | |
Name: One Message: Hi | |
Name: Two Message: Hi | |
*/ | |
using Microsoft.VisualStudio.TestTools.UnitTesting; | |
using System; | |
using System.Diagnostics; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Observables | |
{ | |
class Subscriber | |
{ | |
public string Name; | |
//Listen for OnNext and write to the debug window when it happens | |
public Subscriber(IObservable<string> observable, string name) | |
{ | |
Name = name; | |
observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}")); | |
} | |
} | |
[TestClass] | |
public class UnitTest1 | |
{ | |
static string GetData() => "Hi"; | |
[TestMethod] | |
public async Task Messaging() | |
{ | |
var cancellationSource = new CancellationTokenSource(); | |
var cancellationToken = cancellationSource.Token; | |
var coldObservable = Observable.Create<string>(observer => | |
{ | |
_ = Task.Run(async () => | |
{ | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
var data = GetData(); | |
observer.OnNext(data); | |
await Task.Delay(1000); | |
} | |
}, cancellationToken); | |
return Disposable.Empty; | |
}); | |
var publisher = coldObservable.Publish(); | |
var connection = publisher.Connect(); | |
new Subscriber(publisher, "One"); | |
new Subscriber(publisher, "Two"); | |
for (var i = 0; i < 5; i++) | |
{ | |
if (i == 4) | |
{ | |
cancellationSource.Cancel(); | |
} | |
await Task.Delay(1000); | |
} | |
connection.Dispose(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment