Skip to content

Instantly share code, notes, and snippets.

@ierhalim
Created January 1, 2021 11:27
Show Gist options
  • Select an option

  • Save ierhalim/21494e684405f87aa61306fcf31e6a30 to your computer and use it in GitHub Desktop.

Select an option

Save ierhalim/21494e684405f87aa61306fcf31e6a30 to your computer and use it in GitHub Desktop.
Y5O1 Paralel programlama
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace BlockingCollectionExample
{
class Program
{
static BlockingCollection<string> blockingCollection;
static CancellationTokenSource cancellationTokenSource;
// Consume işlemi yapan kısım.
static void TextOperation()
{
// GetConsumingEnumerable methodu koleksiyon üzerinden bir eleman döndürür ve o elemanı siler.
// Koleksiyonun içi boş olduğunda bekler, yani bu döngü normal yollarla tamamlanmaz. Döngüyü tamamlamak için parametre üzerinden gönderilen Token'ı cancel etmek gerekir.
foreach (var item in blockingCollection.GetConsumingEnumerable(cancellationTokenSource.Token))
{
Console.WriteLine("Operation start for: {0}", item);
Thread.Sleep(10);
Console.WriteLine("Operation done for: {0}", item);
}
}
// Produce işlemini yapan kısım
static void GenerateText()
{
for (int i = 0; i < 300; i++)
{
// Eğer BlockkingCollection sınırıra ulaşmıs ise Add methodu tamamlanmaz ve koleksiyon üzerinden bir veri eksilmesini bekler.
// Bu yüzden Add methodunu MainThread üzerinde kullanmak uygulanmanın kilitlenmesine sebep olur.
blockingCollection.Add($"Item-{i}");
Console.WriteLine($"Added Item-{i}");
}
Console.WriteLine("All items added");
// İşimiz bittiğinde Consumer'ın yeni eleman beklememesi için cancel ediyoruz.
cancellationTokenSource.Cancel();
}
static void Main(string[] args)
{
var concurentBag = new ConcurrentBag<string>();
blockingCollection = new BlockingCollection<string>(concurentBag, 5);
cancellationTokenSource = new CancellationTokenSource();
var task = Task.Run(() =>
{
var producer = Task.Run(GenerateText);
var consumer = Task.Run(TextOperation);
try
{
Task.WaitAll(new[] { producer, consumer });
}
catch (AggregateException ex)
{
// Cancellation implementasyonu işlem iptal edildiğinde exception fırlatır, bu exception Wait/WaitAll methodlarında ana thread e yansır.
// Bunu engellemek için OperationCanceledException larını handle edildiği olarak işaretliyoruz.
ex.Handle((e) =>
{
return e is OperationCanceledException;
});
}
Console.WriteLine("Task wait done");
});
task.Wait();
Console.WriteLine("Done..");
Console.ReadKey();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment