Spark
Luigi

luigiでsparkJobを管理する

More than 1 year has passed since last update.

背景

Luigiが、HadoopJobの管理で使えるから使っていたけど、sparkもLuigiで管理できないかと思って調べてみた

前提

今までこのようにJobを走らせていたとする。 test_spark.pyはSparkを実行するCode

引数でInputとOutputのDirを指定できるようにしている。

spark-submit --master yarn --deploy-mode client --executor-cores 4 --driver-memory 4g --executor-memory 4g --num-executors 20 test_spark.py s3://test/input s3://test/output

使うもの

SparkSubmitTask

SparkSubmitTask_test.py
import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask, PySparkTask


class PySparkTest(SparkSubmitTask):

    driver_memory = '4g'
    executor_memory = '4g'
    executor_cores = 4
    num_executors = 20
    deploy_mode = 'client'

    app = 'test_spark.py' # 実際にSubmitされるsparkコードを指定

    def app_options(self): #これは、spark_test.pyへの2つの引数
        return [self.input().path, self.output().path]

    def input(self):
        return S3Target('s3://test/input')

    def output(self):
        return S3Target('s3://test/output')


if __name__ == '__main__':
     luigi.configuration.LuigiConfigParser.add_config_path('client.cfg')
     luigi.run(main_task_cls=PySparkTest)

実行は、python SparkSubmitTask_test.py普通のluigiJobのSubmitと同じである。

これで、周期的にSparkを走らせることも可能になった。sparkの中身をすべて切り分けておけるので、便利である。

おまけ

もう一つ紛らわしいが、PySparkTaskというModuleもある。こちらh,SparkSubmitTaskに近いが、Sparkコード自体をmainの中に書いて、Luigiコード自体をSubmitする形でSparkJobを呼ぶ(はず)

参考

https://github.com/spotify/luigi/blob/master/examples/pyspark_wc.py