Skip to content

Instantly share code, notes, and snippets.

@Dorus
Last active December 8, 2016 12:33
Show Gist options
  • Save Dorus/69988d58dce7d6a979025656f68c72be to your computer and use it in GitHub Desktop.
Save Dorus/69988d58dce7d6a979025656f68c72be to your computer and use it in GitHub Desktop.
GroupCache
public sealed class GroupCache<TSource, TKey> : IConnectableObservable<TSource>
{
private IConnectableObservable<TSource> _souce;
private IObservable<TSource> _souceGrouped;
public GroupCache(IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
_souce = source.Publish();
var sub = new ReplaySubject<IObservable<TSource>>();
_souce
.GroupBy(keySelector)
.Select(group =>
{
var s = new ReplaySubject<TSource>(1);
group.Subscribe(s);
return s.AsObservable();
}).Subscribe(sub);
_souceGrouped = sub.Merge();
}
public IDisposable Connect()
{
return _souce.Connect();
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
return _souceGrouped.Subscribe(observer);
}
}
public static class myExtension
{
//
// Summary:
// Caches the latest item per group.
//
public static IConnectableObservable<TSource> GroupedCache<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
return new GroupCache<TSource, TKey>(source, keySelector);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment