tl:dr
- 2021年2月現在、puckel/docker-airflow は事実上のdeprecated
- すぐに apache/airflow に移行できない人も多いだろうからとりあえず動くようにして延命させたい
- Dockerfileに
pip install sqlalchemy==1.3.15
を足す
- Dockerfileに
はじめに
2021年2月現在、新規に Airflow をDocker上で運用するなら、公式が提供している apache/airflow のお世話になるのがよいかと思います。
しかし、数年前から運用している方の中には puckel氏の puckel/docker-airflow を カスタマイズして使っている方も多そうです 。
残念ながらpuckel/docker-airflowの更新は2020年2月を最後に止まっています。
DockerHubからイメージをpullして動かす場合には問題ないのですが、前述のようにDockerfileをカスタマイズして使いたい場合、2020年2月時点では 提供されているDockerfileからイメージをビルドしてdocker-composeしても動作しません。
リポジトリーでも多数のissues、PRsも放置されており、 事実上のdeprecatedといえます。
べき論でいえば、puckel/docker-airflowの使用は止め、即刻apache/airflowに移行すべきでしょう。
しかし業務で使っている場合など、世の中そう簡単に移行できるとは限りません。
この記事では、puckel/docker-airflowを動くようにし、とりあえずの延命をはかる方法を説明します。
環境
- OS: macOS Big Sur 11.2
- Docker: Docker Desktop 3.1.0/Docker 20.10.2
- docker-compose version 1.27.4, build 40524192
- puckel/docker-airflow: 1.10.9
環境構築手順
自分でビルドしたDockerイメージを使ってAirflowを立ち上げます。
最初にDockerイメージのビルドです。READMEに書いてあるコマンドとほぼ一緒ですが、タグを指定しています。
docker build --rm --build-arg AIRFLOW_DEPS="datadog,dask" --build-arg PYTHON_DEPS="flask_oauthlib>=0.9" -t puckel/docker-airflow:1.10.9 .
つぎにdocker-composeします。
docker-composeファイルは docker-compose-CeleryExecutor.yml
とdocker-compose-LocalExecutor.yml
の2種類がありますが、エラーが出るという点ではどちらも変わりません。
今回はCeleryExecutorの方を使います。
これもREADMEに記載されているコマンドそのままです。
docker-compose -f docker-compose-CeleryExecutor.yml up -d
問題:Airflowのwebserverコンテナーが起動できない
上述の操作を行うと、一見動いているように見えますが、 http://localhost:8080
にアクセスしても反応が返ってきません。
docker ps
を数秒おきに実行してみると、webserverコンテナーが終了と再起動を繰り返していることが確認できます。
docker run puckel/docker-airflow:latest webserver
で直接webserverコンテナーを立ち上げてみましょう。
以下のエラーによって起動が失敗していることがわかります。
DB: sqlite:////usr/local/airflow/airflow.db
[2021-02-07 13:58:14,562] {{db.py:368}} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1, current schema
INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/usr/local/lib/python3.7/site-packages/alembic/ddl/sqlite.py:44: UserWarning: Skipping unsupported ALTER for creation of implicit constraintPlease refer to the batch mode feature which allows for SQLite migrations using a copy-and-move strategy.
"Skipping unsupported ALTER for "
INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea, add idx_log_dag
INFO [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
INFO [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table
Revision ID: d38e04c12aa2
Revises: 6e96a59344a4
Create Date: 2019-08-01 14:39:35.616417
INFO [alembic.runtime.migration] Running upgrade d38e04c12aa2 -> b3b105409875, add root_dag_id to DAG
Revision ID: b3b105409875
Revises: d38e04c12aa2
Create Date: 2019-09-28 23:20:01.744775
INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
INFO [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
INFO [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO [alembic.runtime.migration] Running upgrade a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875 -> 08364691d074, Merge the four heads back together
INFO [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
INFO [alembic.runtime.migration] Running upgrade fe461863935f -> 7939bcff74ba, Add DagTags table
Done.
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2021-02-07 13:58:16,659] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2021-02-07 13:58:16,665] {{scheduler_job.py:1344}} INFO - Starting the scheduler
[2021-02-07 13:58:16,665] {{scheduler_job.py:1352}} INFO - Running execute loop for -1 seconds
[2021-02-07 13:58:16,666] {{scheduler_job.py:1353}} INFO - Processing each file at most -1 times
[2021-02-07 13:58:16,667] {{scheduler_job.py:1356}} INFO - Searching for files in /usr/local/airflow/dags
[2021-02-07 13:58:16,668] {{scheduler_job.py:1358}} INFO - There are 0 files in /usr/local/airflow/dags
[2021-02-07 13:58:16,669] {{scheduler_job.py:1409}} INFO - Resetting orphaned tasks for active dag runs
[2021-02-07 13:58:16,690] {{dag_processing.py:556}} INFO - Launched DagFileProcessorManager with pid: 26
[2021-02-07 13:58:16,695] {{settings.py:54}} INFO - Configured default timezone <Timezone [UTC]>
[2021-02-07 13:58:16,709] {{dag_processing.py:758}} WARNING - Because we cannot use more than 1 thread (max_threads = 2) when using sqlite. So we set parallelism to 1.
[2021-02-07 13:58:16,778] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2021-02-07 13:58:16,779] {{dagbag.py:403}} INFO - Filling up the DagBag from /usr/local/airflow/dags
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 37, in <module>
args.func(args)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 75, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 900, in webserver
app = cached_app_rbac(None) if settings.RBAC else cached_app(None)
File "/usr/local/lib/python3.7/site-packages/airflow/www/app.py", line 233, in cached_app
app = create_app(config, testing)
File "/usr/local/lib/python3.7/site-packages/airflow/www/app.py", line 103, in create_app
models.Chart, Session, name="Charts", category="Data Profiling"))
File "/usr/local/lib/python3.7/site-packages/flask_admin/contrib/sqla/view.py", line 330, in __init__
menu_icon_value=menu_icon_value)
File "/usr/local/lib/python3.7/site-packages/flask_admin/model/base.py", line 818, in __init__
self._refresh_cache()
File "/usr/local/lib/python3.7/site-packages/flask_admin/model/base.py", line 913, in _refresh_cache
self._search_supported = self.init_search()
File "/usr/local/lib/python3.7/site-packages/flask_admin/contrib/sqla/view.py", line 581, in init_search
if tools.is_hybrid_property(self.model, name):
File "/usr/local/lib/python3.7/site-packages/flask_admin/contrib/sqla/tools.py", line 209, in is_hybrid_property
return last_name in get_hybrid_properties(last_model)
File "/usr/local/lib/python3.7/site-packages/flask_admin/contrib/sqla/tools.py", line 190, in get_hybrid_properties
for key, prop in inspect(model).all_orm_descriptors.items()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/inspection.py", line 73, in inspect
"available for object of type %s" % type_
sqlalchemy.exc.NoInspectionAvailable: No inspection system is available for object of type <class 'method'>
なるほど、どうやらPythonの、それもSQLAlchemyにまつわるエラーのようですね。
原因:SQLAlchemyのバージョンに互換性がない
Airflowは一時期まで依存パッケージの管理が適当だったようです。
特に、SQLAlchemyのバージョンはAirflowの動く・動かないが激しいらしく、今回のエラーも2021年2月現在の最新の版である1.3.23をインストールしてしまい、Airflowとの互換性がないために上述のエラーが発生しています。
解決方法:SQLAlchemyのバージョンを1.3.15にする
こちらのissue によれば、1.3.15なら動作するようです。
Dockerfile を修正しましょう。
(わかりやすいように行番号を付与しています)
...
61: && pip install pyasn1 \
+ && pip install sqlalchemy==1.3.15 \
62: && pip install apache-airflow[crypto,celery,postgres,hive,jdbc,mysql,ssh${AIRFLOW_DEPS:+,}${AIRFLOW_DEPS}]==${AIRFLOW_VERSION} \
63: && pip install 'redis==3.2' \
...
Airflowパッケージのインストールの前にSQLAlchemyパッケージをバージョンピニングしてインストールする行を追加するだけです。
pipの仕組みを考えるとこれはベストな修正の仕方ではない(後続のpipでsqlalchemyがバージョンアップされてしまう可能性がある)のですが、今回はとりあえずの延命が目的なのでこれでOKとしておきましょう。
docker-compose -f docker-compose-CeleryExecutor.yml down
docker rmi puckel/docker-airflow:1.10.9
以上のコマンドでコンテナとイメージを削除した後、もう一度 環境構築手順
を実行してみてください。
今度はうまく動作すると思います。