0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Fluentd で、ネストされた特定のハッシュの中身だけを転送する

Posted at

概要

Fluentd で渡ってくるイベント(JSON)内の、
ネストされた特定のハッシュの中身だけを転送する方法。

例:
こういうイベント(データ)がある場合、
{"data": {"name": "hanzo", "skill": "Python"}, "memo": "kesu"}

data フィールド(項目)の中身(ハッシュ)だけを転送したいケース。
{"name": "hanzo", "skill": "Python"}

用語

Fluentd の詳細な説明は省きます。

  • Fluentd
    • ログ収集で使うツール。
    • 収集方法を定義ファイルに記載する必要があり、
      大きく source(入力)=> filter(整形) => match(出力)に分かれる。
  • イベント
    • Fluentd で処理されるデータ1行のこと。
  • フィールド
    • イベント内の項目のこと。
    • 例:{"name": "hanzo", "skill": "Python"} => nameskill がフィールド
  • ハッシュ
    • key, value で構成されたフィールド。
    • 例:{"data": {"name": "hanzo", "skill": "Python"}} => data フィールドの中身がハッシュ

考え方

特定のフィールドのハッシュを展開するだけなら、
parser filter plugin で実現できます。

ただし、この plugin を使うためには一つ制約があります。
公式ドキュメントによると、

The parser filter plugin "parses" string field in event records 
※訳:parser filter plugin は、イベントレコード内の文字列フィールドをパースする

つまり、展開する対象(パース対象)が 文字列 である必要があります。

通常、Fluentd で JSON 形式のデータを取得すると、
全て hash に変換されてしまうため、
そのまま parser filter plugin にかけても抽出されません。
 ※ハッシュの部分をあえて文字列で入れればできますが。。

なので事前に、取りだしたいハッシュだけを、
JSON 文字列に変換する必要があります。

例:
こういうハッシュを
{"data": {"name": "hanzo", "skill": "Python"}}
こんな感じで文字列にする。
{"data": "{\"name\": \"hanzo\", \"skill\": \"Python\"}"}
 ※展開したいフィールド(例なら data)のハッシュだけを文字列にする

filter の書き方

source, match の記載は省略する。

その1:ハッシュを展開する(他のフィールドは捨てる)

ハッシュを展開して転送する。

# 文字列変換
<filter sometag>
  @type record_transformer
  # JSON 文字列に変換したフィールドだけを残す
  renew_record true
  enable_ruby true
  <record>
    # ハッシュ(data の中身)を JSON 文字列に変換する
    obj ${record["data"].to_json}
  </record>
</filter>

# parser で JSON 文字列を展開する
<filter sometag>
  @type parser
  key_name obj
  <parse>
    @type json
  </parse>
</filter>

実行例

# input data:
{"data": {"name": "hanzo", "skill": "Python"}, "memo": "kesu"}
# output data:
{"name":"hanzo","skill":"Python"}

その2:ハッシュを展開し、かつ他のフィールドも残す

展開したハッシュと、その他のフィールドを合わせて転送する。

# 文字列変換
<filter sometag>
  @type record_transformer
  renew_record true
  enable_ruby true
  <record>
    # ハッシュ(data の中身)を JSON 文字列に変換する
    obj ${record["data"].to_json}
    # 他に必要なフィールドがあれば追加する
    memo ${record["memo"]}
  </record>
</filter>

# parser で JSON 文字列 と 必要なフィールド を残す
<filter sometag>
  @type parser
  key_name obj
  # フィールドを全部残す
  reserve_data true
  # 展開前のフィールドは消す
  remove_key_name_field true
  <parse>
    @type json
  </parse>
</filter>

実行例

# input data:
{"data": {"name": "hanzo", "skill": "Python"}, "memo": "nokosu", "hoge": "kesu"}
# output data:
{"memo":"nokosu","name":"hanzo","skill":"Python"}

その3:複数のハッシュを展開する

複数のハッシュを展開し、それらをマージして転送する。

# 文字列変換
<filter sometag>
  @type record_transformer
  renew_record true
  enable_ruby true
  <record>
    # 1. ハッシュ同士をマージする
    # 2. マージしたハッシュを JSON 文字列に変換する
    obj ${tmp = record["data"].merge(record["data2"]); tmp.to_json}
  </record>
</filter>

# parser で JSON 文字列を展開する(ここは1回目と同じ処理)
<filter sometag>
  @type parser
  key_name obj
  <parse>
    @type json
  </parse>
</filter>

実行例

# input data:
{"data": {"name": "hanzo", "skill": "Python"}, "data2": {"office": "Tokyo", "skill": "Python3"}}
# output data:
{"name":"hanzo","skill":"Python3","office":"Tokyo"}
  => skill は二つのハッシュに共通するため、マージした方で上書きされる

お試しコード

source => filter => match の全部載せ。
とりあえず挙動を試したい場合。

td-agent.conf
<source>
  @type tail
  path /var/tmp/aaa
  pos_file /var/log/td-agent/aaa.pos
  tag sometag
  <parse>
    @type json
  </parse>
</source>

# ハッシュを文字列に変換する
<filter sometag>
  @type record_transformer
  renew_record true
  enable_ruby true
  <record>
    obj ${record["data"].to_json}
  </record>
</filter>

# parser で JSON 文字列を展開する
<filter sometag>
  @type parser
  key_name obj
  <parse>
    @type json
  </parse>
</filter>

<match sometag>
  @type file
  path /var/tmp/bbb
</match>

実行例

# /var/tmp/aaa:  =>以下の内容でファイルを作る
{"data": {"name": "hanzo", "skill": "Python"}, "memo": "kesu"}
# /var/tmp/bbb:
{"name":"hanzo","skill":"Python"}

まとめ

今回の記事を書いた切っ掛けは、
既存アプリケーションが吐いている JSON データを、
Fluentd 経由でとあるログ分析基盤に入れようとしたのが始まりです。

ネストされた項目だけを入れたくて、色々検討しました。

  • 入力元から直す!
    開発側に依頼してログを変えてもらう。
     =>改修のインパクトがデカすぎる :ng:
  • 出力先で何とかする!
    ログ分析基盤にはそのまま入れて、検索時にうまく調整する。
     =>クエリが複雑になる or パースする処理に時間がかかる :ng:

検討した結果、こんな感じの板挟みにあったので、
最終的に中間地点の Fluentd(ログ収集)で色々調べてみたところ、
これといった記事が見つからず、
公式のドキュメントを凝視して、今回の方法にたどり着きました。

時間が無いときは、つい結果だけを探してしまいますが、
詰まったときは原点に戻り、公式ドキュメントに目を通すことも大事 だと感じました。

同じような悩みを抱えた方のお役に立てれば光栄です。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?