これまで、Elasticsearchにカスタム時系列データを取り込む(Elastic Agent編) 、及び Elasticsearchにカスタム時系列データを取り込む(Filebeat編) で、Elastic AgentとFilebeatを使ったカスタムな時系列データの取り込みについて解説してきましたが、ここまで来たらということで、今回は、Logstashについて解説してみたいと思います。
Logstash
言わずと知れた Logstash ですが、古くは ELK Stack のLはLogstashに由来しています。
Logstashは、サーバーサイドの汎用のETLツールで、様々なデータの取り込みで使っている方も多いと思います。Logstashの特徴として、豊富なInput Pluginだけではなく、Outputの方もElasticsearchに限らず、S3やKafkaといった様々なデータストアに出力できるところが、汎用と言われている所以です。Integrations(日本語だと「統合機能」)が出てきたことで、存在感が薄れてる?かもしれませんが、バッチリ現役ですので、ご安心ください。8系では、Elastic Common Schema(ECS) の互換性モードがデフォルトとなり、ECSへの対応も進んでいます。
実際にやってみよう
過去のブログでは、次のような方法で実現してきました。今回は、同じことをLogstashのPluginを使って実現してみたいと思います。
やりたいこと | Elastic Agent編 | Filebeat編 |
---|---|---|
HTTP APIでJSONを取り込む | Custom HTTPJSON Input | HTTP JSON Input |
JSON文字列をデコード | JSON processor | Decode JSON fields processor |
null-valueフィールド対応 | Set processor | Script processor |
@timestamp書き換え | Date processor | Timestamp processor |
1.NewsAPI
これまでと同様に、NewsAPI という、世の中のニュースのヘッドラインをJSON形式で返してくれるAPIをデータソースとして使います。登録して、API Keyを取得します。パラメータによって、対象を絞り込むことができるのですが、Japanのヘッドラインを取得するとこんな感じで返ってきます。
GET https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge
レスポンス:
{
"status": "ok",
"totalResults": 30,
"articles": [
{
"source": {
"id": null,
"name": "News.local"
},
"author": "ローカルニュース",
"title": "これはグッドニュースだ!",
"description": "今日、素晴らしい出来事がありました!",
"url": "https://news.local/hogehoge.html",
"urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
"publishedAt": "2022-07-24T04:55:57Z",
"content": null
},
{
(中略)
}
]
}
2.Inputの設定
Logstashの処理は、一般的にはPipelineと言われ、inputs → filters → outputs の3ステージで構成され、それぞれのステージでPluginの設定を行います。今回は、設定ファイルをlogstash-news.conf
として作成します。先ずはInputの設定になりますが、httpを介してNewsAPIからデータを所得するので、Http_poller input plugin を使います。取得したデータは、news
フィールドに格納しておきます。Outputはとりあえず、デバッグ用にコンソールに出力すればよいので、Stdout output plugin を設定しておきます。
input {
http_poller {
urls => {
news => "https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge"
}
request_timeout => 60
# Supports "cron", "every", "at" and "in" schedules by rufus scheduler
schedule => { "every" => "1m" }
target => "news"
ecs_compatibility => "v1"
ssl_verification_mode => "none"
}
}
output {
stdout {
codec => rubydebug
}
}
この段階での、Logstashからの出力は以下のようになります。
{
"@version" => "1",
"news" => {
"@version" => "1",
"event" => {
"original" => "{\"status\":\"ok\",\"totalResults\":30,\"articles\":[(中略)]}"
},
"articles" => [
[ 0] {
"description" => "今日、素晴らしい出来事がありました!",
"author" => "ローカルニュース",
"url" => "https://news.local/hogehoge.html",
"content" => nil,
"source" => {
"id" => nil,
"name" => "News.local"
},
"title" => "これはグッドニュースだ!",
"urlToImage" => "https://news.local/hogehoge/hogehoge.jpg",
"publishedAt" => "2022-07-24T04:55:57Z"
},
(中略)
],
"totalResults" => 30,
"status" => "ok",
"@timestamp" => 2022-07-24T07:09:51.828241Z
},
"@timestamp" => 2022-07-24T07:09:51.835379Z
}
3.配列をイベントに分割する
現状ですと、news.event.original
に取得したデータがJSON文字列として格納され、さらにarticles
配列の要素にそれぞれの記事がデコードされていることがわかりますが、これは、Http_poller input pluginのデフォルトのcodecがjson
になっているからで、Json codec plugin のおかげで暗黙的にデコードされているからです。いずれにせよ1つのイベントとして出力されてしまっていて、これは以前のブログの、Elastic AgentやFilebeatの時と少し異なる挙動ですね。配列要素を、それぞれ独立したイベントとして出力するようにしてあげる必要があります。ここで、Split filter plugin の出番です。以下の構成では、対象となるnews.articles
配列をnews
フィールドにそれぞれ分割すると同時に、元々のJSON文字列が格納されているnews.event
フィールドは不要となるので、削除するようにしています。ちなみにネストされたフィールドへのアクセスは、[top-level field][nested field]
という書き方になるので、少し注意が必要です。この辺りの書き方は、公式ドキュメントを参照してみてください。
(前略)
filter {
split {
field => "[news][articles]"
target => "news"
remove_field => "[news][event]"
}
}
(後略)
そうすると、配列要素がそれぞれのイベントとして出力されるようになります。
{
"@timestamp" => 2022-07-24T07:40:02.507870Z,
"news" => {
"description" => "今日、素晴らしい出来事がありました!",
"urlToImage" => "https://news.local/hogehoge/hogehoge.jpg",
"publishedAt" => "2022-07-24T04:55:57Z",
"url" => "https://news.local/hogehoge.html",
"source" => {
"id" => nil,
"name" => "News.local"
},
"author" => nil,
"title" => "これはグッドニュースだ!",
"content" => nil
},
"@version" => "1"
}
4.null valueフィールドに対応する
さらに、前回同様source.id
やauthor
がnullであるarticle
の場合は、source.name
をセットするようにしたいと思います。ここでは、Mutate filter plugin を使います。Logstashの場合、条件文は簡単に書けるので、違和感はないでしょう。
(前略)
filter {
split {
field => "[news][articles]"
target => "news"
remove_field => "[news][event]"
}
if ![news][author] {
mutate {
copy => { "[news][source][name]" => "[news][author]" }
}
}
if ![news][source][id] {
mutate {
copy => { "[news][source][name]" => "[news][source][id]" }
}
}
}
(後略)
5.@timestampを書き換える
仕上げに、イベントの@timestamp
を、処理した時間であるprocessing timeから、イベントが発生した時間であるevent timeに書き換えます。これは、Date filter plugin で処理することができます。puhlishedAt
が、ニュース記事が投稿された時間なので、このフィールドを使います。publishedAt
の形式は、ISO8601形式でサポートされている形式なので、formats
はISO8601
と指定しておきます。
(前略)
filter {
split {
field => "[news][articles]"
target => "news"
remove_field => "[news][event]"
}
if ![news][author] {
mutate {
copy => { "[news][source][name]" => "[news][author]" }
}
}
if ![news][source][id] {
mutate {
copy => { "[news][source][name]" => "[news][source][id]" }
}
}
date {
match => [ "[news][publishedAt]", "ISO8601" ]
}
}
(後略)
最終的に、コンソールへの出力は以下のようになります。
{
"@timestamp" => 2022-07-24T04:55:57Z,
"news" => {
"description" => "今日、素晴らしい出来事がありました!",
"urlToImage" => "https://news.local/hogehoge/hogehoge.jpg",
"publishedAt" => "2022-07-24T04:55:57Z",
"url" => "https://news.local/hogehoge.html",
"source" => {
"id" => "News.local",
"name" => "News.local"
},
"author" => "News.local",
"title" => "これはグッドニュースだ!",
"content" => nil
},
"@version" => "1"
}
6.Elasticsearchに投入する
さて、とりあえずいい感じになってきたので、Elasticsearchに投入してみましょう。ElasticsearchへのIndexは、Elasticsearch output plugin を使います。Logstashでも、7.13から Data stream へ出力することができるようになったので、Elastic AgentやFilebeatの時と同様に、Data streamに出力してみましょう。宛先も、Elastic Cloudを使っている場合は、次のようにcloud.id
とcloud.auth
を書いてあげるだけでOKです。
(前略)
output {
elasticsearch {
cloud_id => "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="
cloud_auth => "elastic:YOUR_PASSWORD"
data_stream => "true"
}
}
そうすると、デフォルトの場合、logs-generic-default
というData streamにIndexされることになります。Index Template もデフォルトのlogs
というテンプレートが適用されており、例によってnews
フィールドはDynamic Mappingで、配下の全てのフィールドがkeyword
になってしまっています。
GET logs-generic-default/_mapping
レスポンス:
{
".ds-logs-generic-default-2022.07.23-000001": {
"mappings": {
"_data_stream_timestamp": {
"enabled": true
},
"dynamic_templates": [
{
"match_ip": {
"match": "ip",
"match_mapping_type": "string",
"mapping": {
"type": "ip"
}
}
},
{
"match_message": {
"match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "match_only_text"
}
}
},
{
"strings_as_keyword": {
"match_mapping_type": "string",
"mapping": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
],
"date_detection": false,
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword",
"ignore_above": 1024
},
"data_stream": {
"properties": {
"dataset": {
"type": "constant_keyword",
"value": "generic"
},
"namespace": {
"type": "constant_keyword",
"value": "default"
},
"type": {
"type": "constant_keyword",
"value": "logs"
}
}
},
"ecs": {
"properties": {
"version": {
"type": "keyword",
"ignore_above": 1024
}
}
},
"host": {
"type": "object"
},
"news": {
"properties": {
"author": {
"type": "keyword",
"ignore_above": 1024
},
"content": {
"type": "keyword",
"ignore_above": 1024
},
"description": {
"type": "keyword",
"ignore_above": 1024
},
"publishedAt": {
"type": "keyword",
"ignore_above": 1024
},
"source": {
"properties": {
"id": {
"type": "keyword",
"ignore_above": 1024
},
"name": {
"type": "keyword",
"ignore_above": 1024
}
}
},
"title": {
"type": "keyword",
"ignore_above": 1024
},
"url": {
"type": "keyword",
"ignore_above": 1024
},
"urlToImage": {
"type": "keyword",
"ignore_above": 1024
}
}
}
}
}
}
}
logsテンプレート
GET _index_template/logs/
レスポンス:
{
"index_templates": [
{
"name": "logs",
"index_template": {
"index_patterns": [
"logs-*-*"
],
"composed_of": [
"logs-mappings",
"data-streams-mappings",
"logs-settings"
],
"priority": 100,
"version": 1,
"_meta": {
"description": "default logs template installed by x-pack",
"managed": true
},
"data_stream": {
"hidden": false,
"allow_custom_routing": false
},
"allow_auto_create": true
}
}
]
}
7.Index Templateを作成する
ここでは、デフォルトのlogs-generic-default
Data streamの設定を生かしつつ、news
フィールドのMappingを追加することにしたいところです。まず、logs-logstash.news-qa
というData streamを使うことにするとして、Kibanaの Stack Management > Index Management > Index Templates から、logs
Index Templateを、logs-logstash.news
という名前でクローンします。その時、Index patternsには、logs-logstash.news-*
と指定します。
さらに、カスタマイズ部分の Component Template として、logs-logsash.news@custom
を作成します。ILM Policy index.lifecycle.name
には、ビルトインのlogs
を設定しています。以下は、APIの入力ですが、もちろんKibanaのUIから作成してもOKです。
PUT _component_template/logs-logstash.news@custom
{
"template": {
"settings": {
"index": {
"lifecycle": {
"name": "logs"
},
"codec": "best_compression",
"query": {
"default_field": [
"*"
]
}
}
},
"mappings": {
"dynamic_templates": [],
"properties": {
"news": {
"type": "object",
"properties": {
"publishedAt": {
"type": "date"
},
"author": {
"ignore_above": 1024,
"type": "keyword",
"fields": {
"text": {
"type": "text"
}
}
},
"urlToImage": {
"ignore_above": 1024,
"type": "keyword"
},
"description": {
"ignore_above": 1024,
"type": "keyword",
"fields": {
"text": {
"type": "text"
}
}
},
"source": {
"type": "object",
"properties": {
"name": {
"ignore_above": 1024,
"type": "keyword",
"fields": {
"text": {
"type": "text"
}
}
},
"id": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"title": {
"ignore_above": 1024,
"type": "keyword",
"fields": {
"text": {
"type": "text"
}
}
},
"content": {
"ignore_above": 1024,
"type": "keyword",
"fields": {
"text": {
"type": "text"
}
}
},
"url": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
}
}
}
}
そして、logs-logstash.news
Index Templateで、logs-logsash.news@custom
を含むようにします。logs-mappings
とdata-streams-mappings
は残しておきます。
logs-logstash.news
Index Templateは、以下のようになります。
GET _index_template/logs-logstash.news/
レスポンス:
{
"index_templates": [
{
"name": "logs-logstash.news",
"index_template": {
"index_patterns": [
"logs-logstash.news-*"
],
"composed_of": [
"logs-mappings",
"data-streams-mappings",
"logs-logstash.news@custom"
],
"priority": 249,
"version": 1,
"_meta": {
"description": "logstash news template",
"managed": false
},
"data_stream": {
"hidden": false,
"allow_custom_routing": false
}
}
}
]
}
最後に、logs-logstash.news-qa
Data streamに出力するように、Logstashの設定ファイルのoutput
に反映します。最終的なlogstash-news.conf
は以下のようになります。
input {
http_poller {
urls => {
news => "https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge"
}
request_timeout => 60
# Supports "cron", "every", "at" and "in" schedules by rufus scheduler
schedule => { "every" => "1m" }
target => "news"
ecs_compatibility => "v1"
ssl_verification_mode => "none"
}
}
filter {
split {
field => "[news][articles]"
target => "news"
remove_field => "[news][event]"
}
if ![news][author] {
mutate {
copy => { "[news][source][name]" => "[news][author]" }
}
}
if ![news][source][id] {
mutate {
copy => { "[news][source][name]" => "[news][source][id]" }
}
}
date {
match => [ "[news][publishedAt]", "ISO8601" ]
}
}
output {
elasticsearch {
cloud_id => "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="
cloud_auth => "elastic:YOUR_PASSWORD"
data_stream => "true"
data_stream_type => "logs"
data_stream_dataset => "logstash.news"
data_stream_namespace => "qa"
}
}
最終的には、以下のようにIndexされます。
{
"_index": ".ds-logs-logstash.news-qa-2022.07.23-000001",
"_id": "tWYYK4IBDg7ocqbwfqrs",
"_score": 1,
"_source": {
"@timestamp": "2022-07-24T04:55:57Z",
"news": {
"author": "News.local",
"content": null,
"publishedAt": "2022-07-24T04:55:57Z",
"description": "今日、素晴らしい出来事がありました!",
"urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
"url": "https://news.local/hogehoge.html",
"title": "これはグッドニュースだ!",
"source": {
"id": "News.local",
"name": "News.local"
}
},
"@version": "1",
"data_stream": {
"type": "logs",
"dataset": "logstash.news",
"namespace": "qa"
}
}
}
まとめ
さて、これまで、Elastic Agent編、Filebeat編、そしてLogstash編と、同じ目的をそれぞれ別の方法で実現する方法を見てきました。色々と方法があって少しややこしいと感じるかもしれませんが、逆にやりたいことに応じて柔軟に実現できる方法があると思って、前向きに捉えたいところです。
やりたいこと | Elastic Agent編 | Filebeat編 | Logstash編 |
---|---|---|---|
HTTP APIでJSONを取り込む | Custom HTTPJSON Input | HTTP JSON Input | Http_poller input plugin & Split filter plugin |
JSON文字列をデコード | JSON processor | Decode JSON fields processor | Json codec plugin |
null-valueフィールド対応 | Set processor | Script processor | Mutate filter plugin |
@timestamp書き換え | Date processor | Timestamp processor | Date filter plugin |
時系列データのData stream周りのネーミングルールなどが少しややこしいのですが、理解してしまえば、LogstashでもElastic AgentやBeats系と併せて統合的に管理が可能になるので、公式ドキュメントを参考にしながら、是非試してみてください。