public static clas DynamicDataJoinEx
/// <summary>
/// Joins the left and right observable data sources, combining the content into a single
/// </summary>
/// <typeparam name="TLeft">The object type of the left datasource</typeparam>
/// <typeparam name="TLeftKey">The key type of the left datasource</typeparam>
/// <typeparam name="TRight">The object type of the right datasource</typeparam>
/// <typeparam name="TRightKey">The key type of the right datasource</typeparam>
/// <typeparam name="TDestination">The resulting object which </typeparam>
/// <param name="left">The left data source</param>
/// <param name="right">The right data source.</param>
/// <param name="rightKeySelector">Specify the foreign key on the right datasource</param>
/// <param name="resultSelector">The result selector.used to transform the combined data into. Example (left,right) => new CustomObject(left, right)</param>
/// <returns></returns>
/// <exception cref="System.ArgumentNullException"></exception>
public static IObservable<IChangeSet<TDestination, TLeftKey>> JoinOne<TLeft, TLeftKey, TRight, TRightKey, TDestination>(this IObservable<IChangeSet<TLeft, TLeftKey>> left,
[NotNull] IObservable<IChangeSet<TRight, TRightKey>> right,
[NotNull] Func<TRight, TLeftKey> rightKeySelector,
[NotNull] Func<TLeft, Optional<TRight>, TDestination> resultSelector)
if (right == null) throw new ArgumentNullException(nameof(right));
return new JoinOne<TLeft, TLeftKey, TRight, TRightKey, TDestination>(left, right, rightKeySelector, resultSelector).Run();
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Kernel;
namespace DynamicData.Cache.Internal
internal class JoinOne<TLeft, TLeftKey, TRight, TRightKey, TDestination>
private readonly IObservable<IChangeSet<TLeft, TLeftKey>> _left;
private readonly IObservable<IChangeSet<TRight, TRightKey>> _right;
private readonly Func<TRight, TLeftKey> _rightKeySelector;
private readonly Func<TLeft, Optional<TRight>, TDestination> _resultSelector;
public JoinOne(IObservable<IChangeSet<TLeft, TLeftKey>> left,
IObservable<IChangeSet<TRight, TRightKey>> right,
Func<TRight, TLeftKey> rightKeySelector,
Func<TLeft, Optional<TRight>, TDestination> resultSelector)
if (left == null) throw new ArgumentNullException(nameof(left));
if (right == null) throw new ArgumentNullException(nameof(right));
if (rightKeySelector == null) throw new ArgumentNullException(nameof(rightKeySelector));
if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector));
_left = left;
_right = right;
_rightKeySelector = rightKeySelector;
_resultSelector = resultSelector;
public IObservable<IChangeSet<TDestination, TLeftKey>> Run()
return Observable.Create<IChangeSet<TDestination, TLeftKey>>(observer =>
var locker = new object();
//create local backing stores
var leftCache = _left.Synchronize(locker).AsObservableCache();
var rightCache = _right.Synchronize(locker).ChangeKey(_rightKeySelector).AsObservableCache();
//joined is the final cache
var joinedCache = new IntermediateCache<TDestination, TLeftKey>();
var leftLoader = leftCache.Connect()
.Subscribe(changes =>
joinedCache.Edit(innerCache =>
changes.ForEach(change =>
switch (change.Reason)
case ChangeReason.Add:
case ChangeReason.Update:
//Update with left (and right if it is presents)
var left = change.Current;
var right = rightCache.Lookup(change.Key);
innerCache.AddOrUpdate(_resultSelector(left, right), change.Key);
case ChangeReason.Remove:
//remove from result because a left value is expected
case ChangeReason.Evaluate:
//propagate upstream
var rightLoader = rightCache.Connect()
.Subscribe(changes =>
joinedCache.Edit(innerCache =>
changes.ForEach(change =>
var right = change.Current;
var left = leftCache.Lookup(change.Key);
switch (change.Reason)
case ChangeReason.Add:
case ChangeReason.Update:
if (left.HasValue)
//Update with left and right value
innerCache.AddOrUpdate(_resultSelector(left.Value, right), change.Key);
//remove if it is already in the cache
case ChangeReason.Remove:
if (left.HasValue)
//Update with no right value
innerCache.AddOrUpdate(_resultSelector(left.Value, Optional<TRight>.None), change.Key);
//remove if it is already in the cache
case ChangeReason.Evaluate:
//propagate upstream
return new CompositeDisposable(
This code differs from the original as you need to maintain a cache of devices and a cache of the metadata. The Joined result then looks after itself
//1. Define your datasource like this
var deviceCache = new SourceCache<Device, string>(device => device.Name);
var metadataCache = new SourceCache<DeviceMetaData, string>(device => device.Name);
//Join them together like this. It will include all devices and match any meta data matching the foreign key
var deviceWithMetadata = deviceCache.Connect()
.JoinOne(metadataCache.Connect(), meta => meta.Name, (device, meta) => new DeviceWithMetadata(device,meta))
NB: DeviceWithMetadata is the immutable equivalent to the DeviceViewModel
This gets updated each time either deviceCache or metadataCache gets amended
public class DeviceWithMetadata : IEquatable<DeviceWithMetadata>
public Device Device { get; }
public Optional<DeviceMetaData> MetaData { get; }
public DeviceWithMetadata(Device device, Optional<DeviceMetaData> metaData)
//Optional is the functional equivalent of Nullable<T> but can be used on a class or a struct
Device = device;
MetaData = metaData;
//2. Change DeviceViewModelComparer to use DeviceWithMetadata
//3. For both of these, sorting will work. Also FilterOnProperty is inefficient so better to filter on values which are immutable
var autoConnectedDeviceViewModels = deviceWithMetadata
.Connect(device => device.IsAutoConnectEnabled)
.Bind(out this.autoConnectedDevices)
var autoConnectedDeviceViewModels = deviceWithMetadata
.Connect(device => !device.IsAutoConnectEnabled)
.Bind(out this.autoConnectedDevices)
//4. Replace this
.Where(deviceMetadatas => deviceMetadatas != null)
.Do( deviceMetadatas =>
//With something like this
.Do(deviceMetadatas =>
metadataCache .Edit(innerCache =>
//work on the inner cache because it will produce a single changeset which is very efficient
var previous = innerCache.Items.Where(meta => meta.Name != null).ToArray();
/calculate all changes
var adds = deviceMetadatas.Except(previous); //may need to specify a comparer which compares key fields
var removes = previous.Except(deviceMetadatas); //may need to specify a comparer which compares key fields
var updates = previous.Intersect(deviceMetadatas); //may need to specify a comparer which compares key fields and checks whether there are changes [prevents an uneed update]
//now do the update
//A simpler but less effiecient method would be to
// innerCache.Clear();
// innerCache.AddOrUpdate(deviceMetadatas.Where(meta => meta.Name != null)); //Prevent Nulls
// as before but do it within the edit method, and apply changes to innerCache.
//5. also optimise population of Devices
//6. This is no longer required as the JoinOne() does the job
.Filter(device => !this.devices.Any(deviceViewModel => deviceViewModel.Name == device.Name))
.Transform(device => deviceViewModelFactory(Observable.Return(device), null, timer))
