36

I have a use case where I have a AWS Step function that is triggered when a file is uploaded to S3, from there the first step runs an ffprobe to get the duration of the file from an external service such as transloadit where the output is written back to S3.

I can create a new step function from that event, but I was wandering if it is possible to have an Await promise inside the original step function and then continue to the next - taking into account that it could take longer for the ffprobe to comeback.

Any advice is much appreciated on how to tackle this.

khinester
  • 3,398
  • 9
  • 45
  • 88

7 Answers7

38

AWS Step Functions now supports asynchronous callbacks for long-running steps as first-class.

This is similar to @mixja's answer above but simplified. A single state in your workflow can directly invoke Lambda, SNS, SQS, or ECS and wait for a call to SendTaskSuccess.

There is a good example documented for SQS, where a step function sends a message and pauses workflow execution until something provides a callback. Lambda would be equivalent (assuming the main processing like transloadit happens outside the Lambda itself)

Your step function definition would look like

"Invoke transloadit": {
  "Type": "Task",
  "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
  "Parameters": {
    "FunctionName": "InvokeTransloadit",
    "Payload": {
        "some_other_param": "...",
        "token.$": "$$.Task.Token"
     }
  },
  "Next": "NEXT_STATE"
}

Then in your Lambda you would do something like

def lambda_handler(event, context):
    token = event['token']

    # invoke transloadit via SSM, ECS, passing token along

then in your main long-running process you would issue a callback with the token like aws stepfunctions send-task-success --task-token $token from shell script / CLI, or similar with API calls.

wrschneider
  • 17,913
  • 16
  • 96
  • 176
5

When you send the request to transloadit, save the taskToken for the step in s3 at a predictable key based on the uploaded file key. For example, if the media file is at 's3://my-media-bucket/foobar/media-001.mp3', you could make a JSON file containing the task token of the current step and store it with the same key in a different bucket, for example 's3://ffprobe-tasks/foobar/media-001.mp3.json'. At the end of your step that sends the media to transloadit do not call success or failure on the step -- leave it running.

Then when you get s3 notification that the transloadit result is ready, you can determine the s3 key to get the task token ('s3://ffprobe-tasks/foobar/media-001.json'), load the JSON (and delete it from s3) and send success for that task. The step function will continue to the next state in the execution.

ivo
  • 1,103
  • 10
  • 13
1

Can't propose simple solution, only few directions to explore.

First, Step Functions have a specific way to handle long running background work: activities. https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html it is basically a queue.

If you want 100% serverless, this is going to be complicated or ugly.

  • either, as you said, create new step function for each file
  • or, S3 poll loop in state machine using custom error code and Retry clause

If you can allocate "1/8 micro" instance for background worker it's not elegant but easy and can be implemented with instant reaction. Low hardware requirement hint that we're going to use machine only for synchronisation.

Define StepFunction activity, named for example video-duration. Define SQS queue for instant reaction or poll S3 for duration results.

State function pseudocode:

{
  StartAt: ffprobe
  ffprobe: {
    Type: Task
    Resource: arn:...lambda:launch-ffprobe
    Next: wait-duration
  }
  wait-duration: {
    Type: Task
    Resource: arn...activity:video-duration
    End: true
  }
}

Background worker pseudocode:

statemap = dict/map filename to result

thread1:
  loop:
    taskToken, input = SF.GetActivityTask('video-duration')  # long poll
    sync(key=input.filename, waiter=taskToken)
thread2:
  loop:
    msg = SQS.ReceiveMessage(...)  # or poll S3
    sync(key=msg.filename, duration=msg.result)

function sync(key, waiter, duration):
  state = statemap[key]
  if waiter:
    state.waiter = waiter
  if duration:
    state.duration = duration
  if state.waiter and state.duration:
    SF.SendTaskSuccess(state.waiter, state.duration)

S3 trigger pseudocode:

if filename is video:
  SF.StartExecution(...)
else if filename is duration:
  content = S3.GetObject(filename)
  SQS.SendMessage(queue, content)
temoto
  • 5,394
  • 3
  • 34
  • 50
1

You generally want to initiate the asynchronous task as a Step Function activity. The keyword here is initiate - in other words, once your activity has a pending action, that's when you trigger your asynchronous action. The reason for this is that you need the task token associated with the pending activity - then as long as your "future" can include this token somehow (e.g. you might set it as a reference or request ID), then you can "complete" the activity with either success or failure using the SendTaskSuccess or SendTaskFailure call.

There are two approaches to initiating the task:

  1. Poll for a new activity. You would setup a CloudWatch scheduled event to make the GetActivityTask call every n minutes.

  2. Fire a new "initiator" task in parallel to your activity within the step function. This initiator performs the same as #1 and makes the GetActivityTask call, the only difference is it is triggered immediately and doesn't need a polling mechanism. The GetActivityTask call blocks until a new activity task becomes available, so there are no issues with race conditions. Note there is a chance you may pick up an activity from another execution, so this initiator needs to only consider the input of the activity and not the input the initiator itself receives.

Here is what #2 looks like in a Step Function:

Initiating an activity

And basic code example associated with the InitiateManualApprovalActivity task:

import boto3
import time

client = boto3.client('stepfunctions')
activity = "arn:aws:states:us-east-1:123456789012:activity:ManualStep"

def lambda_handler(event, context):
    print(event)
    # This will block until an activity task becomes available
    task = client.get_activity_task(activityArn=activity, workerName="test")
    print(task)
    # Perform your task here
    # In this example we continue on in the same function,
    # but the continuation could be a separate event, 
    # just as long as you can retrieve the task token
    time.sleep(60)
    response = client.send_task_success(taskToken=task['taskToken'], output=task['input'])
    print(response)
    return "done"
mixja
  • 6,977
  • 3
  • 32
  • 34
1

If you know where transloadit will put the file in S3 once it is done you can poll S3 in a loop. To poll you can use HeadObject and then check the status code of the response.

Such a poll loop is described in one of the sample projects in the AWS Step Function documentation. Instead of using Lambdas for which you have to pay for execution you can directly request the S3 API as described here. Without Lambdas you only have to pay for state transitions in the Standard Workflow.

Diagram of state machine taken from documentation.

Tobske
  • 518
  • 3
  • 12
0

Well, I would inspire myself from https://aws.amazon.com/blogs/compute/implementing-serverless-manual-approval-steps-in-aws-step-functions-and-amazon-api-gateway/

You can replace the API Gateway in this by an AWS Lambda function, triggered by an S3 event for example (Documentation: http://docs.aws.amazon.com/lambda/latest/dg/with-s3.html). Just make sure your Task has an appropriate timeout.

ElFitz
  • 908
  • 1
  • 8
  • 26
0

i also go to this problem, when I tried to combined SFN to orchestrate AWS Batch jobs. the practices suggested above is problematic, since you should pass the taskToken, so you need, from a lambda inside the state-machine, to poll TaskToken from the queue, and pass it to S3 or somewhere, that another lambda will submit the activity status.

problem is: when you poll the taskToken, you can't know if it belong to your state-machine instance. you can get token on another instance of the same sate-machine instead. personally, I think it would be great if AWS will support this functionalty, which they easily can do...

RELW
  • 189
  • 1
  • 14