Created
February 14, 2020 04:58
-
-
Save sachinsu/6fcbc36e6e5cc58c7b5ba9007e276afc to your computer and use it in GitHub Desktop.
Using System.Threading.channels for implementing High performance Producer-Consumer Pattern
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
using System.Diagnostics; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
using Microsoft.Extensions.Logging; | |
namespace channels | |
{ | |
public class EventConsumer | |
{ | |
private readonly System.Threading.Channels.Channel<string> channel; | |
private readonly int workerCount; | |
private readonly IWebService _service; | |
private readonly ILogger _logger; | |
public EventConsumer(ILogger<EventConsumer> logger, IWebService service, int maxMessageCount, int workerCount) | |
{ | |
this.workerCount = workerCount; | |
this._service = service; | |
this._logger = logger; | |
this.channel = Channel.CreateBounded<string>(maxMessageCount); | |
} | |
// Using ValueTask: https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/ | |
public async ValueTask SetupChannel() | |
{ | |
var tasks = new List<Task>(this.workerCount); | |
for (var count = 1; count < this.workerCount; count++) | |
{ | |
tasks.Add(Task.Run(() => this.ProcessChannelMessage(this.channel.Reader))); | |
} | |
// writer.Complete(); | |
await this.channel.Reader.Completion; | |
await Task.WhenAll(tasks); | |
} | |
public async ValueTask Send(string request) | |
{ | |
await this.channel.Writer.WriteAsync(request); | |
} | |
private async ValueTask ProcessChannelMessage(ChannelReader<string> reader) | |
{ | |
//because async methods use a state machine to handle awaits | |
//it is safe to await in an infinte loop. Thank you C# compiler gods! | |
while (await reader.WaitToReadAsync())//if this returns false the channel is completed | |
{ | |
//as a note, if there are multiple readers but only one message, only one reader | |
//wakes up. This prevents inefficent races. | |
string messageString; | |
while (reader.TryRead(out messageString))//yes, yes I know about 'out var messageString'... | |
{ | |
var RequestTime = DateTime.Now; | |
this._logger.LogDebug($"The listener just read {messageString} at {RequestTime}!"); | |
await this._service.GetPage(messageString).ConfigureAwait(false); | |
var ResponseTime = DateTime.Now; | |
this._logger.LogDebug($"The listener done {messageString} at {ResponseTime}!", DateTime.Now); | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
using System.Diagnostics; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
using Microsoft.Extensions.Logging; | |
namespace channels { | |
public class EventGenerator { | |
public struct StringArgs { | |
internal string Id { get; } | |
internal StringArgs(string id) { | |
Id = id; | |
} | |
} | |
public event EventHandler<StringArgs> DataChanged; | |
public void GenerateLoad(int loadFactor) { | |
for (int i = 0; i < loadFactor; i++) { | |
DataChanged?.Invoke(this,new StringArgs(i.ToString())); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment