Skip to content

Instantly share code, notes, and snippets.

@dealproc
Created April 7, 2018 16:29
Show Gist options
  • Save dealproc/2fcfcadef61d7103c81ecf5f84425728 to your computer and use it in GitHub Desktop.
Save dealproc/2fcfcadef61d7103c81ecf5f84425728 to your computer and use it in GitHub Desktop.
CQS with MediatR
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using MediatR;
public interface INotificationWithIdentification : INotification
{
Guid RequestID { get; set; }
}
public interface IBrokeredRequest<TResponse> : IRequest<TResponse>
where TResponse : INotificationWithIdentification
{
Guid RequestID { get; set; }
}
public interface IBrokeredRequestHandler<TBrokeredRequest, TResponse> : IRequestHandler<TBrokeredRequest, TResponse>
where TBrokeredRequest : IBrokeredRequest<TResponse>
where TResponse : INotificationWithIdentification
{
}
public abstract class BrokeredRequestHandler<TBrokeredRequest, TResponse> : IBrokeredRequestHandler<TBrokeredRequest, TResponse>, INotificationHandler<TResponse>
where TBrokeredRequest : IBrokeredRequest<TResponse>
where TResponse : INotificationWithIdentification
{
protected static ConcurrentDictionary<Guid, TaskCompletionSource<TResponse>> Requests { get; private set; } = new ConcurrentDictionary<Guid, TaskCompletionSource<TResponse>>();
public abstract Task Execute(TBrokeredRequest request, CancellationToken cancellationToken);
Task<TResponse> IRequestHandler<TBrokeredRequest, TResponse>.Handle(TBrokeredRequest request, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<TResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
Requests.TryAdd(request.RequestID, tcs);
Task.Run(async () =>
{
try
{
await Execute(request, cancellationToken).ConfigureAwait(false);
}
catch (Exception exc)
{
if (Requests.TryRemove(request.RequestID, out TaskCompletionSource<TResponse> removed))
{
removed.SetException(exc);
}
}
}, cancellationToken);
return tcs.Task;
}
Task INotificationHandler<TResponse>.Handle(TResponse notification, CancellationToken cancellationToken)
{
if (Requests.TryRemove(notification.RequestID, out TaskCompletionSource<TResponse> tcs))
{
tcs.SetResult(notification);
}
return Task.CompletedTask;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment