自分の周囲でCloud Run熱が高まっているようなのですが、それに乗っかるためにやってみた系の記事です。
Cloud Runでバッチ処理をする際に気になってしまうタイムアウトについて、中断時点から容易に再開できるpythonのgokartというライブラリとCloud Tasksで気にしなくて済むようにしようという試みです。(新規性はわかりません)
コードはこちら
https://github.com/Ryusuketa/cloud-run-long-batch
Cloud Runをバッチ処理に利用する
意外とバッチでのデータ処理をいい感じにクラウド上で動かしてくれるサービスがあんまりありません。
そこそこ複雑な処理を書くにはGCPのCloud Workflowsでは物足りず、
甘ったれ園児ニアとしてはデータ処理に向いたPythonでぺっと自由に処理を書いてDocker imageに依存もパッケージングしてデプロイしていい感じに動いていて欲しいです。App EngineやEBもやりたいことに対して手間だなってレベルの園児です。
また実行頻度とコストの兼ね合いで、インスタンスを定常的に立てておきたくなかったりします。
というわけで適当にDocker imageを作ればサッと動かしてくれるCloud Runを使いたくなりました。
vCPU単位での料金は細かく課金され、デプロイがめちゃくちゃ簡単、特に何もしなくてもゼロスケールしてくれるのが魅力的でした。
ただし、Cloud Runのタイムアウトのリミットは1時間までです。
バッチ処理に利用する際、1時間で終わればいいのですが、終わらない場合が多々あると思います。
バッチ処理を完遂するために、処理を強制中断された場合に、中断された箇所からやり直す必要があります。またバッチ処理全てが完了するまでCloud Runを再起動して欲しいです。
言語はPythonで実装するとして、
中断された箇所からの再開する処理の実装をgokart or luigiで、完了するまでリトライする処理をCloud Tasksで実装し、問題なく完了させられるようにしたいと思います。
中断を考慮したバッチ処理の実装
再開処理を楽に実装するためにgokartというライブラリを利用するのでその説明をします。
その後実装を示します。
gokart/luigiとは
https://github.com/m3dev/gokart
gokartはpython用のワークフローエンジンです。luigiというライブラリのラッパーです。
luigiの特徴として次の点があります。
- 処理単位ごとにTaskと呼ばれる抽象クラス継承して作ったタスククラスで処理を定義する。
- 依存するタスク(前処理など)をタスク毎に書くだけで自動で実行DAGを定義してくれる。
- これしてくれるのはkedroとluigiだけな気がします。一部分に興味を持った時にその箇所に集中してコードリーディングできる点は楽です。
- 各タスクの処理結果はファイルシステムにdumpする。
- 途中で中断した場合、dumpされた処理結果の存在を確認して、未処理の部分から再開する。
- digdagやAirflowなどのワークフローエンジンと違い、分散処理に否定的でシングルプロセス上でDAGを処理する(マルチスレッドには対応してる)
太字で示した点が大事で、luigiで全体の処理を記述してしまえば、あとは中断しても再実行させるだけで良いという点です。
ただしdumpした処理結果はgcsなどのクラウドストレージにdumpする必要があります。
luigiはこの辺りのIOもサポートしているので実装は比較的軽量ですみます。
さらにそのラッパーであるgokartはluigiで必要とされていたIOのコードを省略したりできます。
実装コード
単にテキストファイルをGCSにdumpするだけのコードを書きます。
タスクが全部終了した際にall_results.txtが吐き出されるようにコードを書いています。
import gokart
import luigi
import os
import time
class RunTaskWithWait(gokart.TaskOnKart):
workspace_directory = ''
wait_time: int = luigi.IntParameter()
output_name: str = luigi.Parameter()
def output(self):
return self.make_target(f'gs://{os.environ["GCS_BUCKET_NAME"]}/{self.output_name}.txt', use_unique_id=False)
def run(self):
print(f'Task {self.output_name} start.')
time.sleep(self.wait_time)
self.dump(f'{self.output_name} is done')
print(f'Task {self.output_name} end.')
class Agg(gokart.TaskOnKart):
workspace_directory = ''
def output(self):
return self.make_target(f'gs://{os.environ["GCS_BUCKET_NAME"]}/all_results.txt', use_unique_id=False)
def requires(self):
return [RunTaskWithWait(wait_time=100, output_name=f'{i}') for i in range(27)]
def run(self):
res = self.load()
self.dump(str(res))
def run_pipeline():
gokart.build(Agg())
if __name__ == '__main__':
run_pipeline()
記法について、どこかに解説があればいいんですが纏まったものはおそらくないと思います。ここでもしません。
一つだけ書くとworkspace_directory = ''
はfsをgcsfilesystemにするためのおまじないです。これをいれておかないとローカルにdumpされます。外から全部に値を注入する記法もありますがただの検証なのでこうしておきます。
時間のかかるタスクを擬似的に再現するために100秒かかるタスクを27個用意して順次実行させます。(27はあんまり意味ないです)
特に優先順位は依存タスクがなければ順不同のため(優先度パラメータもあります)実行順はバラバラです。
cloud run上で検証
cloud runで走らせるにはHTTPリクエストを受け付けてそれをトリガーにして上記パイプラインを実行できるようにする必要があります。
これはfastapiでさっと実装しました。
from fastapi import FastAPI
from pipeline import run_pipeline
app = FastAPI()
@app.get("/")
@app.post("/")
async def root():
run_pipeline()
return {'Message': 'finished'}
(Dockerfileはgithubレポを確認してください)
Cloud Run上にイメージをデプロイし、タイムアウトは10秒にしておきます。
この状態で払い出されたURLにアクセスします。
無事タイムアウトしました。
ただしここからCloud Runのインスタンスはアイドル状態に入ります。このアイドル状態はリクエストが途絶えると終了して行きますが、終了まではそれなりの時間がかかるようなので、タイムアウト後も処理が続きます。
metricをみるとアイドル状態のインスタンス数が分かります。
その時点で生成されているファイルを確認すると、
all_results.txtが最終的に吐き出されるはずですがありません。途中で終了しました。
もう一度アクセスしてまた実行させアイドルインスタンスが終了するのを待ち、終了したらもう一度を繰り返すことで最後まで生成できます。
Cloud Tasksで自動リトライ
こんな感じで、中断時点から再開し、処理を完遂できましたがこれは流石に辛いです。
というわけでリトライもCloud Tasksを使って自動でやってもらいます。
Cloud Tasksはフルマネージドなメッセジーキューサービスです。
自動リトライができ、それに関してリトライ数、backoff係数など、いくつか設定ができます。
Max retry durationを100000000sに、max doublingを0に、Min backoffを900sにすることで完了するまで、15分毎にリトライさせます。(なぜかMin backoffが897sになってるけど)
gcloudコマンドでキューにタスクを発行します
gcloud tasks create-http-task\
--queue=cloud-run-retry\
--url=https://cloud-run-experiment-hogehoge.a.run.app/
こちらも無事all_results.txtが生成できました。
定期実行が必要な場合、Cloud Tasksにcloud schedulerからメッセージを送信することで実装できます。
終わりに 思うconsたち
正直、コンテナのアイドルタイムがあるためにリトライ制御が面倒です。アイドルタイム中はバッチも動いているので、その間にリクエストを飛ばすと多重起動になります。
この辺りは重複して動かさないようにアプリ側にプロセス管理する処理が必要かもしれません。他には1時間以内にプロセスを自殺させつつメッセージをpubsubに送信して、また自分に送り直す手もありそうです。(と教えてもらいました)
リソース面に関して、自由に使えるk8sクラスタがあるならそちらが良さそうです。cron jobがあるので定期実行までワンストップで済んで簡単ですし、MLの推論を行うなど重たいプロセスになった場合はCloud Runのインスタンスでは流石に厳しいと思います。
また目的外ですが、AI Platformを使うと、ワーカーインスタンスを最低2つ用意するため割高ですが、タイムアウトを気にせずDocker containerを走らせられるのでこちらでもいいと思います。
あと処理を分割して中間ファイルに起こしておくのがミソなのでそこを自前で書くのも手と言えば手です。gokart/luigiは割と書き方覚えるコストが大きいので…。
タスクの処理時間が一つで1時間より大きくなった場合、処理をタスクレベルでチャンク化するなども必要です。この辺りはタスクのパラメータを用意してあげることで割と楽に実装できるのでgokart/luigiを採用するメリットがあります。
他に、あまりCloud RunやCloud Tasksの機能や仕様を細かに調査しておらず、こちらのconsがあんまり調べられていません。
もしかするとCloud TasksのリトライとCloud Runのアイドルは都度チューニングしないといけないかもしれません。