Last active
August 29, 2015 14:18
-
-
Save yreynhout/f0f09f7541902bb930c8 to your computer and use it in GitHub Desktop.
Queue all projection requests - report back using TCS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Projac; | |
namespace ConsoleApplication666 | |
{ | |
public class AsyncProjector | |
{ | |
private readonly AsyncSqlProjector _projector; | |
private readonly ConcurrentQueue<IProjectOperation> _operations; | |
private int _processing; | |
public AsyncProjector(AsyncSqlProjector projector) | |
{ | |
if (projector == null) | |
throw new ArgumentNullException("projector"); | |
_projector = projector; | |
_operations = new ConcurrentQueue<IProjectOperation>(); | |
_processing = 0; | |
} | |
public Task<int> ProjectAsync(object message) | |
{ | |
if (message == null) throw new ArgumentNullException("message"); | |
return ProjectAsync(message, CancellationToken.None); | |
} | |
public Task<int> ProjectAsync(object message, CancellationToken cancellationToken) | |
{ | |
if (message == null) throw new ArgumentNullException("message"); | |
var operation = new ProjectMessageOperation(message, cancellationToken); | |
_operations.Enqueue(operation); | |
if (Interlocked.CompareExchange(ref _processing, 1, 0) == 0) | |
ThreadPool.QueueUserWorkItem(ProcessQueue); | |
return operation.Task; | |
} | |
public Task<int> ProjectAsync(IEnumerable<object> messages) | |
{ | |
if (messages == null) throw new ArgumentNullException("messages"); | |
return ProjectAsync(messages, CancellationToken.None); | |
} | |
public Task<int> ProjectAsync(IEnumerable<object> messages, CancellationToken cancellationToken) | |
{ | |
if (messages == null) throw new ArgumentNullException("messages"); | |
var operation = new ProjectMessagesOperation(messages, cancellationToken); | |
_operations.Enqueue(operation); | |
if (Interlocked.CompareExchange(ref _processing, 1, 0) == 0) | |
ThreadPool.QueueUserWorkItem(ProcessQueue); | |
return operation.Task; | |
} | |
private void ProcessQueue(object state) | |
{ | |
do | |
{ | |
IProjectOperation operation; | |
while (_operations.TryDequeue(out operation)) | |
{ | |
operation.Project(_projector); | |
} | |
Interlocked.Exchange(ref _processing, 0); | |
} while (_operations.Count > 0 && Interlocked.CompareExchange(ref _processing, 1, 0) == 0); | |
} | |
private interface IProjectOperation | |
{ | |
void Project(AsyncSqlProjector projector); | |
Task<int> Task { get; } | |
} | |
private class ProjectMessageOperation : IProjectOperation | |
{ | |
private readonly object _message; | |
private readonly CancellationToken _cancellationToken; | |
private readonly TaskCompletionSource<int> _source; | |
public ProjectMessageOperation(object message, CancellationToken cancellationToken) | |
{ | |
if (message == null) | |
throw new ArgumentNullException("message"); | |
_message = message; | |
_cancellationToken = cancellationToken; | |
_source = new TaskCompletionSource<int>(); | |
} | |
public void Project(AsyncSqlProjector projector) | |
{ | |
if (_cancellationToken.IsCancellationRequested) | |
{ | |
_source.SetCanceled(); | |
} | |
else | |
{ | |
projector. | |
ProjectAsync(_message, _cancellationToken). | |
ContinueWith(task => | |
{ | |
if (task.IsCanceled) | |
{ | |
_source.SetCanceled(); | |
} | |
else if (task.IsFaulted) | |
{ | |
_source.SetException(task.Exception); | |
} | |
else | |
{ | |
_source.SetResult(task.Result); | |
} | |
}, _cancellationToken); | |
} | |
} | |
public Task<int> Task { get { return _source.Task; } } | |
} | |
private class ProjectMessagesOperation : IProjectOperation | |
{ | |
private readonly IEnumerable<object> _messages; | |
private readonly CancellationToken _cancellationToken; | |
private readonly TaskCompletionSource<int> _source; | |
public ProjectMessagesOperation(IEnumerable<object> messages, CancellationToken cancellationToken) | |
{ | |
if (messages == null) | |
throw new ArgumentNullException("messages"); | |
_messages = messages; | |
_cancellationToken = cancellationToken; | |
_source = new TaskCompletionSource<int>(); | |
} | |
public void Project(AsyncSqlProjector projector) | |
{ | |
if (_cancellationToken.IsCancellationRequested) | |
{ | |
_source.SetCanceled(); | |
} | |
else | |
{ | |
projector. | |
ProjectAsync(_messages, _cancellationToken). | |
ContinueWith(task => | |
{ | |
if (task.IsCanceled) | |
{ | |
_source.SetCanceled(); | |
} | |
else if (task.IsFaulted) | |
{ | |
_source.SetException(task.Exception); | |
} | |
else | |
{ | |
_source.SetResult(task.Result); | |
} | |
}, _cancellationToken); | |
} | |
} | |
public Task<int> Task { get { return _source.Task; } } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
And it's broken :(