こちらのアップデートです。
ジョブにおける下流タスクでのSQL出力の参照
同じジョブの下流タスクで、SQLタスクの出力を参照するために動的な値を使えるようになりました。For eachタスクでは、出力のデータの行に対してイテレーションを行うことができます。
動的値参照とはをご覧ください。
マニュアルの当該箇所はこちらのSQL出力オプションです。
SQLタスクの作成
この機能の肝は、動的参照する先のタスクはSQLタスクである必要があることです。SQLノートブックのタスクの場合、動的参照はできません。
以下のようなクエリーを作成して、SQLタスクとしてジョブに追加します。
-- 例データを生成
CREATE OR REPLACE TEMP VIEW example_sales AS
SELECT * FROM VALUES
(2020, 12),
(2021, 23),
(2022, 47),
(2023, 15),
(2024, 22)
AS example_sales(sales_year, num_sales);
-- 例データをクエリ
SELECT sales_year, num_sales FROM example_sales;
後段のタスクの作成
Pythonノートブックから参照してみます。SQLタスクの実行結果は以下のような文字列となります。
'[{"sales_year":"2020","num_sales":"12"},{"sales_year":"2021","num_sales":"23"},{"sales_year":"2022","num_sales":"47"},{"sales_year":"2023","num_sales":"15"},{"sales_year":"2024","num_sales":"22"}]'
このままではリストとしてアクセスできないので、以下のようにeval
を入れています。
result = eval(dbutils.widgets.get("sql_result"))
display(result)
これを後段のタスクとして設定すると、前段のSQLタスクsales_by_year
の動的参照のリストにoutput
が含まれています。これらがSQLタスクの結果にアクセスするための動的参照となります。ここでは、tasks.sales_by_year.output.rows
をキーsql_result
で参照できるようにします。
これで、SQLタスクの結果にアクセスできました。
For eachタスクからのアクセス
For each
タスクでは、行はネストされたタスクに繰り返し送信されます。この例では、For each タスクの入力 を{{tasks.sales_by_year.output.rows}}
に設定すると、ネストされたタスクで、構文{{input.<column_alias>}}
を使用して、行の値をパラメーターとして繰り返し送信できます。ネストされたタスク構成では、キーと値のペアyear:{{input.sales_year}}
とsales:{{input.num_sales}}
を使用して 2 つのパラメーターを作成できます。 ネストされたタスクが SQL タスクであると仮定すると、次のようなクエリを使用してコード内の値を参照できます。
言葉だけだと理解が難しいので、実際に設定してみます。
クエリーの設定
SQLタスクの出力の各行を読み込むクエリーを作成します。
-- 例: 前のクエリからデータにアクセスする:
SELECT concat(:year, ' 年の売り上げは ', :sales, ' でした。')
タスクの設定
それぞれ(For Each)
タスクを選択し、パラメータとして{{tasks.sales_by_year.output.rows}}
を設定します。
繰り返すタスクとして上のクエリーを選択したSQLタスクを作成し、クエリーで参照しているパラメータを以下のように設定します:
-
year:
{{input.sales_year}}
-
sales
{{input.num_sales}}
これでFor Eachタスクが追加されました。
実行すると、ループごとの結果が正しく表示されていることを確認できます。
このように、クエリーの結果をテーブルに永続化させることなく後段のタスクで参照できます。ご活用ください!