1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta Live TablesのPython UDFでログを出力する

Last updated at Posted at 2022-06-18

やってみようとしたら意外に大変だったのでメモ。

Delta Live Tables(DLT)はDatabricks上で高信頼、高品質なデータパイプラインを作成できるソリューションです。宣言型ETL(最初のテーブルはこういう定義である、次のテーブルの定義は...)を用いることで、処理の手順を記述するのではなく、最終的にどのようなデータが必要なのかを宣言するだけで、途中の処理はDLTがよしなにやってくれます。

パイプラインの定義はPythonかSQLで行います。さらにPythonのUDF(ユーザー定義関数)を使用できるので、処理のモジュール化、共通化も行うことができます。

ただ、UDFを作成する際にはデバッグは不可避であり、その際にログ出力ができないと色々不便かと思います。なので、今回ログ出力方法を調査しました。

何を使ってログ出力するのか

パッと思いつくのはprintloggerかと思います。結果としてどちらでも行けました。

以下のようにUDFの中でログを出力します。

callee
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math

def log_udf(x) -> int:
    """
    logging udf
    :param x: data to log
    :return: x
    """
    
    import datetime
    import logging
    
    logger = logging.getLogger('test')
    dt_now = datetime.datetime.now()
     
    #raise Exception("デバッグ用メッセージ")
    print(f"{dt_now}: printのデバッグ用メッセージ")
    logger.error(f"{dt_now}: loggerのデバッグ用メッセージ")

    return x

spark.udf.register("log_udf", log_udf, IntegerType())

なお、Exceptionを使うとドライバーログ(log4jログ)にログ出力されますが、その度処理が止まるので現実的ではありません。

どこに出力されるのか

実はこちらで悩みました。DLTの実態はSparkの構造化ストリーミング + Delta Lakeなので、処理もSparkクラスターで実行されます。結論としては、Spark UIのExecutorタブで表示されるExecutorのstderrでした。

DLTの画面でコンピューティングSpark UIを開きます。ログはドライバーのログなのでこちらを見ても表示されません。
Screen Shot 2022-06-18 at 14.31.36.png

Executorsをクリックします。
Screen Shot 2022-06-18 at 14.32.26.png

下の方にあるLogsカラムのstderrリンクをクリックします。
Screen Shot 2022-06-18 at 14.33.10.png

このようにログを確認することができます。オートスケールするなどして複数のExecutorが存在する場合には、それぞれのログを確認する必要があるので注意してください。
Screen Shot 2022-06-18 at 14.34.37.png

ログを永続化する

DLTのクラスターは停止後は削除されてしまうので、ログも消えてしまいます。これを永続化するにはクラスターログデリバリーを活用して、ログをDBFSに永続化します。

  1. Delta Live Tableにアクセスし、対象のパイプラインを開きます。

  2. 設定をクリックします。

  3. パイプライン設定を編集JSONトグルスイッチをクリックします。

  4. "clusters"の下に以下の内容を追加します。"destination"の値は適宜変更してください。

    JSON
    "cluster_log_conf": {
                    "dbfs": {
                        "destination": "dbfs:/tmp/takaaki.yayoi@databricks.com/cluster_log"
                    }
    }
    
  5. 以下のようになっていることを確認して、保存をクリックします。
    Screen Shot 2022-06-24 at 9.53.15.png

  6. パイプラインを実行します。

  7. ノートブック上でファイルが作成されていることを確認します。/tmp/takaaki.yayoi@databricks.com/cluster_log/以降のパスはクラスターID/executor/アプリケーションID/エグゼキュータノードの連番という構成になります。

%fs
ls /tmp/takaaki.yayoi@databricks.com/cluster_log/0624-004506-zp6uwtgl/executor/app-20220624004529-0000/0

Screen Shot 2022-06-24 at 9.52.55.png

こちらをFileStoreにコピーしてダウンロードして中身を確認することもできます。

%fs
cp dbfs:/tmp/takaaki.yayoi@databricks.com/cluster_log/0624-004506-zp6uwtgl/executor/app-20220624004529-0000/0/stderr /FileStore/shared_uploads/takaaki.yayoi@databricks.com

Screen Shot 2022-06-24 at 10.00.13.png

Databricks 無料トライアル

Databricks 無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?