背景
Luigiが、HadoopJobの管理で使えるから使っていたけど、sparkもLuigiで管理できないかと思って調べてみた
前提
今までこのようにJobを走らせていたとする。 test_spark.pyはSparkを実行するCode
引数でInputとOutputのDirを指定できるようにしている。
spark-submit --master yarn --deploy-mode client --executor-cores 4 --driver-memory 4g --executor-memory 4g --num-executors 20 test_spark.py s3://test/input s3://test/output
使うもの
SparkSubmitTask
SparkSubmitTask_test.py
import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
class PySparkTest(SparkSubmitTask):
driver_memory = '4g'
executor_memory = '4g'
executor_cores = 4
num_executors = 20
deploy_mode = 'client'
app = 'test_spark.py' # 実際にSubmitされるsparkコードを指定
def app_options(self): #これは、spark_test.pyへの2つの引数
return [self.input().path, self.output().path]
def input(self):
return S3Target('s3://test/input')
def output(self):
return S3Target('s3://test/output')
if __name__ == '__main__':
luigi.configuration.LuigiConfigParser.add_config_path('client.cfg')
luigi.run(main_task_cls=PySparkTest)
実行は、python SparkSubmitTask_test.py
普通のluigiJobのSubmitと同じである。
これで、周期的にSparkを走らせることも可能になった。sparkの中身をすべて切り分けておけるので、便利である。
おまけ
もう一つ紛らわしいが、PySparkTask
というModuleもある。こちらh,SparkSubmitTaskに近いが、Sparkコード自体をmainの中に書いて、Luigiコード自体をSubmitする形でSparkJobを呼ぶ(はず)