Created
July 16, 2018 03:29
-
-
Save b-anand/7aa82d43a6231567bf90fc52ed6d02d8 to your computer and use it in GitHub Desktop.
Async Reader Writer Spin Lock
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
/// <summary> | |
/// The async reader writer spin lock. | |
/// </summary> | |
public class AsyncReaderWriterSpinLock | |
{ | |
/// <summary> | |
/// The max writer count. | |
/// </summary> | |
public const ushort MaxWriterCount = ushort.MaxValue; | |
/// <summary> | |
/// The max reader count. | |
/// </summary> | |
public const ulong MaxReaderCount = 0x0000_7fff_ffff_ffffUL; | |
/// <summary> | |
/// The write lock delay in milliseconds. | |
/// </summary> | |
private const int WriteLockDelayInMilliseconds = 100; | |
/// <summary> | |
/// The read lock delay in milliseconds. | |
/// </summary> | |
private const int ReadLockDelayInMilliseconds = 10; | |
/// <summary> | |
/// The lock state. | |
/// </summary> | |
private long lockStateLongValue; | |
/// <summary> | |
/// The acquire write lock async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
public async Task<IDisposable> AcquireWriteLockAsync() | |
{ | |
// Impact to CPU consumption due to this yield is negligible but it is needed | |
// when there are too many writers and readers running in synchronous mode so that | |
// they yield the CPU for other tasks requesting read/write lock. | |
await Task.Yield(); | |
var writeLockManager = new WriteLockManager(this); | |
await writeLockManager.AcquireLockAsync(); | |
return writeLockManager; | |
} | |
/// <summary> | |
/// The acquire read lock async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
public async Task<IDisposable> AcquireReadLockAsync() | |
{ | |
// Impact to CPU consumption due to this yield is negligible but it is needed | |
// when there are too many writers and readers running in synchronous mode so that | |
// they yield the CPU for other tasks requesting read/write lock. | |
await Task.Yield(); | |
var readLockManager = new ReadLockManager(this); | |
await readLockManager.AcquireLockAsync(); | |
return readLockManager; | |
} | |
/// <summary> | |
/// The update lock state action async. | |
/// </summary> | |
/// <param name="asyncAction"> | |
/// The async action. | |
/// </param> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
private async Task UpdateLockStateActionAsync(Func<LockState, Task<(bool, LockState)>> asyncAction) | |
{ | |
while (true) | |
{ | |
var oldValue = Interlocked.Read(ref this.lockStateLongValue); | |
var (success, lockState) = await asyncAction(new LockState { LockStateValue = oldValue }); | |
if (!success) | |
{ | |
continue; | |
} | |
if (oldValue == Interlocked.CompareExchange(ref this.lockStateLongValue, lockState.LockStateValue, oldValue)) | |
{ | |
break; | |
} | |
} | |
} | |
/// <summary> | |
/// The lock state. | |
/// </summary> | |
private struct LockState | |
{ | |
/// <summary> | |
/// The lock state value. | |
/// </summary> | |
private ulong lockStateValue; | |
/// <summary> | |
/// Gets or sets the lock state value. | |
/// </summary> | |
public long LockStateValue | |
{ | |
get => (long)this.lockStateValue; | |
set => this.lockStateValue = (ulong)value; | |
} | |
/// <summary> | |
/// Gets or sets a value indicating whether write lock is acquired or requested. | |
/// </summary> | |
public bool WriteLock | |
{ | |
get => (this.lockStateValue & 0x8000_0000_0000_0000UL) == 0x8000_0000_0000_0000UL; | |
set | |
{ | |
if (value) | |
{ | |
this.lockStateValue |= 0x8000_0000_0000_0000UL; | |
} | |
else | |
{ | |
this.lockStateValue &= ~0x8000_0000_0000_0000UL; | |
} | |
} | |
} | |
/// <summary> | |
/// Gets or sets the count of writers. | |
/// </summary> | |
public ushort Writers | |
{ | |
get => (ushort)((this.lockStateValue >> 47) & 0xffffUL); | |
set | |
{ | |
this.lockStateValue &= ~(0xffffUL << 47); | |
this.lockStateValue |= (value & 0xffffUL) << 47; | |
} | |
} | |
/// <summary> | |
/// Gets or sets the count of readers. | |
/// </summary> | |
public ulong Readers | |
{ | |
get => this.lockStateValue & 0x0000_7fff_ffff_ffffUL; | |
set | |
{ | |
if (value > MaxReaderCount) | |
{ | |
throw new ArgumentOutOfRangeException($"MaxReaderCount limit reached."); | |
} | |
this.lockStateValue &= ~0x0000_7fff_ffff_ffffUL; | |
this.lockStateValue |= value & 0x0000_7fff_ffff_ffffUL; | |
} | |
} | |
} | |
/// <summary> | |
/// The write lock manager. | |
/// </summary> | |
private class WriteLockManager : IDisposable | |
{ | |
/// <summary> | |
/// The parent lock. | |
/// </summary> | |
private readonly AsyncReaderWriterSpinLock parentSpinLock; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="WriteLockManager"/> class. | |
/// </summary> | |
/// <param name="parentSpinLock"> | |
/// The parent lock. | |
/// </param> | |
public WriteLockManager(AsyncReaderWriterSpinLock parentSpinLock) | |
{ | |
this.parentSpinLock = parentSpinLock; | |
} | |
/// <summary> | |
/// The acquire lock async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
public async Task AcquireLockAsync() | |
{ | |
await this.IncrementWritersCountAsync(); | |
await this.AcquireWriteLockAsync(); | |
} | |
/// <inheritdoc /> | |
public void Dispose() | |
{ | |
this.ReleaseWriteLock(); | |
} | |
/// <summary> | |
/// The increment writers count async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
private async Task IncrementWritersCountAsync() | |
{ | |
await this.parentSpinLock.UpdateLockStateActionAsync( | |
async lockState => | |
{ | |
if (lockState.Writers == MaxWriterCount) | |
{ | |
await Task.Delay(TimeSpan.FromMilliseconds(WriteLockDelayInMilliseconds)); | |
return (false, lockState); | |
} | |
lockState.Writers = (ushort)(lockState.Writers + 1); | |
return (true, lockState); | |
}); | |
} | |
/// <summary> | |
/// The acquire write lock async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
private async Task AcquireWriteLockAsync() | |
{ | |
await this.parentSpinLock.UpdateLockStateActionAsync( | |
async lockState => | |
{ | |
if (lockState.WriteLock || lockState.Readers > 0) | |
{ | |
var delayTimeInMilliseconds = lockState.WriteLock ? WriteLockDelayInMilliseconds : ReadLockDelayInMilliseconds; | |
await Task.Delay(TimeSpan.FromMilliseconds(delayTimeInMilliseconds)); | |
return (false, lockState); | |
} | |
lockState.WriteLock = true; | |
return (true, lockState); | |
}); | |
} | |
/// <summary> | |
/// The release write lock. | |
/// </summary> | |
private void ReleaseWriteLock() | |
{ | |
this.parentSpinLock.UpdateLockStateActionAsync( | |
lockState => | |
{ | |
lockState.Writers = (ushort)(lockState.Writers - 1); | |
lockState.WriteLock = false; | |
return Task.FromResult((true, lockState)); | |
}).GetAwaiter().GetResult(); | |
} | |
} | |
/// <summary> | |
/// The read lock manager. | |
/// </summary> | |
private class ReadLockManager : IDisposable | |
{ | |
/// <summary> | |
/// The parent lock. | |
/// </summary> | |
private readonly AsyncReaderWriterSpinLock parentSpinLock; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="ReadLockManager"/> class. | |
/// </summary> | |
/// <param name="parentSpinLock"> | |
/// The parent lock. | |
/// </param> | |
public ReadLockManager(AsyncReaderWriterSpinLock parentSpinLock) | |
{ | |
this.parentSpinLock = parentSpinLock; | |
} | |
/// <summary> | |
/// The acquire lock async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
public async Task AcquireLockAsync() | |
{ | |
await this.IncrementReadersCountAsync(); | |
} | |
/// <inheritdoc /> | |
public void Dispose() | |
{ | |
this.DecrementReadersCount(); | |
} | |
/// <summary> | |
/// The increment readers count async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
private async Task IncrementReadersCountAsync() | |
{ | |
await this.parentSpinLock.UpdateLockStateActionAsync( | |
async lockState => | |
{ | |
if (lockState.Writers > 0 || lockState.Readers == MaxReaderCount) | |
{ | |
var delayTimeInMilliseconds = lockState.Writers > 0 ? WriteLockDelayInMilliseconds : ReadLockDelayInMilliseconds; | |
await Task.Delay(TimeSpan.FromMilliseconds(delayTimeInMilliseconds)); | |
return (false, lockState); | |
} | |
lockState.Readers = (ushort)(lockState.Readers + 1); | |
return (true, lockState); | |
}); | |
} | |
/// <summary> | |
/// The release write lock. | |
/// </summary> | |
private void DecrementReadersCount() | |
{ | |
this.parentSpinLock.UpdateLockStateActionAsync( | |
lockState => | |
{ | |
lockState.Readers = lockState.Readers - 1; | |
return Task.FromResult((true, lockState)); | |
}).GetAwaiter().GetResult(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using FluentAssertions; | |
using Microsoft.IntelligencePlatform.TableStore.Utilities.Task; | |
using Microsoft.VisualStudio.TestTools.UnitTesting; | |
/// <summary> | |
/// The async reader writer spin lock tests. | |
/// </summary> | |
[TestClass] | |
public class AsyncReaderWriterSpinLockTests | |
{ | |
/// <summary> | |
/// The multiple readers tests async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
[TestMethod] | |
public async Task MultipleReadersTestsAsync() | |
{ | |
long counter = 0; | |
var rwLock = new AsyncReaderWriterSpinLock(); | |
var tasks = new List<Task>(); | |
for (int i = 0; i < 5; i++) | |
{ | |
tasks.Add(ReadAsync()); | |
} | |
await Task.WhenAll(tasks); | |
counter.Should().Be(5); | |
async Task ReadAsync() | |
{ | |
using (await rwLock.AcquireReadLockAsync()) | |
{ | |
Interlocked.Increment(ref counter); | |
while (true) | |
{ | |
var localCounter = Interlocked.Read(ref counter); | |
if (localCounter == 5) | |
{ | |
break; | |
} | |
await Task.Delay(1); | |
} | |
} | |
} | |
} | |
/// <summary> | |
/// The multiple writers tests async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
[TestMethod] | |
public async Task MultipleWritersTestsAsync() | |
{ | |
long counter = 0; | |
var rwLock = new AsyncReaderWriterSpinLock(); | |
var tasks = new List<Task>(); | |
for (int i = 0; i < 5; i++) | |
{ | |
tasks.Add(WriteAsync()); | |
} | |
await Task.WhenAll(tasks); | |
async Task WriteAsync() | |
{ | |
using (await rwLock.AcquireWriteLockAsync()) | |
{ | |
var localCounter = Interlocked.Increment(ref counter); | |
localCounter.Should().Be(1); | |
Interlocked.Decrement(ref counter); | |
} | |
} | |
} | |
/// <summary> | |
/// The reader writers tests async. | |
/// </summary> | |
/// <returns> | |
/// The <see cref="Task"/>. | |
/// </returns> | |
[TestMethod] | |
public async Task ReaderWriterTestsAsync() | |
{ | |
var values = new[] { 0, 0, 0, 0, 0 }; | |
var rwLock = new AsyncReaderWriterSpinLock(); | |
var tasks = new List<Task>(); | |
for (int i = 0; i < 5; i++) | |
{ | |
tasks.Add(ReadAsync()); | |
} | |
tasks.Add(WriteAsync()); | |
await Task.WhenAll(tasks); | |
foreach (var value in values) | |
{ | |
value.Should().Be(1); | |
} | |
async Task ReadAsync() | |
{ | |
var continueReading = true; | |
while (continueReading) | |
{ | |
using (await rwLock.AcquireReadLockAsync()) | |
{ | |
var localValues = values.ToList(); | |
var anyZero = localValues.Any(v => v == 0); | |
var anyOne = localValues.Any(v => v == 1); | |
if (anyZero && anyOne) | |
{ | |
throw new Exception($"Mix of 1 and 0 found."); | |
} | |
if (!anyZero && !anyOne) | |
{ | |
throw new Exception("Not reachable."); | |
} | |
if (anyOne && !anyZero) | |
{ | |
continueReading = false; | |
} | |
} | |
} | |
} | |
async Task WriteAsync() | |
{ | |
using (await rwLock.AcquireWriteLockAsync()) | |
{ | |
for (var i = 0; i < values.Length; i++) | |
{ | |
values[i] = 1; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment