Skip to content

Instantly share code, notes, and snippets.

@obiwanjacobi
Last active September 10, 2019 13:42
Show Gist options
  • Save obiwanjacobi/c95aa00809d2a1528f83cdf8db4b3119 to your computer and use it in GitHub Desktop.
Save obiwanjacobi/c95aa00809d2a1528f83cdf8db4b3119 to your computer and use it in GitHub Desktop.
Azure Queue Copy Function
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Admin.Storage.Queue.Functions
{
public class StorageQueue
{
public string QueueName { get; set; }
public string ConnectionString { get; set; }
}
public class QueueCopyParameters
{
public bool Peek { get; set; }
public StorageQueue SourceQueue { get; set; }
public StorageQueue TargetQueue { get; set; }
public int? MessageCount { get; set; }
}
public static class AdminQueueCopy
{
[FunctionName("Admin_QueueCopy")]
public static Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "queuecopy")]
QueueCopyParameters parameters)
{
#region Parameter checking
if (parameters.SourceQueue == null && parameters.TargetQueue == null && parameters.MessageCount == null)
{
throw new ArgumentException("Admin_QueueCopy did not receive any parameters. Is there an error in your Json?");
}
if (parameters.SourceQueue == null)
{
throw new ArgumentException("Admin_QueueCopy did not receive source queue configuration.");
}
if (parameters.TargetQueue == null)
{
throw new ArgumentException("Admin_QueueCopy did not receive target queue configuration.");
}
if (String.IsNullOrEmpty(parameters.SourceQueue.QueueName))
{
throw new ArgumentException("The source queue name was not specified.");
}
if (String.IsNullOrEmpty(parameters.SourceQueue.ConnectionString))
{
throw new ArgumentException("The source connection string was not specified.");
}
if (String.IsNullOrEmpty(parameters.TargetQueue.QueueName))
{
throw new ArgumentException("The target queue name was not specified.");
}
#endregion
return RunImpl(parameters);
}
public static async Task<IActionResult> RunImpl(QueueCopyParameters parameters)
{
// copy in same storage account if target is not specified
if (String.IsNullOrEmpty(parameters.TargetQueue.ConnectionString))
{
parameters.TargetQueue.ConnectionString = parameters.SourceQueue.ConnectionString;
}
var sourceQueue = OpenQueue(parameters.SourceQueue);
if (!await sourceQueue.ExistsAsync())
{
throw new InvalidOperationException($"The source queue '{parameters.SourceQueue.QueueName}' does not exist: '{parameters.SourceQueue.ConnectionString}'.");
}
var targetQueue = OpenQueue(parameters.TargetQueue);
if (!await targetQueue.ExistsAsync())
{
await targetQueue.CreateAsync();
}
var messageCount = parameters.MessageCount.GetValueOrDefault(1);
IEnumerable<CloudQueueMessage> messages;
if (parameters.Peek)
{
messages = await sourceQueue.PeekMessagesAsync(messageCount);
}
else
{
messages = await sourceQueue.GetMessagesAsync(messageCount);
}
foreach (var msg in messages)
{
await targetQueue.AddMessageAsync(msg);
if (!parameters.Peek)
{
await sourceQueue.DeleteMessageAsync(msg);
}
}
string actionTxt = parameters.Peek ? "Copied" : "Moved";
return new OkObjectResult($"{actionTxt} {messageCount} messages.");
}
private static CloudQueue OpenQueue(StorageQueue storageQueue)
{
var storageAccount = CloudStorageAccount.Parse(storageQueue.ConnectionString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference(storageQueue.QueueName);
return queue;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment