Last active
July 18, 2018 22:05
-
-
Save cwharris/5dbb4e4971505dc9a53c71b34217d6f7 to your computer and use it in GitHub Desktop.
Rx Grouping by continuity of a key
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Linq; | |
namespace ConsoleApplication1 | |
{ | |
internal class Program | |
{ | |
public static void Main(string[] args) | |
{ | |
var updates = Observable.Return(new Dictionary<string, string>()); | |
var continuousGroupings = | |
updates.GroupValuesByContinuity( | |
x => x.Key, | |
x => x.Value | |
); | |
} | |
} | |
public static class ObservableExtensions | |
{ | |
public static IObservable<IGroupedObservable<TKey, TValue>> GroupValuesByContinuity<T, TKey, TValue>( | |
this IObservable<ICollection<T>> source, | |
Func<T, TKey> keySelector, | |
Func<T, TValue> valueSelector | |
) | |
{ | |
return source | |
.Aggregate( | |
ContinuityTrackingCollection<TKey, TValue>.Seed, | |
Aggregate | |
) | |
.SelectMany(ToContinuityIndications) | |
.GroupByUntil( | |
x => x.Key, | |
g => g.Where( | |
x => x.Exists == false | |
) | |
) | |
.SelectMany( | |
g => g.GroupBy( | |
x => g.Key, | |
x => x.Value | |
) | |
); | |
ContinuityTrackingCollection<TKey, TValue> Aggregate( | |
ContinuityTrackingCollection<TKey, TValue> prev, | |
ICollection<T> latest | |
) | |
{ | |
var removedItems = | |
latest | |
.Select(keySelector) | |
.Where(key => prev.Items.Keys.Contains(key) == false); | |
var items = | |
latest | |
.ToDictionary( | |
keySelector, | |
valueSelector | |
); | |
return new ContinuityTrackingCollection<TKey, TValue>(items, removedItems); | |
} | |
} | |
private static IEnumerable<ContinuityIndication<TKey, TValue>> ToContinuityIndications<TKey, TValue>( | |
ContinuityTrackingCollection<TKey, TValue> continuityTrackingCollection | |
) | |
{ | |
foreach (var key in continuityTrackingCollection.RemovedKeys) | |
{ | |
yield return ContinuityIndication<TKey, TValue>.WithoutValue(key); | |
} | |
foreach (var value in continuityTrackingCollection.Items) | |
{ | |
yield return ContinuityIndication<TKey, TValue>.WithValue(value.Key, value.Value); | |
} | |
} | |
private class ContinuityTrackingCollection<TKey, TValue> | |
{ | |
public IDictionary<TKey, TValue> Items { get; } | |
public HashSet<TKey> RemovedKeys { get; } | |
public ContinuityTrackingCollection( | |
IDictionary<TKey, TValue> items, | |
IEnumerable<TKey> removedKeys | |
) | |
{ | |
Items = items; | |
RemovedKeys = new HashSet<TKey>(removedKeys); | |
} | |
public static ContinuityTrackingCollection<TKey, TValue> Seed => | |
new ContinuityTrackingCollection<TKey, TValue>( | |
new Dictionary<TKey, TValue>(), | |
new List<TKey>() | |
); | |
} | |
private class ContinuityIndication<TKey, T> | |
{ | |
public TKey Key { get; } | |
public T Value { get; } | |
public bool Exists { get; } | |
private ContinuityIndication(TKey key, T value, bool exists) | |
{ | |
Key = key; | |
Value = value; | |
Exists = exists; | |
} | |
public static ContinuityIndication<TKey, T> WithValue(TKey key, T value) | |
{ | |
return new ContinuityIndication<TKey, T>(key, value, true); | |
} | |
public static ContinuityIndication<TKey, T> WithoutValue(TKey key) | |
{ | |
return new ContinuityIndication<TKey, T>(key, default(T), false); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment