これも心待ちにしていた機能。マニュアルはこちらです。
for eachタスクとは
For eachタスクは、コレクション(テーブル名のリストなど)に対して繰り返しの処理を行い、動的なパラメーターの値で複数回処理を実行することができ、データドリブンなワークフローを構築することができます。
for eachタスクの作成
ループする処理を記述するノートブックを作成します。
print("I'm a task!")
ノートブックを指定してジョブのタスクを作成します。一度タスクを保存しないとループ処理に変更できません。
保存するとタスクの上のメニューにループマークが表示されます。こちらをクリックすることでタスクをループ処理に変更することができます。
最低限、ここで入力する必要があるのは入力です。入力パラメーターをJSONの配列形式で指定します。以下の配列を指定してタスクを保存します。
[1,2,3]
これでジョブを実行してみます。すると、ループの過程を確認することができます。
入力パラメーターの引き渡し
上の例では同じタスクをループしただけでした。ループの場合、イテレーションごとに異なるパラメーターを入力して処理することが一般的です。それでは、その方法を試してみます。
ノートブックのロジックを変更します。従来からタスクでのパラメーター受け取りで用いているdbutils.widgets.getを使います。
#print(dbutils.widgets.get("input_parameter"))
タスクを表示し、ループのボックスの中にあるループされるタスクをクリックします。パラメータを追加します。
ループで指定されているインプットは{{input}}
で動的に参照することができます。キーはノートブックで参照しているキーinput_parameter
にそろえてタスクを保存します。
再度ジョブを実行します。すると、ループごとに異なるインプットが渡されていることを確認できます。
動的な入力パラメーターの引き渡し
上の例ではインプットの配列は静的なものでしたが、これを動的に生成される配列にしたいケースもあるかと思います。ここでは、タスクバリューを用いて動的に生成された配列をループのインプットにします。
job_task = []
# ランダムな国名のキーと値を作成するためにrandomモジュールを使用
import random
# オブジェクトを格納するためのリスト
job_task = []
# 国名のリスト
country_names = ["USA", "Canada", "Mexico", "Brazil", "Argentina", "Chile", "France", "Germany", "Spain", "Italy", "China", "Japan", "India", "Australia", "South Africa"]
# ランダムな国名のキーと値を持つ100個のオブジェクトの作成
for _ in range(100):
country_key = random.randint(1000, 9999)
country_name = random.choice(country_names)
obj = {"country_key": country_key, "country_name": country_name}
job_task.append(obj)
# dbutilsを用いてタスクバリューを設定
dbutils.jobs.taskValues.set(key = "countries", value = job_task)
上記ノートブックをポイントするタスクcreate_input
を作成し、最初に作ったタスクがこのタスクに依存するように設定します。そして、入力をタスクバリューの動的参照{{tasks.create_input.values.countries}}
に設定します。
そして、値を参照する方のノートブックも変更を加えます。
country_key = dbutils.widgets.get("country_key")
country_name = dbutils.widgets.get("country_name")
print("country_key:" + country_key, "country_name:" + country_name)
ループされるタスクのパラメータも入力の形式にそろえます。
キー | バリュー |
---|---|
country_key | {{input.country_key}} |
country_name | {{input.country_name}} |
これでジョブを実行します。すると、最初のタスクで国のキーと名前のペアを格納するリストが生成されます。
そして、後段のタスクにリストが引き渡され、それに従ってループ処理が行われます。
同時実行
ループのイテレーション間に依存関係がある場合は逐次実行すべきですが、そうでなければ同時実行することも可能です。
これまで以上に柔軟にジョブを組めるようになりましたので、是非ご活用ください!