Skip to content

Instantly share code, notes, and snippets.

@takemyoxygen
Last active August 29, 2015 14:01
Show Gist options
  • Save takemyoxygen/ed3a5f059942742c54a6 to your computer and use it in GitHub Desktop.
Save takemyoxygen/ed3a5f059942742c54a6 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace RxSample
{
static class Program
{
static void Main(string[] args)
{
var xs = new Subject<Tuple<string, int>>();
var ys = new Subject<Tuple<string, string>>();
Console.WriteLine("Type [key] [value] to add or \"quit\" to quit :)");
xs
.CombineByKey(ys)
.Subscribe(t => Console.WriteLine("Key: {0}, {1}-{2}", t.Item1, t.Item2, t.Item3));
var e = new ManualResetEvent(false);
Input()
.ToObservable()
.Subscribe(
s =>
{
var splitted = s.Split();
int x;
if (int.TryParse(splitted[1], out x))
{
xs.OnNext(Tuple.Create(splitted[0], x));
}
else
{
ys.OnNext(Tuple.Create(splitted[0], splitted[1]));
}
},
() => e.Set()
);
}
public static IEnumerable<string> Input()
{
while (true)
{
var line = Console.ReadLine();
if (line.ToLower() == "quit") yield break;
yield return line;
}
}
public static IObservable<Tuple<TKey, T1, T2>> CombineByKey<TKey, T1, T2>(
this IObservable<Tuple<TKey, T1>> xs,
IObservable<Tuple<TKey, T2>> ys)
{
return Observable.Create<Tuple<TKey, T1, T2>>(observer =>
{
var dx = new Dictionary<TKey, T1>();
var dy = new Dictionary<TKey, T2>();
Action<TKey> tryPublish = key =>
{
T1 x;
T2 y;
if (dx.TryGetValue(key, out x) && dy.TryGetValue(key, out y))
{
observer.OnNext(Tuple.Create(key, x, y));
}
};
var subscription = new CompositeDisposable(3)
{
xs.Subscribe(
tx =>
{
dx[tx.Item1] = tx.Item2;
tryPublish(tx.Item1);
},
observer.OnError,
observer.OnCompleted),
ys.Subscribe(
ty =>
{
dy[ty.Item1] = ty.Item2;
tryPublish(ty.Item1);
},
observer.OnError,
observer.OnCompleted),
Disposable.Create(() =>
{
dx.Clear();
dy.Clear();
})
};
return subscription;
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment