LoginSignup
1

More than 5 years have passed since last update.

luigiでsparkJobを管理する

Last updated at Posted at 2017-07-20

背景

Luigiが、HadoopJobの管理で使えるから使っていたけど、sparkもLuigiで管理できないかと思って調べてみた

前提

今までこのようにJobを走らせていたとする。 test_spark.pyはSparkを実行するCode

引数でInputとOutputのDirを指定できるようにしている。

spark-submit --master yarn --deploy-mode client --executor-cores 4 --driver-memory 4g --executor-memory 4g --num-executors 20 test_spark.py s3://test/input s3://test/output

使うもの

SparkSubmitTask

SparkSubmitTask_test.py
import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask, PySparkTask


class PySparkTest(SparkSubmitTask):

    driver_memory = '4g'
    executor_memory = '4g'
    executor_cores = 4
    num_executors = 20
    deploy_mode = 'client'

    app = 'test_spark.py' # 実際にSubmitされるsparkコードを指定

    def app_options(self): #これは、spark_test.pyへの2つの引数
        return [self.input().path, self.output().path]

    def input(self):
        return S3Target('s3://test/input')

    def output(self):
        return S3Target('s3://test/output')


if __name__ == '__main__':
     luigi.configuration.LuigiConfigParser.add_config_path('client.cfg')
     luigi.run(main_task_cls=PySparkTest)

実行は、python SparkSubmitTask_test.py普通のluigiJobのSubmitと同じである。

これで、周期的にSparkを走らせることも可能になった。sparkの中身をすべて切り分けておけるので、便利である。

おまけ

もう一つ紛らわしいが、PySparkTaskというModuleもある。こちらh,SparkSubmitTaskに近いが、Sparkコード自体をmainの中に書いて、Luigiコード自体をSubmitする形でSparkJobを呼ぶ(はず)

参考

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1