個人的な勉強でAirflowを実装したので、その時に気づいたことを書きます1。
同じような問題ではまった人が減れば幸いです。
前提
- LocalExecutorを使う
- MySQLコンテナーとAirflowコンテナー
- AirflowからRedshiftにアクセス
Dockerイメージ
python:3.7.7ベースでairflow==1.10.10をインストール
- Dockerfileを自分で書いてみるため
- ついでに
entrypoint.sh
も
- ついでに
- 学習目的なので、puckel/docker-airflowは遠慮したい
- めちゃくちゃ参考にはする
- 公式Dockerイメージのmasterやlatestはバージョンが2.0で開発版2
AIRFLOW_EXTRAS
airflowを拡張するためのプラグインで、MySQLなどのDB系からGCPへのアクセスなどがある。
公式のDockerfileでは以下の通り、
ARG AIRFLOW_EXTRAS="async,aws,azure,celery,dask,elasticsearch,gcp,kubernetes,mysql,postgres,redis,slack,ssh,statsd,virtualenv"
crypto
はFERNET_KEY
の生成に必要なので事実上必須。
バックエンドのDBにMySQL、Redshiftへの接続にpsycopg2を使うので、これらに関連するものも必要。
entrypoint.sh
ドキュメントやpuckelもしているように、ここでFERNET_KEY
を生成して、コネクションを暗号化できるようにする。airflow.cfg
にベタ書きするより安全。
: "${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")}}"
nc (netcat) コマンドによるDB立ち上げの確認
docker-compose.yml
でdepends_on: mysql
としても、コンテナが立ち上がるのを待つだけで、DBが立ち上がるのは確認しません。puckelはentrypoint.sh
内でnc
コマンドを使い、DBへのコネクションが成立するかを確認しています。こういうのはとても参考になります。
wait_for_port() {
local name="$1" host="$2" port="$3"
local j=0
while ! nc -z "$host" "$port" >/dev/null 2>&1 < /dev/null; do
j=$((j+1))
if [ $j -ge $TRY_LOOP ]; then
echo >&2 "$(date) - $host:$port still not reachable, giving up"
exit 1
fi
echo "$(date) - waiting for $name... $j/$TRY_LOOP"
sleep 5
done
}
DBの設定
MySQLの環境設定
次の環境変数を設定する (参考)。これらはDB側の設定だが、同じユーザー・パスワードを使ってAirflowからアクセスする。
- MYSQL_ROOT_PASSWORD
- MYSQL_DATABASE
- MYSQL_USER
- MYSQL_PASSWORD
my.cnf
Airflowのデータベースバックエンドの説明にあるように、MySQLを使う場合は、explicit_defaults_for_timestamp=1
の設定が必要。
この他、マルチバイト文字を扱う場合の設定も追加する。
[mysqld]
character-set-server=utf8mb4
explicit_defaults_for_timestamp=1
[client]
default-character-set=utf8mb4
DBへのアクセス
AIRFLOW__CORE__SQL_ALCHEMY_CONN
デフォルトはsqliteを使うが、DBやドライバに合わせて変更する。書き方はSqlAlchemyのドキュメントを参考に、
- MySQL + mysqlclient
mysql+mysqldb://user:password@host:port/db
- PostgreSQL + psycopg2
postgresql+psycopg2://user:password@host:port/db
ホストはdocker-compose.yml
のcontainer_name
で指定したもの、ポートは基本的に、MySQLなら3306、PostgreSQLなら5432。ユーザー名やDB名は上で設定したもの。
環境変数の利用
Airflowの各種設定をどこに書くか
ドキュメントにも記載がある通り、複数の場所に設定を書くことができて、環境変数が優先される。AWSの認証などは環境変数を使いたい。DBへのアクセスなどスタティックにしたいものはairflow.cfg
でいいかもしれない。プロダクションレベルでは、きちんとメンバーでルールを設けるべき。
- set as an environment variable
- set as a command environment variable
- set in airflow.cfg
- command in airflow.cfg
- Airflow’s built in defaults
上のものほど優先する。
環境変数をどこで定義するか
これも結構色々ある。こちらは下のものほど優先されるはず。
-
Dockerfile
: 変更が少ないもの、デフォルトっぽく使う -
entrypoint.sh
: 同上 -
docker-compose.yml
: 他のコンテナに合わせて変更できる。より柔軟。 -
.env
ファイル:docker-compose.yml
でenv_file
として指定すると、コンテナ起動時に読まれる。Gitに残したくない認証系はこちらに書く。
DBの設定とSqlAlchemyの設定はdocker-compose.yml
内で
同じ設定を使うため、記述する場所が同じな方が、管理・保守しやすいと考えられる。
version: "3.7"
services:
mysql:
image: mysql:5.7
container_name: mysql
environment:
- MYSQL_ROOT_PASSWORD=password
- MYSQL_USER=airflow
- MYSQL_PASSWORD=airflow
- MYSQL_DATABASE=airflow
volumes:
- ./mysql.cnf:/etc/mysql/conf.d/mysql.cnf:ro
ports:
- "3306:3306"
airflow:
build: .
container_name: airflow
depends_on:
- mysql
environment:
- AIRFLOW_HOME=/opt/airflow
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mysql:3306/airflow
- MYSQL_PORT=3306
- MYSQL_HOST=mysql
# 略
AWSやRedshiftの接続は.env
ファイルで
Redshiftはともかく、AWSのアクセスキー・シークレットキーは機密性が高いので、docker-compose.yml
やentrypoint.sh
には書きたくない。airflow.cfg
なら一考の余地があるが、実際には開発チームと相談かも。
とりあえず、GUIでぽちぽち入力するのは、モダンじゃない。
書く際には、ドキュメントを参考にして以下のように書く。
Conn Id | Conn Type | Login | Password | Host | Port | Schema | Environment Variable |
---|---|---|---|---|---|---|---|
redshift_conn_id | postgres | awsuser | password | your-cluster-host | 5439 | dev | AIRFLOW_CONN_REDSHIFT_CONN_ID=postgres://awsuser:password@your-cluster-host:5439/dev |
aws_conn_id | aws | your-access-key | your-secret-key | AIRFLOW_CONN_AWS_CONN_ID=aws://your-access-key:your-secret-key@ |
IDが小文字でも、環境変数名では大文字になる。
AWSのキーではホストがなくとも最後に@をつける必要がある。ないとエラーを起こす。また、キー内にコロンやスラッシュが含まれると、うまくパースできないので、キーを生成し直す方がいい。
逆に、GUIで入力したコネクションのURI形式を知りたかったら、以下のように出力すればいい。
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('postgres_conn_id')
print(f"AIRFLOW_CONN_{conn.conn_id.upper()}='{conn.get_uri()}'")
AirflowのKey-Valueを環境変数で設定
コネクションと同様に、キーバリューも設定することができる。方法としては、
- GUIで設定する
-
.py
コードで設定する。 - 環境変数で設定する
コードの場合は、
from airflow.models import Variable
Variable.set(key="foo", value="bar")
環境変数の場合は
Key | Value | Environment Variable |
---|---|---|
foo | bar | AIRFLOW_VAR_FOO=bar |
キーが小文字でも、環境変数名は大文字になる。
終わり
一応リポジトリをご紹介します。
-
一応説明するとUdacityのData Engineerです。Cassandra, Redshift, Spark, Airflowを触りました。5ヶ月かかると言われていましたが、3ヶ月で終わったので、まあmonthlyの契約をする方が良さそうです。あと、定期的に50%オフになるので、それを狙って登録するのがオススメです。
じゃないと高杉↩ -
記事作成中に、
apache/airflow:1.10.10
を触ったら、こちらは割とすんなりいけそうでした。docker run -it --name test -p 8080 -d apache/airflow:1.10.10 ""
と実行すれば、bash
を開いた状態で起動するため、docker exec test airflow initdb
など柔軟に操作できます。 ↩