はじめに
TROCCOのワークフローで、待望の「条件分岐」が標準機能として追加されました!これまで多くの方からご要望をいただいていた、データ基盤の運用をより柔軟にする機能です。
条件分岐を使うことで、以下のようなことが実現できます。
- エラー発生時のリカバリ処理を自動化する
- 実行スケジュールを柔軟にする
- 将来の特定の日時以降に実行する
- 将来の特定の日時まで一時的に実行する
- 自社の営業日のみに実行する
- 不定期に担当者がアップロードするファイルの取込処理を最適化する
- アクセスが集中したときにTableauダッシュボードの更新を早くする
- データ件数に応じて処理を使い分ける
- 特定の環境のみで対象の処理を行う
これらは運用負荷の軽減や処理時間の最適化(=コスト削減)、意思決定の高速化、データ処理の柔軟化などにつながります。非常に嬉しい機能ですね・・・!
そこで本記事では、ワークフロー条件分岐の概要を解説しつつ、そのユースケースや設定方法の例についてもご紹介していきます。
こんな方におすすめ
- 新機能「ワークフロー条件分岐」の概要を知りたい方
- どのようなユースケースで使えるか、具体的な活用例を理解したい方
- 個々の使い方に応じた設定方法を参考にしたい方
TROCCOのワークフローとはどういうものかという前提から説明をしているので、
- 既にワークフローをご利用いただいている方は「条件分岐タスクとは」から
- ユースケースや設定方法が知りたいという方は「条件分岐タスクのユースケース例と設定方法」から
- ポイントだけ分かればいいという方は「設定のポイント」だけ
読んでいただければと思います!
TROCCOのワークフローとは
TROCCOには、
- 転送ジョブ(ある転送元から別の転送先にデータを転送する)
- データマートシンク(DWHに対してSQLを管理/実行する)
などの機能がありますが、これらの個々のタスクに対して、GUIで矢印を引くだけで依存関係を定義/実行管理できるのがワークフローです(以下のようなイメージです)。
ワークフローを使うことで、例えばSFA/CRMやプロダクトのDBからデータをDWHに転送してきて、それらのデータをDWH内で統合したあとに、SFA/CRMに対してプロダクトの利用者がどのような状況にあるのかをデータ転送をして戻すといった、一連のデータ活用の流れを簡単に実現することができます。
従来、ワークフローのなかでは、以下のようなタスクを利用することができました。
| タスク種別 | 概要 |
|---|---|
| 転送ジョブ | あるシステムから別のシステムにデータを転送する |
| マネージド転送設定 | 転送ジョブをまとめて実行する |
| データマートシンク | DWHに対してSQLを実行する |
| ワークフロー | タスク間の依存関係を定義して実行する; 親子関係にできる |
| dbtジョブ | dbtジョブを実行する |
| Google BigQuery/Snowflake/Amazon Redshiftデータチェック | SQLを実行してデータの状態を検証する |
| HTTPリクエスト | 外部通知等の任意のHTTPリクエスト(GET/POST)を実行する |
| Slack通知 | Slackに通知する |
| Tableauデータ抽出 | 抽出形式のTableauのデータを更新する |
今回、これらのタスクに加えて、「条件分岐」というタスク種別が新たに追加されました。
条件分岐タスクとは
条件分岐タスクとは、「ワークフローの前段のタスクの結果や条件に応じて、後段で実行するタスクを分岐できる機能」です。
以下のようなものを、条件分岐の条件として利用することができます。
- 実行日時
- 前段のタスクの成功/エラー
- 転送ジョブ/データマートシンクの処理レコード数
- データチェックのチェック結果
- HTTPリクエストタスクのステータスコード
その他の詳細については、公式ドキュメントをご確認ください。
ユースケース例と設定方法
では、よくありそうなユースケース例をもとに、具体的な設定方法を紹介していきます。
ユースケース例
まずはよくありそうなユースケースを整理していきましょう。なお、設定方法については後述するので、それぞれざっと見てみていただければ大丈夫です。
1. エラー発生時のリカバリ処理を自動化する
ある処理を行ったときに、一定の割合で決まったエラーが発生することがあります。そのようなときに、タスクのエラー時のみに実行するようなリカバリタスクを設定することで、リカバリ処理を自動化することができます。
2. 実行スケジュールを柔軟にする
標準のスケジュール設定だけでは難しい、実行スケジュールの柔軟化も実現できます。
将来の特定の日時以降に実行する
定常的な運用ではなく、ときには一時的に運用をしたいということもあるでしょう。このパターンでは、将来の特定の日時以降に実行するようにできます。
なお、厳密にはスケジュール設定した頻度でワークフローは実行され、条件分岐タスクの実行時間分だけ処理時間がかかることになります。
とはいえ、この部分の処理時間は非常に短いので、全体としては大した時間にならず、運用上のメリットの方が大きくなるでしょう。
将来の特定の日時まで一時的に実行する
先ほどの逆で、将来の特定の日時まで実行するパターンです。実行をしない方にはSlack通知を入れておくことで、スケジュールの停止を忘れないようにしておきます。
自社の営業日のみに実行する
処理時間の最適化を図ろうとしたときに、自社の営業日のみにデータを更新するというのが1つの方法としてあります。データチェックで営業日か否かを判定するタスクを入れることで、営業日のみという柔軟な実行制御が実現できます。
3. 不定期に担当者がアップロードするファイルの取込処理を最適化する
現場の運用を踏まえたときに、非開発部門の担当者がストレージに不定期にファイルをアップロードし、その後の処理を行うということはよくあることです。
この処理を最適化しようとするには、一定時間間隔でファイルの存在を確認しつつ、ファイルが存在するときのみ後続の処理を行うというのが1つの対応策になります。
これまでだと、データ転送後にデータチェックを入れつつ、エラー発生時も後続のタスクを実行するようにすることで、下記のように対応「できないわけではない」ものではありました。
しかし、この形だと子ワークフローを実行しないときには親ワークフローがエラーになってしまいます。これではエラーハンドリングに難があるという課題がありました。
以下は従来の課題のある例
条件分岐タスクを利用すると、処理レコード数をベースに条件分岐を入れることで、ワークフローを不必要にエラーにすることなく、ファイルの格納有無を確認しつつ後続の処理を最適化する流れを、自然に実装できるようになります。
以下は条件分岐を使う場合(非常にシンプル)
なお、転送ジョブの処理レコード数を確認しており、ファイルの存在確認はしていないので、厳密なファイル検知ではありません。
4. アクセスが集中したときにTableauダッシュボードの更新を早くする
ダッシュボードの更新を必要なときだけ高速化したいというのもあるでしょう。データ自体は毎時取り込みつつ、0時台の日次の更新またはデータ量が多くなったときだけダッシュボードを更新するみたいなこともできそうです。
5. データ件数に応じて処理を使い分ける
もっと凝ったものだと、データの状況に合わせて機械学習の運用を回すみたいなこともできそうです。「trocco x BigQueryMLで実現するMLOps (時系列予測編)」のようなMLOpsのパイプラインも、自然に組めるようになります。(ちょうどいい事例だったので参考にさせていただきました)
6. 特定の環境のみで対象の処理を行う
環境管理機能(参考:「もう間違えて本番環境を壊さない!TROCCOの環境管理で、堅牢なデータパイプライン開発を実現する」)のリリースに伴い、特定の環境のみで対象の処理を行うというものも追加で拡充されています。
設定方法
それでは、ここからは具体的な設定方法を説明していきます。
1. エラー発生時のリカバリ処理を自動化する
以下のサンプルはデータ基盤の運用に役立つ、INFORMATION_SCHEMAのビューに対して、日次でスナップショットを取る例です。
ビューのスキーマは不定期にカラム追加など変更がありますが、そのときにスキーマ変更のエラーに都度対応するのは地味に面倒です。そこで、スキーマ変更があってエラーになった際には、スキーマ変更に自動追従するようなリカバリ処理を実行するようにしています。
条件分岐の条件式としては、前段のタスクが「成功」であることを条件にします。
なお、このような処理を行う場合は、条件分岐の前にワークフローが止まることがないよう、タスクのエラーハンドリングとして「エラー時も後続のタスクを続行する」を選択するようにしておいてください。
そのほか、エラーとなったときには、途中まで処理したテーブルをタイムトラベルを使って過去の状態に復元するといったことも、役立つ場面があるかもしれません。
2. 実行スケジュールを柔軟にする
実行スケジュールについては、条件分岐で実行日時を利用するパターンと、データチェックを利用するパターンがあります。
将来の特定の日時以降に実行する
この例はシンプルなので、条件分岐の条件式のみ記載します。対象として「現在日時(=条件分岐タスクの実行日時)」を選択し、基準となる日時を記載するだけです。簡単ですね。
将来の特定の日時まで一時的に実行する
こちらは前の例の演算子を変更するだけなので、詳細は割愛します。
自社の営業日のみに実行する
営業日のみといった複雑な条件を利用する場合は、前段にデータチェックを設定します。
条件分岐の条件式として、チェック結果が成功であることを指定します。
なお、このままではチェック結果がエラーになるとデータチェックのタスク自体がエラーになり処理が停止するので、データチェックの方で「クエリ結果を条件分岐に利用する」の設定をしておいてください。
データチェックでは、例えば、
select
cast(current_date('Asia/Tokyo')
in (select ymd from `{営業日カレンダーのテーブル}` where date_type = '平日')
as integer)
とすると、true(=今日が営業日である)のときに1、false(=今日が営業日ではない)のときに0が返るクエリとなります。booleanをintegerにキャストすると1/0とできるのは若干トリッキーですが、これが一番短く書けるような・・・(実際に使う場合はコメントをつけるなどしましょう)
なお、自社独自の営業日カレンダーを作成する方法については、「TROCCO®でプロジェクト管理用の営業日データをBigQueryに整備する」で過去にまとめているので、ご参考までに。
3. 不定期に担当者がアップロードするファイルの取込処理を最適化する
この例では、転送ジョブの処理レコード数をベースに条件分岐させることで実現できます。
条件にレコード数を指定すればいいだけです。簡単ですね。
このサンプルのワークフローでは、転送後にそこからデータを変換して、ビジネスユーザーに通知するデータがあるときだけリバースを行いつつ、対象のデータがあったときに通知をするような形になっています。色々便利ですね。
4. アクセスが集中したときにTableauダッシュボードの更新を早くする
この例はこれまでの内容を組み合わせることで実現できます。
- 処理レコード数に応じた条件分岐
- データチェックのエラーの停止とチェック結果に応じた条件分岐
5. データ件数に応じて処理を使い分ける
この例も構造的には上の例と同様なので割愛します。
6. 特定の環境のみで対象の処理を行う
これは、対象に「環境」を選択し、値を対応する環境の名前にすることで実現できます。
設定のポイント
ここまでそれぞれの事例ベースで解説してきました。最後に設定の際のポイントを改めて整理しておきましょう。
| 条件の対象 | ポイント |
|---|---|
| 実行日時 | シンプルに条件設定すればよい |
| 前段のタスクの成功/エラー | 条件分岐の対象としているタスクがエラーとなった後に条件分岐まで実行させるには、ワークフローとして「エラー時も後続のタスクを続行する」を設定する必要がある |
| 転送ジョブ/データマートシンクの処理レコード数 | シンプルに条件設定すればよい |
| データチェックのチェック結果 | データチェックの方で「クエリ結果を条件分岐に利用する」の設定をしておく これによりデータチェックはエラーにならないので、「エラー時も後続のタスクを続行する」は不要 |
| HTTPリクエストタスクのステータスコード | HTTPリクエストタスクの方で「ステータスコードチェックを無効にする」の設定をしておく これによりHTTPリクエストタスクはエラーにならないので、「エラー時も後続のタスクを続行する」は不要 |
一覧で見ただけではなかなかピンとこないところもあるかもしれませんが、そのときは設定しながら試してみてください。
おわりに
ここまで、TROCCOのワークフロー条件分岐の概要、ユースケース、実装例について紹介してきました。こちらの記事を参考にしつつ、ワークフローの活用をより深めていただけると嬉しいです!









