17
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

BrainPad Advent CalendarAdvent Calendar 2021

Day 18

2021年版ワークフローエンジンの紹介

Last updated at Posted at 2021-12-18

この記事は BrainPad Advent Calendar2021 18日目の記事となります。
プロダクトビジネス本部、開発部の吉田です。
データ処理パイプラインとしてよく名前が挙がるワークフローエンジンを紹介します。
近年、ワークフローエンジンの利用は

  1. 業務システムではバッチの依存関係の解消のため
  2. MLops と呼ばれる分野でデータ処理、モデル更新に

様々なシーンで活躍しており、広がっている印象です。
ここでは、ワークフローエンジンの紹介を行っていきたいと思います。

データ処理パイプライン|ワークフローエンジン

本稿では、ワークフローエンジンの紹介を行いますが、一番良いのは利用シーンから様々なワークフローエンジンの良し悪しを出していくことが望ましいと思います。
近年、ワークフローエンジンも多く増えてきており、開発のしやすさ、コードの書きやすさ、利便性(ワークフロー内部にサブセットのワークフローが組める、など)、ハードウェア起因の様々な問題の解消(メモリ、CPU、HA構成化)ができる、、、など多岐にわたる選定項目からワークフローエンジンを選択していくことが望ましいです。

特に利用シーンにおいては、HA構成で構築可能であるかという点で選ぶ、簡単に組んでワークフローエンジンの恩恵を受けたい(依存関係の解消)、など考えられます。こうした状況の中で、ワークフローエンジンの中に、ETL処理特化したワークフローエンジンを導入する、といった複数のワークフローエンジンを運用で選択するケースも起きているかと思います。
多くのワークフローエンジンを知って損がない状態であると思いますので、一覧紹介していきます。

紹介一覧

  • airflow
  • digdag
  • prefect
  • metaflow
  • luigi
  • argo workflow

airflow

2015年にリリースされた Airbnb 社で始まったワークフローエンジン
多岐にわたる Operator を利用し様々な利用ができるワークフローエンジンで Google Cloud Composer での利用が有名でしょうか。 image も提供されているため、サンプルなどは構築しやすいイメージです。
DAG を使用してワークフローを構築、多くの提供されている Operator を利用したデータ処理、Python スクリプト を時前で構築して処理、高機能なスケジューリング機能、 Web UI の提供、メールや Slack への通知、ジョブのモニタリング、 HA構成、あらゆる場面で利用可能なワークフローエンジンだと思います。

様々な Operator

このような提供済みの Operator を駆使して、 ETL 処理に利用できるか、自前で Operator を実装して処理したり、高機能なスケジューリング機能の恩恵を受けたい、大規模なデータ処理基板としての利用で活躍すると感じます。
弊社でも利用しており、利用してみた記事も過去に挙げています。

digdag

Treasure Data社が開発したオープンソースのワークフローエンジン
yaml 形式でフロー制御、タスク実行、スケジュール、モニタリングで利用可能で、多くの企業が採用しているワークフローエンジンの一つだと感じています。 java を利用するため、 java インストールから環境を作成する必要があります。
サーバーモードの機能を利用し、 --disable-executor-loop などで API サーバーに特化したインスタンス、ジョブのタスク処理に特化したインスタンス、など切り替えができます。

スケジュール、リトライ制御も豊富で、 syntax も多く提供しています。

prefect

python 製のライブラリである prefect は、データエンジニアリングに向けて提供を始めたライブラリらしいです。
基本的なフロー制御、簡単にタスク制御ができる印象です。簡単にワークフロー制御を組みたい場合は、非常にシンプルで選択肢に入るかと思います。ぜひ一度は使ってみてほしいワークフローエンジンです。

metaflow

2019 年にリリースされた Netflix社のワークフローエンジン
データエンジニアに向けたワークフローエンジンで、データ前処理、モデル構築、それらを運用に向けて進めていくためのサポートするツールの一つとして位置づけが強い印象です。こちらも軽量なワークフロー制御で、非常にシンプルに利用可能であることから、依存関係制御をメインに利用したいケースでは、選択肢の一つになると思います。
下記リンクは非常に個性のある紹介で、ぜひ見てほしいところです。

luigi

2012 年にSpotify社からリリースされたワークフローエンジン
古くからあることで非常に有名なワークフローエンジンの一つで、様々な企業がユーザーとして利用しているライブラリの一つと言えます。

argo workflow

Kubernetes 上で並列ジョブを構成するオープンソースのワークフローエンジン
個々のタスクをコンテナで構成することで、複雑なバッチ処理を可能にしており、機械学習用のコンテナ、データ処理用のコンテナといったコンテナごとに役割を設け、複雑な構成を作ることなくワークフローで必要な制御ができます。
ローカルで実行環境を作る場合 kubectl から apply する必要があります。 採用も増えつつあるワークフローエンジンの一つ?

リンク

airflow
https://airflow.apache.org/
https://airflow.apache.org/docs/
https://github.com/apache/airflow

digdag
https://www.digdag.io/
https://docs.digdag.io/
https://github.com/treasure-data/digdag

prefect
https://www.prefect.io/
https://github.com/PrefectHQ/prefect
https://docs.prefect.io/

argo workflow
https://argoproj.github.io/argo-workflows/
https://argoproj.github.io/argo-workflows/quick-start/
https://github.com/argoproj/argo-workflows

metaflow
https://metaflow.org/
https://github.com/Netflix/metaflow

luigi
https://luigi.readthedocs.io/en/stable/index.html
https://github.com/spotify/luigi

実行確認系(一部ワークフロー)

digdag

dockerイメージ

FROM openjdk:8u212-jre-alpine

ENV DIGDAG_VERSION 0.10.3

RUN apk add --no-cache \
  bash \
  curl \
  python3

RUN curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-${DIGDAG_VERSION}" && \
  chmod +x /usr/local/bin/digdag

WORKDIR /src

# entrypoint
ENTRYPOINT ["java", "-jar", "/usr/local/bin/digdag"]

実行確認

docker build -t docker-digdag .
docker run -v `pwd`:/src -t docker-digdag init mydag
cd mydag
docker run -v `pwd`:/src -t docker-digdag run mydag.dig # digdag 実行 - ローカルモード

2021-10-27 17:05:05 +0900: Digdag v0.10.3
2021-10-27 17:05:07 +0900 [WARN] (main): Using a new session time 2021-10-27T00:00:00+00:00.
2021-10-27 17:05:07 +0900 [INFO] (main): Using session /path/to/mydag/.digdag/status/20211027T000000+0000.
2021-10-27 17:05:07 +0900 [INFO] (main): Starting a new session project id=1 workflow name=mydag session_time=2021-10-27T00:00:00+00:00
2021-10-27 17:05:10 +0900 [INFO] (0017@[0:default]+mydag+setup): echo>: start 2021-10-27T00:00:00+00:00
start 2021-10-27T00:00:00+00:00
2021-10-27 17:05:11 +0900 [INFO] (0017@[0:default]+mydag+disp_current_date): echo>: 2021-10-27 00:00:00 +00:00
2021-10-27 00:00:00 +00:00
2021-10-27 17:05:11 +0900 [INFO] (0017@[0:default]+mydag+repeat): for_each>: {order=[first, second, third], animal=[dog, cat]}
2021-10-27 17:05:12 +0900 [INFO] (0020@[0:default]+mydag+repeat^sub+for-0=order=1=second&1=animal=1=cat): echo>: second cat
second cat
2021-10-27 17:05:12 +0900 [INFO] (0018@[0:default]+mydag+repeat^sub+for-0=order=0=first&1=animal=1=cat): echo>: first cat
first cat
2021-10-27 17:05:12 +0900 [INFO] (0021@[0:default]+mydag+repeat^sub+for-0=order=2=third&1=animal=0=dog): echo>: third dog
third dog
2021-10-27 17:05:12 +0900 [INFO] (0022@[0:default]+mydag+repeat^sub+for-0=order=2=third&1=animal=1=cat): echo>: third cat
third cat
2021-10-27 17:05:12 +0900 [INFO] (0017@[0:default]+mydag+repeat^sub+for-0=order=0=first&1=animal=0=dog): echo>: first dog
first dog
2021-10-27 17:05:12 +0900 [INFO] (0019@[0:default]+mydag+repeat^sub+for-0=order=1=second&1=animal=0=dog): echo>: second dog
second dog
2021-10-27 17:05:13 +0900 [INFO] (0019@[0:default]+mydag+teardown): echo>: finish 2021-10-27T00:00:00+00:00
finish 2021-10-27T00:00:00+00:00
Success. Task state is saved at /path/to/workflow/.digdag/status/20211027T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

prefect

インストール

pipenv install prefect
sample_prefect.py
from prefect import Flow, task

@task
def get_name():
    return "world"

@task
def hello(who):
    print("hello, {}!".format(who))

with Flow("HelloWorld") as flow:
    who = get_name()
    hello_world = hello(who)

flow.run()

実行確認

> pipenv shell
> python sample_prefect.py
[2021-12-18 19:47:38+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'HelloWorld'
[2021-12-18 19:47:38+0900] INFO - prefect.TaskRunner | Task 'get_name': Starting task run...
[2021-12-18 19:47:38+0900] INFO - prefect.TaskRunner | Task 'get_name': Finished task run for task with final state: 'Success'
[2021-12-18 19:47:38+0900] INFO - prefect.TaskRunner | Task 'hello': Starting task run...
hello, world!
[2021-12-18 19:47:38+0900] INFO - prefect.TaskRunner | Task 'hello': Finished task run for task with final state: 'Success'
[2021-12-18 19:47:38+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

metaflow

インストール

pipenv install metaflow
sample_metaflow.py
from metaflow import FlowSpec, step

class LinearFlow(FlowSpec):

    @step
    def start(self):
        self.my_var = 'hello world'
        self.next(self.a)

    @step
    def a(self):
        print('the data artifact is: %s' % self.my_var)
        self.next(self.end)

    @step
    def end(self):
        print('the data artifact is still: %s' % self.my_var)

if __name__ == '__main__':
    LinearFlow()

実行確認

> pipenv shell
> python sample_metaflow.py
Metaflow 2.4.7 executing LinearFlow for user:xxxxxx.yoshida
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!

'/path/to/sample_metaflow.py show' shows a description of this flow.
'/path/to/sample_metaflow.py run' runs the flow locally.
'/path/to/sample_metaflow.py help' shows all available commands and options.
17
5
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?