先日の Elasticsearchにカスタム時系列データを取り込む(Elastic Agent編) では、Elatic Agentに焦点を当てて、カスタムな時系列データの取り込み方法を解説しましたが、今回は、同じことをBeats (実際にはFilebeat) でやってみたいと思います。
Integrationsって?
ちょっと前まで、Elastic Stack と言えば、Elasticsearch、Kibana、Logstash、Beatsということになっていたのですが、最新のElastic Stackを見ると、LogstashとBeatsのところが、Integrations(日本語だと「統合機能」)という風に変わっています。でも、このIntegrationsというのは、幅広いコネクター的なもの全般を指していて、お馴染みのLogstashやBeatsも、ちゃんと現役ですので、ご安心ください。
Kibanaの Integrations > Browse integrations からIntegrationsの一覧を見ることが出来ますが、Beats only
をチェックすると、いわゆるBeats Modulesが表示されます。
Beats
Beats については、ご存知の方も多いと思いますが、軽量のデータシッパーとして様々な種類のBeatが存在します。Elastic Agent がリリースされて以来、少し影が薄くなった感があるのですが、以前のブログでも触れた通り、実際にはElastic Agentのコントロールの下、データプレーンとして動いています。また、一部のBeat設定については、まだElastic Agent経由で変更出来なかったりするので、現実にはそうしたケースにおいては、各種Beatを直接使うことも沢山あります。
Filebeat
今回は Filebeat を使って、カスタムの時系列データをElasticsearchに取り込んでみます。Filebeatは、言わずと知れた軽量のログシッパーですが、前述の Filebeat Modules によるコネクター群で様々なデータソースに対応していますし、カスタムなデータソースについても、Filebeat Processorsを使って、ETL処理を行うことも出来ます。
実際にやってみよう
さて 前回 は、Elastic Agentでデータを取得して、データのパース処理は、主にElasticsearchの Ingest Pipelines を使いました。ただし、Filebeat + Ingest Pipelineだと、ほとんど前回と同じになってしまうので、今回はエージェントサイド、つまりFilebeat Processorsを使って、Filebeatだけで同じことを実現してみたいと思います。
1. NewsAPI
前回同様、NewsAPIという、世の中のニュースのヘッドラインをJSON形式で返してくれるAPIをデータソースとして使います。登録して、API Keyを取得します。パラメータによって、対象を絞り込むことができるのですが、Japanのヘッドラインを取得するとこんな感じで返ってきます。
GET https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge
レスポンス:
{
"status": "ok",
"totalResults": 30,
"articles": [
{
"source": {
"id": null,
"name": "News.local"
},
"author": "ローカルニュース",
"title": "これはグッドニュースだ!",
"description": "今日、素晴らしい出来事がありました!",
"url": "https://news.local/hogehoge.html",
"urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
"publishedAt": "2022-05-25T05:49:58Z",
"content": null
},
{
(中略)
}
]
}
2. Inputの設定
Filebeatの設定は、基本的にはfilebeat.yml
で行います。今回はNewsAPIからのInputということになるので、HTTP JSON input を使います。NewsAPIから返されるJSONドキュメントを見ると、必要なデータは、articles
配列の要素であることがわかるので、Response Split の機能を使って、配列の要素毎にイベントを生成するようにします。これで前述の例のように、30のニューストピックスが配列として返された場合は、30の独立したイベントが生成されるようになります。ここは前回と基本は同じです。Outputはとりあえず、デバッグ用にコンソールに出力すればよいので、Console output を指定して、デフォルトの Elasticsearch output はコメントアウトしておきます(Filebeatでは同時に複数のoutputは書けません)。
# ============================== Filebeat inputs ===============================
# News API Configuration
filebeat.inputs:
- type: httpjson
interval: 1m
request.url: https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge
response.split:
target: body.articles
type: array
# ================================== Outputs ===================================
# Configure what output to use when sending the data collected by the beat.
output.console:
pretty: true
# ---------------------------- Elasticsearch Output ----------------------------
# output.elasticsearch:
# Array of hosts to connect to.
# hosts: ["localhost:9200"]
この段階で、Filebeatからの出力は以下のようになります。
{
"@timestamp": "2022-05-26T02:38:00.173Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "8.2.1"
},
"event": {
"created": "2022-05-26T02:38:00.173Z"
},
"message": "{\"author\":\"ローカルニュース\",\"content\":null,\"description\":\"今日、素晴らしい出来事がありました!\",\"publishedAt\":\"2022-05-25T05:49:58Z\",\"source\":{\"id\":null,\"name\":\"News.local\"},\"title\":\"これはグッドニュースだ!",\"url\":\"https://news.local/hogehoge.html\",\"urlToImage\":\"https://news.local/hogehoge/hogehoge.jpg\"}",
"input": {
"type": "httpjson"
},
"ecs": {
"version": "8.0.0"
},
"host": {
"name": "hogehoge.local",
"id": "22570515-B9D2-5E7A-BD39-608FA0BB373C",
"ip": [
(中略)
],
"mac": [
(中略)
],
"hostname": "hogehoge.local",
"architecture": "x86_64",
"os": {
"family": "darwin",
"name": "Mac OS X",
"kernel": "19.6.0",
"build": "19H1922",
"type": "macos",
"platform": "darwin",
"version": "10.15.7"
}
},
"agent": {
"version": "8.2.1",
"ephemeral_id": "6506efe3-93ad-4e10-b793-701005f33fdf",
"id": "2000b9bc-a219-438f-bdda-4e6d64febaca",
"name": "hogehoge.local",
"type": "filebeat"
}
}
3. JSON文字列をデコードする
次にmeeeage
フィールドのJSON文字列を、news
フィールドにデコードします。前回はIngest Pipelineで対応しましたが、今回は、Filebeatの Decode JSON fields processorを使います。processors
の定義の下に、以下のように追加します。元のmessage
フィールドは不要なので、Drop fieldsで削除しておきます。
# ================================= Processors =================================
processors:
(中略)
# Adding configurations for News API
- decode_json_fields:
fields: ["message"]
target: "news"
- drop_fields:
fields: ["message"]
そうすると、出力は以下のようになります。
{
"@timestamp": "2022-05-26T02:40:33.341Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "8.2.1"
},
"input": {
"type": "httpjson"
},
"ecs": {
"version": "8.0.0"
},
"host": {
"os": {
"version": "10.15.7",
"family": "darwin",
"name": "Mac OS X",
"kernel": "19.6.0",
"build": "19H1922",
"type": "macos",
"platform": "darwin"
},
"id": "22570515-B9D2-5E7A-BD39-608FA0BB373C",
"ip": [
(中略)
],
"name": "hogehoge.local",
"mac": [
(中略)
],
"hostname": "hogehoge.local",
"architecture": "x86_64"
},
"agent": {
"ephemeral_id": "bd131899-cfa9-4f1f-b5d5-48b387c32171",
"id": "2000b9bc-a219-438f-bdda-4e6d64febaca",
"name": "hogehoge.local",
"type": "filebeat",
"version": "8.2.1"
},
"news": {
"description": "今日、素晴らしい出来事がありました!",
"publishedAt": "2022-05-25T05:49:58Z",
"source": {
"id": null,
"name": "News.local"
},
"title": "これはグッドニュースだ!",
"url": "https://news.local/hogehoge.html",
"urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
"author": "ローカルニュース",
"content": null
},
"event": {
"created": "2022-05-26T02:40:33.341Z"
}
}
意図した通りにnews
フィールドにデコードされました。
4. null valueフィールドに対応する
さらに、前回同様source.id
やauthor
がnullであるarticle
の場合は、source.name
をセットするようにしたいと思います。FilebeatのProcessorには条件を書けるので、次のように条件を追加して、Copy fields processorを使って、フィールドをコピーするようにしてみます。
- copy_fields:
when:
equals:
news.source.id: null
fields:
- from: news.source.name
to: news.source.id
ところが、実行すると以下のようにエラーとなってしまいます。結論から言うと、フィールドの値がnull
かどうかというチェックは出来ません。というのも、条件には integerかstringの値しか書けないからです。しかも、NewsAPIから返されるJSONでは、明確にnull
となっており、空文字""
では判定できません。
$ ./filebeat
Exiting: error initializing processors: failed to initialize condition: condition attempted to set 'news.source.id' -> '<nil>' and encountered unexpected type '<nil>', only strings, ints, and booleans are allowed
困ったなと思いきや、実はFilebeat Processorには、Script processorという万能ナイフがあるのです。一言でいうと、JavaScriptが書けちゃいます。ここでは、source.id
とauthor
それぞれがnull
だったら、一旦フィールドをDeleteし、その上でフィールドが存在していなければ、source.name
をコピーするように設定します。
# Adding configurations for News API
- decode_json_fields:
fields: ["message"]
target: "news"
- drop_fields:
fields: ["message"]
- script:
lang: javascript
source: >
function process(event) {
if (event.Get("news.source.id") == null) {
event.Delete("news.source.id");
}
if (event.Get("news.author") == null) {
event.Delete("news.author");
}
return event;
}
- copy_fields:
when.not.has_fields: ["news.source.id"]
fields:
- from: news.source.name
to: news.source.id
- copy_fields:
when.not.has_fields: ["news.author"]
fields:
- from: news.source.name
to: news.author
5. @timestampを書き換える
仕上げに、イベントの@timestamp
を、処理した時間であるprocessing timeから、イベントが発生した時間であるevent timeに書き換えます。これは、Timestamp processorで処理することができます。puhlishedAt
が、ニュース記事が投稿された時間なので、このフィールドを使います。layouts
で指定するフォーマットの時刻は、GOのtimeパッケージ特有のもので、この時刻でなくてはなりません。
# Adding configurations for News API
- decode_json_fields:
fields: ["message"]
target: "news"
- drop_fields:
fields: ["message"]
- script:
lang: javascript
source: >
function process(event) {
if (event.Get("news.source.id") == null) {
event.Delete("news.source.id");
}
if (event.Get("news.author") == null) {
event.Delete("news.author");
}
return event;
}
- copy_fields:
when.not.has_fields: ["news.source.id"]
fields:
- from: news.source.name
to: news.source.id
- copy_fields:
when.not.has_fields: ["news.author"]
fields:
- from: news.source.name
to: news.author
- timestamp:
field: news.publishedAt
layouts:
- '2006-01-02T15:04:05Z'
コンソールへの出力は、以下のようになります。
{
"@timestamp": "2022-05-25T05:49:58.000Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "8.2.1"
},
"agent": {
"version": "8.2.1",
"ephemeral_id": "52c51fef-d0f3-4655-aaec-d5a15f81cc71",
"id": "2000b9bc-a219-438f-bdda-4e6d64febaca",
"name": "hogehoge.local",
"type": "filebeat"
},
"ecs": {
"version": "8.0.0"
},
"news": {
"description": "今日、素晴らしい出来事がありました!",
"publishedAt": "2022-05-25T05:49:58Z",
"source": {
"id": "News.local",
"name": "News.local"
},
"title": "これはグッドニュースだ!",
"url": "https://news.local/hogehoge.html",
"urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
"author": "ローカルニュース",
"content": null
},
"event": {
"created": "2022-05-26T02:49:11.475Z"
},
"input": {
"type": "httpjson"
},
"host": {
"hostname": "hogehoge.local",
"architecture": "x86_64",
"os": {
"version": "10.15.7",
"family": "darwin",
"name": "Mac OS X",
"kernel": "19.6.0",
"build": "19H1922",
"type": "macos",
"platform": "darwin"
},
"id": "22570515-B9D2-5E7A-BD39-608FA0BB373C",
"ip": [
(中略)
],
"mac": [
(中略)
],
"name": "hogehoge.local"
}
}
6. Index Templateを修正する
ここまでは、デバッグしやすいようにコンソールに出力していただけですが、最後にElasticsearchに投入していきたいと思います。Elasticsearchにこのまま投入すると、news
フィールドはDynamic Mapingで作成されてしまうため、配下の全てのフィールドはkeyword
になってしまいます。きちんとIndexのMappingを定義するために、Index Template に反映したいところです。デフォルトのFilebeatのIndex Templateは、8系ではfilebeat-8.x.x
になりますが、試しにKibanaのDev Toolsで確認すると、あまりの巨大さに驚愕します。
GET _index_template/filebeat-8.2.1
優に3万ラインを超えてます。Beats系では、このように1つのIndex Templateで対象のIndex(この場合、filebeat-8.x.x)の全てのMappingを管理する形になるので、Beats Modulesの拡大によるフィールドの増加に伴って、徐々に管理性が低くなっていました。そこで、Elastic Agentでは、Dataset毎に出力するData streamを分けて、それぞれがIndex Templateを持つようになりました。ちなみに、Beatsの8系では、従来のIndexへの出力から、Elastic Agentと同様に Data stream への出力に変わっています。
さて、やりたいことは、以下のnews
フィールドのMappingを追加したいだけなのですが、そのために先ほどのfilebeat-8.x.x
Index Templateを直接修正するのは気が引けますし、修正したらバージョンアップ時のメンテナンスも面倒です。
{
"news" : {
"properties" : {
"author" : {
"ignore_above" : 1024,
"type" : "keyword",
"fields" : {
"text" : {
"type" : "text"
}
}
},
"content" : {
"ignore_above" : 1024,
"type" : "keyword",
"fields" : {
"text" : {
"type" : "text"
}
}
},
"description" : {
"ignore_above" : 1024,
"type" : "keyword",
"fields" : {
"text" : {
"type" : "text"
}
}
},
"publishedAt" : {
"type" : "date"
},
"source" : {
"properties" : {
"id" : {
"type" : "keyword",
"ignore_above" : 1024
},
"name" : {
"ignore_above" : 1024,
"type" : "keyword",
"fields" : {
"text" : {
"type" : "text"
}
}
}
}
},
"title" : {
"ignore_above" : 1024,
"type" : "keyword",
"fields" : {
"text" : {
"type" : "text"
}
}
},
"url" : {
"type" : "keyword",
"ignore_above" : 1024
},
"urlToImage" : {
"type" : "keyword",
"ignore_above" : 1024
}
}
}
}
こんな時に使える設定が、setup.template.append_fields
です。詳細は公式ドキュメントを参照頂ければと思いますが、setup.template.overwrite
と合わせ技で、filebeat.yml
上でIndex Templateに追加したいフィールドを定義することが出来ます(頭をJSONからYAMLに切り替える必要がありますが...)。
# Adding news field
setup.template.overwrite: true
setup.template.append_fields:
- name: news.author
type: keyword
ignore_above: 1024
multi_fields:
- type: text
norms: true
name: text
- name: news.content
type: keyword
ignore_above: 1024
multi_fields:
- type: text
norms: true
name: text
- name: news.description
type: keyword
ignore_above: 1024
multi_fields:
- type: text
norms: true
name: text
- name: news.publishedAt
type: date
- name: news.source.id
type: keyword
ignore_above: 1024
- name: news.source.name
type: keyword
ignore_above: 1024
multi_fields:
- type: text
norms: true
name: text
- name: news.title
type: keyword
ignore_above: 1024
multi_fields:
- type: text
norms: true
name: text
- name: news.url
type: keyword
ignore_above: 1024
- name: news.urlToImage
type: keyword
ignore_above: 1024
Multi-fieldsも書けますし、filebeat.yml
で一元管理も出来るのでシンプルです。注意点としては、text
フィールドの時に、norms
を明示的に指定しないとnorms: false
で作成されることです。Elasticsearchにおいては、norms
はデフォルトtrue
なのですが...
あとは、今回のようにfilebeatのプロセスが1つの場合は特に問題になりませんが、複数の場合、全てのfilebeatプロセスがIndex Templateを上書きするので、差分があったりすると後勝ちになりますので、注意が必要です。
7. Elasticsearchに投入する
いよいよ、Elasticsearchへの投入です。filebeat.yml
で、outputを変更します。Elastic Cloudを使っている場合は、次のようにcloud.id
とcloud.auth
を書いてあげるだけでOKです。
# =============================== Elastic Cloud ================================
# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).
# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
cloud.id: "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="
# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
cloud.auth: "elastic:YOUR_PASSWORD"
オンプレでElasticsearchクラスタを立てている場合は、公式ドキュメントの Elasticsearch output を参考にしてみて下さい。
最終的には、こんな感じでIndexされます。
{
"_index": ".ds-filebeat-8.2.1-2022.05.27-000001",
"_id": "OOwpBIEBCJsKRn8hWsLn",
"_version": 1,
"_score": 1,
"_source": {
"@timestamp": "2022-05-25T05:49:58Z.000Z",
"input": {
"type": "httpjson"
},
"host": {
"name": "hogehoge.local",
"architecture": "x86_64",
"os": {
"version": "10.15.7",
"family": "darwin",
"name": "Mac OS X",
"kernel": "19.6.0",
"build": "19H1922",
"type": "macos",
"platform": "darwin"
},
"id": "22570515-B9D2-5E7A-BD39-608FA0BB373C",
"ip": [
(中略)
],
"mac": [
(中略)
],
"hostname": "hogehoge.local"
},
"agent": {
"id": "2000b9bc-a219-438f-bdda-4e6d64febaca",
"name": "hogehoge.local",
"type": "filebeat",
"version": "8.2.1",
"ephemeral_id": "752dedb3-fd8c-4772-8ae2-38f8c3d61b92"
},
"ecs": {
"version": "8.0.0"
},
"news": {
"description": "今日、素晴らしい出来事がありました!",
"publishedAt": "2022-05-25T05:49:58Z",
"source": {
"id": "News.local",
"name": "News.local"
},
"title": "これはグッドニュースだ!",
"url": "https://news.local/hogehoge.html",
"urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
"author": "ローカルニュース",
"content": null
},
"event": {
"created": "2022-05-27T06:17:02.453Z"
}
},
}
まとめ
今回は、前回 との比較の意味で、Filebeatだけで頑張ってみました。
やりたいこと | Elastic Agent編 | Filebeat編 |
---|---|---|
HTTP APIでJSONを取り込む | Custom HTTPJSON Input (Elastic Agent) | HTTP JSON Input (Filebeat) |
JSON文字列をデコード | JSON processor (Ingest Pipeline) | Decode JSON fields processor (Filebeat) |
null-valueフィールド対応 | Set processor (Ingest Pipeline) | Script processor (Filebeat) |
@timestamp書き換え | Date processor (Ingest Pipeline) | Timestamp processor (Filebeat) |
Filebeat Processos だけでも、それなりのことが出来ることはご理解いただけたかと思いますが、現実の選択肢としては、エージェントサイドだけに処理を寄せるというのはレアケースかもしれません。こうしたパース処理をどこで実行するかは、言うまでもなくユースケースに依存するので、実際にはパフォーマンスのテストなどを通じて、最適な方法を探ることになります。Elasticでは 様々な選択肢 があるので、色々と試してみてください。