Created
December 24, 2019 03:54
-
-
Save programmation/ea04466f81790478d5a4e9efc1c54fe7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
using Nito.AsyncEx; | |
namespace Core.Utility | |
{ | |
public abstract class BaseOperation<TInput, TOutput> | |
{ | |
public TaskCompletionSource<TOutput> TaskCompletionSource { get; } | |
= new TaskCompletionSource<TOutput>(); | |
} | |
public abstract class BaseReaderWriterOperation<TInput, TOutput> | |
: BaseOperation<TInput, TOutput> | |
{ | |
public abstract TOutput Execute(); | |
} | |
public abstract class ReaderOperation<TInput, TOutput> | |
: BaseReaderWriterOperation<TInput, TOutput> | |
{ | |
} | |
public abstract class WriterOperation<TInput, TOutput> | |
: BaseReaderWriterOperation<TInput, TOutput> | |
{ | |
} | |
public abstract class BaseAsyncReaderWriterOperation<TInput, TOutput> | |
: BaseOperation<TInput, TOutput> | |
{ | |
public abstract Task<TOutput> ExecuteAsync(); | |
} | |
public abstract class AsyncReaderOperation<TInput, TOutput> | |
: BaseAsyncReaderWriterOperation<TInput, TOutput> | |
{ | |
} | |
public abstract class AsyncWriterOperation<TInput, TOutput> | |
: BaseAsyncReaderWriterOperation<TInput, TOutput> | |
{ | |
} | |
public class ReaderWriterOperationQueue<TInput, TOutput> | |
{ | |
private AsyncReaderWriterLock _readerWriterLock; | |
private readonly TransformBlock<ReaderOperation<TInput, TOutput>, Task<TOutput>> _readerQueue; | |
private readonly TransformBlock<WriterOperation<TInput, TOutput>, Task<TOutput>> _writerQueue; | |
private readonly TransformBlock<AsyncReaderOperation<TInput, TOutput>, Task<TOutput>> _asyncReaderQueue; | |
private readonly TransformBlock<AsyncWriterOperation<TInput, TOutput>, Task<TOutput>> _asyncWriterQueue; | |
public ReaderWriterOperationQueue() | |
{ | |
_readerWriterLock = new AsyncReaderWriterLock(); | |
_readerQueue = new TransformBlock<ReaderOperation<TInput, TOutput>, Task<TOutput>>(DoReaderOperationAsync); | |
_writerQueue = new TransformBlock<WriterOperation<TInput, TOutput>, Task<TOutput>>(DoWriterOperationAsync); | |
_asyncReaderQueue = new TransformBlock<AsyncReaderOperation<TInput, TOutput>, Task<TOutput>>(DoAsyncReaderOperationAsync); | |
_asyncWriterQueue = new TransformBlock<AsyncWriterOperation<TInput, TOutput>, Task<TOutput>>(DoAsyncWriterOperationAsync); | |
} | |
public async Task<TOutput> ReadAsync(ReaderOperation<TInput, TOutput> operation) | |
{ | |
if (await _readerQueue.SendAsync(operation)) | |
{ | |
return await operation.TaskCompletionSource.Task; | |
} | |
else | |
{ | |
throw new InvalidOperationException("Unable to queue read operation"); | |
} | |
} | |
public async Task<TOutput> WriteAsync(WriterOperation<TInput, TOutput> operation) | |
{ | |
if (await _writerQueue.SendAsync(operation)) | |
{ | |
return await operation.TaskCompletionSource.Task; | |
} | |
else | |
{ | |
throw new InvalidOperationException("Unable to queue write operation"); | |
} | |
} | |
public async Task<TOutput> ReadAsync(AsyncReaderOperation<TInput, TOutput> operation) | |
{ | |
if (await _asyncReaderQueue.SendAsync(operation)) | |
{ | |
return await operation.TaskCompletionSource.Task; | |
} | |
else | |
{ | |
throw new InvalidOperationException("Unable to queue async read operation"); | |
} | |
} | |
public async Task<TOutput> WriteAsync(AsyncWriterOperation<TInput, TOutput> operation) | |
{ | |
if (await _asyncWriterQueue.SendAsync(operation)) | |
{ | |
return await operation.TaskCompletionSource.Task; | |
} | |
else | |
{ | |
throw new InvalidOperationException("Unable to queue async write operation"); | |
} | |
} | |
private async Task<TOutput> DoReaderOperationAsync(ReaderOperation<TInput, TOutput> operation) | |
{ | |
try | |
{ | |
using (await _readerWriterLock.ReaderLockAsync()) | |
{ | |
var result = operation.Execute(); | |
operation.TaskCompletionSource.SetResult(result); | |
} | |
} | |
catch (Exception ex) | |
{ | |
operation.TaskCompletionSource.SetException(ex); | |
} | |
return await operation.TaskCompletionSource.Task; | |
} | |
private async Task<TOutput> DoWriterOperationAsync(WriterOperation<TInput, TOutput> operation) | |
{ | |
try | |
{ | |
using (await _readerWriterLock.WriterLockAsync()) | |
{ | |
var result = operation.Execute(); | |
operation.TaskCompletionSource.SetResult(result); | |
} | |
} | |
catch (Exception ex) | |
{ | |
operation.TaskCompletionSource.SetException(ex); | |
} | |
return await operation.TaskCompletionSource.Task; | |
} | |
private async Task<TOutput> DoAsyncReaderOperationAsync(AsyncReaderOperation<TInput, TOutput> operation) | |
{ | |
try | |
{ | |
using (await _readerWriterLock.ReaderLockAsync()) | |
{ | |
var result = await operation.ExecuteAsync(); | |
operation.TaskCompletionSource.SetResult(result); | |
} | |
} | |
catch (Exception ex) | |
{ | |
operation.TaskCompletionSource.SetException(ex); | |
} | |
return await operation.TaskCompletionSource.Task; | |
} | |
private async Task<TOutput> DoAsyncWriterOperationAsync(AsyncWriterOperation<TInput, TOutput> operation) | |
{ | |
try | |
{ | |
using (await _readerWriterLock.WriterLockAsync()) | |
{ | |
var result = await operation.ExecuteAsync(); | |
operation.TaskCompletionSource.SetResult(result); | |
} | |
} | |
catch (Exception ex) | |
{ | |
operation.TaskCompletionSource.SetException(ex); | |
} | |
return await operation.TaskCompletionSource.Task; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment