本記事でやること
Cloud Composer(apache-airflow==1.10.3)からDataflowPythonOperator
を使ってジョブを送った際にDataflow側のSDKのバージョンがサポート終了予定のGoogle Cloud Dataflow SDK for Python 2.5.0
になってしまうので、Python2での実行環境からApache Beam Python3.x SDK xxx
のPython3の実行環境までバージョンをあげる。
対象読者
- CloudComposer/AirFlowを触ったことのある方
実行環境
- Python3.6.3
- apache-airflow==1.10.3
- apache-beam==2.15.0
考えられる原因
Cloud CompposerからDataflowにジョブを送った際にGoogle Cloud Dataflow SDK for Python 2.5.0
で実行されてしまう考えられる原因は、airflow側のDataflowPythonOperator
の実装であると考えています。
実装を見てみる
-
DataflowPythonOperator
のexecute
関数の中でDataFlowHook
クラスをイニシャライズし、start_python_dataflow
関数を実行する。 -
DataFlowHook
のstart_python_dataflow
関数では_start_dataflow
関数のcommand_prefix
引数の一部にpython2
がハードコーディングされた状態で実行される。
class DataFlowHook(GoogleCloudBaseHook):
def start_python_dataflow(self, job_name, variables, dataflow, py_options,
append_job_name=True):
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
# "python2"がハードコーディングされている
self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
label_formatter)
この先の実装では、Dataflowにジョブを送るコマンドを作成するのですが、このコマンドのprefixはpython2
のままでありそのままDataflowのファイルを実行すしようとするのでDataflow側の実行環境がGoogle Cloud Dataflow SDK for Python 2.5.0
になってしまうのではないかと考えています。
解決策(2020/03/09時点)
以下を順に実行していきます。
1. Cloud Composerの環境にapache-beam
をインストールする
apache-beamをインストールするため、依存関係のある以下4つをインストールする。
apache-beam==2.15.0
google-api-core==1.14.3
google-apitools==0.5.28
google-cloud-core==1.0.3
インストールするためには、以下のコマンドを実行する。
(適当なディレクトリにrequirements.txt
を置いておく)
environment=your_composer_environment_name
location=your_location
gcloud composer environments update ${environment} \
--update-pypi-packages-from-file airflow/config/requirements.txt \
--location ${location}
2. DataflowPythonOperator
DataFlowHook
を継承するクラスを作成する
airflowのDataflowPythonOperator
とDataFlowHook
を継承するクラスを作成しpython3コマンドでdataflowのファイルを実行できる様にする。
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=1),
'dataflow_default_options': {
'project': YOUR_PROJECT,
'temp_location': DATAFLOW_TEMP_LOCATION.format(bucket=BUCKET),
'runner': 'DataflowRunner'
}
}
class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
self,
job_name: str,
variables: Dict,
dataflow: str,
py_options: List[str],
append_job_name: bool = True,
py_interpreter: str = "python3"
):
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['job_name'] = name
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)
class DataFlowPython3Operator(DataFlowPythonOperator):
def execute(self, context):
"""Execute the python dataflow job."""
bucket_helper = GoogleCloudBucketHelper(
self.gcp_conn_id, self.delegate_to)
self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
dataflow_options = self.dataflow_default_options.copy()
dataflow_options.update(self.options)
# Convert argument names from lowerCamelCase to snake case.
camel_to_snake = lambda name: re.sub(
r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
formatted_options = {camel_to_snake(key): dataflow_options[key]
for key in dataflow_options}
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")
with airflow.DAG(
dag_id="airflow_test_dataflow",
default_args=default_args,
schedule_interval=None) as dag:
t1 = DummyOperator(task_id="start")
t2 = DataFlowPython3Operator(
py_file=DATAFLOW_PY_FILE,
task_id="test_job",
dag=dag)
DataFlowPython3Operator
クラスのexecute
関数の中で実行しているstart_python_dataflow
の引数にpy_interpreter="python3"
を指定しておくことでpython3
コマンドでDataflowのファイルを実行できる様になります。
以下の様にApache Beam Python3.6 SDK 2.15.0
のバージョンで実行できたことを確認できればokです。
備考
airflowのDataflowPythonOperator
を使ってpython3
コマンドが実行ができる様に修正されたPRが作されairflow2.0以降にマージされています。
-
issueのjira
-
PullRrequest