3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ログを Fluentd (Sidecar) 経由で GCS へ転送した全体的な流れ

Last updated at Posted at 2019-12-07

この記事は、CyberAgent Developers Advent Calendar 2019 7日目です。

はじめに

こんにちは。CyberAgent の AI事業本部 アドテクDivision のインソ(@sano307)と申します。

今回は、マイクロサービス化したサービスのログを Fluentd 経由で Cloud Storage(GCS) へ永続化した全体的な流れ(要件定義・技術選定・設計・実装・テスト・経過観察)についてお話しします。

予算管理周りのアーキテクチャ

ユーザーが広告に対して特定な行動 (*1) を行う時に予算情報を格納したり予算消化判定をしたりする全対象です。

*1) Adcall → Impression → View or Click → Conversion

スクリーンショット 2019-12-07 14.05.55.png
  • Reverse Proxy: 約 90台

  • Delivery (Java): 広告配信 API サービス(まだ kubernetes 化されていない)

    • 約 100台
  • Node(adprovider): 20台 (Custom Kubernetes Engine の仕組み問題のため)

    • Ad Provider (Scala): 広告選定マイクロサービス
      • Pod: 20台
  • Node(budget): 5台

    • Budget API (Scala): 予算 API サービス
      • Pod: 3台
    • Budget Daemon (Scala): 予算判定デーモン(5秒間隔)
      • Pod: 1台
  • Aerospike: 非確定予算消化額 / 予算超過案件リスト(5秒間隔で更新)

  • MySQL: 確定予算消化額

実現したいこと

  • 予算判定デーモンから 5秒間隔で評価された詳細な予算消化の情報を調査できるようにしたい

要件

ビジネス

① データの利用者は社員なので管理画面まで表示しなくても大丈夫
 → 基本的に他のデータも BigQuery に格納しつつ運用しているので BigQuery で問題なさそう
② データの参照頻度は予算消化のずれが発生する時だと思うのでそんなに頻繁に参照するわけではない
 → リアルタイム性は要らないので適切な間隔で BigQuery へロード
③ 今後要望によってデータが外部公開になるかもしれない
 → Cloud Storage(GCS) に永続化して置こう

技術

① ログの出力がメイン処理に影響を与えられないよう
 → 非同期処理をサポートする plugin を調査
② できる限りログの欠損は起きないよう
 → Middleware のパラメータ調整をしっかり行う
③ デプロイの時にも予算判定デモンの処理が走らないタイミングが発生しないよう
 → Pod の scheduling(replica, affinity) の調整をしっかり行う

技術選定

データの特徴(本番基準)

細かいパラメータ調整が必要な規模ではない感は若干ありますが、せっかくなのでしっかりやって見ましょう。

ログの種類 容量(1 cycle) レコード(1 cycle)
Order 約 0.1mb 1,500
Campaign 約 0.2mb 2,000

Middleware

理想的には Fluent Bit の JVM plugin を自作し OSS 化するのが正しいですが、スケジュール的にちょっと余裕がなかったので Fluentd を使うようにしました。

技術 メリット デメリット 妥当性
Logstash Elastic stack のメインサポート Java runtime が必要
性能検証が必要
Fluentd よりメモリを食う
X
Fluent Bit C言語で書かれているので早い
Fluentd よりメモリを食わない
JVM 向けの stable plugin が存在しない
Fluentd(採用) 既存サービスに使われている性能検証は要らない
CNCF の graduated project
細かいパラメータの学習が必要

Kubernetes

kind

budget-daemon.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: budget-daemon
spec:
  replicas: 1
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 0
      maxSurge: 1
  • Deployment(採用)

    • maxSurge を 1に設定したらいつでも一個以上の Pod は動いていることを保証するのでサービスの処理に影響を与えない
  • StatefulSet

    • Rolling update が走る時に Pod が一個も存在しないタイミングが約 30秒できちゃうのでサービスの処理に影響を与える
      • 1 cycle が 5秒なので少なくとも 6 cycle の漏れが発生

Pod design

  • 予算関連のマイクロサービスは同じ Node Pool を使っている(node の数は 5つ)
    • Daemonset
      • 5つの node に Fluentd が展開されることになるけど、現状ログ転送は「予算判定デーモン」のみ使う予定なので無駄なリソース利用が発生
    • Sidecar(採用)
      • 「予算判定デーモン」が scheduling された node のみ Flunetd が展開されるので無駄なリソース利用は発生しない

クラウドロギングの流れ

Fluentd プロセスが Pod のログを chunk (*2) 単位で読み込んで一時的に Buffer path に保存します。その後、Fluentd プロセスが Buffer path に保存されたデータを読み込んで Fluentd の共通処理 queue (*3) に入れます。Queue に入れたデータは Fluentd のパラメータで指定した基準 (*4) で GCS に転送されます。直接 BigQuery に streaming insert しなかった理由は先ほどの要件通りリアルタイム性は求められていないしデータが外部公開になる可能性があるため Cloud Storage に永続化した方が今後の maintenance 的にも良いかなと思った背景があります。(余談ですが、BigQuery の streaming insert はちょっと高いので出来るだけ使いたくないんですね)

*2) chunk はパラメータで調整できます。
*3) queue は1つです。
*4) 主に時間間隔、データーの容量、レコードの数で調整できます。

スクリーンショット 2019-12-07 14.06.13.png

実装

ログの吐き出し

(ログの schema を選定するのは内部の話なのでスキップします)

普通に logback を利用すれば終わる話なんですが、念のため 5秒間隔で行われる main 処理に影響を与えられないよう同期方式 (RollingFileAppender) ではなく非同期方式 (AsyncAppender) を使用しました。非同期方式だと queue の挙動によって main 処理に影響を与えうる可能性があるので軽くパラメータ調整も行いました。

  • queueSize:1 cycle で生成されるレコードの数をもとに 2倍を設定しました。予算判定デーモンは対象のデータ規模がある程度決まられている特徴があるためざっくり 2倍にした理由もあります。リアルタイムでデータの規模が変わるサービスであれば他の工夫が必要になるかもしれません。
  • discardingThreshold:デフォルトだと log level WARN と ERROR 以外は queue の容量によって勝手に捨てられるので 0 で設定しました。
  • neverBlock:queue が詰まっている時の挙動なんですが、念のため main 処理を block しないようにしました。
BudgetStatusUpdater.scala
private[this] val statusOrderLogger = Logger("statusOrder")
private[this] val statusCampaignLogger = Logger("statusCampaign")
logback.xml
    <appender name="ASYNC_STATUS_ORDER" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="STATUS_ORDER" />
        <queueSize>3072</queueSize>
        <discardingThreshold>0</discardingThreshold>
        <neverBlock>true</neverBlock>
    </appender>

    <appender name="ASYNC_STATUS_CAMPAIGN" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="STATUS_CAMPAIGN" />
        <queueSize>4096</queueSize>
        <discardingThreshold>0</discardingThreshold>
        <neverBlock>true</neverBlock>
    </appender>

    <logger name="statusOrder" level="INFO" additivity="false">
        <appender-ref ref="ASYNC_STATUS_ORDER" />
    </logger>

    <logger name="statusCampaign" level="INFO" additivity="false">
        <appender-ref ref="ASYNC_STATUS_CAMPAIGN" />
    </logger>
  • ログの出力先を Pod ごとに同じにしたら Rolling Update が行われる場合、旧・新 Pod のログが混ぜちゃうし旧 Pod のログを毎回消す operation を定義しないといけないので、Pod ごとに自分の hostname directory を生成しそこにログを出力するようにしました。
daemon/charts/prod.yaml
...
  javaOpts: >
    ...
    -Dlog.home=LOG_DIR/${HOSTNAME}

Dockerfile を定義

公式 Fluentd の dockerhub のサンプルを参照しました。
https://hub.docker.com/r/fluent/fluentd

FROM fluent/fluentd:v1.7-1

USER root

RUN apk --no-cache add --update --virtual .build-deps \
        sudo build-base ruby-dev \
  && sudo gem install \
         fluent-plugin-gcs -v "0.4.0" \
  && sudo gem sources --clear-all \
  && apk del .build-deps \
  && apk --no-cache add --update tzdata \
  && rm -rf /usr/lib/ruby/gems/*/cache/*.gem

USER fluentd
  • 実行ユーザーとして root 指定
  • plugin
    • fluent-plugin-gcs
      • GCS への転送機能を提供します。
        • 既に他のサービスで使っているかつこっちの方が規模的に小さいので検証なしで採用しました。
  • OS package
    • tzdata
      • Pod の Timezone 指定する時に利用しています。

k8s manifest

fluentd の container spec を sidecar container で追加

budget-daemon.yaml
...
        - name: fluentd
          image: REMOTE_DOCKERFILE_PATH/budget-daemon-fluentd:{{ .Values.application.tag }}
          resources:
            requests:
              cpu: {{ .Values.fluentd.cpuRequest }}
              memory: {{ .Values.fluentd.memoryRequest }}
            limits:
              cpu: {{ .Values.fluentd.cpuLimit }}
              memory: {{ .Values.fluentd.memoryLimit }}
          env:
            - name: TZ
              value: "Asia/Tokyo"
          volumeMounts:
            - name: log
              mountPath: LOG_PATH
            - name: fluentd-conf
              mountPath: /fluentd/etc
            - name: gcp-key
              mountPath: /etc/gcp
              readOnly: true
      volumes:
        - name: log
          emptyDir: {}
        - name: fluentd-conf
          configMap:
            name: budget-daemon
            items:
              - key: fluentd.main.conf
                path: fluent.conf
        - name: gcp-key
          secret:
            secretName: gcp-key
...
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: budget-daemon
data:
  fluentd.main.conf: |
{{ .Values.fluentd.main.conf | indent 4 }}
---
  • {{ .Values.???.??? }}

    • 環境ごとの設定を分離し Helm template を利用して kubectl apply する時に使う manifest を生成しています。
      • 近いうちに Kustomize に migration する予定です。
  • {{ .Values.application.tag }}

    • CI Tool(Circle CI) の build number を入れています。
  • volumes

    • 最初は DirectoryOrCreate を設定しわざわざ Pod の preStop で rm -rf を実行していたんですが 、普通に emptyDir したら Pod が Terminate されるタイミングで一緒に消えるので Pod の lifecycle と同様にした方が分かりやすいかなと思いました。
    budget-daemon.yaml
    volumes:
    - name: log
      hostPath:
        path: LOG_PATH
        type: DirectoryOrCreate
    
    budget-daemon.yaml
    volumes:
    - name: log
      emptyDir: {}
    

詳細な fluentd の設定を追加

daemon/charts/prod.yaml
...
fluentd:
  tag:
  cpuRequest: 200m
  cpuLimit: 200m
  memoryRequest: 512Mi
  memoryLimit: 512Mi
  main:
    conf: |
      <system>
        workers 2
        root_dir LOG_PATH
      </system>
      <source>
        @type monitor_agent
        bind 0.0.0.0
        port 24220
      </source>

      @include /fluentd/etc/conf.d/*.conf

      <match budget.daemon.**>
        @id gcs
        @type gcs
        project cyberagent-004
        keyfile /etc/gcp/credential.json
        bucket BUCKET_NAME
        object_key_format %{path}%{time_slice}_%{uuid_flush}_%{index}.%{file_extension}
        time_slice_format %Y%m%d-%H%M
        path ${tag[2]}/${tag[3]}/%Y/%m/%d/%H/
        store_as gzip
        <buffer tag,time>
          @type file
          timekey 1m
          timekey_wait 5s
          chunk_limit_size 1m
          total_limit_size 100m
          flush_at_shutdown true
          flush_thread_count 2
          overflow_action throw_exception
        </buffer>
        <format>
          @type json
        </format>
      </match>
      <worker 0>
        <source>
          @type tail
          path "LOG_PATH/#{ENV['HOSTNAME']}/status_orders_*.log"
          pos_file "LOG_PATH/#{ENV['HOSTNAME']}/status_orders.log.pos"
          tag budget.daemon.status.orders
          rotate_wait 5
          refresh_interval 10
          read_from_head true
          <parse>
            @type json
            time_key fluent_time
            keep_time_key true
          </parse>
        </source>
      </worker>
      <worker 1>
        <source>
          @type tail
          path "LOG_PATH/#{ENV['HOSTNAME']}/status_campaigns_*.log"
          pos_file "LOG_PATH/#{ENV['HOSTNAME']}/status_campaigns.log.pos"
          tag budget.daemon.status.campaigns
          rotate_wait 5
          refresh_interval 10
          read_from_head true
          <parse>
            @type json
            time_key fluent_time
            keep_time_key true
          </parse>
        </source>
      </worker>
  • system

    • とりあえず、worker はログの種類に合わせて 2台
  • source

    • @type monitor_agent
      • 基本的に Pod ごとに Datadog が Daemonset の形でデプロイされていて k8s の metrics の収集をやっています。Fluentd の metrics も Datadog GUI で見たいので下記を helm chart に設定しました。
    infra/kubernetes/datadog/values.yml
    ...
    datadog:
    ...
      confd:
      ...
        fluentd.yaml: |-
          ad_identifiers:
            - budget-daemon
          init_config:
          instances:
            - monitor_agent_url: http://%%host%%:24220/api/plugins.json
    
  • @include

    • 複数の fluentd configuration をインポートする時に使えます。
  • <match>

    • @type gcs
      • project: GCP のプロジェクト名
      • keyfile: GCP credencial file path
      • bucket: GCS バケット名
      • path: GCS バケット内部の path
      • object_key_format: 転送後の file の名
      • store_as: 転送後の file type は gzip
    • <buffer>
      • @type file: in-memory も問題ない規模ですが、念のため in-disk 方式
      • timekey: 今回 1分間隔で GCS へ転送することだとざっくり決めたので 1m
      • flush_at_shutdown: Fluentd が終了される時には溜まっている chunk を一気に処理するよう true
      • flush_thread_count: Chunk を処理するスレッドの数は適当に worker と同様
      • overflow_action: Buffer queue がいっぱいになっている時には例外が発生 (default) するようにする
  • <worker>

    • <source>
      • refresh_interval: Buffer が参照する Pod's path を refresh する間隔は 10秒
      • read_from_head: こちらを false にすると、もともと Application container が Fluentd container より早く動き始めるので Fluentd container が出来上がる前に発生したログは欠損されます。約 10秒ぐらいの欠損を対処するために true
      • <parse>
        • time_key: レコードごとに処理した時間の format
        • keep_time_key: <match> 側で time_key を参照できるようにする

テスト

  • Application container

    • CPU と Memory 確認して問題ないこと

    • 1 cycle 処理時間に問題ないこと

    • ログの非同期処理の merics に問題ないこと

      -<appender name="ASYNC_STATUS_ORDER" class="ch.qos.logback.classic.AsyncAppender">
      +<appender name="ASYNC_STATUS_ORDER" class="com.twitter.inject.logback.AsyncAppender">
      
  • Flunetd container

    • CPU と Memory 確認して問題ないこと
    • 通常に Pod のログと GCS ログの整合性が合っていること
    • Fluentd の metrics に問題ないこと
  • Kubernetes

    • Rolling update のタイミングで欠損されないこと
    • Fluentd container が落ちて再作成された時に Pod のログと GCS ログの整合性が合っていること
      • Sidecar container を強制終了する方法: kubectl exec -it POD_ID -c fluentd -- /bin/sh -c "kill 1"

終わりに

  • 実現したいことの定義からビジネスと技術の要件定義、その後技術選定及びアーキテクチャ工夫。そして、実装及び勉強しつつ 1つずつのボトルネックを解決。テストも充実にしてからスケジュールの締め切り内(2週間程度)に安心してリリースできて嬉しかったです。もともと僕は技術が好きでアーキテクチャを考案する時に他の要素を見る視野が足りなかったんですが、今回のタスクのおかげで他の要素を考慮する視野がちょっと広くなった気がします。今後からも「技術」だけではなく「メインテナンス性・工数・ユーザー・お金」を考慮したアーキテクチャを引き続き考えて行きたいと思います。
3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?