A Guide to Building Workflow Based Application with AWS SWF

Vinit Kumar

09 Jun 2017

A Guide to Building Workflow Based Application with AWS SWF

Business workflows are very common in applications and often play the most critical role. In this post, we will explore AWS SWF service to handle business workflows.

 

Before understanding the SWF, let’s be on the same page by understanding what I mean by a workflow. A Workflow is a sequence of activities which we perform to achieve a goal. Now, sequence of activities may be dynamic and decided based on some inputs, usually the output of previous activity or may be some external signal. We often represent a workflow using flow diagrams. For an example – An online taxi booking system might have a workflow for taxi booking as:

 

  1. Search for nearby taxis which matches taxi-type criteria of search.
  2. If the search gives the results, then:2.1. Send notification all taxi drivers about the order and wait for the fixed time period for confirmation.
    2.2. If the confirmation came in time, then it sends confirmation to the customer. Else after timeout send SMS to customer to retry after sometime.
  3. Else, it sends a message about non availability of the taxi and asks to retry after some time.

 

Of course, this is just the booking flow. But, in reality, this will be more complicated for complete order lifecycle. Here is the flowchart for this.

flowchart

You can easily observe the technical challenges involved in handling this simple workflow where we have to maintain the state at each steps to take next decision. Let’s see what AWS SWF provides to solve this.
AWS SWF is a reliable & scalable solution to run jobs that have parallel or sequential steps. It provides task coordination and state tracking and allows you to completely control the decision making and activity functioning.

 

Terminologies for SWF:

Following are the terminologies for SWF.

  1. Worker An application which perform some task. There are two type of workers in SWF, Decider & Activity workers. Decider Workers are responsible for performing decisions by taking the state history and returning next activity task to perform or completing the workflow execution. Decider corresponds to diamond box in flow-chart. Activity Workers are responsible for performing the actual task.
  2. Tasks: SWF interacts with workers by providing them some unit of work called task. It can be an activity task which needs to be performed by Activity Worker or decision task which needs to be performed by a decider or a lambda task, which is a special activity task that can be executed using AWS lambda function.
  3. WorkflowType: Every workflow in SWF needs to be registered by providing name and version. This is just to identify a workflow. e.g. for above discussed taxi booking system, we can have a workflow type as ‘TaxiBookingWorkflow’
  4. Domain: Provides a way to scope AWS resource within AWS account. All tasks and workflow needs to be associated with a domain.
  5. Workflow Starter: An application which kicks of the workflow execution. In our taxi booking app, a backend API handler for booking request could be the workflow starter.

 

Brief on how SWF based application works

  1. Create a domain in SWF and workflow in SWF. And then register activity tasks in the workflow.
  2. Start workflow execution.
  3. Next, initiate the decider worker which will Poll for decision tasks and find the next step to do.
  4. Start Activity workers which will poll for activity task and perform required task.

 

Here is a simple diagram explaining how SWF works.

SWFApplicationMode

So, for our above discussed taxi booking system we will,

  1. Register a domain(a name to scope all the related SWF entities), workflow-type(just an identifier to booking workflow) and activities with SWF.
  2. Create decider – python program which keeps polling decider queue and output next step.
  3. Create Activity worker – python program which keeps polling activity queue and executes task.

 

Let’s see some code in action. For Python, there is no flow framework. So, we will use low level boto3 API’s.

# swf.py
import boto3
DOMAIN = 'myGola'
DOMAIN_DESCRIPTION = 'Domain for My Taxi Booking Flow'
WORKFLOW = 'TaxiBookingFlow'
WORKFLOW_VERSION = '1.0'
# Period in string till then we need to keep the workflow execution
# history in domain. If 0 / None, then it will not retain. Max is 90 days.
WORKFLOW_RETAINTION_PERIOD = '3'
WORKFLOW_DESCRIPTION = 'Taxi Booking Workflow'
TASKLIST = 'TaxiBookingTaskList'
ACTIVITIES = ['findNearByTaxi', 'sendOrderConfirmation', 'sendRetrySMS', 'multiCastBookingDetail']
ACTIVITY_VERSION = '1.0'
def swf_client():
    """ Get SFW client object
    """
    return boto3.client("swf")
def create_workflow():
    """ Create Workflow by registering domain, workflowType and activity tasks lists
    """
     try:
        # Get SWF client
        client = swf_client()
        # Register a new Domain for the workflow. Domain will be used
        # to scope workflow resources. A single domain can have multiple workflows
        # and generally have all the workflows related to a single project.
        client.register_domain(name=DOMAIN,
            description=DOMAIN_DESCRIPTION,
            workflowExecutionRetentionPeriodInDays=WORKFLOW_RETAINTION_PERIOD)
        # Register workflowType
        client.register_workflow_type(domain=DOMAIN,
            name=WORKFLOW,version=WORKFLOW_VERSION,
            description=WORKFLOW_DESCRIPTION)
        # Register new ActivityType
        for activity in ACTIVITIES:
            session.register_activity_type(
                domain=DOMAIN, name=activity,
                version=TASK_VERSION,
                # default Max duration that a worker can take to process task
                # of this activity type. "O" / "None" means unlimited duration
                defaultTaskStartToCloseTimeout="0",
                # default maximum time before which a worker processing a task
                # of this type must report progress by calling RecordActivityTaskHeartbeat
                defaultTaskHeartbeatTimeout=0,
                # the default maximum duration that a task of this activity type can wait
                # before being assigned to a worker
                defaultTaskScheduleToStartTimeout=0,
                # default maximum duration for a task of this activity type
                defaultTaskScheduleToCloseTimeout=0)
    except Exception as e:
        # Log exception
def send_signal_to_workflow(client, driver_id, booking_id, execution_id):
    """Send signal to execution_id informing acceptance of request
    """
    try:
        client.signal_workflow_execution(
            domain=DOMAIN,
            workflowId=booking_id,
            runId=execution_id,
            signalName="OrderAccepted",
            input=driver_id)
    except:
        # Handle Error
def start_workflow_execution(client, bookingDetail):
    """Start workflow execution for the given booking detail.
    We can have multiple executions for same workflow. In our case, at least
    one for every taxi booking. For starting a workflow execution, we need to
    provide a unique execution id. We will use booking id for this.
    """
    try:
        response = client.start_workflow_execution(domain=DOMAIN,
            workflowId=bookingDetail.bookingId,
            # Which workflow to start
            workflowType={
                "name": WORKFLOW,
                "version": WORKFLOW_VERSION
            },
            # What taskList to use. This will override the default task list
            # Specified in workflow registration.
            taskList={
                "name": TASKLIST
            },
            # Input as string for the workflow
            input=bookingId)
        return response
    except Exception as e:
        # Handle error
if __name__ == '__main__':
    client = swf_client()
    create_workflow(client)

# decider.py
from swf import swf_client
from swf import DOMAIN, TASKLIST, ACTIVITIES
def poll_for_decision_task(client):
    try:
        response = client.poll_for_decision_task(
            domain=DOMAIN,
            taskList={
                "name": TASKLIST
            },
            reverseOrder=true)
        return response
    except Exception as e:
        # Handle Error
def schedule_next_activity(client, task_token, activity_id, activity_name, input=""):
    client.respond_decision_task_completed(
        taskToken=task_token,
        decisions=[
            {
                'decisionType': 'ScheduleActivityTask',
                'scheduleActivityTaskDecisionAttributes': {
                    'activityType': {
                        'name': activity_name,
                        'version': '1.0'
                    },
                    'activityId': activity_id#'findNearByTaxi-{0}'.format(booking_id)
                }
            }])
def schedule_task_complete(client, task_token, result)
    client.respond_decision_task_complete(
        taskToken=task_token,
        decisions=[
            {
              'decisionType': 'CompleteWorkflowExecution',
              'completeWorkflowExecutionDecisionAttributes': {
                'result': result
              }
            }
        ])
def start_timer(client, taskId, timer, timerId):
    client.respond_decision_task_complete(
        taskToken=taskId,
        decisions=[
            {
              'decisionType': 'StartTimer',
              'startTimerDecisionAttributes': {
                'timerId': timerId,
                # Assume that taxi driver will be given 3 minutes to accept order
                # seconds to wait before firing the timeout signal
                'startToFireTimeout': timer
              }
            }
        ]
    )
def decider(client, response):
    """Decider which takes decision for the next step
    poll_for_decision_task make long HTTP connection
    for 60 sec, within this if there is no decision task,
    then this will return a response without taskToken.
    """
    if 'taskToken' not in response:
        print "Poll timed out without returning a task"
        return False
    # Response will contain list of history events. We will use the last
    #  event from event history. For every thing SWF has event in event history
    #  so for decision task scheduling also, event history will have decision task
    #  related start / stop events, so just ignore them.
    event_history = [event for event in response['events'] if not event['eventType'].startswith('Decision')]
    last_event = event_history[-1]
    last_event_type = last_event['eventType']
    if last_event_type == 'WorkflowExecutionStarted':
        booking_id = last_event['workflowExecutionStartedEventAttributes']['input']
        schedule_next_activity(response['taskToken'],'findNearByTaxi-{0}'.format(booking_id),'findNearByTaxi')
    # Driver has accepted the booking request and hence we have
      triggered a signal in this workflow.
     # We will treat this signal as success case and stop our timer
    elif last_activity_name == 'WorkflowExecutionSignaled':
        client.respond_decision_task_complete(
                taskToken=response['taskToken'],
                decisions=[
                    {
                      'decisionType': 'CancelTimer',
                      'startTimerDecisionAttributes': {
                        'timerId': 'TimerToWaitMultiCastBookingDetailSignal-{0}'.format(booking_id),
                      }
                    },
                    {
                      'decisionType': 'CompleteWorkflowExecution',
                      'completeWorkflowExecutionDecisionAttributes': {
                        'result': 'booking confirmed'
                      }
                    }
                ]
            )
    # Last scheduled activity has been completed
    elif last_event_type == 'ActivityTaskCompleted':
        completed_activity_id = last_event['activityTaskCompletedEventAttributes']['scheduledEventId'] - 1
        last_activity_data = response['events'][completed_activity_id]
        last_activity_attrs = activity_data['activityTaskScheduledEventAttributes']
        last_activity_name = activity_attrs['activityType']['name']
        last_activity_result = last_event['activityTaskCompletedEventAttributes'].get('result')
        next_activity = None
        if last_activity_name == "findNearByTaxi":
            next_activity = "sendRetrySMS" if last_activity_result is None or last_activity_result.length == 0 else "multiCastBookingDetail"
        elif last_activity_name == "multiCastBookingDetail":
            # Wait for external signal till some time
            start_timer(response['taskToken'], "240", 'TimerToWaitMultiCastBookingDetailSignal-{0}'.format(booking_id))
            return
        elif last_activity_name in ["sendOrderConfirmation", "sendRetrySMS"]:
            # We will mark the workflow as complete here.
            next_activity = None
        if next_activity is not None:
            schedule_next_activity(response['taskToken'],'{0}-{0}'.format(next_activity, booking_id),next_activity)
        else:
            schedule_task_complete(response['taskToken'], 'booking confirmed')
if __name__ == '__main__':
    client = swf_client()
    while True:
        try:
            response = poll_for_decision_task(client)
            decider(client, response)
        except ReadTimeout:
            pass

# worker
from swf import swf_client
from swf import DOMAIN, TASKLIST, ACTIVITIES
def poll_for_activity_task(client):
    try:
        response = client.poll_for_activity_task(
            domain=DOMAIN,
            taskList={
                "name": TASKLIST
            })
        return response
    except Exception as e:
        # Log Error
def find_nearby_taxi(booking_id):
    """Get details from the booking id and find list of
    taxis nearby the location of booking request.
    """
    # Run some logic to find details
    # Place the result some where in your cache
    # Return comma-separated driver_ids
    # NOTE: We can only return string in result
    pass
def send_order_confirmation(booking_id):
    """Send push notification and SMS to the user
    informing confirmation of booking with cab details
    """
    # Add logic to send message and notification
    pass
def send_retry_sms(booking_id):
    """Send message to client informing unavailability of taxi
    at this moment
    """
    # Add logic to send retrySMS
    pass
def multi_cast_booking_detail(booking_id):
    """Send notification for new booking to all taxi drivers
    """
    # Send booking details to all the drivers by using driver_ids
    pass
activities = {
    "findNearByTaxi": "find_nearby_taxi",
    "sendOrderConfirmation": "send_order_confirmation",
    "sendRetrySMS": "send_retry_sms",
    "multiCastBookingDetail": "multi_cast_booking_detail"
}
if __name__ == '__main__':
    client = swf_client()
    while True:
        try:
            response = poll_for_activity_task(client)
            if 'activityId' in response:
                result = response['activityType']['name'](response['input'])
                client.respond_activity_task_completed(
                    taskToken=response['taskToken'],
                    result=result)
        except ReadTimeout:
            pass

# views.py
from django.http import HttpResponse
from swf import swf_client, start_workflow_execution, send_signal_to_workflow
def new_booking(request):
    """Handle new booking request
    """
    # Save booking record in DB
    booking_detail = save_booking(request)
    # start workflow execution for this order
    client = swf_client()
    execution_id = start_workflow_execution(client, booking_detail)["runId"]
    # save this executionId for later use
    save_workflow_execution(booking_id, execution_id)
    return HttpResponse(booking_id)
def accept_order(request):
    """Handle Accept booking order request from the driver's
    """
    booking_id = get_booking_id(request)
    execution_id = get_execution_id(booking_id)
    # check if order is still open
    if is_booking_open(booking_id):
        # send signal to workflow execution informing
        client = swf_client()
        send_signal_to_workflow(booking_id, execution_id)
        return HttpResponse("Success")
    else:
        return HttpResponse("Failure", 404)

 

Summary

Of-course the code sample didn’t cover the edge cases and all flows like order cancellation from customer etc. But, hopefully this should have given some idea on for what & how to use AWS SWF. The code might look more verbose because of the uses of low level APIs which I think could be improved with some higher library/framework like flow(currently available only for Java & ruby). To learn more you can check these online references

 


Have a question?

Need Technology advice?

Connect

+1 669 253 9011

contact@hashedin.com

facebook twitter linkedIn youtube