Last active
August 29, 2015 14:14
-
-
Save alfeg/f7262474abf59a3897f2 to your computer and use it in GitHub Desktop.
Atomic blocking queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.Isam.Esent.Collections.Generic; | |
namespace Esent | |
{ | |
class Storage | |
{ | |
static PersistentDictionary<string, Entity> dict = new PersistentDictionary<string, Entity>("data"); | |
public Entity GetById(string id) | |
{ | |
return dict[id]; | |
} | |
public void Store(string id, Entity entity) | |
{ | |
dict[id] = entity; | |
} | |
} | |
[Serializable] | |
public struct Entity | |
{ | |
public int Id { get; set; } | |
public int Counter { get; set; } | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var storage = new Storage(); | |
var taskCount = 1000; | |
var tasks = new Task[taskCount]; | |
var entity = new Entity(); | |
var entityId = "ID"; | |
storage.Store(entityId, entity); | |
using (var atomic = new Atomic()) | |
{ | |
for (int i = 0; i < taskCount; i++) | |
{ | |
tasks[i] = Task.Run(() => | |
{ | |
atomic.Execute(() => | |
{ | |
var storedEntity = storage.GetById(entityId); | |
storedEntity.Counter++; | |
storage.Store(entityId, storedEntity); | |
}); | |
}); | |
} | |
Task.WaitAll(tasks); | |
} | |
Console.WriteLine(storage.GetById(entityId).Counter); | |
} | |
} | |
public class Atomic : IDisposable | |
{ | |
private static BlockingCollection<Action> queue = new BlockingCollection<Action>(); | |
public Atomic() | |
{ | |
Task.Run(() => | |
{ | |
foreach (var action in queue.GetConsumingEnumerable()) | |
{ | |
action(); | |
} | |
}); | |
} | |
// blocking call, will continue as soon as action completed | |
public void Execute(Action action) | |
{ | |
var man = new ManualResetEventSlim(false); | |
queue.Add(() => | |
{ | |
action(); | |
man.Set(); | |
}); | |
man.Wait(); | |
} | |
public void Dispose() | |
{ | |
queue.CompleteAdding(); | |
} | |
} | |
public class AtomicHasher : IDisposable | |
{ | |
readonly private Dictionary<string, Atomic> atoms = new Dictionary<string, Atomic>(); | |
readonly private Func<string, string> hasher; | |
public AtomicHasher(Func<string, string> hasher = null) | |
{ | |
this.hasher = hasher ?? (id => | |
{ | |
return Math.Abs(id.GetHashCode() % Environment.ProcessorCount).ToString(); | |
}); | |
} | |
public void Execute(string id, Action action) | |
{ | |
var hash = hasher(id); | |
lock (atoms) | |
{ | |
if (!atoms.ContainsKey(hash)) | |
{ | |
atoms[hash] = new Atomic(hash); | |
} | |
} | |
atoms[hash].Execute(action); | |
} | |
public void Dispose() | |
{ | |
atoms.Clear(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment