先に結果
以前書いた記事のジョブフローは、今回のStepfunctionsのアップデートではほとんど置き換えられなかった。Lambdaの作成や運用がなくなり便利なアップデートだが、StepFunctionsからAPI対応したサービスは全てではなく、さらにそれぞれのサービスのAPIも一部しか対応していないので、対応してるものはASL(SFのJSONのこと)で書きつつ、足りないものはLambdaで補う方針がよさそう。今後の拡張にも期待したい。
StepFunctionsのアップデートにともない以前書いた記事の更新
Step Functionsのアップデート
GlueやDDBなどをLambdaを介さず直接API連携できるようになりました。
以前書いたLambdaで連携したやつはこちらで、これを新機能でLambda使わずに連携してみます。
Glueの使い方的な⑦(Step Functionsでジョブフロー)
Step FunctionsでGlueのジョブフローを作る
今回はサーバーレスなジョブフローのサービスであるStep Functionsを使って、Glueクローラーを実行し正常終了したら後続のGlueジョブを実行するというフローを作ってみます。
全体の流れ
- Glue処理内容
- StepFunctionsの処理内容
- 前準備
- Step FunctionsでStateMachine作成
- 実行
Glue処理内容
"Glueの使い方的な①(GUIでジョブ実行)"(以後①と書きます)で実行したものと同じGlueクローラーとGlueジョブを使います。入力データも出力結果も①と同じです。
今回行うのはGlueクローラー処理が終わったら次のGlueジョブ処理開始というジョブフロー形成です。
あらためて①のクローラーとジョブの処理内容は以下の通りです
クローラーの内容
入力のCSVファイルからスキーマを作成します
ジョブの内容
S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する
Step Functionsを使ったジョブフローの内容
図の四角をStep Functionsでは"State"と呼びます。処理の1単位と思ってください。
ジョブフローは以下のような形です。
Stateごとに流れを説明します
- "Submit Crawler Job"でLambdaを使いGlueクローラーを実行
- (↑SFのAPI連携で置き換えられなかった)Glue Crawlerの APIが対応してなかった
- "Wait X Seconds"で指定時間待つ
- "Get Crawler Job Status"でLambdaを使いGlueクローラーの状態をポーリングして確認
- "Job Complete?"で状態を判定して結果によって3つに処理が分岐
- 失敗なら"Job Failed"エラー処理
終了なら"Run Final Glue Job"でLambdaを使い後続のGlueジョブを実行- (↑SFのAPI連携で置き換えた)終了なら"Run Final Glue Job"でGlueのStartJobRun APIで後続のGlueジョブを実行
- 処理中なら"Add Count"でLambdaを使いカウンタをインクリメント。
-
(↑SFのAPI連携で置き換えられなかった)DynamoDBのupdateItem APIがあえば加算カウンタが作れたがgetItemとputItemしか対応してなかった
- "Add Count"の後"Chk Count"でカウンタをチェックし3回以上になっていたら"Job Failed Timeout"でタイムアウト処理、3未満なら"Wait X Seconds"に戻りループ処理
※補足ですが、CloudWatchイベントがGlueクローラーのステータスを検知できるようになっていました。こちらも合わせて使うとよさそうです。
前準備
①と同じです
今回使うサンプルログファイル(19件)
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14
入力ファイルをS3に配置
$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27 0
2018-01-02 15:13:44 691 cvlog.csv
ディレクトリ構成
in0に入力ファイル、out0に出力ファイル
$ aws s3 ls s3://test-glue00/se2/
PRE in0/
PRE out0/
PRE script/
PRE tmp/
ジョブのPySparkスクリプト
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
入力のCSVデータのスキーマ
クローラーによって作成されるスキーマ
StepFunctionsで使うIAMロール作成
SFからGlueやDDBなどにアクセスするために、ロールに権限を追加
IAMの画面から[ロールの作成]をクリックする
[AWSサービス]をクリックし、"Step Functions"をクリックし、[次のステップ]をクリックする。次の画面はそのまま[次のステップ]をクリック、次の画面もそのまま[次のステップ]
次の画面でロール名を"demo-sf-glue-ddb-test"と入力し[ロールの作成]をクリック
作成したロールに以下のポリシーを追加する。
- AmazonDynamoDBFullAccess
- AWSGlueServiceRole
StepFunctionsでStateMachine作成
StepFunctionsは一連のジョブフローをJSONで定義しこれを"StateMachine"と呼びます。
StateMachine内の処理の1つ1つの四角をStateと呼びます。処理の1単位です。
このJSONの記述はASL(AmazonStatesLanguages)と呼ばれStateTypeとしてChoice(分岐処理)やWait(待ち)やParallel(並列実行)などがJSONだけで表現出来ます。またTaskというStateTypeからはLambdaやアクティビティ(EC2からStepFunctionsをポーリングする)を定義できます。前述の通り今回はLmabdaを使います。
マネージメントコンソールからいくつかあるテンプレートを元に作ることも出来ますが、カスタムでJSONを1から作ることもできます。今回は1から作ります。
新規StateMachine作成画面
"一から作成"にチェックを入れ、名前に"GlueJob2"と入力し[次へ]をクリック
"既存のロールを選択する"にチェックを入れ、作成したロール"demo-sf-glue-ddb-test"を選択し[ステートマシンの作成]をクリックする
作成したステートマシンGlueJob2の[編集]をクリックし
この後に書かれているJSONを貼り付け、[保存]をクリックし保存する
StateMachine
今回のStateMachieのJSONは以下です。
内容は前述の通りです。
※[AWSID]のところは自身のAWSIDと置き換えてください
{
"Comment": "A state machine that submits a Job to Glue Batch and monitors the Job until it completes.",
"StartAt": "Submit Crawler Job",
"States": {
"Submit Crawler Job": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-cr1",
"ResultPath": "$.chkcount",
"Next": "Wait X Seconds",
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 120,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
]
},
"Wait X Seconds": {
"Type": "Wait",
"SecondsPath": "$.wait_time",
"Next": "Get Crawler Job Status"
},
"Get Crawler Job Status": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-crcheck",
"Next": "Job Complete?",
"InputPath": "$",
"ResultPath": "$.response",
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
]
},
"Job Complete?": {
"Type": "Choice",
"Choices": [{
"Variable": "$.response",
"StringEquals": "FAILED",
"Next": "Job Failed"
},
{
"Variable": "$.response",
"StringEquals": "READY",
"Next": "Run Final Glue Job"
}
],
"Default": "Add Count"
},
"Add Count": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-addcount",
"Next": "Chk Count",
"InputPath": "$",
"ResultPath": "$.chkcount",
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
]
},
"Chk Count": {
"Type": "Choice",
"Choices": [{
"Variable": "$.chkcount",
"NumericGreaterThan": 3,
"Next": "Job Failed Timeout"
}],
"Default": "Wait X Seconds"
},
"Job Failed": {
"Type": "Fail",
"Cause": "Glue Crawler Job Failed",
"Error": "DescribeJob returned FAILED"
},
"Job Failed Timeout": {
"Type": "Fail",
"Cause": "Glue Crawler Job Failed",
"Error": "DescribeJob returned FAILED Because of Timeout"
},
"Run Final Glue Job": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "se2_job0"
},"End": true,
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
]
}
}
}
Lambda
今回使うLambdaは4つ3つです。流れも振り返りながら見ていきます
書き方はいろいろあるし今回はエラーハンドリングも甘いのであくまでも動きのイメージをつかむための参考程度にしてください。最後のGlueジョブの実行についてはジョブの終了判定とかはしてないです。
"Submit Crawler Job"
GlueのAPIを使ってクローラーのStartを行う
ここは置き換えたかったのですが出来ませんでした。
GlueサービスのAPIがGlue CrawlerのAPIには対応してませんでした。
Step Functions Glue API
# coding: UTF-8
import sys
import boto3
glue = boto3.client('glue')
def lambda_handler(event, context):
client = boto3.client('glue')
response = client.start_crawler(Name='se2_in0')
return 1
"Wait X Seconds"
Waitで指定秒数待つ
"Get Crawler Job Status"
GlueのAPIを使ってクローラーのステータスを取得します
# coding: UTF-8
import sys
import boto3
import json
glue = boto3.client('glue')
def lambda_handler(event, context):
client = boto3.client('glue')
response = client.get_crawler(Name='se2_in0')
response = response['Crawler']['State']
return response
"Job Complete?"
Choiceで取得したステータスが、"READY"なら正常終了、"FAILED"なら失敗、それ以外は実行中の分岐処理
"Job Failed"
ステータスが失敗なら
FailでStepFunctionsをエラーさせます
"Run Final Glue Job"
ステータスが正常終了なら
GlueのStartJobRun APIを使ってジョブをStartします
今回はここだけ置き換える事ができました。
ここで指定しているARNはMagic ARNと呼ばれるもので、StepFunctions側で定義されている少し特殊なARNです。ParametersにGlueサービスのAPIに渡す引数を指定します。
Step Functions Glue API
"Run Final Glue Job": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "se2_job0"
}
"Add Count"
クローラーがまだ実行中なので
カウンタにインクリメントします
ここも置き換えたかったのですが出来ませんでした。
DynamoDBサービスのAPIを使って、DynamoDBをカウンタとして1づつインクリメントする処理を書きたかったのですが、APIがget,putしか対応してなく無理して書いたら逆に複雑になり諦めました。
# coding: UTF-8
import sys
import boto3
import json
glue = boto3.client('glue')
def lambda_handler(event, context):
chkcount = event["chkcount"]
chkcount = chkcount + 1
return chkcount
"Chk Count"
choiceでカウンタが3未満か3以上かをチェックします
"Job Failed Timeout"
Failでカウンタが3以上だった時のエラー処理
"Wait X Seconds"
3未満の場合はここに戻りループ処理
実行
Step Functionsを実行
作成したStateMachineをクリックし、"実行の開始"をクリック
JSONに引数を入れて"実行の開始"をクリック
今回はJSON内で使う変数で"wait_time"を60秒で待ちの時間として入力しています
実行状況
その他
今回はクローラー実行後にジョブ実行というシンプルなフローでしたが、Step Functionsは並列度を替えたり引数の受け渡しをしたり、さらにLambdaでロジックを書くことができるので自由度高く複雑なフローの作成が行えます。Glueとの相性はいいのではないでしょうか?
JSON部分も30分もあれば学習完了というカジュアルさがありLambdaを使ってAPI操作で様々なAWSの処理を繋げるのにはとてもいい印象です。
かなりシンプルな処理だったのですがコードがやや多い印象で、より複雑な処理になると結構大きいJSONになりそうで、JSONなのでコメント書けないとか少し大変な部分が出て来るのかもしれません。
バージョン管理を考えるとCliでの処理で運用したほうが良さそうですが、こういったサービスはGUIでの良さもあるのでどちらに比重を置いた運用がいいかは考慮が必要かもです
本文中で使ったカウンタのステート情報はDynamoDBなどに入れた方が良いかもです。
マイクロサービス化しやすいので、極力本来の処理のロジックをLambda側にやらせてそれ以外のフロー処理(分岐とかカウンタインクリメントとか)をJSONで書くのがいいと思います。今回カウンタはLambdaでやってしまいましたが。
ログはCloudWatchLogsに出ます
参考
Glueの使い方的な⑦(Step Functionsでジョブフロー)
https://qiita.com/pioho07/items/f8a2fd946fc391f89c97
StepFunctions BlackBelt資料
https://www.slideshare.net/AmazonWebServicesJapan/20170726-black-beltstepfunctions-78267693
Step Functions Document (AWS Service Integration)
https://docs.aws.amazon.com/step-functions/latest/dg/concepts-connectors.html
Step Functions Document (Error Handling)
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/concepts-error-handling.html
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f