In this post I’m going to explore Simple Workflow Service (SWF) available in AWS.

To understand what SWF is good for, we need to first understand what a workflow is. Wikipedia defines it as follows:

A workflow consists of an orchestrated and repeatable pattern of activity, enabled by the systematic organization of resources into processes that transform materials, provide services, or process information. It can be depicted as a sequence of operations, the work of a person or group, the work of an organization of staff, or one or more simple or complex mechanisms.

In computer systems we care about the part about processing information. Some things that could be modeled as workflows:

  • Deployment pipeline: We could receive some code as input and then build it in a worker machine. We can run tests in parallel in different machines. If all tests pass we can deploy the binaries to another set of machines.
  • Coordinate shipments: A user buys a product on an online store and the order is placed on a system. A human monitors this system and takes care of finding the products in a warehouse and shipping them to the correct address. When the shipment is made, the information is entered in a system. The workflow notices this information an e-mails the user the shipping details.
  • Asynchronous image processing: A system uploads files to a system for processing (let’s say, create thumbnails). A workflow uses multiple workers to execute the task. If any of the machines fails while processing a set of files, they same work can be taken over by another worker.

Those are some high level examples. In this post I’m going to go over one example in more detail.

Components

Before we start building a workflow, let’s learn a little about the components of an SWF:

  • Workflow: A set of activities, and some logic that defines how these work together to achieve some objective
  • Domain: A workflow lives in a domain. A Domain can contain multiple workflows. Workflows in different domains can’t interact
  • Execution: An instance of the workflow with its associated state
  • Event: Represents a change on the state of an execution
  • Starter: A program, or person that starts and execution
  • Activity: A type of task that needs to be performed, such as: resizing images, running tests, etc
  • Task: An invocation of an activity
  • Worker: Program that performs tasks
  • Decider: Program that defines the logic for the workflow

Machine repair workflow

To help us get familiar with SWF, we are going to create a workflow to model the process for fixing a broken machine in a fleet. It will look something like this:

Fixing broken machine workflow

This workflow can be used in a datacenter that runs a lot of machines. We can have the workflow probe machines to see if they are working well. If it notices something wrong, it sets the machine state as maintenance in a database. If it doesn’t find anything wrong, it finishes the execution.

Once a machine is drained we’ll do two things. We’ll have a person take a look at the machine and fix it and we’ll take the oportunity to reimage the machine so we have clean machine when it comes back.

Once the repair and the reimage are done, we can set the state back to available and finish the execution.

Getting ready

For our activities and the decider, we are going to need the AWS SDK. In this section I’m going to show how to get it ready.

We’ll use Ruby for our examples, since it’s easy to run and it’s very well supported. The latest version of the Ruby SDK at the time of this writing is version 3. We’ll create a Gemfile with dependencies:

1
2
3
source 'https://rubygems.org'

gem 'aws-sdk-swf', '~> 1'

We can then install them using:

1
bundle install

The AWS SDK will need to communicate with AWS, so we’ll need some credentials, these credentials can be set in environment variables like this:

1
2
export AWS_ACCESS_KEY_ID=<your key id>
export AWS_SECRET_ACCESS_KEY=<your secret>

Activities

Each of the boxes in the diagram above is an activity. Activities can be pretty self contained, so we’ll start building those.

An activity works by polling the workflow for pending tasks. If it finds that there is a task it can perform, it does so, and returns a result back. Polling then continues until there is more work to do.

Our activities will share some code with the decider, so let’s create a base class that will be shared between them (workflow_base.rb):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
require 'aws-sdk-swf'

class WorkflowBase
  DOMAIN_NAME = 'datacenter-domain'
  REGION = 'ap-southeast-2'
  TASK_LIST_NAME = 'repairs-workflow-task-list'
  VERSION = '14'

  def initialize
    @swf = Aws::SWF::Client.new(region: REGION)
    register_domain(REGION, DOMAIN_NAME)
  end

  # Register a domain for our workflow (if it doesn't already exist)
  def register_domain(region, domain_name)
    swf = Aws::SWF::Client.new(region: region)
    begin
      swf.register_domain({
        name: domain_name,
        workflow_execution_retention_period_in_days: '3'
      })
      puts "Domain #{domain_name} registered"
    rescue Aws::SWF::Errors::DomainAlreadyExistsFault
      puts "Domain #{domain_name} already exists"
    end
  end
end

This class defines some constants that are shared between the decider and activities. It also initializes the SWF client and register the domain if it doesn’t yet exist.

Because of the way SWF works, it is best if the code for all our activities is handled by a single program. This program will poll for any new tasks in the domain. Every time it sees a task it will execute it and send the result back to SWF. Because each task is blocking, we could spin many copies of this program to allow tasks to be executed in parallel if we wanted to.

The activities handler (activities.rb):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
require 'aws-sdk-swf'
require_relative 'workflow_base.rb'

class Activities < WorkflowBase
  ACTIVITIES = [
    'probe_machines',
    'drain_machine',
    'fix_machine',
    'reimage_machine',
    'enable_machine'
  ]

  def initialize
    super()
    register_activities
    poll
  end

  # Register the activities with the domain
  def register_activities()
    ACTIVITIES.each do |activity|
      begin
        @swf.register_activity_type({
          domain: DOMAIN_NAME,
          name: activity,
          version: VERSION,
          # Maximum time it can take to process an activity
          default_task_start_to_close_timeout: '60'
        })
        puts "Activity #{activity} registered"
      rescue Aws::SWF::Errors::TypeAlreadyExistsFault
        puts "Activity #{activity} already exists"
      end
    end
  end

  # Poll the domain for tasks for this activity
  def poll
    while true
      options = {
        domain: DOMAIN_NAME,
        task_list: {
          name: TASK_LIST_NAME
        }
      }
      task = @swf.poll_for_activity_task(options)

      if task.task_token == nil
        puts 'Polling expired for activities expired. Trying again'
        next
      end

      if !ACTIVITIES.include?(task.activity_id)
        raise "Activity #{task.activity_id} unknown"
      end

      # If execute is successfull, it will return the result, otherwise ti will
      # throw
      puts "Executing #{task.activity_id}"
      begin
        # Call the method for the activity
        result = send(task.activity_id, task, task.input)

        puts "Completing #{task.activity_id}"
        @swf.respond_activity_task_completed({
          task_token: task.task_token,
          # SWF doesn't provide a way to know which activity this result
          # belongs to, so we'll prepend the result with it
          result: "#{task.activity_id}:#{result}"
        })
      rescue => e
        puts e
        puts "Failing #{task.activity_id}"
        @swf.respond_activity_task_failed({
          task_token: task.task_token,
          reason: @failure
        })
      end
    end
  end

  def probe_machines(task, input)
    # Because this is just an example and I don't actually have machines to test,
    # I'm going to use some mock data
    machines = [
      'machine-A459Z',
      'machine-M3992',
      'machine-A873R'
    ]

    machines.each do | machine |
      # A machine is bad, set it to maintenance
      if check_machine(machine) == 'FAIL'
        # In a real scenario we would update the database with the new state
        puts "Set machine #{machine} to maintenance"
        return machine
      end
    end

    puts 'No bad machines found'
    return ''
  end

  # Randomly decide if it's drained. In a real scenario we would communicate with
  # the machine, or check a database
  def drain_machine(task, input)
    puts "Draining machine #{input}"
    random_number = rand(5)
    if random_number == 4
      puts 'Machine is drained'
      return
    end

    raise "#{input} is not drained"
  end

  # In real life we would check if a human has marked the task as fixed. In this
  # case, we'll just sleep and use a random number
  def fix_machine(task, input)
    puts "Check if machine #{input} is fixed"
    sleep(1)
    random_number = rand(2)
    if random_number == 1
      puts "Machine #{input} has been fixed"
      return
    end

    raise "#{input} is not fixed yet"
  end

  # In real life we would use something like chef to re-image the machine here
  # we'll just sleep and use a random number
  def reimage_machine(task, input)
    puts "Reimaging #{input}"
    sleep(1)
    random_number = rand(2)
    if random_number == 1
      puts "Machine has been reimaged"
      return
    end

    raise "#{input} reimage still in progress"
  end

  # Pretend we re-enabled the machine
  def enable_machine(task, input)
    puts "Marking #{input} as available"
  end

  # Returns a random state for a machine
  def check_machine(machine)
    random_number = rand(5)
    if random_number == 4
      return 'FAIL'
    end

    return 'SUCCESS'
  end
end

Activities.new

The most important part of the code above is the polling loop. The poll_for_activity_task method, will initiate a connection with SWF and will return an activity to execute if there is one available. If there are no activities to execute for 60 seconds, it will return an empty task_token. If that happens we just continue polling again.

The code to handle each of the activities is just dummy code to pretend we are doing some work.

Decider

The decider is the most complex part of the workflow. It takes care of making decisions on what needs to be done next. Because of the way we get data form SWF, making these decisions requires some grunt work over all the available events for the execution.

Let’s look at the code (repairs_workflow.rb):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
require 'aws-sdk-swf'
require_relative 'workflow_base.rb'

class RepairsWorkflow < WorkflowBase
  WORKFLOW_NAME = 'repairs-workflow'

  def initialize
    super
    register_workflow
    poll
  end

  def register_workflow
    begin
      @swf.register_workflow_type({
        domain: DOMAIN_NAME,
        name: WORKFLOW_NAME,
        version: VERSION,
        # Maximum time the decider can take to make a decision about this task
        default_task_start_to_close_timeout: '15'
      })
      puts "Workflow #{WORKFLOW_NAME} registered"
    rescue Aws::SWF::Errors::TypeAlreadyExistsFault
      puts "Workflow #{WORKFLOW_NAME} already exists"
    end
  end

  def poll
    while true
      options = {
        domain: DOMAIN_NAME,
        task_list: {
          name: TASK_LIST_NAME
        }
      }
      task = @swf.poll_for_decision_task(options)

      if task.task_token == nil
        puts "Polling expired for decision task. Trying again"
        next
      end

      process_events(task.workflow_execution.workflow_id, task.task_token, task.events)
    end
  end

  def process_events(workflow_id, token, events)
    # events contains all the event history for this workflow execution.
    # The lowest indexes [0] contains the oldest events. The first event is
    # always WorkflowExecutionStarted.
    # `events` can span multiple pages, in which case we would need to make
    # extra requests to get the other pages. We are not taking care of that
    # scenario in this example
    pending = 0
    in_progress = 0
    completed = 0
    results = {}
    events.each do | event |
      case event.event_type
        # This means at some point an activity was scheduled. It will either fail
        # or be grabbed by a worker
        when 'ActivityTaskScheduled'
          pending += 1
        # Activity couldn't be scheduled. This means something is wrong in the
        # workflow
        when 'ScheduleActivityTaskFailed'
          puts "There was a failure scheduling an activity"
        # A worker grabbed the task
        when 'ActivityTaskStarted'
          pending -= 1
          in_progress += 1
        # A task finished unsuccessfully
        when 'ActivityTaskFailed', 'ActivityTaskTimedOut'
          in_progress -= 1
        # A task completed successfully
        when 'ActivityTaskCompleted'
          if event.activity_task_completed_event_attributes
            parts = event.activity_task_completed_event_attributes.result.split(':')
            results[parts[0]] = parts[1]
          end
          completed += 1
          in_progress -= 1
      end
    end

    decide_task(workflow_id, token, completed, pending, in_progress, results)
  end

  # - If there are no `pending`, `in_progress` or `completed` activities, we
  #   need to schedule 'probe_machines'
  # - If 1 task has completed and the result includes a machine, we need to
  #   schedule 'drain_machine'. If there is no machine in the result, we can
  #   finish the workflow
  # - If 2 tasks have completed we need to schedule both 'fix_machine' and
  #   'reimage_machine'
  # - If 3 tasks have completed, it means that one of our parallel tasks has
  #   completed, but not the other. If it's still in progress, or pending we
  #   don't need to do anything. If that's not the case, we need to reschedule
  #   the missing task
  # - If 4 tasks have completed we need to schedule 'enable_machine'
  # - If 5 tasks have completed we need to can finish the workflow
  def decide_task(workflow_id, token, completed, pending, in_progress, results)
    puts "Completed: #{completed} Pending: #{pending} In progress: #{in_progress}"
    if completed == 0 && pending == 0 && in_progress == 0
      schedule_tasks(token, ['probe_machines'])
      return
    end

    if completed == 1 && pending == 0 && in_progress == 0
      if !results['probe_machines']
        # If there are no results, we can terminate the workflow
        end_execution(workflow_id)
      else
        schedule_tasks(token, ['drain_machine'], results['probe_machines'])
      end
      return
    end

    if completed == 2 && pending == 0 && in_progress == 0
      schedule_tasks(token, ['reimage_machine', 'fix_machine'], results['probe_machines'])
      return
    end

    if completed == 3 && pending == 0 && in_progress == 0
      # Figure out which one completed and schedule the other one
      if results.has_key?('fix_machine')
        schedule_tasks(token, ['reimage_machine'], results['probe_machines'])
      else
        schedule_tasks(token, ['fix_machine'], results['probe_machines'])
      end
      return
    end

    if completed == 4 && pending == 0 && in_progress == 0
      schedule_tasks(token, ['enable_machine'], results['probe_machines'])
      return
    end

    if completed == 5
      end_execution(workflow_id)
    end
  end

  def schedule_tasks(token, activities, input = nil)
    puts "Scheduling #{activities.join(',')}"
    decisions = []
    activities.each do |activity|
      decisions.push({
        decision_type: 'ScheduleActivityTask',
        schedule_activity_task_decision_attributes: {
          activity_type: {
            name: activity,
            version: VERSION
          },
          task_list: {
            name: TASK_LIST_NAME
          },
          input: input,
          activity_id: activity,
          # Time the workflow will wait for this task to be assigned to a worker
          schedule_to_start_timeout: '600',
          # Time the workflow will wait for this task to complete
          schedule_to_close_timeout: '3600',
          # Used for heartbeats. This example doesn't use them, so just setting
          # to the same value as schedule_to_close_timeout
          heartbeat_timeout: '3600'
        }
      })
    end
    @swf.respond_decision_task_completed({
      task_token: token,
      decisions: decisions
    })
  end

  def end_execution(id)
    puts "Terminating workflow"
    @swf.terminate_workflow_execution({
      domain: DOMAIN_NAME,
      workflow_id: id
    })
  end
end

RepairsWorkflow.new

I added comments explaining how we process the events and how we decide which activity needs to be executed next. process_events Looks through and figures out how many events have been completed and how many are in progress. decide_task schedules the next task based on the data from process_events.

Starter

The last thing we need for our workflow is the something to trigger an execution. We’ll have a little program do this (execution_starter.rb):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
require_relative 'workflow_base.rb'

class ExecutionStarter < WorkflowBase
  WORKFLOW_NAME = 'repairs-workflow'

  def initialize
    super
    @swf.start_workflow_execution({
      domain: DOMAIN_NAME,
      workflow_id: SecureRandom.uuid,
      workflow_type: {
        name: WORKFLOW_NAME,
        version: VERSION
      },
      task_list: {
        name: TASK_LIST_NAME
      },
      execution_start_to_close_timeout: '36000',
      child_policy: 'TERMINATE'
    })
  end
end

ExecutionStarter.new

Testing the workflow

Now that we have the pieces ready, we need to open a terminal window with the activities:

1
ruby activities.rb

Another window with the decider:

1
ruby repairs_workflow.rb

Then, we can start our workflow:

1
ruby execution_started.rb

Conclusion

Using SWF turned out to be a lot more complicated than I expected. Having to parse all to events to figure out which activity goes next, seems error prone and makes the code confusing. The documentation is also not very clear on how this should be done, so hopefully this example helps people interested.

[ architecture  automation  aws  productivity  ]
Instrumenting an Istio Cluster With Jaeger Tracing
Introduction to Jaeger Tracing
Kubernetes ConfigMaps
Kubernetes networking with Istio
Managing Kubernetes Objects With Yaml Configurations