Last active
October 6, 2016 08:21
-
-
Save dayneo/42ddf2d04e59a999355e58f5401cc577 to your computer and use it in GitHub Desktop.
Assists in the management of asynchronous execution of data service queries and their cancellations.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.ComponentModel; | |
| using System.Data.Services.Client; | |
| using System.Data.Services.Exceptions; | |
| using System.Linq; | |
| using System.Text; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| /// <summary> | |
| /// Manages the asynchronous execution of data service queries and their cancellations. | |
| /// </summary> | |
| /// <typeparam name="T"></typeparam> | |
| public class AsyncDataTaskHelper : INotifyPropertyChanged | |
| { | |
| #region INotifyPropertyChanged implementation | |
| public event PropertyChangedEventHandler PropertyChanged; | |
| /// <summary> | |
| /// Raises the PropertyChanged event. | |
| /// </summary> | |
| /// <param name="e"></param> | |
| protected void OnPropertyChanged(PropertyChangedEventArgs e) | |
| { | |
| PropertyChangedEventHandler handler = this.PropertyChanged; | |
| if (handler != null) | |
| { | |
| handler(this, e); | |
| } | |
| } | |
| /// <summary> | |
| /// Raises the PropertyChanged event. | |
| /// </summary> | |
| /// <param name="propertyName"></param> | |
| protected void OnPropertyChanged(string propertyName) | |
| { | |
| this.OnPropertyChanged(new PropertyChangedEventArgs(propertyName)); | |
| } | |
| #endregion | |
| /// <summary> | |
| /// Instantiates a new instance of a QueryTaskHandler class. | |
| /// </summary> | |
| /// <param name="dataContext"></param> | |
| public AsyncDataTaskHelper(DataServiceContext dataContext) | |
| { | |
| if (dataContext == null) | |
| { | |
| throw new ArgumentNullException("dataContext", "A data service context must be provided."); | |
| } | |
| this.DataContext = dataContext; | |
| this.Status = TaskStatus.Created; | |
| } | |
| private CancellationTokenSource CancellationSource { get; set; } | |
| private Task ActiveDataTask { get; set; } | |
| /// <summary> | |
| /// Gets the data context against which the queries are executed. | |
| /// </summary> | |
| public DataServiceContext DataContext { get; private set; } | |
| private TaskStatus status; | |
| /// <summary> | |
| /// Gets the current status of the asynchronous load task. | |
| /// </summary> | |
| public TaskStatus Status | |
| { | |
| get | |
| { | |
| return this.status; | |
| } | |
| private set | |
| { | |
| if (this.status == value) return; | |
| this.status = value; | |
| this.OnPropertyChanged("Status"); | |
| } | |
| } | |
| /// <summary> | |
| /// Executes the data task asynchronously. | |
| /// </summary> | |
| /// <returns></returns> | |
| protected async Task<T> Execute<T>(Func<IAsyncResult> beginExecute, Func<IAsyncResult, T> endExecute) | |
| { | |
| if (this.ActiveDataTask != null | |
| && this.ActiveDataTask.Status == TaskStatus.Running) | |
| { | |
| this.Cancel(); | |
| } | |
| try | |
| { | |
| this.CancellationSource = new CancellationTokenSource(); | |
| CancellationToken ct = this.CancellationSource.Token; | |
| this.ActiveDataTask = Task.Run<T>(() => | |
| { | |
| Guid dataTaskID = Guid.NewGuid(); | |
| IAsyncResult r = beginExecute(); | |
| System.Diagnostics.Trace.TraceInformation("DataTask begun: {0}", dataTaskID); | |
| do | |
| { | |
| if (ct.IsCancellationRequested) | |
| { | |
| System.Diagnostics.Trace.TraceInformation("DataTask cancellation requested: {0}", dataTaskID); | |
| this.DataContext.CancelRequest(r); | |
| ct.ThrowIfCancellationRequested(); | |
| } | |
| System.Threading.Thread.Sleep(5); | |
| } | |
| while (!r.IsCompleted); | |
| System.Diagnostics.Trace.TraceInformation("DataTask completed: {0}", dataTaskID); | |
| try | |
| { | |
| return endExecute(r); | |
| } | |
| catch (InvalidOperationException ex) | |
| { | |
| Exception baseEx = ex.GetBaseException(); | |
| if (baseEx is System.Data.Services.Client.DataServiceTransportException | |
| || baseEx is System.Net.WebException) | |
| { | |
| throw new NetworkException(baseEx.Message, ex); | |
| } | |
| Exception dsex; | |
| bool hasparsed = DataServiceExceptionUtil.TryParse(baseEx, out dsex); | |
| if (hasparsed | |
| && dsex.InnerException != null | |
| && dsex.InnerException.InnerException != null) | |
| { | |
| throw new DataServiceException(dsex.InnerException.InnerException.Message, ex); | |
| } | |
| // report a data service provider error | |
| if (hasparsed) | |
| { | |
| throw new DataServiceException(dsex.Message, ex); | |
| } | |
| if (ex is DataServiceQueryException) | |
| { | |
| DataServiceQueryException dsqex = (DataServiceQueryException)ex; | |
| QueryOperationResponse resp = dsqex.Response; | |
| DataServiceRequest req = resp.Query; | |
| throw new DataServiceException(string.Format("{0}\n\n(Status code {1}) {2}" | |
| , ex.Message, resp.StatusCode, req.RequestUri) | |
| , ex); | |
| } | |
| throw new DataServiceException(ex.Message, ex); | |
| } | |
| }, ct); | |
| this.Status = TaskStatus.Running; | |
| T data = await (Task<T>)this.ActiveDataTask; | |
| this.Status = TaskStatus.RanToCompletion; | |
| this.CancellationSource.Dispose(); | |
| return data; | |
| } | |
| catch (AggregateException ex) | |
| { | |
| foreach (Exception inner in ex.InnerExceptions) | |
| { | |
| System.Diagnostics.Trace.TraceInformation("Task aggregate exception: {0} \n{1}", ex.Message, ex.StackTrace); | |
| } | |
| this.Status = TaskStatus.Faulted; | |
| throw; | |
| } | |
| catch (OperationCanceledException ex) | |
| { | |
| // Consuming the exception that we think must have come about due | |
| // to task cancellation. | |
| System.Diagnostics.Trace.TraceInformation("{0} \n{1}", ex.Message, ex.StackTrace); | |
| this.Status = TaskStatus.Canceled; | |
| throw; | |
| } | |
| catch (Exception ex) | |
| { | |
| System.Diagnostics.Trace.TraceError("{0} \n{1}", ex.Message, ex.StackTrace); | |
| this.Status = TaskStatus.Faulted; | |
| throw; | |
| } | |
| } | |
| /// <summary> | |
| /// Executes the query asynchronously. | |
| /// </summary> | |
| /// <returns></returns> | |
| public async Task<IEnumerable<T>> Execute<T>(DataServiceQuery<T> query) | |
| { | |
| if (query == null) | |
| { | |
| throw new ArgumentNullException("query", "No query has been provided to execute."); | |
| } | |
| Func<IAsyncResult> beginExecute = () => { return query.BeginExecute(new AsyncCallback((o) => { }), query); }; | |
| Func<IAsyncResult, IEnumerable<T>> endExecute = (r) => { return query.EndExecute(r); }; | |
| return await this.Execute(beginExecute, endExecute); | |
| } | |
| /// <summary> | |
| /// Executes a set of batched queries asynchronously. | |
| /// </summary> | |
| /// <returns></returns> | |
| public async Task<DataServiceResponse> Execute(params DataServiceRequest[] batchRequests) | |
| { | |
| if (batchRequests == null) | |
| { | |
| throw new ArgumentNullException("batchRequests", "No Batch Request has been provided to execute."); | |
| } | |
| Func<IAsyncResult> beginExecute = () => { return this.DataContext.BeginExecuteBatch(new AsyncCallback((o) => { }), this, batchRequests); }; | |
| Func<IAsyncResult, DataServiceResponse> endExecute = (r) => { return this.DataContext.EndExecuteBatch(r); }; | |
| return await this.Execute(beginExecute, endExecute); | |
| } | |
| /// <summary> | |
| /// Executes the uri asynchronously. | |
| /// </summary> | |
| /// <returns></returns> | |
| public async Task<IEnumerable<T>> Execute<T>(Uri uri) | |
| { | |
| if (uri == null) | |
| { | |
| throw new ArgumentNullException("uri", "No uri has been provided to execute."); | |
| } | |
| Func<IAsyncResult> beginExecute = () => { return this.DataContext.BeginExecute<T>(uri, new AsyncCallback((o) => { }), uri, "GET", false); }; | |
| Func<IAsyncResult, IEnumerable<T>> endExecute = (r) => { return this.DataContext.EndExecute<T>(r); }; | |
| return await this.Execute(beginExecute, endExecute); | |
| } | |
| /// <summary> | |
| /// Cancels the execution of the current query. | |
| /// </summary> | |
| public void Cancel() | |
| { | |
| if (this.ActiveDataTask == null | |
| || this.ActiveDataTask.Status != TaskStatus.Running) | |
| { | |
| throw new InvalidOperationException("No task is running."); | |
| } | |
| this.CancellationSource.Cancel(); | |
| try | |
| { | |
| this.ActiveDataTask.Wait(); | |
| } | |
| catch (AggregateException ex) | |
| { | |
| // Consuming the exception that we think must have come about due | |
| // to task cancellation. | |
| foreach (Exception inner in ex.InnerExceptions) | |
| { | |
| System.Diagnostics.Trace.TraceInformation("Task aggregate exception: {0} \n{1}", ex.Message, ex.StackTrace); | |
| } | |
| } | |
| this.CancellationSource.Dispose(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment