3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Elasticsearchにカスタム時系列データを取り込む (Filebeat編)

Last updated at Posted at 2022-05-28

先日の Elasticsearchにカスタム時系列データを取り込む(Elastic Agent編) では、Elatic Agentに焦点を当てて、カスタムな時系列データの取り込み方法を解説しましたが、今回は、同じことをBeats (実際にはFilebeat) でやってみたいと思います。

Integrationsって?

ちょっと前まで、Elastic Stack と言えば、Elasticsearch、Kibana、Logstash、Beatsということになっていたのですが、最新のElastic Stackを見ると、LogstashとBeatsのところが、Integrations(日本語だと「統合機能」)という風に変わっています。でも、このIntegrationsというのは、幅広いコネクター的なもの全般を指していて、お馴染みのLogstashやBeatsも、ちゃんと現役ですので、ご安心ください。

Kibanaの Integrations > Browse integrations からIntegrationsの一覧を見ることが出来ますが、Beats onlyをチェックすると、いわゆるBeats Modulesが表示されます。
Beats_Integrations.png

Beats

Beats については、ご存知の方も多いと思いますが、軽量のデータシッパーとして様々な種類のBeatが存在します。Elastic Agent がリリースされて以来、少し影が薄くなった感があるのですが、以前のブログでも触れた通り、実際にはElastic Agentのコントロールの下、データプレーンとして動いています。また、一部のBeat設定については、まだElastic Agent経由で変更出来なかったりするので、現実にはそうしたケースにおいては、各種Beatを直接使うことも沢山あります。

Filebeat

今回は Filebeat を使って、カスタムの時系列データをElasticsearchに取り込んでみます。Filebeatは、言わずと知れた軽量のログシッパーですが、前述の Filebeat Modules によるコネクター群で様々なデータソースに対応していますし、カスタムなデータソースについても、Filebeat Processorsを使って、ETL処理を行うことも出来ます。

実際にやってみよう

さて 前回 は、Elastic Agentでデータを取得して、データのパース処理は、主にElasticsearchの Ingest Pipelines を使いました。ただし、Filebeat + Ingest Pipelineだと、ほとんど前回と同じになってしまうので、今回はエージェントサイド、つまりFilebeat Processorsを使って、Filebeatだけで同じことを実現してみたいと思います。

1. NewsAPI

前回同様、NewsAPIという、世の中のニュースのヘッドラインをJSON形式で返してくれるAPIをデータソースとして使います。登録して、API Keyを取得します。パラメータによって、対象を絞り込むことができるのですが、Japanのヘッドラインを取得するとこんな感じで返ってきます。

GET https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge

レスポンス:

{
  "status": "ok",
  "totalResults": 30,
  "articles": [
    {
        "source": {
          "id": null,
          "name": "News.local"
        },
        "author": "ローカルニュース",
        "title": "これはグッドニュースだ!",
        "description": "今日、素晴らしい出来事がありました!",
        "url": "https://news.local/hogehoge.html",
        "urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
        "publishedAt": "2022-05-25T05:49:58Z",
        "content": null
    },
    {
      (中略)
    }
  ]
}

2. Inputの設定

Filebeatの設定は、基本的にはfilebeat.ymlで行います。今回はNewsAPIからのInputということになるので、HTTP JSON input を使います。NewsAPIから返されるJSONドキュメントを見ると、必要なデータは、articles配列の要素であることがわかるので、Response Split の機能を使って、配列の要素毎にイベントを生成するようにします。これで前述の例のように、30のニューストピックスが配列として返された場合は、30の独立したイベントが生成されるようになります。ここは前回と基本は同じです。Outputはとりあえず、デバッグ用にコンソールに出力すればよいので、Console output を指定して、デフォルトの Elasticsearch output はコメントアウトしておきます(Filebeatでは同時に複数のoutputは書けません)。

filebeat.yml
# ============================== Filebeat inputs ===============================
# News API Configuration

filebeat.inputs:
- type: httpjson
  interval: 1m
  request.url: https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge
  response.split:
    target: body.articles
    type: array

# ================================== Outputs ===================================
# Configure what output to use when sending the data collected by the beat.

output.console:
  pretty: true

# ---------------------------- Elasticsearch Output ----------------------------
# output.elasticsearch:
  # Array of hosts to connect to.
#   hosts: ["localhost:9200"]

この段階で、Filebeatからの出力は以下のようになります。

{
  "@timestamp": "2022-05-26T02:38:00.173Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.2.1"
  },
  "event": {
    "created": "2022-05-26T02:38:00.173Z"
  },
  "message": "{\"author\":\"ローカルニュース\",\"content\":null,\"description\":\"今日、素晴らしい出来事がありました!\",\"publishedAt\":\"2022-05-25T05:49:58Z\",\"source\":{\"id\":null,\"name\":\"News.local\"},\"title\":\"これはグッドニュースだ!",\"url\":\"https://news.local/hogehoge.html\",\"urlToImage\":\"https://news.local/hogehoge/hogehoge.jpg\"}",
  "input": {
    "type": "httpjson"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "hogehoge.local",
    "id": "22570515-B9D2-5E7A-BD39-608FA0BB373C",
    "ip": [
      (中略)
    ],
    "mac": [
      (中略)
    ],
    "hostname": "hogehoge.local",
    "architecture": "x86_64",
    "os": {
      "family": "darwin",
      "name": "Mac OS X",
      "kernel": "19.6.0",
      "build": "19H1922",
      "type": "macos",
      "platform": "darwin",
      "version": "10.15.7"
    }
  },
  "agent": {
    "version": "8.2.1",
    "ephemeral_id": "6506efe3-93ad-4e10-b793-701005f33fdf",
    "id": "2000b9bc-a219-438f-bdda-4e6d64febaca",
    "name": "hogehoge.local",
    "type": "filebeat"
  }
}

3. JSON文字列をデコードする

次にmeeeageフィールドのJSON文字列を、newsフィールドにデコードします。前回はIngest Pipelineで対応しましたが、今回は、Filebeatの Decode JSON fields processorを使います。processorsの定義の下に、以下のように追加します。元のmessageフィールドは不要なので、Drop fieldsで削除しておきます。

filebeat.yml
# ================================= Processors =================================
processors:
  (中略)
# Adding configurations for News API
  - decode_json_fields:
      fields: ["message"]
      target: "news"
  - drop_fields:
      fields: ["message"]

そうすると、出力は以下のようになります。

{
  "@timestamp": "2022-05-26T02:40:33.341Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.2.1"
  },
  "input": {
    "type": "httpjson"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "os": {
      "version": "10.15.7",
      "family": "darwin",
      "name": "Mac OS X",
      "kernel": "19.6.0",
      "build": "19H1922",
      "type": "macos",
      "platform": "darwin"
    },
    "id": "22570515-B9D2-5E7A-BD39-608FA0BB373C",
    "ip": [
      (中略)
    ],
    "name": "hogehoge.local",
    "mac": [
      (中略)
    ],
    "hostname": "hogehoge.local",
    "architecture": "x86_64"
  },
  "agent": {
    "ephemeral_id": "bd131899-cfa9-4f1f-b5d5-48b387c32171",
    "id": "2000b9bc-a219-438f-bdda-4e6d64febaca",
    "name": "hogehoge.local",
    "type": "filebeat",
    "version": "8.2.1"
  },
  "news": {
    "description": "今日、素晴らしい出来事がありました!",
    "publishedAt": "2022-05-25T05:49:58Z",
    "source": {
      "id": null,
      "name": "News.local"
    },
    "title": "これはグッドニュースだ!",
    "url": "https://news.local/hogehoge.html",
    "urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
    "author": "ローカルニュース",
    "content": null
  },
  "event": {
    "created": "2022-05-26T02:40:33.341Z"
  }
}

意図した通りにnewsフィールドにデコードされました。

4. null valueフィールドに対応する

さらに、前回同様source.idauthorがnullであるarticleの場合は、source.nameをセットするようにしたいと思います。FilebeatのProcessorには条件を書けるので、次のように条件を追加して、Copy fields processorを使って、フィールドをコピーするようにしてみます。

- copy_fields:
    when:
      equals:
        news.source.id: null
    fields:
      - from: news.source.name
        to: news.source.id

ところが、実行すると以下のようにエラーとなってしまいます。結論から言うと、フィールドの値がnullかどうかというチェックは出来ません。というのも、条件には integerかstringの値しか書けないからです。しかも、NewsAPIから返されるJSONでは、明確にnullとなっており、空文字""では判定できません。

$ ./filebeat
Exiting: error initializing processors: failed to initialize condition: condition attempted to set 'news.source.id' -> '<nil>' and encountered unexpected type '<nil>', only strings, ints, and booleans are allowed

困ったなと思いきや、実はFilebeat Processorには、Script processorという万能ナイフがあるのです。一言でいうと、JavaScriptが書けちゃいます。ここでは、source.idauthorそれぞれがnullだったら、一旦フィールドをDeleteし、その上でフィールドが存在していなければ、source.nameをコピーするように設定します。

filebeat.yml
# Adding configurations for News API
  - decode_json_fields:
      fields: ["message"]
      target: "news"
  - drop_fields:
      fields: ["message"]
  - script:
      lang: javascript
      source: >
        function process(event) {
            if (event.Get("news.source.id") == null) {
                event.Delete("news.source.id");
            }
            if (event.Get("news.author") == null) {
                event.Delete("news.author");
            }
            return event;
        }
  - copy_fields:
      when.not.has_fields: ["news.source.id"]
      fields:
        - from: news.source.name
          to: news.source.id
  - copy_fields:
      when.not.has_fields: ["news.author"]
      fields:
        - from: news.source.name
          to: news.author

5. @￰timestampを書き換える

仕上げに、イベントの@timestampを、処理した時間であるprocessing timeから、イベントが発生した時間であるevent timeに書き換えます。これは、Timestamp processorで処理することができます。puhlishedAtが、ニュース記事が投稿された時間なので、このフィールドを使います。layoutsで指定するフォーマットの時刻は、GOのtimeパッケージ特有のもので、この時刻でなくてはなりません。

filebeat.yml
# Adding configurations for News API
  - decode_json_fields:
      fields: ["message"]
      target: "news"
  - drop_fields:
      fields: ["message"]
  - script:
      lang: javascript
      source: >
        function process(event) {
            if (event.Get("news.source.id") == null) {
                event.Delete("news.source.id");
            }
            if (event.Get("news.author") == null) {
                event.Delete("news.author");
            }
            return event;
        }
  - copy_fields:
      when.not.has_fields: ["news.source.id"]
      fields:
        - from: news.source.name
          to: news.source.id
  - copy_fields:
      when.not.has_fields: ["news.author"]
      fields:
        - from: news.source.name
          to: news.author
  - timestamp:
      field: news.publishedAt
      layouts:
        - '2006-01-02T15:04:05Z'

コンソールへの出力は、以下のようになります。

{
  "@timestamp": "2022-05-25T05:49:58.000Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.2.1"
  },
  "agent": {
    "version": "8.2.1",
    "ephemeral_id": "52c51fef-d0f3-4655-aaec-d5a15f81cc71",
    "id": "2000b9bc-a219-438f-bdda-4e6d64febaca",
    "name": "hogehoge.local",
    "type": "filebeat"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "news": {
    "description": "今日、素晴らしい出来事がありました!",
    "publishedAt": "2022-05-25T05:49:58Z",
    "source": {
      "id": "News.local",
      "name": "News.local"
    },
    "title": "これはグッドニュースだ!",
    "url": "https://news.local/hogehoge.html",
    "urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
    "author": "ローカルニュース",
    "content": null
  },
  "event": {
    "created": "2022-05-26T02:49:11.475Z"
  },
  "input": {
    "type": "httpjson"
  },
  "host": {
    "hostname": "hogehoge.local",
    "architecture": "x86_64",
    "os": {
      "version": "10.15.7",
      "family": "darwin",
      "name": "Mac OS X",
      "kernel": "19.6.0",
      "build": "19H1922",
      "type": "macos",
      "platform": "darwin"
    },
    "id": "22570515-B9D2-5E7A-BD39-608FA0BB373C",
    "ip": [
      (中略)
    ],
    "mac": [
      (中略)
    ],
    "name": "hogehoge.local"
  }
}

6. Index Templateを修正する

ここまでは、デバッグしやすいようにコンソールに出力していただけですが、最後にElasticsearchに投入していきたいと思います。Elasticsearchにこのまま投入すると、newsフィールドはDynamic Mapingで作成されてしまうため、配下の全てのフィールドはkeywordになってしまいます。きちんとIndexのMappingを定義するために、Index Template に反映したいところです。デフォルトのFilebeatのIndex Templateは、8系ではfilebeat-8.x.xになりますが、試しにKibanaのDev Toolsで確認すると、あまりの巨大さに驚愕します。

GET _index_template/filebeat-8.2.1

レスポンス:
filebeat-8.2.1_template.png

優に3万ラインを超えてます。Beats系では、このように1つのIndex Templateで対象のIndex(この場合、filebeat-8.x.x)の全てのMappingを管理する形になるので、Beats Modulesの拡大によるフィールドの増加に伴って、徐々に管理性が低くなっていました。そこで、Elastic Agentでは、Dataset毎に出力するData streamを分けて、それぞれがIndex Templateを持つようになりました。ちなみに、Beatsの8系では、従来のIndexへの出力から、Elastic Agentと同様に Data stream への出力に変わっています。

さて、やりたいことは、以下のnewsフィールドのMappingを追加したいだけなのですが、そのために先ほどのfilebeat-8.x.x Index Templateを直接修正するのは気が引けますし、修正したらバージョンアップ時のメンテナンスも面倒です。

{
  "news" : {
    "properties" : {
      "author" : {
        "ignore_above" : 1024,
        "type" : "keyword",
        "fields" : {
          "text" : {
            "type" : "text"
          }
        }
      },
      "content" : {
        "ignore_above" : 1024,
        "type" : "keyword",
        "fields" : {
          "text" : {
            "type" : "text"
          }
        }
      },
      "description" : {
        "ignore_above" : 1024,
        "type" : "keyword",
        "fields" : {
          "text" : {
            "type" : "text"
          }
        }
      },
      "publishedAt" : {
        "type" : "date"
      },
      "source" : {
        "properties" : {
          "id" : {
            "type" : "keyword",
            "ignore_above" : 1024
          },
          "name" : {
            "ignore_above" : 1024,
            "type" : "keyword",
            "fields" : {
              "text" : {
                "type" : "text"
              }
            }
          }
        }
      },
      "title" : {
        "ignore_above" : 1024,
        "type" : "keyword",
        "fields" : {
          "text" : {
            "type" : "text"
          }
        }
      },
      "url" : {
        "type" : "keyword",
        "ignore_above" : 1024
      },
      "urlToImage" : {
        "type" : "keyword",
        "ignore_above" : 1024
      }
    }
  }
}

こんな時に使える設定が、setup.template.append_fieldsです。詳細は公式ドキュメントを参照頂ければと思いますが、setup.template.overwriteと合わせ技で、filebeat.yml上でIndex Templateに追加したいフィールドを定義することが出来ます(頭をJSONからYAMLに切り替える必要がありますが...)。

filebeat.yml
# Adding news field
setup.template.overwrite: true
setup.template.append_fields:
- name: news.author
  type: keyword
  ignore_above: 1024
  multi_fields:
    - type: text
      norms: true
      name: text
- name: news.content
  type: keyword
  ignore_above: 1024
  multi_fields:
    - type: text
      norms: true
      name: text
- name: news.description
  type: keyword
  ignore_above: 1024
  multi_fields:
    - type: text
      norms: true
      name: text
- name: news.publishedAt
  type: date
- name: news.source.id
  type: keyword
  ignore_above: 1024
- name: news.source.name
  type: keyword
  ignore_above: 1024
  multi_fields:
    - type: text
      norms: true
      name: text
- name: news.title
  type: keyword
  ignore_above: 1024
  multi_fields:
    - type: text
      norms: true
      name: text
  - name: news.url
  type: keyword
  ignore_above: 1024
  - name: news.urlToImage
  type: keyword
  ignore_above: 1024

Multi-fieldsも書けますし、filebeat.ymlで一元管理も出来るのでシンプルです。注意点としては、textフィールドの時に、normsを明示的に指定しないとnorms: falseで作成されることです。Elasticsearchにおいては、normsはデフォルトtrueなのですが...

あとは、今回のようにfilebeatのプロセスが1つの場合は特に問題になりませんが、複数の場合、全てのfilebeatプロセスがIndex Templateを上書きするので、差分があったりすると後勝ちになりますので、注意が必要です。

7. Elasticsearchに投入する

いよいよ、Elasticsearchへの投入です。filebeat.ymlで、outputを変更します。Elastic Cloudを使っている場合は、次のようにcloud.idcloud.authを書いてあげるだけでOKです。

filebeat.yml
# =============================== Elastic Cloud ================================

# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).

# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
cloud.id: "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="

# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
cloud.auth: "elastic:YOUR_PASSWORD"

オンプレでElasticsearchクラスタを立てている場合は、公式ドキュメントの Elasticsearch output を参考にしてみて下さい。

最終的には、こんな感じでIndexされます。

{
  "_index": ".ds-filebeat-8.2.1-2022.05.27-000001",
  "_id": "OOwpBIEBCJsKRn8hWsLn",
  "_version": 1,
  "_score": 1,
  "_source": {
    "@timestamp": "2022-05-25T05:49:58Z.000Z",
    "input": {
      "type": "httpjson"
    },
    "host": {
      "name": "hogehoge.local",
      "architecture": "x86_64",
      "os": {
        "version": "10.15.7",
        "family": "darwin",
        "name": "Mac OS X",
        "kernel": "19.6.0",
        "build": "19H1922",
        "type": "macos",
        "platform": "darwin"
      },
      "id": "22570515-B9D2-5E7A-BD39-608FA0BB373C",
      "ip": [
        (中略)
      ],
      "mac": [
        (中略)
      ],
      "hostname": "hogehoge.local"
    },
    "agent": {
      "id": "2000b9bc-a219-438f-bdda-4e6d64febaca",
      "name": "hogehoge.local",
      "type": "filebeat",
      "version": "8.2.1",
      "ephemeral_id": "752dedb3-fd8c-4772-8ae2-38f8c3d61b92"
    },
    "ecs": {
      "version": "8.0.0"
    },
    "news": {
      "description": "今日、素晴らしい出来事がありました!",
      "publishedAt": "2022-05-25T05:49:58Z",
      "source": {
        "id": "News.local",
        "name": "News.local"
      },
      "title": "これはグッドニュースだ!",
      "url": "https://news.local/hogehoge.html",
      "urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
      "author": "ローカルニュース",
      "content": null
    },
    "event": {
      "created": "2022-05-27T06:17:02.453Z"
    }
  },
}

まとめ

今回は、前回 との比較の意味で、Filebeatだけで頑張ってみました。

やりたいこと Elastic Agent編 Filebeat編
HTTP APIでJSONを取り込む Custom HTTPJSON Input (Elastic Agent) HTTP JSON Input (Filebeat)
JSON文字列をデコード JSON processor (Ingest Pipeline) Decode JSON fields processor (Filebeat)
null-valueフィールド対応 Set processor (Ingest Pipeline) Script processor (Filebeat)
@￰timestamp書き換え Date processor (Ingest Pipeline) Timestamp processor (Filebeat)

Filebeat Processos だけでも、それなりのことが出来ることはご理解いただけたかと思いますが、現実の選択肢としては、エージェントサイドだけに処理を寄せるというのはレアケースかもしれません。こうしたパース処理をどこで実行するかは、言うまでもなくユースケースに依存するので、実際にはパフォーマンスのテストなどを通じて、最適な方法を探ることになります。Elasticでは 様々な選択肢 があるので、色々と試してみてください。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?