Sparkで動くアプリケーションをPythonで書いたので、pytestでテストしたい!
大規模データでもテストしたいので、YARNクラスタにも投げたい!
ある意味 pytestに入門してみたメモ の続編です。
pytestプラグインとかあるみたいですが
今回は手の内が分かったうえで自分でいろいろやりたかったので、前回の復習も兼ねて自前で作ってみることにします。
spark-submitコマンドを使わずにSparkにアクセス
Spark向けに書いたスクリプトを実行する時、普通はspark-submitコマンドを使うと思いますが、今回はpytest経由で実行したいので、spark-submitを使わずに普通のPythonからpysparkモジュールを呼びたいわけです。
そのためには、本来spark-submitがやっている諸々の設定を自分でやればいいはず。spark-submitの処理を追いかけ、設定部分を引っこ抜いてシェルスクリプトを作成します。
# !/bin/bash
export PYTHONHASHSEED=0
export SPARK_HOME=/usr/local/spark2
export PYTHONPATH=`echo ${SPARK_HOME}/python/lib/py4j-*-src.zip`:${SPARK_HOME}/python:${PYTHONPATH}
SPARK_ENV=${SPARK_HOME}/conf/spark-env.sh
if [ -f ${SPARK_ENV} ]
then
set -a
source ${SPARK_ENV}
set +a
fi
pytest
SPARK_HOMEの値は各自の環境に合わせて変えてください。
ちなみに set -a を実行すると、変数に代入された場合に自動的にexportされるようになります(Sparkのbin/load-spark-env.shで実際に使われています)。pytestの中でSparkのアプリが起動するときに使われる値ですので、エクスポートしなければ設定が反映されません。用事が終わったらset +aで自動エクスポートを解除します。
そしてテスト用のスクリプトとして、こういうのを用意してみました。
# -*- coding: utf-8 -*-
import pyspark
import pytest
@pytest.fixture(scope="module", autouse=True)
def spark_session():
spark = pyspark.sql.SparkSession.builder.getOrCreate()
yield spark
spark.stop()
@pytest.mark.parametrize(
"data, sum_tobe", [
([(1, 2, 3), (4, 5, 6)], (5, 7, 9)),
([(10, 20), (40, 50), (100, 200)], (150, 270))
]
)
def test_dataframe(spark_session, data, sum_tobe):
df = spark_session.createDataFrame(data)
sum_asis = df.groupBy().sum().head()
assert sum_asis == sum_tobe
spark_sessionは、このスクリプトに書いてあるテスト全体を通して1回だけ作る設定にしています。テストが増えても安心ですね。
実際にもっとテストが増えるとすれば、複数のスクリプトにまたがって書かれることになるでしょう。そういうときにはdef spark_session()はconftest.pyに移して、scope="session"と変更すれば、複数のスクリプトを回すときでも全体で1回だけ初期化することになるはずです。
2つのスクリプトを同じ場所に置き、実行。
$ bash do_test.sh
================================================== test session starts ===================================================
platform linux2 -- Python 2.7.8, pytest-3.3.2, py-1.5.2, pluggy-0.6.0
rootdir: /home/foo/pytest, inifile:
collected 2 items
test_spark.py .. [100%]
=============================================== 2 passed in 11.91 seconds ================================================
良いですね。
現状だとローカルでの実行になっていますが、YARNクラスタに投げたい時はgetOrCreate()のところで設定できます。こうすればOK。
spark = pyspark.sql.SparkSession.builder.master("yarn").getOrCreate()
但し、これだけだと速くなりません。なぜならmaster("yarn")だとspark.executor.coresの初期値が1だから。
spark.executor.coresspark.executor.memory- etc.
を設定しましょう。
spark = pyspark.sql.SparkSession.builder \
.master("yarn") \
.config("spark.executor.cores", 5) \
.getOrCreate()
とか。
もっと言えば、yarnとかlocal[*]といった文字列をテスト実行時にコマンドライン引数で与えられるようにするか、@pytest.markを使って特定の種類のテストだけ呼べるようにするとか、考えた方がよいでしょうね。
その他
pytestを動かすのに使うPythonと、PYSPARK_DRIVER_PYTHONやPYSPARK_PYTHONに指定しているPythonとが異なるバージョンだと、エラーで止まってしまいます。pytestはSparkの設定に書いたPythonに対応するpip(など)で入れるようにしましょう。
参考
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b
https://stackoverflow.com/questions/40975360/testing-spark-with-pytest-cannot-run-spark-in-local-mode