Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save imzjy/ebd66a0b806b047c14dbf774c98cccf0 to your computer and use it in GitHub Desktop.
Save imzjy/ebd66a0b806b047c14dbf774c98cccf0 to your computer and use it in GitHub Desktop.
BlockingCollection with priorities
public class PriorityCollection
{
private readonly BlockingCollection<Item> low = new BlockingCollection<Item>();
private readonly BlockingCollection<Item> middle = new BlockingCollection<Item>();
private readonly BlockingCollection<Item> high = new BlockingCollection<Item>();
private readonly BlockingCollection<Guid> main = new BlockingCollection<Guid>();
private readonly BlockingCollection<Item>[] queue;
private readonly Dictionary<Priority, BlockingCollection<Item>> priorityMap = new Dictionary<Priority, BlockingCollection<Item>>();
public List<Priority> TestList { get; private set; }
public PriorityCollection()
{
queue = new[]{high, middle, low};
TestList = new List<Priority>();
priorityMap.Add(Priority.Low, low);
priorityMap.Add(Priority.Middle, middle);
priorityMap.Add(Priority.High, high);
}
public void Publish(Item item)
{
var guid = Guid.NewGuid();
priorityMap[item.Priority].Add(item);
main.Add(guid);
Console.Out.WriteLine("item with priority {0} is published", item.Priority);
}
public void ProcessItems(CancellationToken token)
{
foreach (var guid in main.GetConsumingEnumerable(token))
{
Item item;
BlockingCollection<Item>.TakeFromAny(queue, out item);
var priority = item.Priority;
Console.Out.WriteLine("item with priority {0} is processed", priority);
TestList.Add(priority);
}
}
}
public enum Priority
{
High,
Middle,
Low
}
public class Item
{
public Priority Priority { get; set; }
}
public class PriorityCollectionTests
{
[Fact]
public void should_process_items_by_their_priorities()
{
var token = new CancellationTokenSource();
var priorityCollection = new PriorityCollection();
Task.Factory.ContinueWhenAll(new[]
{
Task.Factory.StartNew(() => priorityCollection.ProcessItems(token.Token)),
GenerateItems(priorityCollection, Priority.Low),
GenerateItems(priorityCollection, Priority.High),
GenerateItems(priorityCollection, Priority.Middle)
}, tasks => { });
Thread.Sleep(5000);
token.Cancel();
priorityCollection.TestList.Should().ContainInOrder(
Enumerable.Range(0, 10).Select(i => Priority.High).Union(
Enumerable.Range(0, 10).Select(i => Priority.Middle)).Union(
Enumerable.Range(0, 10).Select(i => Priority.Low)));
}
private static Task GenerateItems(PriorityCollection priorityCollection, Priority priority)
{
return Task.Factory.StartNew(() =>
Enumerable.Range(0, 10).ToList().ForEach(i => priorityCollection.Publish(new Item{Priority = priority})));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment