Created
September 19, 2011 23:37
-
-
Save caleb-vear/1227920 to your computer and use it in GitHub Desktop.
RX Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public IDisposable SubscribeToK2WorkflowUpdatesFor<T>( | |
IEnumerable<T> data, | |
Action<T, K2Activity> activitySetter, | |
Func<T, string> k2ProcessIdSelector, | |
int interval) | |
{ | |
var itemsToRefresh = Observable | |
.Interval(TimeSpan.FromSeconds(interval), Scheduler.ThreadPool) | |
.SelectMany(l => data) | |
.Synchronize() | |
.Where(entity => !string.IsNullOrEmpty(k2ProcessIdSelector(entity))) | |
.SelectMany(entity => | |
{ | |
var processIdSelector = k2ProcessIdSelector(entity); | |
Debug.WriteLine("Start Select for {0}", processIdSelector); | |
if (!IsInteger(processIdSelector)) | |
return Observable.Empty<K2RefreshResult>(); | |
Debug.WriteLine("Start Service Call for {0}", processIdSelector); | |
return Observable.FromAsyncPattern<int, ObservableCollection<K2Activity>>( | |
_workflowService.BeginGetCurrentProcessActivity, | |
_workflowService.EndGetCurrentProcessActivity)(int.Parse(processIdSelector)) | |
.Select(r => | |
{ | |
var result = new K2RefreshResult | |
{ | |
Entity = entity, | |
UpdatedActivity = r | |
}; | |
Debug.WriteLine("End Service Call for {0}", processIdSelector); | |
return result; | |
}); | |
}) | |
.TakeUntil(_disposedObserver); // This line will automatically cleanup any outstanding requests when the object is disposed. | |
//.SubscribeOn(Scheduler.ThreadPool); You don't need to subscribe on the thread pool the async callback is already on it. | |
return itemsToRefresh.Subscribe( | |
o => | |
{ | |
Debug.WriteLine("Subscription Received response"); | |
// You shouldn't need this anymore as TakeUntil takes care of it. | |
//var response = o.Concat(_disposedObserver).FirstOrDefault(); | |
Debug.WriteLine("Subscription Received updated entity in response"); | |
Execute.OnUIThread(() => | |
{ | |
Debug.WriteLine("Updating UI"); | |
if (o.UpdatedActivity == null || | |
o.UpdatedActivity.Count != 1) | |
activitySetter((T) o.Entity, null); | |
else | |
activitySetter((T) o.Entity, | |
o.UpdatedActivity. | |
Single()); | |
}); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment