Skip to content

Instantly share code, notes, and snippets.

@DarranShepherd
Last active January 29, 2018 13:43
Show Gist options
  • Save DarranShepherd/1125d4083f74e636552dba5c44bcdf83 to your computer and use it in GitHub Desktop.
Save DarranShepherd/1125d4083f74e636552dba5c44bcdf83 to your computer and use it in GitHub Desktop.
namespace Foo
{
[TableName("sequences")]
public class SequenceTableEntity : TableEntity
{
public static string CreatePartitionKey() => "DefaultPartitionKey";
public static string CreateRowKey(string name) => name.ToLowerInvariant();
public SequenceTableEntity() { }
public SequenceTableEntity(string name) : base(CreatePartitionKey(), CreateRowKey(name)) { }
public long Value { get; set; }
}
public static class IncrementSequence
{
public class Request : IAsyncRequest<long>
{
public Request(string name)
{
if(string.IsNullOrEmpty(name)) throw new ArgumentNullException(nameof(name));
Name = name;
}
public string Name { get; }
}
public class Handler : IAsyncRequestHandler<Request, long>
{
private const int Conflict = 409;
private const int PreconditionFailed = 412;
private static readonly TimeSpan Expiry = TimeSpan.FromSeconds(10);
private readonly ITableFactory _tableFactory;
public Handler(ITableFactory tableFactory)
{
if (tableFactory == null) throw new ArgumentNullException(nameof(tableFactory));
_tableFactory = tableFactory;
}
public async Task<long> Handle(Request message)
{
var pk = SequenceTableEntity.CreatePartitionKey();
var rk = SequenceTableEntity.CreateRowKey(message.Name);
var range = new TableRange(pk, rk, rk);
var table = await _tableFactory.GetTableForUpdateAsync<SequenceTableEntity>(range, Expiry);
var sequence = await Policy
.Handle<StorageException>(e =>
e.RequestInformation.HttpStatusCode == Conflict
|| e.RequestInformation.HttpStatusCode == PreconditionFailed)
.WaitAndRetryAsync(new[]
{
TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(200),
TimeSpan.FromMilliseconds(400)
})
.ExecuteAsync(() => IncrementSequence(table, pk, rk));
return sequence.Value;
}
private static async Task<SequenceTableEntity> IncrementSequence(ITable<SequenceTableEntity> table, string pk, string rk)
{
var sequence = await table.RetrieveAsync(pk, rk);
// Don't use InsertOrReplace as it doesn't perform concurrency checks on ETag
// See https://azure.microsoft.com/en-gb/blog/managing-concurrency-in-microsoft-azure-storage-2/
if (sequence == null)
{
sequence = new SequenceTableEntity(rk) { Value = 1 };
await table.InsertAsync(sequence);
}
else
{
sequence.Value++;
await table.ReplaceAsync(sequence);
}
return sequence;
}
}
}
public class IncrementSequenceFacts
{
public class TheRequestCtor
{
[Fact]
public void ThrowsNullArgumentWhenNameIsNull()
{
Assert.Throws<ArgumentNullException>(() => new IncrementSequence.Request(null));
}
[Fact]
public void ThrowsNullArgumentWhenNameIsEmpty()
{
Assert.Throws<ArgumentNullException>(() => new IncrementSequence.Request(string.Empty));
}
}
public class TheHandlerCtor
{
[Fact]
public void ThrowsNullArgumentWhenTableFactoryIsNull()
{
Assert.Throws<ArgumentNullException>(() => new IncrementSequence.Handler(null));
}
}
public class TheHandleMethod
{
private const string SequenceName = "TestSequence";
private readonly Mock<ITable<SequenceTableEntity>> _tableMock;
private readonly IncrementSequence.Handler _sut;
public TheHandleMethod()
{
_tableMock = new Mock<ITable<SequenceTableEntity>>();
_sut = new IncrementSequence.Handler(Mock.Of<ITableFactory>(f => f.GetTableForUpdateAsync<SequenceTableEntity>(It.IsAny<TableRange>(), It.IsAny<TimeSpan>()) == Task.FromResult(_tableMock.Object)));
}
[Fact]
public async Task InsertsEntityForNewSequence()
{
SetupEntityRetrieval(null);
await _sut.Handle(new IncrementSequence.Request(SequenceName));
_tableMock.Verify(t => t.InsertAsync(EntityPredicate(1)));
}
[Fact]
public async Task IncrementsEntityForExistingSequence()
{
SetupEntityRetrieval(new SequenceTableEntity(SequenceName) { Value = 99 });
await _sut.Handle(new IncrementSequence.Request(SequenceName));
_tableMock.Verify(t => t.ReplaceAsync(EntityPredicate(100)));
}
[Fact]
public async Task IncrementsEntityForExistingSequenceWithRetries()
{
SetupThrowingEntityRetrieval(
3,
new StorageException(new RequestResult { HttpStatusCode = 412 }, "Exception", null),
new SequenceTableEntity(SequenceName) { Value = 99 });
await _sut.Handle(new IncrementSequence.Request(SequenceName));
_tableMock.Verify(t => t.ReplaceAsync(EntityPredicate(100)));
}
[Fact]
public async Task ThrowsExceptionAfter4Failures()
{
SetupThrowingEntityRetrieval(
5,
new StorageException(new RequestResult { HttpStatusCode = 412 }, "Exception", null),
new SequenceTableEntity(SequenceName) { Value = 99 });
await Assert.ThrowsAsync<StorageException>(() => _sut.Handle(new IncrementSequence.Request(SequenceName)));
_tableMock.Verify(t => t.ReplaceAsync(EntityPredicate(100)), Times.Never);
}
private void SetupEntityRetrieval(SequenceTableEntity sequence) =>
_tableMock
.Setup(t => t.RetrieveAsync(SequenceTableEntity.CreatePartitionKey(), SequenceTableEntity.CreateRowKey(SequenceName)))
.ReturnsAsync(sequence);
private void SetupThrowingEntityRetrieval(int times, Exception e, SequenceTableEntity sequence)
{
var counter = 0;
_tableMock
.Setup(t => t.RetrieveAsync(SequenceTableEntity.CreatePartitionKey(), SequenceTableEntity.CreateRowKey(SequenceName)))
.Callback(() =>
{
if (++counter < times)
{
throw e;
}
})
.ReturnsAsync(sequence);
}
private SequenceTableEntity EntityPredicate(long value) =>
It.Is<SequenceTableEntity>(
e =>
e.RowKey == SequenceTableEntity.CreateRowKey(SequenceName)
&& e.Value == value);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment