7
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

GCP Cloud ComposerからDataflowにジョブを送る際にpython3で実行できる様にする方法

Posted at

本記事でやること

Cloud Composer(apache-airflow==1.10.3)からDataflowPythonOperatorを使ってジョブを送った際にDataflow側のSDKのバージョンがサポート終了予定のGoogle Cloud Dataflow SDK for Python 2.5.0になってしまうので、Python2での実行環境からApache Beam Python3.x SDK xxxのPython3の実行環境までバージョンをあげる。

スクリーンショット 2020-03-04 12.19.23.png

対象読者

  • CloudComposer/AirFlowを触ったことのある方

実行環境

  • Python3.6.3
  • apache-airflow==1.10.3
  • apache-beam==2.15.0

考えられる原因

Cloud CompposerからDataflowにジョブを送った際にGoogle Cloud Dataflow SDK for Python 2.5.0で実行されてしまう考えられる原因は、airflow側のDataflowPythonOperatorの実装であると考えています。

実装を見てみる

class DataFlowHook(GoogleCloudBaseHook):

    def start_python_dataflow(self, job_name, variables, dataflow, py_options,
                              append_job_name=True):
        name = self._build_dataflow_job_name(job_name, append_job_name)
        variables['job_name'] = name

        def label_formatter(labels_dict):
            return ['--labels={}={}'.format(key, value)
                    for key, value in labels_dict.items()]
        # "python2"がハードコーディングされている
        self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
                             label_formatter)

この先の実装では、Dataflowにジョブを送るコマンドを作成するのですが、このコマンドのprefixはpython2のままでありそのままDataflowのファイルを実行すしようとするのでDataflow側の実行環境がGoogle Cloud Dataflow SDK for Python 2.5.0になってしまうのではないかと考えています。

解決策(2020/03/09時点)

以下を順に実行していきます。

1. Cloud Composerの環境にapache-beamをインストールする

apache-beamをインストールするため、依存関係のある以下4つをインストールする。

apache-beam==2.15.0
google-api-core==1.14.3
google-apitools==0.5.28
google-cloud-core==1.0.3

インストールするためには、以下のコマンドを実行する。
(適当なディレクトリにrequirements.txtを置いておく)

environment=your_composer_environment_name
location=your_location

gcloud composer environments update ${environment} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${location}

2. DataflowPythonOperator DataFlowHookを継承するクラスを作成する

airflowのDataflowPythonOperatorDataFlowHookを継承するクラスを作成しpython3コマンドでdataflowのファイルを実行できる様にする。

参考リンク
https://stackoverflow.com/questions/58545759/no-module-named-airfow-gcp-how-to-run-dataflow-job-that-uses-python3-beam-2-15/58631655#58631655

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'dataflow_default_options': {
        'project': YOUR_PROJECT,
        'temp_location': DATAFLOW_TEMP_LOCATION.format(bucket=BUCKET),
        'runner': 'DataflowRunner'
    }
}


class DataFlow3Hook(DataFlowHook):
    def start_python_dataflow(
        self,
        job_name: str,
        variables: Dict,
        dataflow: str,
        py_options: List[str],
        append_job_name: bool = True,
        py_interpreter: str = "python3"
    ):

        name = self._build_dataflow_job_name(job_name, append_job_name)
        variables['job_name'] = name

        def label_formatter(labels_dict):
            return ['--labels={}={}'.format(key, value)
                    for key, value in labels_dict.items()]

        self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
                             label_formatter)


class DataFlowPython3Operator(DataFlowPythonOperator):

    def execute(self, context):
        """Execute the python dataflow job."""
        bucket_helper = GoogleCloudBucketHelper(
            self.gcp_conn_id, self.delegate_to)
        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
        hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
                             delegate_to=self.delegate_to,
                             poll_sleep=self.poll_sleep)
        dataflow_options = self.dataflow_default_options.copy()
        dataflow_options.update(self.options)
        # Convert argument names from lowerCamelCase to snake case.
        camel_to_snake = lambda name: re.sub(
            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
        formatted_options = {camel_to_snake(key): dataflow_options[key]
                             for key in dataflow_options}
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options, py_interpreter="python3")


with airflow.DAG(
        dag_id="airflow_test_dataflow",
        default_args=default_args,
        schedule_interval=None) as dag:

    t1 = DummyOperator(task_id="start")
    t2 = DataFlowPython3Operator(
        py_file=DATAFLOW_PY_FILE,
        task_id="test_job",
        dag=dag)

DataFlowPython3Operatorクラスのexecute関数の中で実行しているstart_python_dataflowの引数にpy_interpreter="python3"を指定しておくことでpython3コマンドでDataflowのファイルを実行できる様になります。

以下の様にApache Beam Python3.6 SDK 2.15.0のバージョンで実行できたことを確認できればokです。

スクリーンショット 2020-03-05 11.51.34.png スクリーンショット 2020-03-05 12.26.27.png

備考

airflowのDataflowPythonOperatorを使ってpython3コマンドが実行ができる様に修正されたPRが作されairflow2.0以降にマージされています。

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?