Skip to content

Instantly share code, notes, and snippets.

@erdtsieck
Created November 4, 2016 10:19
Show Gist options
  • Save erdtsieck/03bf4020aabe24abeeb97e2edba665c1 to your computer and use it in GitHub Desktop.
Save erdtsieck/03bf4020aabe24abeeb97e2edba665c1 to your computer and use it in GitHub Desktop.
Attempt to prevent concurrency problems using Table Storage
using (var transaction = await TableStorageTransaction<IDomainEvent>.BeginTransaction(cloudBlob, command.AggregateRootId, eventsCollector))
{
var currentVersion = eventsQueryable.Where(e => e.PartitionKey = command.AggregateRootId).Max(e => e.Version);
if (expectedVersion != currentVersion)
{
throw new ConcurrencyException("expected version is not the same as current version");
}
var i = currentVersion;
foreach (var domainEvent in uncommittedEvents.OrderBy(e => e.Timestamp))
{
i++;
domainEvent.Version = i;
await eventsCollector.AddAsync(domainEvent);
}
await transaction.CommitAsync();
}
private class TableStorageTransaction<T> : IDisposable
{
private readonly ICloudBlob cloudBlob;
private readonly string transactionId;
private readonly IAsyncCollector<T> collector;
private TableStorageTransaction(ICloudBlob cloudBlob, string transactionId, IAsyncCollector<T> collector)
{
this.cloudBlob = cloudBlob;
this.transactionId = transactionId;
this.collector = collector;
}
public static async Task<TableStorageTransaction<T>> BeginTransaction(ICloudBlob cloudBlob, string transactionId, IAsyncCollector<T> collector)
{
var transaction = new TableStorageTransaction<T>(cloudBlob, transactionId, collector);
await cloudBlob.AcquireLeaseAsync(null, transactionId);
return transaction;
}
public async Task CommitAsync()
{
await collector.FlushAsync();
}
public void Dispose()
{
cloudBlob.ReleaseLease(new AccessCondition { LeaseId = transactionId });
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment