Skip to content

Instantly share code, notes, and snippets.

@caleb-vear
Created September 19, 2011 23:37
Show Gist options
  • Save caleb-vear/1227920 to your computer and use it in GitHub Desktop.
Save caleb-vear/1227920 to your computer and use it in GitHub Desktop.
RX Example
public IDisposable SubscribeToK2WorkflowUpdatesFor<T>(
IEnumerable<T> data,
Action<T, K2Activity> activitySetter,
Func<T, string> k2ProcessIdSelector,
int interval)
{
var itemsToRefresh = Observable
.Interval(TimeSpan.FromSeconds(interval), Scheduler.ThreadPool)
.SelectMany(l => data)
.Synchronize()
.Where(entity => !string.IsNullOrEmpty(k2ProcessIdSelector(entity)))
.SelectMany(entity =>
{
var processIdSelector = k2ProcessIdSelector(entity);
Debug.WriteLine("Start Select for {0}", processIdSelector);
if (!IsInteger(processIdSelector))
return Observable.Empty<K2RefreshResult>();
Debug.WriteLine("Start Service Call for {0}", processIdSelector);
return Observable.FromAsyncPattern<int, ObservableCollection<K2Activity>>(
_workflowService.BeginGetCurrentProcessActivity,
_workflowService.EndGetCurrentProcessActivity)(int.Parse(processIdSelector))
.Select(r =>
{
var result = new K2RefreshResult
{
Entity = entity,
UpdatedActivity = r
};
Debug.WriteLine("End Service Call for {0}", processIdSelector);
return result;
});
})
.TakeUntil(_disposedObserver); // This line will automatically cleanup any outstanding requests when the object is disposed.
//.SubscribeOn(Scheduler.ThreadPool); You don't need to subscribe on the thread pool the async callback is already on it.
return itemsToRefresh.Subscribe(
o =>
{
Debug.WriteLine("Subscription Received response");
// You shouldn't need this anymore as TakeUntil takes care of it.
//var response = o.Concat(_disposedObserver).FirstOrDefault();
Debug.WriteLine("Subscription Received updated entity in response");
Execute.OnUIThread(() =>
{
Debug.WriteLine("Updating UI");
if (o.UpdatedActivity == null ||
o.UpdatedActivity.Count != 1)
activitySetter((T) o.Entity, null);
else
activitySetter((T) o.Entity,
o.UpdatedActivity.
Single());
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment