時間のかかる処理を開始して、その処理が完了次第すぐ別の処理を実行する、みたいなコトをしたい場合はよくある。
こういう時は、処理の状態(完了してるか否か)を一定間隔で取得(ポーリング)して、処理が完了していたら後続の処理を走らせる。
AWSでポーリングをする方法として、Step Functionsがある。
Step FunctionsはLambda関数と連携させられるので、Lambda関数で時間のかかる処理を開始して、その処理が完了したらすぐ別のLambda関数を動かす、というようなことを簡単にできる。
例えば「Lambda関数でEC2インスタンスを起動して、インスタンスの状態がrunningになったら通知を送る」というような処理は、Step Functionsでサッと作れる。
下記のドキュメントでは、AWS Batchのジョブのステータスをポーリングするためのサンプルについて説明されてる。
ジョブステータスポーリング
単一のLambda関数で、時間のかかる処理の開始と、その処理のポーリングをまとめてやるよりも、Step Functionsを併用する方がいいと思う。
Lambda関数は最大5分間でタイムアウトするので5分以上かかるような処理をポーリングできないし、Lambda関数でポーリングすると実行時間が長くなるので料金が増える。
「EC2インスタンスを起動して、インスタンスの状態がrunningになったら通知を送る」の実装
「EC2インスタンスを起動して、インスタンスの状態がrunningになったら通知を送る」処理をStep Functionsで実装してみる。Step Functionsのステートマシン定義の書き方のメモ書きも兼ねて。。。
概要
Lambda関数を3つ作って、ステートマシンから呼び出す
- インスタンスの起動(RunInstancesの実行)
- インスタンスの状態の確認(DescribeInstancesの実行)
- SNS通知の送信(Publishの実行)
ステートマシンでは、まず『インスタンスの起動』を呼んで、その後数秒おきに『インスタンスの状態の確認』を呼んで、ステータスがrunningになっていたら『SNS通知の送信』を呼ぶ。
Lambda関数: インスタンスの起動(RunInstancesの実行)
RunInstancesでインスタンスを起動するだけの関数を作る。
起動したインスタンスのインスタンスIDを返す。
AWS SDK for JavaScriptの、RunInstancesのリファレンスはここ。
Promiseを使って書いている。
AWS SDK for JavaScriptのPromiseについてはここ。
Lambda関数: インスタンスの状態の確認(DescribeInstancesの実行)
eventで渡されたインスタンスIDのインスタンスに対してDescribeInstancesする関数を作る。
インスタンスの状態がすべてrunningになっていたら、allInstancesAreRunningにtrueを格納して返す。
DescribeInstancesのリファレンスはここ。
Lambda関数: SNS通知の送信(Publishの実行)
起動したインスタンスのインスタンスIDを通知する。
Publishのリファレンスはここ。
ステートマシンの作成
ステートマシンの定義を、「Amazon ステートメント言語」で書く。
『ジョブステータスポーリング』のサンプルのステートマシンと、基本的な構造は同じ。
{
"Comment": "RunInstances, Poll instances, and Notify all instances are running",
"StartAt": "RunInstances",
"States": {
"RunInstances": {
"Type": "Task",
"Resource": "arn:aws:lambda:リージョン:アカウントID:function:RunInstancesFunction",
"Next": "Wait 3 Seconds",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
]
},
"Wait 3 Seconds": {
"Type": "Wait",
"Seconds": 3,
"Next": "DescribeInstances"
},
"DescribeInstances": {
"Type": "Task",
"Resource": "arn:aws:lambda:リージョン:アカウントID:function:DescribeInstancesFunction",
"Next": "Instances Running?",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
]
},
"Instances Running?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.allInstancesAreRunning",
"BooleanEquals": true,
"Next": "Publish"
}
],
"Default": "Wait 3 Seconds"
},
"Publish": {
"Type": "Task",
"Resource": "arn:aws:lambda:リージョン:アカウントID:function:PublishFunction",
"End": true,
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
]
}
}
}
まず"RunInstances"ステートが動く。
次に"Wait 3 Seconds"ステートで3秒待つ。
その後、“DescribeInstances"ステートでインスタンスの状態を確認する。
もしすべてのインスタンスがrunningではないなら、“Instances Running?“ステートから"Wait 3 Seconds"ステートに戻る。
すべてのインスタンスがrunningなら、“Publish"ステートに進んで通知して処理終了。
インスタンスの起動に失敗した時の処理も入れておいた方がいいけど、その辺りはちゃんと作ってない。
TaskステートのRetryについて
それぞれのLambda関数を呼ぶためのTaskステートで、Retryを記述してる。
個人的には、ステートマシンでLambda関数を呼ぶ時は、必ずRetryを書くのが良いと思う。実際、マネジメントコンソールで作れる『ジョブステータスポーリング』のサンプルのステートマシンでも、Retryが記述されている。
というのも、ステートマシンがLambda関数を呼ぶ時は、同期的に呼ばれる。
AWSブログにも書かれているし、TaskステートがLambda関数の返す値を後続のステートに渡せることからも同期的に呼ばれていると想像がつく。
AWS Step Functions – ビジュアルワークフローを使ったアプリケーションのビルドと配布
stateが実行されたときに何を起こさせるかを指定するために2つの方法があります。1つは、stateが実行されたときに同期的に呼び出されるLambda関数を使う方法です。
かつ、Lambda関数を同期的に呼ぶ時は、エラー時に自動的にリトライしてくれなくて、呼び出し元でリトライする必要がある。
同期呼び出し – 呼び出し元アプリケーションが 429 エラーを受け取り、再試行の処理が必要になります
なので、Lambda関数を呼ぶTaskステートには漏れなくRetryを記述して、エラーに備えるのが良いと思う。
ステートマシンの実行ログ
get-execution-historyコマンドで、ステートマシンの実行のログを見られる。
$ aws stepfunctions get-execution-history --execution-arn arn:aws:states:リージョン:アカウントID:execution:ステートマシン名:実行名
ログは下記のような感じ。
allInstancesAreRunningがFalseの間はwaitで待ち続けて、trueになったらPublishに遷移して終了していることがわかる。