Skip to content

Instantly share code, notes, and snippets.

@dayneo
Last active October 6, 2016 08:21
Show Gist options
  • Select an option

  • Save dayneo/42ddf2d04e59a999355e58f5401cc577 to your computer and use it in GitHub Desktop.

Select an option

Save dayneo/42ddf2d04e59a999355e58f5401cc577 to your computer and use it in GitHub Desktop.
Assists in the management of asynchronous execution of data service queries and their cancellations.
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data.Services.Client;
using System.Data.Services.Exceptions;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Manages the asynchronous execution of data service queries and their cancellations.
/// </summary>
/// <typeparam name="T"></typeparam>
public class AsyncDataTaskHelper : INotifyPropertyChanged
{
#region INotifyPropertyChanged implementation
public event PropertyChangedEventHandler PropertyChanged;
/// <summary>
/// Raises the PropertyChanged event.
/// </summary>
/// <param name="e"></param>
protected void OnPropertyChanged(PropertyChangedEventArgs e)
{
PropertyChangedEventHandler handler = this.PropertyChanged;
if (handler != null)
{
handler(this, e);
}
}
/// <summary>
/// Raises the PropertyChanged event.
/// </summary>
/// <param name="propertyName"></param>
protected void OnPropertyChanged(string propertyName)
{
this.OnPropertyChanged(new PropertyChangedEventArgs(propertyName));
}
#endregion
/// <summary>
/// Instantiates a new instance of a QueryTaskHandler class.
/// </summary>
/// <param name="dataContext"></param>
public AsyncDataTaskHelper(DataServiceContext dataContext)
{
if (dataContext == null)
{
throw new ArgumentNullException("dataContext", "A data service context must be provided.");
}
this.DataContext = dataContext;
this.Status = TaskStatus.Created;
}
private CancellationTokenSource CancellationSource { get; set; }
private Task ActiveDataTask { get; set; }
/// <summary>
/// Gets the data context against which the queries are executed.
/// </summary>
public DataServiceContext DataContext { get; private set; }
private TaskStatus status;
/// <summary>
/// Gets the current status of the asynchronous load task.
/// </summary>
public TaskStatus Status
{
get
{
return this.status;
}
private set
{
if (this.status == value) return;
this.status = value;
this.OnPropertyChanged("Status");
}
}
/// <summary>
/// Executes the data task asynchronously.
/// </summary>
/// <returns></returns>
protected async Task<T> Execute<T>(Func<IAsyncResult> beginExecute, Func<IAsyncResult, T> endExecute)
{
if (this.ActiveDataTask != null
&& this.ActiveDataTask.Status == TaskStatus.Running)
{
this.Cancel();
}
try
{
this.CancellationSource = new CancellationTokenSource();
CancellationToken ct = this.CancellationSource.Token;
this.ActiveDataTask = Task.Run<T>(() =>
{
Guid dataTaskID = Guid.NewGuid();
IAsyncResult r = beginExecute();
System.Diagnostics.Trace.TraceInformation("DataTask begun: {0}", dataTaskID);
do
{
if (ct.IsCancellationRequested)
{
System.Diagnostics.Trace.TraceInformation("DataTask cancellation requested: {0}", dataTaskID);
this.DataContext.CancelRequest(r);
ct.ThrowIfCancellationRequested();
}
System.Threading.Thread.Sleep(5);
}
while (!r.IsCompleted);
System.Diagnostics.Trace.TraceInformation("DataTask completed: {0}", dataTaskID);
try
{
return endExecute(r);
}
catch (InvalidOperationException ex)
{
Exception baseEx = ex.GetBaseException();
if (baseEx is System.Data.Services.Client.DataServiceTransportException
|| baseEx is System.Net.WebException)
{
throw new NetworkException(baseEx.Message, ex);
}
Exception dsex;
bool hasparsed = DataServiceExceptionUtil.TryParse(baseEx, out dsex);
if (hasparsed
&& dsex.InnerException != null
&& dsex.InnerException.InnerException != null)
{
throw new DataServiceException(dsex.InnerException.InnerException.Message, ex);
}
// report a data service provider error
if (hasparsed)
{
throw new DataServiceException(dsex.Message, ex);
}
if (ex is DataServiceQueryException)
{
DataServiceQueryException dsqex = (DataServiceQueryException)ex;
QueryOperationResponse resp = dsqex.Response;
DataServiceRequest req = resp.Query;
throw new DataServiceException(string.Format("{0}\n\n(Status code {1}) {2}"
, ex.Message, resp.StatusCode, req.RequestUri)
, ex);
}
throw new DataServiceException(ex.Message, ex);
}
}, ct);
this.Status = TaskStatus.Running;
T data = await (Task<T>)this.ActiveDataTask;
this.Status = TaskStatus.RanToCompletion;
this.CancellationSource.Dispose();
return data;
}
catch (AggregateException ex)
{
foreach (Exception inner in ex.InnerExceptions)
{
System.Diagnostics.Trace.TraceInformation("Task aggregate exception: {0} \n{1}", ex.Message, ex.StackTrace);
}
this.Status = TaskStatus.Faulted;
throw;
}
catch (OperationCanceledException ex)
{
// Consuming the exception that we think must have come about due
// to task cancellation.
System.Diagnostics.Trace.TraceInformation("{0} \n{1}", ex.Message, ex.StackTrace);
this.Status = TaskStatus.Canceled;
throw;
}
catch (Exception ex)
{
System.Diagnostics.Trace.TraceError("{0} \n{1}", ex.Message, ex.StackTrace);
this.Status = TaskStatus.Faulted;
throw;
}
}
/// <summary>
/// Executes the query asynchronously.
/// </summary>
/// <returns></returns>
public async Task<IEnumerable<T>> Execute<T>(DataServiceQuery<T> query)
{
if (query == null)
{
throw new ArgumentNullException("query", "No query has been provided to execute.");
}
Func<IAsyncResult> beginExecute = () => { return query.BeginExecute(new AsyncCallback((o) => { }), query); };
Func<IAsyncResult, IEnumerable<T>> endExecute = (r) => { return query.EndExecute(r); };
return await this.Execute(beginExecute, endExecute);
}
/// <summary>
/// Executes a set of batched queries asynchronously.
/// </summary>
/// <returns></returns>
public async Task<DataServiceResponse> Execute(params DataServiceRequest[] batchRequests)
{
if (batchRequests == null)
{
throw new ArgumentNullException("batchRequests", "No Batch Request has been provided to execute.");
}
Func<IAsyncResult> beginExecute = () => { return this.DataContext.BeginExecuteBatch(new AsyncCallback((o) => { }), this, batchRequests); };
Func<IAsyncResult, DataServiceResponse> endExecute = (r) => { return this.DataContext.EndExecuteBatch(r); };
return await this.Execute(beginExecute, endExecute);
}
/// <summary>
/// Executes the uri asynchronously.
/// </summary>
/// <returns></returns>
public async Task<IEnumerable<T>> Execute<T>(Uri uri)
{
if (uri == null)
{
throw new ArgumentNullException("uri", "No uri has been provided to execute.");
}
Func<IAsyncResult> beginExecute = () => { return this.DataContext.BeginExecute<T>(uri, new AsyncCallback((o) => { }), uri, "GET", false); };
Func<IAsyncResult, IEnumerable<T>> endExecute = (r) => { return this.DataContext.EndExecute<T>(r); };
return await this.Execute(beginExecute, endExecute);
}
/// <summary>
/// Cancels the execution of the current query.
/// </summary>
public void Cancel()
{
if (this.ActiveDataTask == null
|| this.ActiveDataTask.Status != TaskStatus.Running)
{
throw new InvalidOperationException("No task is running.");
}
this.CancellationSource.Cancel();
try
{
this.ActiveDataTask.Wait();
}
catch (AggregateException ex)
{
// Consuming the exception that we think must have come about due
// to task cancellation.
foreach (Exception inner in ex.InnerExceptions)
{
System.Diagnostics.Trace.TraceInformation("Task aggregate exception: {0} \n{1}", ex.Message, ex.StackTrace);
}
}
this.CancellationSource.Dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment