はじめに
2023年の12月にtroccoのワークフロータスクに「HTTPリクエスト」が追加されました。ワークフロー内で任意のAPIを呼び出すことが可能となり、より柔軟なワークフローを構築することが可能となりました。
この記事ではワークフロータスク「HTTPリクエスト」を使った利用例をいくつか紹介します。
troccoとは
trocco®は、ETL/データ転送・データマート生成・ジョブ管理・データガバナンスなどのデータエンジニアリング領域をカバーした、分析基盤構築・運用の支援SaaSです。
あらゆるデータの連携・整備・運用を自動化し、スピーディーにデータ活用環境を整備。インサイトを得やすい状況に導きます。
ETLを行うためのSaaS型のサービスで多くのコネクターをサポートしたデータ転送機能、データマート、ワークフローといったさまざまなETLに関連する機能を持ったサービスとなります。詳細は上記リンクを参照ください。
ワークフローとは
各転送ジョブやデータマートジョブなどを1つのタスクとして、タスク間でDAGの依存関係をつくり、それに従ってジョブを実行できるような機能です。また、ワークフローの中で別の子ワークフローをネストさせることも可能です。似たような機能をDagsterやApache Airflow といったツールで提供されています。
タスクとして下記の画像左部分のようにいろいろな処理をフロー内で組み込むことが可能となっており、一例として3つの転送設定(例:S3 → Bigquery、Salesforce → Bigqueryなど)でデータウェアハウスにデータを入れ、BIツール参照用にデータマートジョブでテーブルを作成するといったワークフローは下記のように作成できます。
HTTPワークフロータスクとは
ワークフローの一部で任意のHTTPリクエストを可能にするタスクです。このタスクを使うことで、troccoのワークフロータスクでサポートしていない様々な外部システムと連携し、troccoワークフロー上で取り扱うことが可能になります。
利用可能な機能オプションについては以下です。
- OAuth2で認可したリクエスト
- HTTPメソッド: GET, POST
- 任意のクエリーパラメータ、リクエストボディ
以下が実際にHTTPワークフロータスクを組み込んだワークフローを作成する動画です。troccoから別サービス(自分が公開しているAPI、通知APIなど)へHTTPリクエストを叩くことでタスクが完了したことを通知することができます。
任意のAPIが叩けるのでどう利用するかもたくさん考えられるため以下に使用例をあげます。
ユースケース1(メール、slack以外の通知機能)
メールとslackでの通知はワークフロー機能の一部として組み込まれているため、それらに通知したい場合、問題にはなりませんが、例えばchatworkやteams、discordといった対応されていないチャットアプリケーションに通知したいケースがあるかと思います。そのような場合HTTPワークフロータスクを利用して実現可能になります。
ここではDiscordのWebhookを利用して通知すること例としてワークフローを作ります。HTTPタスクにDiscordのURLを叩く設定をします。
また、Oauthを利用する場合、最終的にAuthenticationヘッダーにトークンを乗せてリクエストをすることになると思いますが、タスクの設定でOauthを利用するとした場合、自動的にAuthentication ヘッダーは付与されます。
フロー編集にて、転送設定を2件、その後データマート処理をするワークフローを設定しました。
実行するとすべて成功し、最終的にdiscordに通知されることが確認できました。
今回の例ではタスクの最後にdiscordへの通知をしましたが、もしもっとワークフローが複雑な処理になっている場合、部分的に通知するといったことも可能です。
ユースケース2(lambda+自前運用のワークフロージョブを起動)
HTTPタスクを用いて、trocco のワークフロー内で、AWS環境にて自前で運用しているワークフロージョブをキックするユースケースを考えます。ワークフローツールとしてDagsterを用いてみます。
Dagster を簡単に説明すると、データベーステーブルやファイル、MLモデルといった asset に関心をおいたデータオーケストレーションサービスです。各タスクでは Pythonで asset の生成を定義します。Dagster のジョブは Dagster UI, Python API, GraphQL API, スケジュールによって実行できます。
今回は trocco の HTTP タスクから AWS Lambda の Function URL を叩き、 その中で Dagster の GraphQL API を叩き、ジョブを起動する構成をとります。AWS Lambda はシンプルにURLを叩いて Lambda を実行するだけであれば Function URL 機能をつかえるので、ここではAPI Gateway を挟まずにこの構成をとります。
続いて、trocco 側のワークフローは以下のように設定します。別の子ワークフローを呼び出し、その処理の後に dagster job を実行するシンプルな形にしています。「call dagster job」では lambda Function URL を指定しています。
実際にワークフローを動かすと、以下のようにジョブが実行され dagter のジョブが実行されました。
Dagster のジョブも実行されていることを確認しました。
ジョブのキックに限らず、AWS Lambda や Cloud Functions を使うことで柔軟なロジックを実行することができます。
サンプルの ソースコード
まとめ
今回はtroccoのワークフロータスクである「HTTPリクエスト」タスクを試しました。
HTTPリクエストを利用することで、通知先をカスタマイズするためにシンプルに外部のAPIを叩いたり、 AWS Lambda や Cloud Functions を使って柔軟に処理を組み込んだりしました。
HTTPメソッドを増やしたり、タスク間でデータを渡したり、条件分岐したりなど、ワークフロー機能を拡充していきますので楽しみにしていてください!