Sparkで動くアプリケーションをPythonで書いたので、pytestでテストしたい!
大規模データでもテストしたいので、YARNクラスタにも投げたい!
ある意味 pytestに入門してみたメモ の続編です。
pytestプラグインとかあるみたいですが
今回は手の内が分かったうえで自分でいろいろやりたかったので、前回の復習も兼ねて自前で作ってみることにします。
spark-submitコマンドを使わずにSparkにアクセス
Spark向けに書いたスクリプトを実行する時、普通はspark-submit
コマンドを使うと思いますが、今回はpytest経由で実行したいので、spark-submit
を使わずに普通のPythonからpyspark
モジュールを呼びたいわけです。
そのためには、本来spark-submit
がやっている諸々の設定を自分でやればいいはず。spark-submit
の処理を追いかけ、設定部分を引っこ抜いてシェルスクリプトを作成します。
#!/bin/bash
export PYTHONHASHSEED=0
export SPARK_HOME=/usr/local/spark2
export PYTHONPATH=`echo ${SPARK_HOME}/python/lib/py4j-*-src.zip`:${SPARK_HOME}/python:${PYTHONPATH}
SPARK_ENV=${SPARK_HOME}/conf/spark-env.sh
if [ -f ${SPARK_ENV} ]
then
set -a
source ${SPARK_ENV}
set +a
fi
pytest
SPARK_HOME
の値は各自の環境に合わせて変えてください。
ちなみに set -a
を実行すると、変数に代入された場合に自動的にexport
されるようになります(Sparkのbin/load-spark-env.sh
で実際に使われています)。pytest
の中でSparkのアプリが起動するときに使われる値ですので、エクスポートしなければ設定が反映されません。用事が終わったらset +a
で自動エクスポートを解除します。
そしてテスト用のスクリプトとして、こういうのを用意してみました。
# -*- coding: utf-8 -*-
import pyspark
import pytest
@pytest.fixture(scope="module", autouse=True)
def spark_session():
spark = pyspark.sql.SparkSession.builder.getOrCreate()
yield spark
spark.stop()
@pytest.mark.parametrize(
"data, sum_tobe", [
([(1, 2, 3), (4, 5, 6)], (5, 7, 9)),
([(10, 20), (40, 50), (100, 200)], (150, 270))
]
)
def test_dataframe(spark_session, data, sum_tobe):
df = spark_session.createDataFrame(data)
sum_asis = df.groupBy().sum().head()
assert sum_asis == sum_tobe
spark_session
は、このスクリプトに書いてあるテスト全体を通して1回だけ作る設定にしています。テストが増えても安心ですね。
実際にもっとテストが増えるとすれば、複数のスクリプトにまたがって書かれることになるでしょう。そういうときにはdef spark_session()
はconftest.pyに移して、scope="session"
と変更すれば、複数のスクリプトを回すときでも全体で1回だけ初期化することになるはずです。
2つのスクリプトを同じ場所に置き、実行。
$ bash do_test.sh
================================================== test session starts ===================================================
platform linux2 -- Python 2.7.8, pytest-3.3.2, py-1.5.2, pluggy-0.6.0
rootdir: /home/foo/pytest, inifile:
collected 2 items
test_spark.py .. [100%]
=============================================== 2 passed in 11.91 seconds ================================================
良いですね。
現状だとローカルでの実行になっていますが、YARNクラスタに投げたい時はgetOrCreate()
のところで設定できます。こうすればOK。
spark = pyspark.sql.SparkSession.builder.master("yarn").getOrCreate()
但し、これだけだと速くなりません。なぜならmaster("yarn")
だとspark.executor.cores
の初期値が1だから。
spark.executor.cores
spark.executor.memory
- etc.
を設定しましょう。
spark = pyspark.sql.SparkSession.builder \
.master("yarn") \
.config("spark.executor.cores", 5) \
.getOrCreate()
とか。
もっと言えば、yarn
とかlocal[*]
といった文字列をテスト実行時にコマンドライン引数で与えられるようにするか、@pytest.mark
を使って特定の種類のテストだけ呼べるようにするとか、考えた方がよいでしょうね。
その他
pytestを動かすのに使うPythonと、PYSPARK_DRIVER_PYTHON
やPYSPARK_PYTHON
に指定しているPythonとが異なるバージョンだと、エラーで止まってしまいます。pytestはSparkの設定に書いたPythonに対応するpip(など)で入れるようにしましょう。
参考
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b
https://stackoverflow.com/questions/40975360/testing-spark-with-pytest-cannot-run-spark-in-local-mode