Created
December 3, 2013 23:51
-
-
Save sharwell/7779874 to your computer and use it in GitHub Desktop.
Cloud Queues example using multiple programming strategies
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace OpenStackNet.Testing.Unit.Providers.Rackspace | |
{ | |
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.VisualStudio.TestTools.UnitTesting; | |
using net.openstack.Core.Domain.Queues; | |
using net.openstack.Core.Providers; | |
using net.openstack.Core.Synchronous; | |
using Newtonsoft.Json; | |
using Path = System.IO.Path; | |
/// <summary> | |
/// This class demonstrates the creation, claiming, renewal, and removal of a message in Cloud Queues | |
/// for projects targeting the .NET Framework 4.0. | |
/// </summary> | |
public class SampleQueues | |
{ | |
/// <summary> | |
/// The prefix to use for names of queues created during integration testing. | |
/// </summary> | |
public static readonly string TestQueuePrefix = "UnitTestQueue-"; | |
public async Task TestQueueClaimsAsyncAwait(IQueueingService provider, CancellationToken cancellationToken) | |
{ | |
QueueName queueName = CreateRandomQueueName(); | |
await provider.CreateQueueAsync(queueName, cancellationToken); | |
await provider.PostMessagesAsync(queueName, cancellationToken, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(3, "yes"))); | |
QueueStatistics statistics; | |
using (Claim claim = await provider.ClaimMessageAsync(queueName, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1), cancellationToken)) | |
{ | |
Assert.AreEqual(TimeSpan.FromMinutes(5), claim.TimeToLive); | |
Assert.IsNotNull(claim.Messages); | |
Assert.AreEqual(1, claim.Messages.Count); | |
statistics = await provider.GetQueueStatisticsAsync(queueName, cancellationToken); | |
Assert.AreEqual(1, statistics.MessageStatistics.Claimed); | |
QueuedMessage message = await provider.GetMessageAsync(queueName, claim.Messages[0].Id, cancellationToken); | |
Assert.IsNotNull(message); | |
TimeSpan age = claim.Age; | |
await TaskEx.Delay(TimeSpan.FromSeconds(2)); | |
await claim.RefreshAsync(cancellationToken); | |
Assert.IsTrue(claim.Age >= age + TimeSpan.FromSeconds(2)); | |
await claim.RenewAsync(TimeSpan.FromMinutes(10), cancellationToken); | |
Assert.AreEqual(TimeSpan.FromMinutes(10), claim.TimeToLive); | |
} | |
statistics = await provider.GetQueueStatisticsAsync(queueName, cancellationToken); | |
Assert.AreEqual(0, statistics.MessageStatistics.Claimed); | |
await provider.DeleteQueueAsync(queueName, cancellationToken); | |
} | |
public Task TestQueueClaimsAsync(IQueueingService provider, CancellationToken cancellationToken) | |
{ | |
QueueName queueName = CreateRandomQueueName(); | |
Task result = provider.CreateQueueAsync(queueName, cancellationToken); | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
return provider.PostMessagesAsync(queueName, cancellationToken, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(3, "yes"))); | |
}).Unwrap(); | |
QueueStatistics statistics = null; | |
Claim claim = null; | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
return provider.ClaimMessageAsync(queueName, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1), cancellationToken); | |
}).Unwrap() | |
.ContinueWith( | |
task => | |
{ | |
claim = task.Result; | |
}, TaskContinuationOptions.ExecuteSynchronously); | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
Assert.AreEqual(TimeSpan.FromMinutes(5), claim.TimeToLive); | |
Assert.IsNotNull(claim.Messages); | |
Assert.AreEqual(1, claim.Messages.Count); | |
return provider.GetQueueStatisticsAsync(queueName, cancellationToken); | |
}).Unwrap() | |
.ContinueWith( | |
task => | |
{ | |
statistics = task.Result; | |
}, TaskContinuationOptions.ExecuteSynchronously); | |
QueuedMessage message = null; | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
Assert.AreEqual(1, statistics.MessageStatistics.Claimed); | |
return provider.GetMessageAsync(queueName, claim.Messages[0].Id, cancellationToken); | |
}).Unwrap() | |
.ContinueWith( | |
task => | |
{ | |
message = task.Result; | |
}, TaskContinuationOptions.ExecuteSynchronously); | |
TimeSpan age = default(TimeSpan); | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
Assert.IsNotNull(message); | |
age = claim.Age; | |
Thread.Sleep(TimeSpan.FromSeconds(2)); | |
}); | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
return claim.RefreshAsync(cancellationToken); | |
}).Unwrap(); | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
Assert.IsTrue(claim.Age >= age + TimeSpan.FromSeconds(2)); | |
return claim.RenewAsync(TimeSpan.FromMinutes(10), cancellationToken); | |
}).Unwrap(); | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
Assert.AreEqual(TimeSpan.FromMinutes(10), claim.TimeToLive); | |
return claim.DisposeAsync(cancellationToken); | |
}).Unwrap(); | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
return provider.GetQueueStatisticsAsync(queueName, cancellationToken); | |
}).Unwrap() | |
.ContinueWith( | |
task => | |
{ | |
statistics = task.Result; | |
}, TaskContinuationOptions.ExecuteSynchronously); | |
result = result.ContinueWith( | |
task => | |
{ | |
task.PropagateExceptions(); | |
Assert.AreEqual(0, statistics.MessageStatistics.Claimed); | |
return provider.DeleteQueueAsync(queueName, cancellationToken); | |
}).Unwrap(); | |
return result; | |
} | |
public void TestQueueClaimsManualSync(IQueueingService provider, CancellationToken cancellationToken) | |
{ | |
QueueName queueName = CreateRandomQueueName(); | |
provider.CreateQueueAsync(queueName, cancellationToken).Wait(); | |
cancellationToken.ThrowIfCancellationRequested(); | |
provider.PostMessagesAsync(queueName, cancellationToken, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(3, "yes"))).Wait(); | |
cancellationToken.ThrowIfCancellationRequested(); | |
QueueStatistics statistics; | |
using (Claim claim = provider.ClaimMessageAsync(queueName, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1), cancellationToken).Result) | |
{ | |
cancellationToken.ThrowIfCancellationRequested(); | |
Assert.AreEqual(TimeSpan.FromMinutes(5), claim.TimeToLive); | |
Assert.IsNotNull(claim.Messages); | |
Assert.AreEqual(1, claim.Messages.Count); | |
statistics = provider.GetQueueStatisticsAsync(queueName, cancellationToken).Result; | |
cancellationToken.ThrowIfCancellationRequested(); | |
Assert.AreEqual(1, statistics.MessageStatistics.Claimed); | |
QueuedMessage message = provider.GetMessageAsync(queueName, claim.Messages[0].Id, cancellationToken).Result; | |
cancellationToken.ThrowIfCancellationRequested(); | |
Assert.IsNotNull(message); | |
TimeSpan age = claim.Age; | |
Thread.Sleep(2); // you wouldn't want to actually do this, but it shows how the refresh and renew operations work! | |
claim.RefreshAsync(cancellationToken).Wait(); | |
Assert.IsTrue(claim.Age >= age + TimeSpan.FromSeconds(2)); | |
claim.RenewAsync(TimeSpan.FromMinutes(10), cancellationToken).Wait(); | |
Assert.AreEqual(TimeSpan.FromMinutes(10), claim.TimeToLive); | |
} | |
statistics = provider.GetQueueStatisticsAsync(queueName, cancellationToken).Result; | |
cancellationToken.ThrowIfCancellationRequested(); | |
Assert.AreEqual(0, statistics.MessageStatistics.Claimed); | |
provider.DeleteQueueAsync(queueName, cancellationToken).Wait(); | |
cancellationToken.ThrowIfCancellationRequested(); | |
} | |
public void SynchronousTestQueueClaims(IQueueingService provider) | |
{ | |
QueueName queueName = CreateRandomQueueName(); | |
provider.CreateQueue(queueName); | |
provider.PostMessages(queueName, new Message<SampleMetadata>(TimeSpan.FromSeconds(120), new SampleMetadata(3, "yes"))); | |
QueueStatistics statistics; | |
using (Claim claim = provider.ClaimMessage(queueName, null, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(1))) | |
{ | |
Assert.AreEqual(TimeSpan.FromMinutes(5), claim.TimeToLive); | |
Assert.IsNotNull(claim.Messages); | |
Assert.AreEqual(1, claim.Messages.Count); | |
statistics = provider.GetQueueStatistics(queueName); | |
Assert.AreEqual(1, statistics.MessageStatistics.Claimed); | |
QueuedMessage message = provider.GetMessage(queueName, claim.Messages[0].Id); | |
Assert.IsNotNull(message); | |
TimeSpan age = claim.Age; | |
Thread.Sleep(TimeSpan.FromSeconds(2)); | |
claim.Refresh(); | |
Assert.IsTrue(claim.Age >= age + TimeSpan.FromSeconds(2)); | |
claim.Renew(TimeSpan.FromMinutes(10)); | |
Assert.AreEqual(TimeSpan.FromMinutes(10), claim.TimeToLive); | |
} | |
statistics = provider.GetQueueStatistics(queueName); | |
Assert.AreEqual(0, statistics.MessageStatistics.Claimed); | |
provider.DeleteQueue(queueName); | |
} | |
[JsonObject(MemberSerialization.OptIn)] | |
private class SampleMetadata | |
{ | |
public SampleMetadata(int valueA, string valueB) | |
{ | |
ValueA = valueA; | |
ValueB = valueB; | |
} | |
[JsonProperty("valueA")] | |
public int ValueA | |
{ | |
get; | |
private set; | |
} | |
[JsonProperty("valueB")] | |
public string ValueB | |
{ | |
get; | |
private set; | |
} | |
} | |
/// <summary> | |
/// Creates a random queue name with the proper prefix for integration testing. | |
/// </summary> | |
/// <returns>A unique, randomly-generated queue name.</returns> | |
private QueueName CreateRandomQueueName() | |
{ | |
return new QueueName(TestQueuePrefix + Path.GetRandomFileName().Replace('.', '_')); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment