この記事は、CyberAgent Developers Advent Calendar 2019 7日目です。
はじめに
こんにちは。CyberAgent の AI事業本部 アドテクDivision のインソ(@sano307)と申します。
今回は、マイクロサービス化したサービスのログを Fluentd 経由で Cloud Storage(GCS) へ永続化した全体的な流れ(要件定義・技術選定・設計・実装・テスト・経過観察)についてお話しします。
予算管理周りのアーキテクチャ
ユーザーが広告に対して特定な行動 (*1) を行う時に予算情報を格納したり予算消化判定をしたりする全対象です。
*1) Adcall → Impression → View or Click → Conversion
-
Reverse Proxy: 約 90台
-
Delivery (Java): 広告配信 API サービス(まだ kubernetes 化されていない)
- 約 100台
-
Node(adprovider): 20台 (Custom Kubernetes Engine の仕組み問題のため)
- Ad Provider (Scala): 広告選定マイクロサービス
- Pod: 20台
- Ad Provider (Scala): 広告選定マイクロサービス
-
Node(budget): 5台
- Budget API (Scala): 予算 API サービス
- Pod: 3台
- Budget Daemon (Scala): 予算判定デーモン(5秒間隔)
- Pod: 1台
- Budget API (Scala): 予算 API サービス
-
Aerospike: 非確定予算消化額 / 予算超過案件リスト(5秒間隔で更新)
-
MySQL: 確定予算消化額
実現したいこと
- 予算判定デーモンから 5秒間隔で評価された詳細な予算消化の情報を調査できるようにしたい
要件
ビジネス
① データの利用者は社員なので管理画面まで表示しなくても大丈夫
→ 基本的に他のデータも BigQuery に格納しつつ運用しているので BigQuery で問題なさそう
② データの参照頻度は予算消化のずれが発生する時だと思うのでそんなに頻繁に参照するわけではない
→ リアルタイム性は要らないので適切な間隔で BigQuery へロード
③ 今後要望によってデータが外部公開になるかもしれない
→ Cloud Storage(GCS) に永続化して置こう
技術
① ログの出力がメイン処理に影響を与えられないよう
→ 非同期処理をサポートする plugin を調査
② できる限りログの欠損は起きないよう
→ Middleware のパラメータ調整をしっかり行う
③ デプロイの時にも予算判定デモンの処理が走らないタイミングが発生しないよう
→ Pod の scheduling(replica, affinity) の調整をしっかり行う
技術選定
データの特徴(本番基準)
細かいパラメータ調整が必要な規模ではない感は若干ありますが、せっかくなのでしっかりやって見ましょう。
ログの種類 | 容量(1 cycle) | レコード(1 cycle) |
---|---|---|
Order | 約 0.1mb | 1,500 |
Campaign | 約 0.2mb | 2,000 |
Middleware
理想的には Fluent Bit の JVM plugin を自作し OSS 化するのが正しいですが、スケジュール的にちょっと余裕がなかったので Fluentd を使うようにしました。
技術 | メリット | デメリット | 妥当性 |
---|---|---|---|
Logstash | Elastic stack のメインサポート | Java runtime が必要 性能検証が必要 Fluentd よりメモリを食う |
X |
Fluent Bit | C言語で書かれているので早い Fluentd よりメモリを食わない |
JVM 向けの stable plugin が存在しない | △ |
Fluentd(採用) | 既存サービスに使われている性能検証は要らない CNCF の graduated project |
細かいパラメータの学習が必要 | 〇 |
Kubernetes
kind
apiVersion: apps/v1
kind: Deployment
metadata:
name: budget-daemon
spec:
replicas: 1
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 0
maxSurge: 1
-
Deployment(採用)
- maxSurge を 1に設定したらいつでも一個以上の Pod は動いていることを保証するのでサービスの処理に影響を与えない
-
StatefulSet
- Rolling update が走る時に Pod が一個も存在しないタイミングが約 30秒できちゃうのでサービスの処理に影響を与える
- 1 cycle が 5秒なので少なくとも 6 cycle の漏れが発生
- Rolling update が走る時に Pod が一個も存在しないタイミングが約 30秒できちゃうのでサービスの処理に影響を与える
Pod design
- 予算関連のマイクロサービスは同じ Node Pool を使っている(node の数は 5つ)
- Daemonset
- 5つの node に Fluentd が展開されることになるけど、現状ログ転送は「予算判定デーモン」のみ使う予定なので無駄なリソース利用が発生
-
Sidecar(採用)
- 「予算判定デーモン」が scheduling された node のみ Flunetd が展開されるので無駄なリソース利用は発生しない
- Daemonset
クラウドロギングの流れ
Fluentd プロセスが Pod のログを chunk (*2) 単位で読み込んで一時的に Buffer path に保存します。その後、Fluentd プロセスが Buffer path に保存されたデータを読み込んで Fluentd の共通処理 queue (*3) に入れます。Queue に入れたデータは Fluentd のパラメータで指定した基準 (*4) で GCS に転送されます。直接 BigQuery に streaming insert しなかった理由は先ほどの要件通りリアルタイム性は求められていないしデータが外部公開になる可能性があるため Cloud Storage に永続化した方が今後の maintenance 的にも良いかなと思った背景があります。(余談ですが、BigQuery の streaming insert はちょっと高いので出来るだけ使いたくないんですね)
*2) chunk はパラメータで調整できます。
*3) queue は1つです。
*4) 主に時間間隔、データーの容量、レコードの数で調整できます。
実装
ログの吐き出し
(ログの schema を選定するのは内部の話なのでスキップします)
普通に logback を利用すれば終わる話なんですが、念のため 5秒間隔で行われる main 処理に影響を与えられないよう同期方式 (RollingFileAppender) ではなく非同期方式 (AsyncAppender) を使用しました。非同期方式だと queue の挙動によって main 処理に影響を与えうる可能性があるので軽くパラメータ調整も行いました。
- queueSize:1 cycle で生成されるレコードの数をもとに 2倍を設定しました。予算判定デーモンは対象のデータ規模がある程度決まられている特徴があるためざっくり 2倍にした理由もあります。リアルタイムでデータの規模が変わるサービスであれば他の工夫が必要になるかもしれません。
- discardingThreshold:デフォルトだと log level WARN と ERROR 以外は queue の容量によって勝手に捨てられるので 0 で設定しました。
- neverBlock:queue が詰まっている時の挙動なんですが、念のため main 処理を block しないようにしました。
private[this] val statusOrderLogger = Logger("statusOrder")
private[this] val statusCampaignLogger = Logger("statusCampaign")
<appender name="ASYNC_STATUS_ORDER" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STATUS_ORDER" />
<queueSize>3072</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
</appender>
<appender name="ASYNC_STATUS_CAMPAIGN" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STATUS_CAMPAIGN" />
<queueSize>4096</queueSize>
<discardingThreshold>0</discardingThreshold>
<neverBlock>true</neverBlock>
</appender>
<logger name="statusOrder" level="INFO" additivity="false">
<appender-ref ref="ASYNC_STATUS_ORDER" />
</logger>
<logger name="statusCampaign" level="INFO" additivity="false">
<appender-ref ref="ASYNC_STATUS_CAMPAIGN" />
</logger>
- ログの出力先を Pod ごとに同じにしたら Rolling Update が行われる場合、旧・新 Pod のログが混ぜちゃうし旧 Pod のログを毎回消す operation を定義しないといけないので、Pod ごとに自分の hostname directory を生成しそこにログを出力するようにしました。
...
javaOpts: >
...
-Dlog.home=LOG_DIR/${HOSTNAME}
Dockerfile を定義
公式 Fluentd の dockerhub のサンプルを参照しました。
https://hub.docker.com/r/fluent/fluentd
FROM fluent/fluentd:v1.7-1
USER root
RUN apk --no-cache add --update --virtual .build-deps \
sudo build-base ruby-dev \
&& sudo gem install \
fluent-plugin-gcs -v "0.4.0" \
&& sudo gem sources --clear-all \
&& apk del .build-deps \
&& apk --no-cache add --update tzdata \
&& rm -rf /usr/lib/ruby/gems/*/cache/*.gem
USER fluentd
- 実行ユーザーとして root 指定
- v1.3 から base image の実行ユーザーが
fluentd
になったので plugin インストールなどをやるために指定しないといけないです。
- v1.3 から base image の実行ユーザーが
- plugin
- fluent-plugin-gcs
- GCS への転送機能を提供します。
- 既に他のサービスで使っているかつこっちの方が規模的に小さいので検証なしで採用しました。
- GCS への転送機能を提供します。
- fluent-plugin-gcs
- OS package
- tzdata
- Pod の Timezone 指定する時に利用しています。
- tzdata
k8s manifest
fluentd の container spec を sidecar container で追加
...
- name: fluentd
image: REMOTE_DOCKERFILE_PATH/budget-daemon-fluentd:{{ .Values.application.tag }}
resources:
requests:
cpu: {{ .Values.fluentd.cpuRequest }}
memory: {{ .Values.fluentd.memoryRequest }}
limits:
cpu: {{ .Values.fluentd.cpuLimit }}
memory: {{ .Values.fluentd.memoryLimit }}
env:
- name: TZ
value: "Asia/Tokyo"
volumeMounts:
- name: log
mountPath: LOG_PATH
- name: fluentd-conf
mountPath: /fluentd/etc
- name: gcp-key
mountPath: /etc/gcp
readOnly: true
volumes:
- name: log
emptyDir: {}
- name: fluentd-conf
configMap:
name: budget-daemon
items:
- key: fluentd.main.conf
path: fluent.conf
- name: gcp-key
secret:
secretName: gcp-key
...
---
apiVersion: v1
kind: ConfigMap
metadata:
name: budget-daemon
data:
fluentd.main.conf: |
{{ .Values.fluentd.main.conf | indent 4 }}
---
-
{{ .Values.???.??? }}
- 環境ごとの設定を分離し Helm template を利用して kubectl apply する時に使う manifest を生成しています。
- 近いうちに Kustomize に migration する予定です。
- 環境ごとの設定を分離し Helm template を利用して kubectl apply する時に使う manifest を生成しています。
-
{{ .Values.application.tag }}
- CI Tool(Circle CI) の build number を入れています。
-
volumes
- 最初は DirectoryOrCreate を設定しわざわざ Pod の preStop で
rm -rf
を実行していたんですが 、普通に emptyDir したら Pod が Terminate されるタイミングで一緒に消えるので Pod の lifecycle と同様にした方が分かりやすいかなと思いました。
budget-daemon.yamlvolumes: - name: log hostPath: path: LOG_PATH type: DirectoryOrCreate
budget-daemon.yamlvolumes: - name: log emptyDir: {}
- 最初は DirectoryOrCreate を設定しわざわざ Pod の preStop で
詳細な fluentd の設定を追加
...
fluentd:
tag:
cpuRequest: 200m
cpuLimit: 200m
memoryRequest: 512Mi
memoryLimit: 512Mi
main:
conf: |
<system>
workers 2
root_dir LOG_PATH
</system>
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>
@include /fluentd/etc/conf.d/*.conf
<match budget.daemon.**>
@id gcs
@type gcs
project cyberagent-004
keyfile /etc/gcp/credential.json
bucket BUCKET_NAME
object_key_format %{path}%{time_slice}_%{uuid_flush}_%{index}.%{file_extension}
time_slice_format %Y%m%d-%H%M
path ${tag[2]}/${tag[3]}/%Y/%m/%d/%H/
store_as gzip
<buffer tag,time>
@type file
timekey 1m
timekey_wait 5s
chunk_limit_size 1m
total_limit_size 100m
flush_at_shutdown true
flush_thread_count 2
overflow_action throw_exception
</buffer>
<format>
@type json
</format>
</match>
<worker 0>
<source>
@type tail
path "LOG_PATH/#{ENV['HOSTNAME']}/status_orders_*.log"
pos_file "LOG_PATH/#{ENV['HOSTNAME']}/status_orders.log.pos"
tag budget.daemon.status.orders
rotate_wait 5
refresh_interval 10
read_from_head true
<parse>
@type json
time_key fluent_time
keep_time_key true
</parse>
</source>
</worker>
<worker 1>
<source>
@type tail
path "LOG_PATH/#{ENV['HOSTNAME']}/status_campaigns_*.log"
pos_file "LOG_PATH/#{ENV['HOSTNAME']}/status_campaigns.log.pos"
tag budget.daemon.status.campaigns
rotate_wait 5
refresh_interval 10
read_from_head true
<parse>
@type json
time_key fluent_time
keep_time_key true
</parse>
</source>
</worker>
-
system
- とりあえず、worker はログの種類に合わせて 2台
-
source
-
@type monitor_agent
- 基本的に Pod ごとに Datadog が Daemonset の形でデプロイされていて k8s の metrics の収集をやっています。Fluentd の metrics も Datadog GUI で見たいので下記を helm chart に設定しました。
infra/kubernetes/datadog/values.yml... datadog: ... confd: ... fluentd.yaml: |- ad_identifiers: - budget-daemon init_config: instances: - monitor_agent_url: http://%%host%%:24220/api/plugins.json
-
-
@include
- 複数の fluentd configuration をインポートする時に使えます。
-
<match>
-
@type gcs
- project: GCP のプロジェクト名
- keyfile: GCP credencial file path
- bucket: GCS バケット名
- path: GCS バケット内部の path
- object_key_format: 転送後の file の名
- store_as: 転送後の file type は gzip
-
<buffer>
-
@type file
: in-memory も問題ない規模ですが、念のため in-disk 方式 - timekey: 今回 1分間隔で GCS へ転送することだとざっくり決めたので 1m
- flush_at_shutdown: Fluentd が終了される時には溜まっている chunk を一気に処理するよう true
- flush_thread_count: Chunk を処理するスレッドの数は適当に worker と同様
- overflow_action: Buffer queue がいっぱいになっている時には例外が発生 (default) するようにする
-
-
-
<worker>
-
<source>
- refresh_interval: Buffer が参照する Pod's path を refresh する間隔は 10秒
- read_from_head: こちらを false にすると、もともと Application container が Fluentd container より早く動き始めるので Fluentd container が出来上がる前に発生したログは欠損されます。約 10秒ぐらいの欠損を対処するために true
-
<parse>
- time_key: レコードごとに処理した時間の format
- keep_time_key:
<match>
側で time_key を参照できるようにする
-
テスト
-
Application container
-
CPU と Memory 確認して問題ないこと
-
1 cycle 処理時間に問題ないこと
-
ログの非同期処理の merics に問題ないこと
- マイクロサービスの Framework として Finagle を使っているんですが、非同期処理の挙動を merics 化してくれる library があったので利用しました。
-<appender name="ASYNC_STATUS_ORDER" class="ch.qos.logback.classic.AsyncAppender"> +<appender name="ASYNC_STATUS_ORDER" class="com.twitter.inject.logback.AsyncAppender">
-
-
Flunetd container
- CPU と Memory 確認して問題ないこと
- 通常に Pod のログと GCS ログの整合性が合っていること
- Fluentd の metrics に問題ないこと
-
Kubernetes
- Rolling update のタイミングで欠損されないこと
- Fluentd container が落ちて再作成された時に Pod のログと GCS ログの整合性が合っていること
- Sidecar container を強制終了する方法:
kubectl exec -it POD_ID -c fluentd -- /bin/sh -c "kill 1"
- Sidecar container を強制終了する方法:
終わりに
- 実現したいことの定義からビジネスと技術の要件定義、その後技術選定及びアーキテクチャ工夫。そして、実装及び勉強しつつ 1つずつのボトルネックを解決。テストも充実にしてからスケジュールの締め切り内(2週間程度)に安心してリリースできて嬉しかったです。もともと僕は技術が好きでアーキテクチャを考案する時に他の要素を見る視野が足りなかったんですが、今回のタスクのおかげで他の要素を考慮する視野がちょっと広くなった気がします。今後からも「技術」だけではなく「メインテナンス性・工数・ユーザー・お金」を考慮したアーキテクチャを引き続き考えて行きたいと思います。