開発日報

窓際エンジニアの開発備忘。日報は嘘です。

AWS StepFunctionsでECSタスクを起動してみる

仕事でバッチ処理とその周りのワークフロー制御をどうするか調べてみた結果、AWSStepFunction良いんじゃね?

ってなったので、調査検証メモ。

参考記事

AWS Step Functions とは

Step Functions で Amazon ECS または Fargate タスクを管理する

ContainerOverride

AWS Step Functions のよくある質問

AWS Step Functionsでバッチ処理を行う手段を考える

Manage a Container Task (Amazon ECS, Amazon SNS)

InputPath およびパラメータ

AWSCDK aws-stepfunctions module

Step Functionとは?

ワークフロー制御をやってくれるサービスです。 複数のジョブにまたがった処理の実行順序の制御や、リトライなどの制御をしてくれます。

私の関わるPJでは、AWS LambdaやECS Fargateの1関数・1タスクでは終わらないバッチ処理の制御をさせるために使用を検討しました。

↓のようなイメージです。

f:id:yuuu1993g:20200614115506p:plain

できること

バッチ処理用途に使う前提でざっくり書き出しました。

  • CloudWatch EventsをトリガーにしたStepFunctions起動(日時実行、AWSのリソース変化をトリガーにするなどができる。)。
  • StepFunctionsはあくまでワークフローの制御をするサービで、StepFunctionsから処理の実行環境を必要に応じてLambda・Fargate・AWSBatchなどから選ぶことができる。
  • ワークフローの視覚化。上の画像の通り。
  • ワークフローの各ステップ毎に並列実行可能
  • ECS Task(lambda awsbatchなども可)を呼び出して、結果に応じて次のECSタスクを呼び分ける。
  • ECS Taskにパラメータ情報を渡すには、
    • CloudWatchEvents/StepFunctionの実行時引数を使用て、ECSタスクの環境変数にいれる。
    • デフォルトのTask起動コマンドを上書きする
    • 環境変数環境変数ファイルを渡す
  • ログはCloudWatchに吐ける。
  • ワークフローの子ワークフローを作れる
  • AWS CDKでの定義も可。今度やってみたい。

やってみる

ステップファンクションは豊富にテンプレートが用意されているので、今回はそちらを利用します。
ECSタスクもテンプレートから自動で作成されますので、今回は自分では作りません。

使ったことがない方でも観点にサンプルを作成し、各種機能を実際に動かしてみることができます。

1. State Machineを作成する

  1. AWSのコンソールトップのFind Servicesのフォームより「Step Functions」に入る

  2. 画面左のサイドメニューより、「State machines」に入り「Create state mache」を押下 f:id:yuuu1993g:20200614144131p:plain f:id:yuuu1993g:20200614144810p:plain

  3. Define state machineの箇所は「Run a Sample Project」、Sample projectsは「Manage Container Task」を選択し、「Next」を押下します。

  4. 確認画面で「Deploy Resources」を押下します。

実行してみる

完了したら、早速「new execution」してみましょう。 以下のような感じでワークフローの進捗具体と各ステップの実行結果が確認できます。

f:id:yuuu1993g:20200614150329p:plain

定義や生成物の中身の確認

で、何をやっているのか、何ができたのか、みてみましょう。

↑の実行詳細画面の「Code」の箇所を押下すると、定義の詳細が見れます。 以下のような感じかと思います。

{
  "Comment": "An example of the Amazon States Language for notification on an AWS Fargate task completion",
  "StartAt": "Run Fargate Task",
  "TimeoutSeconds": 3600,
  "States": {
    "Run Fargate Task": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "arn:aws:ecs:ap-northeast-1:xxxxxxxx:cluster/StepFunctionsSample-ContainerTaskManagementxxxxx...",
        "TaskDefinition": "arn:aws:ecs:ap-northeast-1:xxxxxxxxxxx:task-definition/StepFunctionsSample-ContainerTaskManagementxxxxxxxxxxxxx...",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "Subnets": [
              "subnet-xxxxxxxx",
              "subnet-yyyyyyyy"
            ],
            "AssignPublicIp": "ENABLED"
          }
        }
      },
      "Next": "Notify Success",
      "Catch": [
          {
            "ErrorEquals": [ "States.ALL" ],
            "Next": "Notify Failure"
          }
      ]
    },
    "Notify Success": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "Message": "AWS Fargate Task started by Step Functions succeeded",
        "TopicArn": "arn:aws:sns:ap-northeast-1:xxxxxxxxxxxx:StepFunctionsSample-ContainerTaskManagementdbxxxxxxxxxxx"
      },
      "End": true
    },
    "Notify Failure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "Message": "AWS Fargate Task started by Step Functions failed",
        "TopicArn": "arn:aws:sns:ap-northeast-1:152341231368:StepFunctionsSample-ContainerTaskManagementdbxxxxxxxxxx......
      },
      "End": true
    }
  }
}

読めば大体わかるかとは思いますが、今回のハイライトは「Run Fargate Task」の箇所です。

"Run Fargate Task": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "LaunchType": "FARGATE",
        "Cluster": "arn:aws:ecs:ap-northeast-1:xxxxxxxx:cluster/StepFunctionsSample-ContainerTaskManagementxxxxx...",
        "TaskDefinition": "arn:aws:ecs:ap-northeast-1:xxxxxxxxxxx:task-definition/StepFunctionsSample-ContainerTaskManagementxxxxxxxxxxxxx...",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "Subnets": [
              "subnet-xxxxxxxx",
              "subnet-yyyyyyyy"
            ],
            "AssignPublicIp": "ENABLED"
          }
        }
      },
      "Next": "Notify Success",
      "Catch": [
          {
            "ErrorEquals": [ "States.ALL" ],
            "Next": "Notify Failure"
          }
      ]
    }

解説すると、

  • ECSタスクを実行する箇所の処理の名前は"Run Fargate Task"となります。名前は自由に指定可能です。
  • "Parameters"が処理に対して渡す諸々の情報で、ここでECSクラスター名やタスク定義、実行するVPCなどを指定します
    • 詳しくはこちらを参照となりますが、Prametersの箇所にECSタスクとして実行されるタスク定義の環境変数を指定することもできます。基本的にStepFunctionで実行する処理がECSタスクの場合、コンテナに外から値を渡す術は環境変数になります。
  • "Next"で次の処理を指定しています。
  • "Catch"で実行したECSタスクが以上終了した際のエラーハンドリングが可能です。

生成されたECS情報を確認する。

最後に今回ステップファンクションで作成されたECSクラスターとタスクを確認してみましょう。

AWSコンソールよりECSに入ります。

(私の既存のものが入っていますが、、) 今回作成されたECSクラスターとタスクの定義が確認できます。
上記JSONのarnとECSのコンソール状に表示されているarnが同一であることを確認しましょう。

f:id:yuuu1993g:20200614152029p:plainf:id:yuuu1993g:20200614152041p:plain