7

I have an existing Function App with 2 Functions and a storage queue. F1 is triggered by a message in a service bus topic. For each msg received, F1 calculates some sub-tasks (T1,T2,...) which have to be executed with varying amount of delay. Ex - T1 to be fired after 3 mins, T2 after 5min etc. F1 posts messages to a storage queue with appropriate visibility timeouts (to simulate the delay) and F2 is triggered whenever a message is visible in the queue. All works fine.

I now want to migrate this app to use 'Durable Functions'. F1 now only starts the orchestrator. The orchestrator code is something as follows -

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            var pnTask = context.CallActivityAsync("PerformSubTask", value);
            tasks.Add(pnTask);
        }

        //dont't await as we want to fire and forget. No fan-in!
        //await Task.WhenAll(tasks);
    }

    [FunctionName("PerformSubTask")]
    public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
    {
         TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
         TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
         var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

         //will still keep the activity function running and incur costs??
         await Task.Delay(actualDelay);
   
         //perform subtask work after delay! 
    }

I would only like to fan-out (no fan-in to collect the results) and start the subtasks. The orchestrator starts all the tasks and avoids call 'await Task.WhenAll'. The activity function calls 'Task.Delay' to wait for the specified amount of time and then does its work.

My questions

  • Does it make sense to use Durable Functions for this workflow?

  • Is this the right approach to orchestrate 'Fan-out' workflow?

  • I do not like the fact that the activity function is running for specified amount of time (3 or 5 mins) doing nothing. It will incurs costs,or?

  • Also if a delay of more than 10 minutes is required there is no way for an activity function to succeed with this approach!

  • My earlier attempt to avoid this was to use 'CreateTimer' in the orchestrator and then add the activity as a continuation, but I see only timer entries in the 'History' table. The continuation does not fire! Am I violating the constraint for orchestrator code - 'Orchestrator code must never initiate any async operation' ?

     foreach (var value in results)
     {
             //calculate time to start
             var timeToStart = ;
             var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
             tasks.Add(pnTask);
     }
    

UPDATE : using approach suggested by Chris

Activity that calculates subtasks and delays

    [FunctionName("CalculateTasks")]
    public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
    {
        //in reality time is obtained by calling an endpoint 
        DateTime currentTime = DateTime.UtcNow;
        return new List<TaskInfo> {
            new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
            new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
        };
    }

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        var currentTime = context.CurrentUtcDateTime;
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            TimeSpan timeDifference = currentTime - value.Origin;
            TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
            var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

            var timeToStart = currentTime.Add(actualDelay);

            Task delayedActivityCall = context
                 .CreateTimer(timeToStart, CancellationToken.None)
                 .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
            tasks.Add(delayedActivityCall);
        }

        await Task.WhenAll(tasks);
    }

Simply scheduling tasks from within the orchestrator seems to work.In my case I am calculating the tasks and the delays in another activity (CalculateTasks) before the loop. I want the delays to be calculated using the 'current time' when the activity was run. I am using DateTime.UtcNow in the activity. This somehow does not play well when used in the orchestrator. The activities specified by 'ContinueWith' just don't run and the orchestrator is always in 'Running' state.

Can I not use the time recorded by an activity from within the orchestrator?

UPDATE 2

So the workaround suggested by Chris works!

Since I do not want to collect the results of the activities I avoid calling 'await Tasks.WhenAll(tasks)' after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.

Am I right? Is there any way to free the orchestrator earlier i.e right after scheduling all activities?

alwayslearning
  • 4,493
  • 6
  • 35
  • 47

4 Answers4

6

The ContinueWith approach worked fine for me. I was able to simulate a version of your scenario using the following orchestrator code:

[FunctionName("Orchestrator")]
public static async Task Orchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    TraceWriter log)
{
    var tasks = new List<Task>(10);
    for (int i = 0; i < 10; i++)
    {
        int j = i;
        DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
        Task delayedActivityCall = context
            .CreateTimer(timeToStart, CancellationToken.None)
            .ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
        tasks.Add(delayedActivityCall);
    }

    await Task.WhenAll(tasks);
}

And for what it's worth, here is the activity function code.

[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
    log.Warning($"{DateTime.Now:o}: {j:00}");
}

From the log output, I saw that all activity invocations ran 10 seconds apart from each other.

Another approach would be to fan out to multiple sub-orchestrations (like @jeffhollan suggested) which are simple a short sequence of a durable timer delay and your activity call.

UPDATE I tried using your updated sample and was able to reproduce your problem! If you run locally in Visual Studio and configure the exception settings to always break on exceptions, then you should see the following:

System.InvalidOperationException: 'Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://learn.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.'

This means the thread which called context.CallActivityAsync("PerformSubtask", j) was not the same as the thread which called the orchestrator function. I don't know why my initial example didn't hit this, or why your version did. It has something to do with how the TPL decides which thread to use to run your ContinueWith delegate - something I need to look more into.

The good news is that there is a simple workaround, which is to specify TaskContinuationOptions.ExecuteSynchronously, like this:

Task delayedActivityCall = context
    .CreateTimer(timeToStart, CancellationToken.None)
    .ContinueWith(
        t => context.CallActivityAsync("PerformSubtask", j),
        TaskContinuationOptions.ExecuteSynchronously);

Please try that and let me know if that fixes the issue you're observing.

Ideally you wouldn't need to do this workaround when using Task.ContinueWith. I've opened an issue in GitHub to track this: https://github.com/Azure/azure-functions-durable-extension/issues/317

Since I do not want to collect the results of the activities I avoid calling await Tasks.WhenAll(tasks) after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.

This is expected. Orchestrator functions never actually complete until all outstanding durable tasks have completed. There isn't any way to work around this. Note that you can still start other orchestrator instances, there just might be some contention if they happen to land on the same partition (there are 4 partitions by default).

Chris Gillum
  • 14,526
  • 5
  • 48
  • 61
  • I will revisit this approach..don't know why it didn't work for me. Do I need to 'await Task.WhenAll(tasks)' if I am not interested in the results? Also, do sub-orchestrations use control-queues? If yes, wouldn't I have scaling issues with them. Max 16 partitions for a task hub? ( https://learn.microsoft.com/en-us/azure/azure-functions/durable-functions-perf-and-scale#orchestrator-scale-out). Also from a cost perspective, Durable Functions are probably more expensive,or (considering the amount of services needed)? – alwayslearning May 18 '18 at 13:01
  • Simply scheduling tasks from within the orchestrator seems to work.In my case I am calculating the tasks and the delays in another activity (CalculateTasks, see the code snippet in my question ) before the loop. I want the delays to be calculated using the 'current time' when the activity was run. I am using DateTime.UtcNow in the activity. This somehow does not play well when used in the orchestrator. The activities specified by 'ContinueWith' just dont run and the orchestrator is always in 'Running' state. Can I not use the time recorded in the activity in the orchestrator? – alwayslearning May 18 '18 at 15:40
  • See my update - I think I know what the issue is, and it's not necessarily related to where the time is being calculated. – Chris Gillum May 19 '18 at 21:34
  • The mystery continues :) I obviously had the 'Exceptions' enabled but just does not break on that exception. Your workaround works though, so I am happy :) . I have left out the 'await Task.WhenAll(tasks)'. Have updated the post with an additional question – alwayslearning May 22 '18 at 09:06
  • I added to my answer - basically leaving out await Task.WhenAll(tasks) doesn't really change anything - and this is by design. – Chris Gillum May 31 '18 at 23:08
  • Thanks.If I bump up to 16 partitions, am I right in assuming that at any given point in time there will be a maximum of 16 orchestrator instances running? If more than one belong belong to a partition they will queue up for allocation and will be allocated when the 'current' one awaits on an activity. Would be interesting to see/measure where scaling out starts getting under strain. – alwayslearning Jun 01 '18 at 11:53
  • You can have an unlimited number of orchestrations "running" regardless of your partition count. Partition count basically just determines the number of CPUs that are used to execute orchestrator functions. You can find more perf information in the official docs: https://learn.microsoft.com/en-us/azure/azure-functions/durable-functions-perf-and-scale – Chris Gillum Jun 01 '18 at 18:38
3

await Task.Delay is definitely not the best option: you will pay for this time while your function won't do anything useful. The max delay is also bound to 10 minutes on Consumption plan.

In my opinion, raw Queue messages are the best option for fire-and-forget scenarios. Set the proper visibility timeouts, and your scenario will work reliably and efficiently.

The killer feature of Durable Functions are awaits, which do their magic of pausing and resuming while keeping the scope. Thus, it's a great way to implement fan-in, but you don't need that.

Mikhail Shilkov
  • 34,128
  • 3
  • 68
  • 107
0

I think durable definitely makes sense for this workflow. I do think the best option would be to leverage the delay / timer feature as you said, but based on the synchronous nature of execution I don't think I would add everything to a task list which is really expecting a .WhenAll() or .WhenAny() which you aren't aiming for. I think I personally would just do a sequential foreach loop with timer delays for each task. So pseudocode of:

for(int x = 0; x < results.Length; x++) { await context.CreateTimer(TimeSpan.FromMinutes(1), ...); await context.CallActivityAsync("PerformTaskAsync", results[x]); }

You need those awaits in there regardless, so just avoiding the await Task.WhenAll(...) is likely causing some issues in code sample above. Hope that helps

jeffhollan
  • 3,139
  • 15
  • 18
  • the problem with the above awaits is that the activities would be scheduled synchronously. So if I had 2 activities (3 and 5 mins) for whom I want to start the timer asap,this will start the timer for the first, wait till its triggered, schedule and execute the first and then move to the next. I want to trigger all the timers asap and then just let them run to completion (execute after their timers run out) – alwayslearning May 17 '18 at 14:21
  • Ok so to clarify task T1 and Tn may execute in parallel, both with a 3 min delay? It’s not always scattering the executions to be 2 min from when last task finished? – jeffhollan May 17 '18 at 14:23
  • Ah I see - you also don’t want the task start time to be impacted by duration of execution. Yeah I’ll have to chew on this and get some other thoughts from folks like Chris Gillum – jeffhollan May 17 '18 at 14:25
  • For ex - T1 (2min), T2(4min), T3(5min).... Each task has an associated delay. I just need to start these timers and let the tasks execute after the timers. Yes they can run in parallel. Starting the timers one after the other would mean T2 would run after 6 mins instead of 4. – alwayslearning May 17 '18 at 14:27
  • 1
    Yep makes sense. Pingd team to see what durable could do in terms of scheduling activities – jeffhollan May 17 '18 at 15:25
  • 1
    My other thinking is if instead of the “task” being an activity, you call a sub-orchestration which is just a timer and then call the activity. Then you could do the “WhenAll” sub orchestrations have completed https://learn.microsoft.com/en-us/azure/azure-functions/durable-functions-sub-orchestrations – jeffhollan May 17 '18 at 15:28
  • Wouldn't the sub-orchestrations all be distributed on the control queues? i.e a max of 16 orchestrations in total (normal + sub)? I am expecting 1000+ events on F1 and probably 2 subtasks per invocation. – alwayslearning May 17 '18 at 15:31
0

You should be able to use the IDurableOrchestrationContext.StartNewOrchestration() method that's been added in 2019 to suport this scenario. See https://github.com/Azure/azure-functions-durable-extension/issues/715 for context

dvdvorle
  • 941
  • 1
  • 10
  • 25