16

I'm wondering if there is a tool or lib that can move messages between queues? Currently, i'm doing something like below

public static void ProcessQueueMessage([QueueTrigger("myqueue-poison")] string message, TextWriter log)
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference("myqueue");
    queue.CreateIfNotExists();

    var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
    queue.AddMessage(new CloudQueueMessage(messageData));
}
Mitch Wheat
  • 295,962
  • 43
  • 465
  • 541
vbn
  • 789
  • 1
  • 6
  • 16

9 Answers9

8

As at (2018-09-11) version 1.4.1 of the Microsoft Azure Storage Explorer doesn’t have the ability to move messages from one Azure queue to another.

I blogged a simple solution to transfer poison messages back to the originating queue and thought it might save someone a few minutes. Obviously, you'll need to have fixed the error that caused the messages to end up in the poison message queue!

You’ll need to add a NuGet package reference to Microsoft.NET.Sdk.Functions :

using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;

void Main()
{
    const string queuename = "MyQueueName";

    string storageAccountString = "xxxxxx";

    RetryPoisonMesssages(storageAccountString, queuename);
}

private static int RetryPoisonMesssages(string storageAccountString, string queuename)
{
    CloudQueue targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    CloudQueue poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

    int count = 0;
    while (true)
    {
        var msg = poisonqueue.GetMessage();
        if (msg == null)
            break;

        poisonqueue.DeleteMessage(msg);
        targetqueue.AddMessage(msg);
        count++;
    }

    return count;
}

private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference(queuename);

    return queue;
}
Mitch Wheat
  • 295,962
  • 43
  • 465
  • 541
  • 1
    Great, thanks. If you're running this from the console, adding [WindowsAzure.Storage](https://www.nuget.org/packages/WindowsAzure.Storage/) is enough. You'll need to make the calls `GetMessage`, `DeleteMessage` and `AddMessage` async though, as those non-async methods have been removed from the library. – Knelis Oct 10 '18 at 10:04
7

Azure Storage Explorer version 1.15.0 can now do this as of 2020. https://github.com/microsoft/AzureStorageExplorer/issues/1064

Tanya Branagan
  • 551
  • 1
  • 12
  • 21
5

Essentially Azure Storage doesn't support moving messages from one queue to another. You would need to do this on your own.

One way to implement moving the messages from one queue to another is by dequeuing the messages from the source queue (by calling GetMessages), read the contents of the message and then creating a new message in the target queue. This you can do via using Storage Client Library.

One tool that comes to my mind for moving messages is Cerebrata Azure Management Studio(paid product with 15 days free trial). It has this functionality.

As at (2018-09-11) version 1.4.1 of the Microsoft Azure Storage Explorer doesn't support moving queue messages.

Michael Freidgeim
  • 26,542
  • 16
  • 152
  • 170
Gaurav Mantri
  • 128,066
  • 12
  • 206
  • 241
4

Here's an updated version of Mitch's answer, using the latest Microsoft.Azure.Storage.Queue package. Simply create a new .NET Console application, add the above-mentioned package to it, and replace the contents of Program.cs with the following:

using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using System.Threading.Tasks;

namespace PoisonMessageDequeuer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            const string queuename = "MyQueueName";

            string storageAccountString = "xxx";

            await RetryPoisonMesssages(storageAccountString, queuename);
        }

        private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename)
        {
            var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
            var poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

            var count = 0;
            while (true)
            {
                var msg = await poisonqueue.GetMessageAsync();
                if (msg == null)
                    break;

                await poisonqueue.DeleteMessageAsync(msg);
                await targetqueue.AddMessageAsync(msg);
                
                count++;
            }

            return count;
        }

        private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
        {
            var storageAccount = CloudStorageAccount.Parse(storageAccountString);
            var queueClient = storageAccount.CreateCloudQueueClient();
            var queue = queueClient.GetQueueReference(queuename);

            return queue;
        }
    }
}

It's still pretty slow if you're working with >1000 messages though, so I'd recommend looking into batch APIs for higher quantities.

IGx89
  • 872
  • 7
  • 18
  • I would recommend reversing the order of DeleteMessageAsync and AddMessageAsync so that if something goes wrong you would get a duplicate message instead of a lost message? – Andreas Öhlund Oct 29 '19 at 20:43
  • Good suggestion! In my case the queue messages trigger non-critical emails so I'd rather a message get dropped than duplicated – IGx89 Oct 29 '19 at 22:01
  • Yea, you the use cases might vary but I’d argue that the safe default is to get a duplicate since in the cloud you pretty much have to make all your processing idempotent since the best guarantee you can get is ”at least once”. Perhaps a comment in the code to make people aware of the decision at least? – Andreas Öhlund Oct 30 '19 at 06:12
  • I would hope that people weren't blindly copying and running my code, it's just an example of how to achieve this using the latest APIs, but probably still wise to do I suppose. Done! – IGx89 Oct 31 '19 at 18:10
  • 2
    Just a note, This code is broken in the latest storage account version. The reason is that AddMessageAsync will overwrite some info on the message and then DeleteMessagAsync will give a 404. The better solution is to copy the values into a new message for AddMessageAsync – Mikael Eliasson Dec 13 '19 at 14:47
  • @MikaelEliasson, I've posted an updated function based on your suggestion [Azure: How to move messages from poison queue to back to main queue?](//stackoverflow.com/a/59796384) – Michael Freidgeim Jan 18 '20 at 00:16
  • I had to run this code again and ran into the issue myself. I've reverted the code back to my original answer, since that works (and works great for my use case). If people want it more robust they can use the code from Michael's answer. – IGx89 Jul 30 '20 at 18:14
1

Here's a python script you may find useful. You'll need to install azure-storage-queue

queueService = QueueService(connection_string = "YOUR CONNECTION STRING")
for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    while queueService.peek_messages(queue.name):
      for message in queueService.get_messages(queue.name, 32):
        print(".", end="", flush=True)
        queueService.put_message(targetQueueName, message.content)
        queueService.delete_message(queue.name, message.id, message.pop_receipt)
Jon Canning
  • 1,602
  • 16
  • 16
  • I am getting this error: Exception: [AttributeError] ("'QueueServiceClient' object has no attribute 'peek_messages'",) – Andrew Jun 26 '20 at 05:59
1

I just had to do this again and took the time to update my snipped to the new storage SDKs. See post at https://www.bokio.se/engineering-blog/how-to-re-run-the-poison-queue-in-azure-webjobs/ for more info.

Here is the code I used

using Azure.Storage.Queues;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AzureQueueTransfer
{
    internal class Program
    {
        // Need Read, Update & Process (full url, can create in storage explorer)
        private const string sourceQueueSAS = ""; 

        // Need Add (full url, can create in storage explorer)
        private const string targetQueueSAS = "";
        private static async Task Main(string[] args)
        {
            var sourceQueue = new QueueClient(new Uri(sourceQueueSAS));
            var targetQueue = new QueueClient(new Uri(targetQueueSAS));

            var queuedAny = true;
            while (queuedAny)
            {
                Thread.Sleep(30000); // Sleep to make sure we dont build too much backlog so we can process new messages on higher prio than old ones
                queuedAny = false;
                foreach (var message in sourceQueue.ReceiveMessages(maxMessages: 32).Value)
                {
                    queuedAny = true;
                    var res = await targetQueue.SendMessageAsync(message.Body);

                    Console.WriteLine($"Transfered: {message.MessageId}");
                    await sourceQueue.DeleteMessageAsync(message.MessageId, message.PopReceipt);
                }

                Console.WriteLine($"Finished batch");
            } 
        }
    }
}

Mikael Eliasson
  • 5,157
  • 23
  • 27
0

To anyone coming here looking for a Node equivalent of @MitchWheats answer using an Azure Function.

import AzureStorage from 'azure-storage'
import { Context, HttpRequest } from '@azure/functions'
import util from 'util'

const queueService = AzureStorage.createQueueService()
queueService.messageEncoder = new AzureStorage.QueueMessageEncoder.TextBase64QueueMessageEncoder()

const deleteMessage = util.promisify(queueService.deleteMessage).bind(queueService)
const createMessage = util.promisify(queueService.createMessage).bind(queueService)
const getMessage = util.promisify(queueService.getMessage).bind(queueService)

export async function run (context: Context, req: HttpRequest): Promise<void> {
  try {
    const poisonQueue = (req.query.queue || (req.body && req.body.queue));
    const targetQueue = poisonQueue.split('-')[0]

    let count = 0

    while (true) {
      const message = await getMessage(poisonQueue)
      if (!message) { break; }
      if (message.messageText && message.messageId && message.popReceipt) {
        await createMessage(targetQueue, message.messageText)
        await deleteMessage(poisonQueue, message.messageId, message.popReceipt)
      }
      count++
    }

    context.res = {
      body: `Replayed ${count} messages from ${poisonQueue} on ${targetQueue}`
    };
  } catch (e) {
    context.res = { status: 500 }
  }
}

To use the function you need to you provide connection information for the storage account used for your storage queues. This is provided as environment variables. Either you provide AZURE_STORAGE_ACCOUNT and AZURE_STORAGE_ACCESS_KEY, or AZURE_STORAGE_CONNECTION_STRING. More on this is available in the Azure Storage SDK docs.

Also wrote a few lines about it in this Medium article

Christofer Eliasson
  • 32,939
  • 7
  • 74
  • 103
0

As Mikael Eliasson noted, the code in IGx89 answer is broken because

AddMessageAsync will overwrite some info on the message and then DeleteMessagAsync will give a 404. The better solution is to copy the values into a new message for AddMessageAsync

Please see enhanced version of RetryPoisonMesssages with an ability to specify only list of messages(instead of all in a queue) and allow to copy messages instead of move them. It also logs success/failure for each message.

/// <param name="storageAccountString"></param>
/// <param name="queuename"></param>
/// <param name="idsToMove">If not null, only messages with listed IDs will be moved/copied</param>
/// <param name="deleteFromPoisonQueue">if false,  messages will be copied; if true, they will be moved
///Warning: if queue is big, keeping deleteFromPoisonQueue=false can cause the same row 
///from poisonqueue to be copied more than once(the reason is not found yet)</param>
/// <returns></returns>
private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename, string[] idsToMove=null, bool deleteFromPoisonQueue=false)
{
    var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    var poisonQueueName = queuename + "-poison";
    var poisonqueue = GetCloudQueueRef(storageAccountString, poisonQueueName);

    var count = 0;
    while (true)
    {
        var msg = await poisonqueue.GetMessageAsync();
        if (msg == null)
        {
            Console.WriteLine("No more messages in a queue " + poisonQueueName);
            break;
        }

        string action = "";
        try
        {
            if (idsToMove == null || idsToMove.Contains(msg.Id))
            {
                var msgToAdd = msg;
                if (deleteFromPoisonQueue)
                {
                    //The reason is that AddMessageAsync will overwrite some info on the message and then DeleteMessagAsync will give a 404.
                    //The better solution is to copy the values into a new message for AddMessageAsync 
                     msgToAdd = new CloudQueueMessage(msg.AsBytes);
                }

                action = "adding";
                await targetqueue.AddMessageAsync(msgToAdd);
                Console.WriteLine(action + " message ID " + msg.Id);
                if (deleteFromPoisonQueue)
                {
                    action = "deleting";
                    await poisonqueue.DeleteMessageAsync(msg);
                }
                Console.WriteLine(action + " message ID " + msg.Id);
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Error encountered when "+ action + " " + ex.Message + " at message ID " + msg.Id);
        }

        count++;
    }

    return count;
}
Michael Freidgeim
  • 26,542
  • 16
  • 152
  • 170
0

Updated python based on Jon Canning's answer:

from azure.storage.queue import QueueServiceClient


queueService = QueueServiceClient.from_connection_string(conn_str="DefaultEndpointsProtocol=https;AccountName=<account>;AccountKey=<key>;EndpointSuffix=core.windows.net")

for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    queue = queueService.get_queue_client(queue=queue.name)
    targetQueue = queueService.get_queue_client(queue=targetQueueName)
    while queue.peek_messages() :
        messages = queue.receive_messages()
        for msg in messages:
            targetQueue.send_message(msg.content)
            queue.delete_message(msg)