3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricksジョブでfor eachタスクがサポートされました!

Last updated at Posted at 2024-08-21

これも心待ちにしていた機能。マニュアルはこちらです。

for eachタスクとは

For eachタスクは、コレクション(テーブル名のリストなど)に対して繰り返しの処理を行い、動的なパラメーターの値で複数回処理を実行することができ、データドリブンなワークフローを構築することができます。
capture.gif

for eachタスクの作成

ループする処理を記述するノートブックを作成します。

sample notebook
print("I'm a task!")

ノートブックを指定してジョブのタスクを作成します。一度タスクを保存しないとループ処理に変更できません。
Screenshot 2024-08-21 at 10.09.27.png

保存するとタスクの上のメニューにループマークが表示されます。こちらをクリックすることでタスクをループ処理に変更することができます。
Screenshot 2024-08-21 at 10.10.32.png
Screenshot 2024-08-21 at 10.10.32.png

最低限、ここで入力する必要があるのは入力です。入力パラメーターをJSONの配列形式で指定します。以下の配列を指定してタスクを保存します。

[1,2,3]

Screenshot 2024-08-21 at 11.14.45.png

これでジョブを実行してみます。すると、ループの過程を確認することができます。
Screenshot 2024-08-21 at 10.17.20.png
Screenshot 2024-08-21 at 10.18.18.png

入力パラメーターの引き渡し

上の例では同じタスクをループしただけでした。ループの場合、イテレーションごとに異なるパラメーターを入力して処理することが一般的です。それでは、その方法を試してみます。

ノートブックのロジックを変更します。従来からタスクでのパラメーター受け取りで用いているdbutils.widgets.getを使います。

sample notebook
#print(dbutils.widgets.get("input_parameter"))

タスクを表示し、ループのボックスの中にあるループされるタスクをクリックします。パラメータを追加します。
Screenshot 2024-08-21 at 10.23.09.png

ループで指定されているインプットは{{input}}で動的に参照することができます。キーはノートブックで参照しているキーinput_parameterにそろえてタスクを保存します。
Screenshot 2024-08-21 at 10.25.23.png

再度ジョブを実行します。すると、ループごとに異なるインプットが渡されていることを確認できます。
Screenshot 2024-08-21 at 10.27.18.png
Screenshot 2024-08-21 at 10.27.58.png
Screenshot 2024-08-21 at 10.28.14.png

動的な入力パラメーターの引き渡し

上の例ではインプットの配列は静的なものでしたが、これを動的に生成される配列にしたいケースもあるかと思います。ここでは、タスクバリューを用いて動的に生成された配列をループのインプットにします。

create input
job_task = []
# ランダムな国名のキーと値を作成するためにrandomモジュールを使用
import random

# オブジェクトを格納するためのリスト
job_task = []

# 国名のリスト
country_names = ["USA", "Canada", "Mexico", "Brazil", "Argentina", "Chile", "France", "Germany", "Spain", "Italy", "China", "Japan", "India", "Australia", "South Africa"]

# ランダムな国名のキーと値を持つ100個のオブジェクトの作成
for _ in range(100):
    country_key = random.randint(1000, 9999)
    country_name = random.choice(country_names)
    obj = {"country_key": country_key, "country_name": country_name}
    job_task.append(obj)

# dbutilsを用いてタスクバリューを設定
dbutils.jobs.taskValues.set(key = "countries", value = job_task)

上記ノートブックをポイントするタスクcreate_inputを作成し、最初に作ったタスクがこのタスクに依存するように設定します。そして、入力をタスクバリューの動的参照{{tasks.create_input.values.countries}}に設定します。

そして、値を参照する方のノートブックも変更を加えます。

sample notebook
country_key = dbutils.widgets.get("country_key")
country_name = dbutils.widgets.get("country_name")

print("country_key:" + country_key, "country_name:" + country_name)

ループされるタスクのパラメータも入力の形式にそろえます。

キー バリュー
country_key {{input.country_key}}
country_name {{input.country_name}}

Screenshot 2024-08-21 at 10.44.12.png

これでジョブを実行します。すると、最初のタスクで国のキーと名前のペアを格納するリストが生成されます。
Screenshot 2024-08-21 at 10.41.58.png

そして、後段のタスクにリストが引き渡され、それに従ってループ処理が行われます。
Screenshot 2024-08-21 at 10.46.12.png
Screenshot 2024-08-21 at 10.46.19.png

各イテレーションに値も引き渡されています。
Screenshot 2024-08-21 at 10.46.39.png

同時実行

ループのイテレーション間に依存関係がある場合は逐次実行すべきですが、そうでなければ同時実行することも可能です。
Screenshot 2024-08-21 at 11.09.49.png

同時実行度を上げればループ全体の処理を高速化できます。
Screenshot 2024-08-21 at 11.11.02.png

これまで以上に柔軟にジョブを組めるようになりましたので、是非ご活用ください!

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?