LoginSignup
1
4

More than 1 year has passed since last update.

【徹底解説】Airflow Fluentd Elasticsearch Docker の連携方法

Last updated at Posted at 2022-03-06

概要

Airflow のリモートロギング機能を活用すると、タスクログの送受信先として S3 や GCS のようなオブジェクトストレージの他、フルテキスト検索が可能な Elasticsearch も使えます。しかし、Elasticsearch にログを転送するには FluentdLogstash のようなログ収集ツールも必要になり、設定の理解や動作確認に時間がかかります。

この記事で、公式 docker-compose.yaml をベースに Airflow、Fluentd、Elasticsearch を使用した開発環境を作り、各コンポーネントの繋ぎ方や潜んでいる落とし穴を解説していきます。

まず、全体の流れを把握しましょう。

airflow_elasticsearch.png

上の図を要約すると:

  1. Airflow がタスクログを stdout に書き込む
  2. Docker のロギングドライバーが Fluentd にログを転送する
  3. Fluentd が Elasticsearch にログを転送する
  4. Airflow が Elasticsearch からログを取得する

迷いやすいところは、ログがどうやって Airflow から Elasticsearch に渡るかです。Elasticsearch からログを直接取得することはできますが、Elasticsearch にログを直接転送することはできません。タスクログを Elasticsearch に転送するには、ロギング・ドライバーと Fluentd のようなログ収集ツールを間に挟める必要があります。

それでは、各コンポーネントの設定方法を見ていきましょう。

コンポーネント バージョン
os macOS 12.2.1
airflow 2.2.4
elasticsearch 8.0.1
fluentd 1.14.5
docker 20.10.12
docker compose 2.2.3

Elasticsearch

まず、動作確認用の Elasticsearch 環境を用意します。以下を docker-compose.yaml に追加すると、1ノードの Elasticsearch クラスターが立ち上がります。設定項目について詳しく知りたい方は 公式ドキュメント が参考になるかと思います。なお、動作確認用なのでセキュリティーを緩和した設定になっています。

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.0.1
    volumes:
      - elasticsearch-volume:/usr/share/elasticsearch/data
    environment:
      - "discovery.type=single-node"
      - "xpack.security.enabled=false" # 注意:本番向けではない
    healthcheck:
      test:
        [
          "CMD",
          "curl",
          "--fail",
          "localhost:9200/_cluster/health?wait_for_status=yellow&timeout=10s"
        ]
      interval: 10s
    restart: always
volumes:
  elasticsearch-volume:

Airflow と Elasticsearch の連携

Elasticsearch と連携するために、Airflow のタスクログの出力先を stdout に、読み込み先を Elasticsearch に変える必要があります。Elasticsearch のプロバイダーパッケージに ElasticsearchTaskHandler があるので、以下の4つの設定で デフォルト task ハンドラー と入れ替えます。

項目 設定 説明
logging.remote_logging true リモートロギング機能を有効にする
elasticsearch.write_stdout true タスクログの出力先を stdout にする
elasticsearch.host elasticsearch:9200 読み込み先のホスト名を指定する
elasticsearch.json_format true 書き込み/読み込み形式を JSON にする

elasticsearch.json_format は false のままだと、連携がうまく行かないことがある

以下のように docker-compose.yaml で設定できます。

---
x-airflow-common:
  &airflow-common
  image: apache/airflow:2.2.4
  environment:
    &airflow-common-env
    AIRFLOW__LOGGING__REMOTE_LOGGING: "true"
    AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "true"
    AIRFLOW__ELASTICSEARCH__HOST: "elasticsearch:9200"
    AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "true"

Airflow と Fluentd の連携

Airflow のタスクログの出力先を stdout に向けた後に、ロギング・ドライバー 経由で、次のセクションで作る Fluentd サービスに転送する必要があります。タスクログは Airflow のワーカーが出力するものなので、docker-compose.yaml の airflow-worker サービスの logging 設定を変えます。

services:
  airflow-worker:
    # 省略
    logging:
      driver: "fluentd"
      options:
        fluentd-address: localhost:24224
        tag: airflow.worker
    depends_on:
      # 省略
      fluentd:
        condition: service_healthy

ドライバーを fluentd に設定し、fluentd のホスト名を指定しました。一見分かりやすく見えますが、fluentd-addresslocalhost を指定しているところに気づいたでしょうか?理由は、ロギング・ドライバーが サービスコンテキストの外から Fluentd に対してデータを送るためです。つまり fluentd-address で指定する値は、Docker ホストから見た Fluentd サービスのアドレスというわけです。

Fluentd 側からタスクログを特定できるように、tagairflow.worker に設定します。

Fluentd と Elasticsearch の連携

Fluentd から Elasticsearch にログを転送するために、ログ収集/転送用の Fluentd サービスが必要です。このセクションで Fluentd のコンテナイメージの作成、設定ファイルの書き方と docker-compose.yaml のサービス定義方法を順番に説明します。

Fluentd のコンテナイメージを作成する

公式コンテナイメージ を継承して elasticsearch と prometheus プラグインをインストールします。今回は prometheus でメトリックを収集するつもりはなく、ヘルスチェックエンドポイントとして使用したいと思います。

FROM fluent/fluentd:v1.14.5-debian-1.0
USER root
RUN apt-get -y update \
    && apt-get -y install gcc make curl \
    && gem install fluent-plugin-elasticsearch:5.2.0 fluent-plugin-prometheus:2.0.2 --no-document
USER fluent

Fluentd の設定ファイルを書く

ロギング・ドライバーから届いたタスクログをパースして elasticsearch に転送するための設定は fluent.conf で行います。

ファイル名は fluent.conf にする必要がある

├── docker-compose.yaml
└── fluentd
    ├── Dockerfile
    └── conf
        └── fluent.conf

中身は以下の通りです。

# (1)
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

# (2)
<filter airflow.worker>
  @type parser
  key_name log
  reserve_data true
  remove_key_name_field true
  <parse>
    @type json
  </parse>
</filter>

# (3)
<match airflow.worker>
  @type elasticsearch
  host elasticsearch
  port 9200
  logstash_format true
  logstash_prefix fluentd.${tag}
  logstash_dateformat %Y%m%d
  include_tag_key true
  tag_key @log_name
  flush_interval 1s
</match>

# (4)
<source>
  @type prometheus
  bind 0.0.0.0
  port 24231
  metrics_path /metrics
</source>

上記設定内容を補足すると:

  1. source ディレクティブと forward 入力プラグインで、ロギング・ドライバーからのデータを受けるための TCP エンドポイントを提供します。
  2. filter ディレクティブで、airflow.worker のログを JSON としてパースします
  3. match ディレクティブと elasticsearch 出力プラグインで airflow.worker のログを elasticsearch に転送します。
  4. prometheus 入力プラグインで、Prometheus サーバーがスクレイピングするための Metric HTTP エンドポイントを提供します。

Fluentd のサービスを定義する

最後に、Fluentd のコンテナイメージと設定ファイルを利用するサービスを docker-compose.yaml で定義します。

services:
  fluentd:
    build: ./fluentd
    volumes:
      - ./fluentd/conf:/fluentd/etc
    ports:
      - 24224:24224
    restart: always
    healthcheck:
      test: [ "CMD", "curl", "--fail", "localhost:24231/metrics" ]
      interval: 10s
      start_period: 30s

環境を立ち上げる

docker compose up で環境を立ち上げて、2分ほど待ってから http://localhost:8080 を開くと、Airflow のログイン画面が表示されます。

docker-compose.yaml のデフォルト設定だと、コンテナがヘルスチェックをクリアできない可能性があります。scheduler | triggerer | webserver | worker | flower サービスのヘルスチェック interval と timeout をそれぞれ 30s に設定すると回避できます。

airflow_login.png
example_bash_operator を実行して、Graph 画面から成功したいずれかのタスクログを開くと、Elasticsearch から取得されたログが以下のように表示されるはずです。
airflow_task_log.png

終わりに

この記事で Airflow、Fluentd と Elasticsearch の連携方法と各コンポーネントのセットアップ手順を解説しました。docker compose の開発環境を前提とした説明でしたが、AWS Elastic Container Service (ECS) や Kubernetes のようなコンテナ実行プラットフォームでも役に立つナレッジになるので、ぜひ活用していただけたらと思います。

1
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
4