LoginSignup
6
4

More than 5 years have passed since last update.

pytestでSparkアプリケーションのテストを書いてみる

Posted at

Sparkで動くアプリケーションをPythonで書いたので、pytestでテストしたい!
大規模データでもテストしたいので、YARNクラスタにも投げたい!

ある意味 pytestに入門してみたメモ の続編です。

pytestプラグインとかあるみたいですが

今回は手の内が分かったうえで自分でいろいろやりたかったので、前回の復習も兼ねて自前で作ってみることにします。

spark-submitコマンドを使わずにSparkにアクセス

Spark向けに書いたスクリプトを実行する時、普通はspark-submitコマンドを使うと思いますが、今回はpytest経由で実行したいので、spark-submitを使わずに普通のPythonからpysparkモジュールを呼びたいわけです。
そのためには、本来spark-submitがやっている諸々の設定を自分でやればいいはず。spark-submitの処理を追いかけ、設定部分を引っこ抜いてシェルスクリプトを作成します。

do_test.sh
#!/bin/bash

export PYTHONHASHSEED=0
export SPARK_HOME=/usr/local/spark2
export PYTHONPATH=`echo ${SPARK_HOME}/python/lib/py4j-*-src.zip`:${SPARK_HOME}/python:${PYTHONPATH}
SPARK_ENV=${SPARK_HOME}/conf/spark-env.sh
if [ -f ${SPARK_ENV} ]
then
    set -a
    source ${SPARK_ENV}
    set +a
fi
pytest

SPARK_HOMEの値は各自の環境に合わせて変えてください。
ちなみに set -a を実行すると、変数に代入された場合に自動的にexportされるようになります(Sparkのbin/load-spark-env.shで実際に使われています)。pytestの中でSparkのアプリが起動するときに使われる値ですので、エクスポートしなければ設定が反映されません。用事が終わったらset +aで自動エクスポートを解除します。

そしてテスト用のスクリプトとして、こういうのを用意してみました。

test_spark.py
# -*- coding: utf-8 -*-

import pyspark
import pytest

@pytest.fixture(scope="module", autouse=True)
def spark_session():
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    yield spark
    spark.stop()

@pytest.mark.parametrize(
    "data, sum_tobe", [
        ([(1, 2, 3), (4, 5, 6)], (5, 7, 9)),
        ([(10, 20), (40, 50), (100, 200)], (150, 270))
    ]
)
def test_dataframe(spark_session, data, sum_tobe):
    df = spark_session.createDataFrame(data)
    sum_asis = df.groupBy().sum().head()
    assert sum_asis == sum_tobe

spark_sessionは、このスクリプトに書いてあるテスト全体を通して1回だけ作る設定にしています。テストが増えても安心ですね。
実際にもっとテストが増えるとすれば、複数のスクリプトにまたがって書かれることになるでしょう。そういうときにはdef spark_session()はconftest.pyに移して、scope="session"と変更すれば、複数のスクリプトを回すときでも全体で1回だけ初期化することになるはずです。

2つのスクリプトを同じ場所に置き、実行。

terminal
$ bash do_test.sh
================================================== test session starts ===================================================
platform linux2 -- Python 2.7.8, pytest-3.3.2, py-1.5.2, pluggy-0.6.0
rootdir: /home/foo/pytest, inifile:
collected 2 items                                                                                                        

test_spark.py ..                                                                                                   [100%]

=============================================== 2 passed in 11.91 seconds ================================================

良いですね。
現状だとローカルでの実行になっていますが、YARNクラスタに投げたい時はgetOrCreate()のところで設定できます。こうすればOK。

    spark = pyspark.sql.SparkSession.builder.master("yarn").getOrCreate()

但し、これだけだと速くなりません。なぜならmaster("yarn")だとspark.executor.coresの初期値が1だから。

  • spark.executor.cores
  • spark.executor.memory
  • etc.

を設定しましょう。

    spark = pyspark.sql.SparkSession.builder \
              .master("yarn") \
              .config("spark.executor.cores", 5) \
              .getOrCreate()

とか。

もっと言えば、yarnとかlocal[*]といった文字列をテスト実行時にコマンドライン引数で与えられるようにするか、@pytest.markを使って特定の種類のテストだけ呼べるようにするとか、考えた方がよいでしょうね。

その他

pytestを動かすのに使うPythonと、PYSPARK_DRIVER_PYTHONPYSPARK_PYTHONに指定しているPythonとが異なるバージョンだと、エラーで止まってしまいます。pytestはSparkの設定に書いたPythonに対応するpip(など)で入れるようにしましょう。

参考

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b
https://stackoverflow.com/questions/40975360/testing-spark-with-pytest-cannot-run-spark-in-local-mode

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4