概要
Airflow のリモートロギング機能を活用すると、タスクログの送受信先として S3 や GCS のようなオブジェクトストレージの他、フルテキスト検索が可能な Elasticsearch も使えます。しかし、Elasticsearch にログを転送するには Fluentd や Logstash のようなログ収集ツールも必要になり、設定の理解や動作確認に時間がかかります。
この記事で、公式 docker-compose.yaml をベースに Airflow、Fluentd、Elasticsearch を使用した開発環境を作り、各コンポーネントの繋ぎ方や潜んでいる落とし穴を解説していきます。
まず、全体の流れを把握しましょう。
上の図を要約すると:
- Airflow がタスクログを
stdout
に書き込む - Docker のロギングドライバーが Fluentd にログを転送する
- Fluentd が Elasticsearch にログを転送する
- Airflow が Elasticsearch からログを取得する
迷いやすいところは、ログがどうやって Airflow から Elasticsearch に渡るかです。Elasticsearch からログを直接取得することはできますが、Elasticsearch にログを直接転送することはできません。タスクログを Elasticsearch に転送するには、ロギング・ドライバーと Fluentd のようなログ収集ツールを間に挟める必要があります。
それでは、各コンポーネントの設定方法を見ていきましょう。
コンポーネント | バージョン |
---|---|
os | macOS 12.2.1 |
airflow | 2.2.4 |
elasticsearch | 8.0.1 |
fluentd | 1.14.5 |
docker | 20.10.12 |
docker compose | 2.2.3 |
Elasticsearch
まず、動作確認用の Elasticsearch 環境を用意します。以下を docker-compose.yaml に追加すると、1ノードの Elasticsearch クラスターが立ち上がります。設定項目について詳しく知りたい方は 公式ドキュメント が参考になるかと思います。なお、動作確認用なのでセキュリティーを緩和した設定になっています。
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.0.1
volumes:
- elasticsearch-volume:/usr/share/elasticsearch/data
environment:
- "discovery.type=single-node"
- "xpack.security.enabled=false" # 注意:本番向けではない
healthcheck:
test:
[
"CMD",
"curl",
"--fail",
"localhost:9200/_cluster/health?wait_for_status=yellow&timeout=10s"
]
interval: 10s
restart: always
volumes:
elasticsearch-volume:
Airflow と Elasticsearch の連携
Elasticsearch と連携するために、Airflow のタスクログの出力先を stdout
に、読み込み先を Elasticsearch に変える必要があります。Elasticsearch のプロバイダーパッケージに ElasticsearchTaskHandler があるので、以下の4つの設定で デフォルト task ハンドラー と入れ替えます。
項目 | 設定 | 説明 |
---|---|---|
logging.remote_logging | true | リモートロギング機能を有効にする |
elasticsearch.write_stdout | true | タスクログの出力先を stdout にする |
elasticsearch.host | elasticsearch:9200 | 読み込み先のホスト名を指定する |
elasticsearch.json_format | true | 書き込み/読み込み形式を JSON にする |
elasticsearch.json_format は false のままだと、連携がうまく行かないことがある
以下のように docker-compose.yaml で設定できます。
---
x-airflow-common:
&airflow-common
image: apache/airflow:2.2.4
environment:
&airflow-common-env
AIRFLOW__LOGGING__REMOTE_LOGGING: "true"
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "true"
AIRFLOW__ELASTICSEARCH__HOST: "elasticsearch:9200"
AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "true"
Airflow と Fluentd の連携
Airflow のタスクログの出力先を stdout に向けた後に、ロギング・ドライバー 経由で、次のセクションで作る Fluentd サービスに転送する必要があります。タスクログは Airflow のワーカーが出力するものなので、docker-compose.yaml の airflow-worker
サービスの logging
設定を変えます。
services:
airflow-worker:
# 省略
logging:
driver: "fluentd"
options:
fluentd-address: localhost:24224
tag: airflow.worker
depends_on:
# 省略
fluentd:
condition: service_healthy
ドライバーを fluentd
に設定し、fluentd のホスト名を指定しました。一見分かりやすく見えますが、fluentd-address
で localhost
を指定しているところに気づいたでしょうか?理由は、ロギング・ドライバーが サービスコンテキストの外から Fluentd に対してデータを送るためです。つまり fluentd-address
で指定する値は、Docker ホストから見た Fluentd サービスのアドレスというわけです。
Fluentd 側からタスクログを特定できるように、tag
を airflow.worker
に設定します。
Fluentd と Elasticsearch の連携
Fluentd から Elasticsearch にログを転送するために、ログ収集/転送用の Fluentd サービスが必要です。このセクションで Fluentd のコンテナイメージの作成、設定ファイルの書き方と docker-compose.yaml のサービス定義方法を順番に説明します。
Fluentd のコンテナイメージを作成する
公式コンテナイメージ を継承して elasticsearch と prometheus プラグインをインストールします。今回は prometheus でメトリックを収集するつもりはなく、ヘルスチェックエンドポイントとして使用したいと思います。
FROM fluent/fluentd:v1.14.5-debian-1.0
USER root
RUN apt-get -y update \
&& apt-get -y install gcc make curl \
&& gem install fluent-plugin-elasticsearch:5.2.0 fluent-plugin-prometheus:2.0.2 --no-document
USER fluent
Fluentd の設定ファイルを書く
ロギング・ドライバーから届いたタスクログをパースして elasticsearch に転送するための設定は fluent.conf
で行います。
ファイル名は fluent.conf にする必要がある
├── docker-compose.yaml
└── fluentd
├── Dockerfile
└── conf
└── fluent.conf
中身は以下の通りです。
# (1)
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# (2)
<filter airflow.worker>
@type parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>
# (3)
<match airflow.worker>
@type elasticsearch
host elasticsearch
port 9200
logstash_format true
logstash_prefix fluentd.${tag}
logstash_dateformat %Y%m%d
include_tag_key true
tag_key @log_name
flush_interval 1s
</match>
# (4)
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /metrics
</source>
上記設定内容を補足すると:
-
source
ディレクティブとforward
入力プラグインで、ロギング・ドライバーからのデータを受けるための TCP エンドポイントを提供します。 -
filter
ディレクティブで、airflow.worker
のログを JSON としてパースします。 -
match
ディレクティブとelasticsearch
出力プラグインでairflow.worker
のログを elasticsearch に転送します。 -
prometheus
入力プラグインで、Prometheus サーバーがスクレイピングするための Metric HTTP エンドポイントを提供します。
Fluentd のサービスを定義する
最後に、Fluentd のコンテナイメージと設定ファイルを利用するサービスを docker-compose.yaml で定義します。
services:
fluentd:
build: ./fluentd
volumes:
- ./fluentd/conf:/fluentd/etc
ports:
- 24224:24224
restart: always
healthcheck:
test: [ "CMD", "curl", "--fail", "localhost:24231/metrics" ]
interval: 10s
start_period: 30s
環境を立ち上げる
docker compose up
で環境を立ち上げて、2分ほど待ってから http://localhost:8080
を開くと、Airflow のログイン画面が表示されます。
docker-compose.yaml のデフォルト設定だと、コンテナがヘルスチェックをクリアできない可能性があります。scheduler | triggerer | webserver | worker | flower サービスのヘルスチェック interval と timeout をそれぞれ 30s に設定すると回避できます。
example_bash_operator
を実行して、Graph 画面から成功したいずれかのタスクログを開くと、Elasticsearch から取得されたログが以下のように表示されるはずです。
終わりに
この記事で Airflow、Fluentd と Elasticsearch の連携方法と各コンポーネントのセットアップ手順を解説しました。docker compose の開発環境を前提とした説明でしたが、AWS Elastic Container Service (ECS) や Kubernetes のようなコンテナ実行プラットフォームでも役に立つナレッジになるので、ぜひ活用していただけたらと思います。