0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AirflowやCloud Composerでpackageを定義して使いたい

Last updated at Posted at 2020-08-18

背景

AirflowやCloud Composerで自分で定義したpackageを使いたい場合。例えば、slack通知を行うクラスの定義など。

Airflowのやり方

例としてslack通知を実装する(参考: Airflowのタスク/DAGが失敗した時にSlackで通知する仕組み)。
まずは、Airflowでは以下のように定義すればいい。

dir構成


ProjectRoot
.
├── dags
│   ├── slack_test.py
│   ├── packages # 任意のdir名
│   │   ├── __init__.py
│   │   └── slack.py

slack.py


import json
import requests


class Slack:
    def noti(context=""):
        Slack.post('Failed: {}.{}.{}'.format(
            context['dag'], context['task'], context['execution_date'])
        )

    def post(text):
        url = "https://hooks.slack.com/services/__/__/__"
        requests.post(url, data=json.dumps({
            'username': 'Airflow',
            'channel': 'your_channel',
            'attachments': [{
                'title': text,
                "color": 'danger'
            }]
        }))

__init__.py

from packages.slack import Slack

slack_test.py


from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from packages import Slack


def test():
    Slack.post('slack test')
    return 'success'


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 8, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "on_failure_callback": Slack.noti, # Dagが失敗したら通知が飛ぶ
}

dag = DAG("slack_test", default_args=default_args, schedule_interval=timedelta(1))
t1 = PythonOperator(task_id='print_package', python_callable=test, dag=dag)

じゃ、Cloud Composerの場合は?

ローカルの Python ライブラリをインストールする

ここにあるように、ディレクトリ名を dependencies と定義する必要がある。
ディレクトリ構成は以下のようになる。


ProjectRoot
.
├── dags
│   ├── slack_test.py
│   ├── dependecies # これ
│   │   ├── __init__.py
│   │   └── slack.py
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?