4

I have developed an Azure Durable Functions app that triggers on new servicebus queue messages. It works ok when no errors occurs, but when an error occurs in an activity function, it logs that it fails but the message is gone forever from the queue. What could be causing that, and how do I prevent the message from disappearing from the queue on error?

Here is the reproducable code, it's the code generated from a new Azure Function template in VS2017, only an exception is added when the city is "Seattle", and it's a ServicebusTrigger instead of a HttpTrigger.

            [FunctionName("Test")]
    public static async Task<List<string>> RunOrchestrator(
        [OrchestrationTrigger] DurableOrchestrationContext context)
    {
        var outputs = new List<string>();

        // Replace "hello" with the name of your Durable Activity Function.
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Tokyo"));
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "Seattle"));
        outputs.Add(await context.CallActivityAsync<string>("Test_Hello", "London"));

        // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
        return outputs;
    }

    [FunctionName("Test_Hello")]
    public static string SayHello([ActivityTrigger] string name, ILogger log)
    {
        log.LogInformation($"Saying hello to {name}.");
        if (name == "Seattle")
            throw new Exception("An error occurs");
        return $"Hello {name}!";
    }

    [FunctionName("Test_HttpStart")]
    public static async Task ServiceBusStart(
        [ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
        [OrchestrationClient]DurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
        string instanceId = await starter.StartNewAsync("Test", msg);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    }

Update: When I have the exception in the Orchestration client function, it does the right thing like retrying and putting the message on the dead letter queue if retrying fails x times.

So I managed to work around this by updating the client function with this while loop, checking for failed/terminated/canceled status.

    [FunctionName("Test_HttpStart")]
    public static async Task ServiceBusStart(
        [ServiceBusTrigger("somequeue", Connection = "ServiceBusQueueListenerConnectionString")]string queuemsg,
        [OrchestrationClient]DurableOrchestrationClient starter,
        ILogger log)
    {
        // Function input comes from the request content.
        var msg = JsonConvert.DeserializeObject<IncomingMessage>(queuemsg);
        string instanceId = await starter.StartNewAsync("Test", msg);
        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

        var status = await starter.GetStatusAsync(instanceId);

        while (status.RuntimeStatus != OrchestrationRuntimeStatus.Completed)
        {
            System.Threading.Thread.Sleep(1000);
            status = await starter.GetStatusAsync(instanceId);
            if (status.RuntimeStatus == OrchestrationRuntimeStatus.Failed 
                || status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
                || status.RuntimeStatus == OrchestrationRuntimeStatus.Canceled)
            {
                throw new Exception("Orchestration failed with error: " + status.Output);
            }
        }

    }

However it seems like a hack to me, and I have not seen this type of code in any MS example code. I guess this should be taken care of by the durable functions framework. Is there another way to make the servicebus trigger work in durable functions?

rayh
  • 103
  • 1
  • 8

2 Answers2

3

This behavior is by-design. Starting an orchestration is asynchronous - i.e. the StartNewAsync API will not automatically wait for the orchestration to run or complete. Internally, StartNewAsync just drops a message into an Azure Storage queue and writes an entry into an Azure Storage table. If that happens successfully, then your Service Bus function will continue running and complete successfully, at which point the message will be deleted.

Your workaround is acceptable if you truly need the Service Bus queue message to retry, but I question why you would need to do this. The orchestration itself can manage its own retries without relying on Service Bus. For example, you could use CallActivityWithRetryAsync to retry internally within the orchestration.

See the Error Handling topic of the Durable Functions documentation.

Chris Gillum
  • 14,526
  • 5
  • 48
  • 61
1

I know this is an old thread, but I wanted to share how I got this working with a ServiceBusTrigger and WaitForCompletionOrCreateCheckStatusResponseAsync.

[FunctionName(nameof(QueueTriggerFunction))]
public async Task QueueTriggerFunction(
    [ServiceBusTrigger("queue-name", Connection = "connectionstring-key")]string queueMessage,
    MessageReceiver messageReceiver,
    string lockToken,
    string messageId,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    //note: autocomplete is disabled
    try
    {
        //start durable function
        var instanceId = await starter.StartNewAsync(nameof(OrchestratorFunction), queueMessage);

        //get the payload (we want to use the status uri)
        var payload = starter.CreateHttpManagementPayload(instanceId);

        //instruct QueueTriggerFunction to wait for response
        await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(new HttpRequestMessage(HttpMethod.Get, payload.StatusQueryGetUri), instanceId);

        //response ready, get status
        var status = await starter.GetStatusAsync(instanceId);

        //act on status
        if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
        {
            //like completing the message
            await messageReceiver.CompleteAsync(lockToken);
            log.LogInformation($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} succeeded [MessageId={messageId}]");
        }
        else
        {
            //or deadletter the sob
            await messageReceiver.DeadLetterAsync(lockToken);
            log.LogError($"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: {nameof(OrchestratorFunction)} failed [MessageId={messageId}]");
        }
    }
    catch (Exception ex)
    {
        //not sure what went wrong, let the lock expire and try again (until max retry attempts is reached)
        log.LogError(ex, $"{nameof(Functions)}.{nameof(QueueTriggerFunction)}: handler failed [MessageId={messageId}]");
    }
}

The thing is, all examples on the internet are using an HttpTrigger and use the httprequest of that trigger to check for completion, but you dont have that with the ServiceBusTrigger. Moreover, I don't think thats correct and you should use the status uri from the payload call as I'm doing here with the instanceId of the orchestrator function.

RvanDalen
  • 1,155
  • 5
  • 10
  • 1
    This really helped me! Additional Considerations : Since the `WaitForCompletionOrCreateCheckStatusResponseAsync` has a timeout (default 10 sec) i added a check for Running / pending status at which you can either abandon the message to be picked up later (make your orchestartor actions Idempotent) or add a `Delay()` and recheck status – 147 Oct 21 '21 at 17:35