Help us understand the problem. What is going on with this article?

Airflowをdocker-composeで実装した時に気をつけたこと

個人的な勉強でAirflowを実装したので、その時に気づいたことを書きます1
同じような問題ではまった人が減れば幸いです。

前提

  • LocalExecutorを使う
  • MySQLコンテナーとAirflowコンテナー
  • AirflowからRedshiftにアクセス

Dockerイメージ

python:3.7.7ベースでairflow==1.10.10をインストール

  • Dockerfileを自分で書いてみるため
    • ついでにentrypoint.sh
  • 学習目的なので、puckel/docker-airflowは遠慮したい
    • めちゃくちゃ参考にはする
  • 公式Dockerイメージのmasterやlatestはバージョンが2.0で開発版2

AIRFLOW_EXTRAS

airflowを拡張するためのプラグインで、MySQLなどのDB系からGCPへのアクセスなどがある。
公式のDockerfileでは以下の通り、

ARG AIRFLOW_EXTRAS="async,aws,azure,celery,dask,elasticsearch,gcp,kubernetes,mysql,postgres,redis,slack,ssh,statsd,virtualenv"

cryptoFERNET_KEYの生成に必要なので事実上必須。
バックエンドのDBにMySQL、Redshiftへの接続にpsycopg2を使うので、これらに関連するものも必要。

entrypoint.sh

ドキュメントやpuckelもしているように、ここでFERNET_KEYを生成して、コネクションを暗号化できるようにする。airflow.cfgにベタ書きするより安全。

: "${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")}}"

nc (netcat) コマンドによるDB立ち上げの確認

docker-compose.ymldepends_on: mysqlとしても、コンテナが立ち上がるのを待つだけで、DBが立ち上がるのは確認しません。puckelはentrypoint.sh内でncコマンドを使い、DBへのコネクションが成立するかを確認しています。こういうのはとても参考になります。

wait_for_port() {
  local name="$1" host="$2" port="$3"
  local j=0
  while ! nc -z "$host" "$port" >/dev/null 2>&1 < /dev/null; do
    j=$((j+1))
    if [ $j -ge $TRY_LOOP ]; then
      echo >&2 "$(date) - $host:$port still not reachable, giving up"
      exit 1
    fi
    echo "$(date) - waiting for $name... $j/$TRY_LOOP"
    sleep 5
  done
}

DBの設定

MySQLの環境設定

次の環境変数を設定する (参考)。これらはDB側の設定だが、同じユーザー・パスワードを使ってAirflowからアクセスする。
- MYSQL_ROOT_PASSWORD
- MYSQL_DATABASE
- MYSQL_USER
- MYSQL_PASSWORD

my.cnf

Airflowのデータベースバックエンドの説明にあるように、MySQLを使う場合は、explicit_defaults_for_timestamp=1の設定が必要。
この他、マルチバイト文字を扱う場合の設定も追加する。

[mysqld]
character-set-server=utf8mb4
explicit_defaults_for_timestamp=1

[client]
default-character-set=utf8mb4

DBへのアクセス

AIRFLOW__CORE__SQL_ALCHEMY_CONN

デフォルトはsqliteを使うが、DBやドライバに合わせて変更する。書き方はSqlAlchemyのドキュメントを参考に、

  • MySQL + mysqlclient
    • mysql+mysqldb://user:password@host:port/db
  • PostgreSQL + psycopg2
    • postgresql+psycopg2://user:password@host:port/db

ホストはdocker-compose.ymlcontainer_nameで指定したもの、ポートは基本的に、MySQLなら3306、PostgreSQLなら5432。ユーザー名やDB名は上で設定したもの。

環境変数の利用

Airflowの各種設定をどこに書くか

ドキュメントにも記載がある通り、複数の場所に設定を書くことができて、環境変数が優先される。AWSの認証などは環境変数を使いたい。DBへのアクセスなどスタティックにしたいものはairflow.cfgでいいかもしれない。プロダクションレベルでは、きちんとメンバーでルールを設けるべき。

  1. set as an environment variable
  2. set as a command environment variable
  3. set in airflow.cfg
  4. command in airflow.cfg
  5. Airflow’s built in defaults

上のものほど優先する。

環境変数をどこで定義するか

これも結構色々ある。こちらは下のものほど優先されるはず。

  1. Dockerfile: 変更が少ないもの、デフォルトっぽく使う
  2. entrypoint.sh: 同上
  3. docker-compose.yml: 他のコンテナに合わせて変更できる。より柔軟。
  4. .envファイル:docker-compose.ymlenv_fileとして指定すると、コンテナ起動時に読まれる。Gitに残したくない認証系はこちらに書く。

DBの設定とSqlAlchemyの設定はdocker-compose.yml内で

同じ設定を使うため、記述する場所が同じな方が、管理・保守しやすいと考えられる。

version: "3.7"
services:
    mysql:
        image: mysql:5.7
        container_name: mysql
        environment:
            - MYSQL_ROOT_PASSWORD=password
            - MYSQL_USER=airflow
            - MYSQL_PASSWORD=airflow
            - MYSQL_DATABASE=airflow
        volumes:
            - ./mysql.cnf:/etc/mysql/conf.d/mysql.cnf:ro
        ports:
            - "3306:3306"
    airflow:
        build: .
        container_name: airflow
        depends_on:
            - mysql
        environment:
            - AIRFLOW_HOME=/opt/airflow
            - AIRFLOW__CORE__LOAD_EXAMPLES=False
            - AIRFLOW__CORE__EXECUTOR=LocalExecutor
            - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mysql:3306/airflow
            - MYSQL_PORT=3306
            - MYSQL_HOST=mysql
# 略

AWSやRedshiftの接続は.envファイルで

Redshiftはともかく、AWSのアクセスキー・シークレットキーは機密性が高いので、docker-compose.ymlentrypoint.shには書きたくない。airflow.cfgなら一考の余地があるが、実際には開発チームと相談かも。

とりあえず、GUIでぽちぽち入力するのは、モダンじゃない。

書く際には、ドキュメントを参考にして以下のように書く。

Conn Id Conn Type Login Password Host Port Schema Environment Variable
redshift_conn_id postgres awsuser password your-cluster-host 5439 dev AIRFLOW_CONN_REDSHIFT_CONN_ID=postgres://awsuser:password@your-cluster-host:5439/dev
aws_conn_id aws your-access-key your-secret-key AIRFLOW_CONN_AWS_CONN_ID=aws://your-access-key:your-secret-key@

IDが小文字でも、環境変数名では大文字になる。

AWSのキーではホストがなくとも最後に@をつける必要がある。ないとエラーを起こす。また、キー内にコロンやスラッシュが含まれると、うまくパースできないので、キーを生成し直す方がいい。

逆に、GUIで入力したコネクションのURI形式を知りたかったら、以下のように出力すればいい。

from airflow.hooks.base_hook import BaseHook

conn = BaseHook.get_connection('postgres_conn_id')
print(f"AIRFLOW_CONN_{conn.conn_id.upper()}='{conn.get_uri()}'")

AirflowのKey-Valueを環境変数で設定

コネクションと同様に、キーバリューも設定することができる。方法としては、

  1. GUIで設定する
  2. .pyコードで設定する。
  3. 環境変数で設定する

コードの場合は、

from airflow.models import Variable
Variable.set(key="foo", value="bar")

環境変数の場合は

Key Value Environment Variable
foo bar AIRFLOW_VAR_FOO=bar

キーが小文字でも、環境変数名は大文字になる。

終わり

一応リポジトリをご紹介します。


  1. 一応説明するとUdacityのData Engineerです。Cassandra, Redshift, Spark, Airflowを触りました。5ヶ月かかると言われていましたが、3ヶ月で終わったので、まあmonthlyの契約をする方が良さそうです。あと、定期的に50%オフになるので、それを狙って登録するのがオススメです。じゃないと高杉 

  2. 記事作成中に、apache/airflow:1.10.10を触ったら、こちらは割とすんなりいけそうでした。docker run -it --name test -p 8080 -d apache/airflow:1.10.10 ""と実行すれば、bashを開いた状態で起動するため、docker exec test airflow initdb など柔軟に操作できます。 

brainpad
ブレインパッドは、2004年の創業以来、データによるビジネス創造と経営改善に向き合ってきたデータ活用・分析企業です。
http://www.brainpad.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした