4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Elasticsearchにカスタム時系列データを取り込む (Logstash編)

Last updated at Posted at 2022-07-25

これまで、Elasticsearchにカスタム時系列データを取り込む(Elastic Agent編) 、及び Elasticsearchにカスタム時系列データを取り込む(Filebeat編) で、Elastic AgentとFilebeatを使ったカスタムな時系列データの取り込みについて解説してきましたが、ここまで来たらということで、今回は、Logstashについて解説してみたいと思います。

Logstash

言わずと知れた Logstash ですが、古くは ELK StackLはLogstashに由来しています。
Logstashは、サーバーサイドの汎用のETLツールで、様々なデータの取り込みで使っている方も多いと思います。Logstashの特徴として、豊富なInput Pluginだけではなく、Outputの方もElasticsearchに限らず、S3やKafkaといった様々なデータストアに出力できるところが、汎用と言われている所以です。Integrations(日本語だと「統合機能」)が出てきたことで、存在感が薄れてる?かもしれませんが、バッチリ現役ですので、ご安心ください。8系では、Elastic Common Schema(ECS) の互換性モードがデフォルトとなり、ECSへの対応も進んでいます。

実際にやってみよう

過去のブログでは、次のような方法で実現してきました。今回は、同じことをLogstashのPluginを使って実現してみたいと思います。

やりたいこと Elastic Agent編 Filebeat編
HTTP APIでJSONを取り込む Custom HTTPJSON Input HTTP JSON Input
JSON文字列をデコード JSON processor Decode JSON fields processor
null-valueフィールド対応 Set processor Script processor
@￰timestamp書き換え Date processor Timestamp processor

1.NewsAPI

これまでと同様に、NewsAPI という、世の中のニュースのヘッドラインをJSON形式で返してくれるAPIをデータソースとして使います。登録して、API Keyを取得します。パラメータによって、対象を絞り込むことができるのですが、Japanのヘッドラインを取得するとこんな感じで返ってきます。

GET https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge

レスポンス:

{
  "status": "ok",
  "totalResults": 30,
  "articles": [
    {
        "source": {
          "id": null,
          "name": "News.local"
        },
        "author": "ローカルニュース",
        "title": "これはグッドニュースだ!",
        "description": "今日、素晴らしい出来事がありました!",
        "url": "https://news.local/hogehoge.html",
        "urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
        "publishedAt": "2022-07-24T04:55:57Z",
        "content": null
    },
    {
      (中略)
    }
  ]
}

2.Inputの設定

Logstashの処理は、一般的にはPipelineと言われ、inputs → filters → outputs の3ステージで構成され、それぞれのステージでPluginの設定を行います。今回は、設定ファイルをlogstash-news.confとして作成します。先ずはInputの設定になりますが、httpを介してNewsAPIからデータを所得するので、Http_poller input plugin を使います。取得したデータは、newsフィールドに格納しておきます。Outputはとりあえず、デバッグ用にコンソールに出力すればよいので、Stdout output plugin を設定しておきます。

input {
  http_poller {
    urls => {
      news => "https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge"
    }
    request_timeout => 60
    # Supports "cron", "every", "at" and "in" schedules by rufus scheduler
    schedule => { "every" => "1m" }
    target => "news"
    ecs_compatibility => "v1"
    ssl_verification_mode => "none"
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

この段階での、Logstashからの出力は以下のようになります。

{
      "@version" => "1",
          "news" => {
            "@version" => "1",
               "event" => {
            "original" => "{\"status\":\"ok\",\"totalResults\":30,\"articles\":[(中略)]}"
        },
            "articles" => [
            [ 0] {
                "description" => "今日、素晴らしい出来事がありました!",
                     "author" => "ローカルニュース",
                        "url" => "https://news.local/hogehoge.html",
                    "content" => nil,
                     "source" => {
                      "id" => nil,
                    "name" => "News.local"
                },
                      "title" => "これはグッドニュースだ!",
                 "urlToImage" => "https://news.local/hogehoge/hogehoge.jpg",
                "publishedAt" => "2022-07-24T04:55:57Z"
            },
             (中略)
        ],
        "totalResults" => 30,
              "status" => "ok",
          "@timestamp" => 2022-07-24T07:09:51.828241Z
    },
    "@timestamp" => 2022-07-24T07:09:51.835379Z
}

3.配列をイベントに分割する

現状ですと、news.event.originalに取得したデータがJSON文字列として格納され、さらにarticles配列の要素にそれぞれの記事がデコードされていることがわかりますが、これは、Http_poller input pluginのデフォルトのcodecがjsonになっているからで、Json codec plugin のおかげで暗黙的にデコードされているからです。いずれにせよ1つのイベントとして出力されてしまっていて、これは以前のブログの、Elastic AgentやFilebeatの時と少し異なる挙動ですね。配列要素を、それぞれ独立したイベントとして出力するようにしてあげる必要があります。ここで、Split filter plugin の出番です。以下の構成では、対象となるnews.articles配列をnewsフィールドにそれぞれ分割すると同時に、元々のJSON文字列が格納されているnews.eventフィールドは不要となるので、削除するようにしています。ちなみにネストされたフィールドへのアクセスは、[top-level field][nested field]という書き方になるので、少し注意が必要です。この辺りの書き方は、公式ドキュメントを参照してみてください。

(前略)

filter {
  split {
    field => "[news][articles]"
    target => "news"
    remove_field => "[news][event]"
  }
}

(後略)

そうすると、配列要素がそれぞれのイベントとして出力されるようになります。

 {
    "@timestamp" => 2022-07-24T07:40:02.507870Z,
          "news" => {
        "description" => "今日、素晴らしい出来事がありました!",
         "urlToImage" => "https://news.local/hogehoge/hogehoge.jpg",
        "publishedAt" => "2022-07-24T04:55:57Z",
                "url" => "https://news.local/hogehoge.html",
             "source" => {
              "id" => nil,
            "name" => "News.local"
        },
             "author" => nil,
              "title" => "これはグッドニュースだ!",
            "content" => nil
    },
      "@version" => "1"
}

4.null valueフィールドに対応する

さらに、前回同様source.idauthorがnullであるarticleの場合は、source.nameをセットするようにしたいと思います。ここでは、Mutate filter plugin を使います。Logstashの場合、条件文は簡単に書けるので、違和感はないでしょう。

(前略)

filter {
  split {
    field => "[news][articles]"
    target => "news"
    remove_field => "[news][event]"
  }

  if ![news][author] {
    mutate {
      copy => { "[news][source][name]" => "[news][author]" }
    }
  }

  if ![news][source][id] {
    mutate {
      copy => { "[news][source][name]" => "[news][source][id]" }
    }
  }
}

(後略)

5.@timestampを書き換える

仕上げに、イベントの@timestampを、処理した時間であるprocessing timeから、イベントが発生した時間であるevent timeに書き換えます。これは、Date filter plugin で処理することができます。puhlishedAtが、ニュース記事が投稿された時間なので、このフィールドを使います。publishedAtの形式は、ISO8601形式でサポートされている形式なので、formatsISO8601と指定しておきます。

(前略)

filter {
  split {
    field => "[news][articles]"
    target => "news"
    remove_field => "[news][event]"
  }

  if ![news][author] {
    mutate {
      copy => { "[news][source][name]" => "[news][author]" }
    }
  }

  if ![news][source][id] {
    mutate {
      copy => { "[news][source][name]" => "[news][source][id]" }
    }
  }

  date {
    match => [ "[news][publishedAt]", "ISO8601" ]
  }
}

(後略)

最終的に、コンソールへの出力は以下のようになります。

{
   "@timestamp" => 2022-07-24T04:55:57Z,
         "news" => {
       "description" => "今日、素晴らしい出来事がありました!",
        "urlToImage" => "https://news.local/hogehoge/hogehoge.jpg",
       "publishedAt" => "2022-07-24T04:55:57Z",
               "url" => "https://news.local/hogehoge.html",
            "source" => {
             "id" => "News.local",
           "name" => "News.local"
       },
            "author" => "News.local",
             "title" => "これはグッドニュースだ!",
           "content" => nil
   },
     "@version" => "1"
}

6.Elasticsearchに投入する

さて、とりあえずいい感じになってきたので、Elasticsearchに投入してみましょう。ElasticsearchへのIndexは、Elasticsearch output plugin を使います。Logstashでも、7.13から Data stream へ出力することができるようになったので、Elastic AgentやFilebeatの時と同様に、Data streamに出力してみましょう。宛先も、Elastic Cloudを使っている場合は、次のようにcloud.idcloud.authを書いてあげるだけでOKです。

(前略)

output {
  elasticsearch {
    cloud_id => "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="
    cloud_auth => "elastic:YOUR_PASSWORD"
    data_stream => "true"
  }
}

そうすると、デフォルトの場合、logs-generic-defaultというData streamにIndexされることになります。Index Template もデフォルトのlogsというテンプレートが適用されており、例によってnewsフィールドはDynamic Mappingで、配下の全てのフィールドがkeywordになってしまっています。

GET logs-generic-default/_mapping

レスポンス:

{
  ".ds-logs-generic-default-2022.07.23-000001": {
    "mappings": {
      "_data_stream_timestamp": {
        "enabled": true
      },
      "dynamic_templates": [
        {
          "match_ip": {
            "match": "ip",
            "match_mapping_type": "string",
            "mapping": {
              "type": "ip"
            }
          }
        },
        {
          "match_message": {
            "match": "message",
            "match_mapping_type": "string",
            "mapping": {
              "type": "match_only_text"
            }
          }
        },
        {
          "strings_as_keyword": {
            "match_mapping_type": "string",
            "mapping": {
              "ignore_above": 1024,
              "type": "keyword"
            }
          }
        }
      ],
      "date_detection": false,
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "@version": {
          "type": "keyword",
          "ignore_above": 1024
        },
        "data_stream": {
          "properties": {
            "dataset": {
              "type": "constant_keyword",
              "value": "generic"
            },
            "namespace": {
              "type": "constant_keyword",
              "value": "default"
            },
            "type": {
              "type": "constant_keyword",
              "value": "logs"
            }
          }
        },
        "ecs": {
          "properties": {
            "version": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        },
        "host": {
          "type": "object"
        },
        "news": {
          "properties": {
            "author": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "content": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "description": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "publishedAt": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "source": {
              "properties": {
                "id": {
                  "type": "keyword",
                  "ignore_above": 1024
                },
                "name": {
                  "type": "keyword",
                  "ignore_above": 1024
                }
              }
            },
            "title": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "url": {
              "type": "keyword",
              "ignore_above": 1024
            },
            "urlToImage": {
              "type": "keyword",
              "ignore_above": 1024
            }
          }
        }
      }
    }
  }
}

logsテンプレート

GET _index_template/logs/

レスポンス:

{
  "index_templates": [
    {
      "name": "logs",
      "index_template": {
        "index_patterns": [
          "logs-*-*"
        ],
        "composed_of": [
          "logs-mappings",
          "data-streams-mappings",
          "logs-settings"
        ],
        "priority": 100,
        "version": 1,
        "_meta": {
          "description": "default logs template installed by x-pack",
          "managed": true
        },
        "data_stream": {
          "hidden": false,
          "allow_custom_routing": false
        },
        "allow_auto_create": true
      }
    }
  ]
}

7.Index Templateを作成する

ここでは、デフォルトのlogs-generic-default Data streamの設定を生かしつつ、newsフィールドのMappingを追加することにしたいところです。まず、logs-logstash.news-qaというData streamを使うことにするとして、Kibanaの Stack Management > Index Management > Index Templates から、logs Index Templateを、logs-logstash.newsという名前でクローンします。その時、Index patternsには、logs-logstash.news-*と指定します。

clone-template-logs1.png

さらに、カスタマイズ部分の Component Template として、logs-logsash.news@customを作成します。ILM Policy index.lifecycle.nameには、ビルトインのlogsを設定しています。以下は、APIの入力ですが、もちろんKibanaのUIから作成してもOKです。

PUT _component_template/logs-logstash.news@custom
{
  "template": {
    "settings": {
      "index": {
        "lifecycle": {
          "name": "logs"
        },
        "codec": "best_compression",
        "query": {
          "default_field": [
            "*"
          ]
        }
      }
    },
    "mappings": {
      "dynamic_templates": [],
      "properties": {
        "news": {
          "type": "object",
          "properties": {
            "publishedAt": {
              "type": "date"
            },
            "author": {
              "ignore_above": 1024,
              "type": "keyword",
              "fields": {
                "text": {
                  "type": "text"
                }
              }
            },
            "urlToImage": {
              "ignore_above": 1024,
              "type": "keyword"
            },
            "description": {
              "ignore_above": 1024,
              "type": "keyword",
              "fields": {
                "text": {
                  "type": "text"
                }
              }
            },
            "source": {
              "type": "object",
              "properties": {
                "name": {
                  "ignore_above": 1024,
                  "type": "keyword",
                  "fields": {
                    "text": {
                      "type": "text"
                    }
                  }
                },
                "id": {
                  "ignore_above": 1024,
                  "type": "keyword"
                }
              }
            },
            "title": {
              "ignore_above": 1024,
              "type": "keyword",
              "fields": {
                "text": {
                  "type": "text"
                }
              }
            },
            "content": {
              "ignore_above": 1024,
              "type": "keyword",
              "fields": {
                "text": {
                  "type": "text"
                }
              }
            },
            "url": {
              "ignore_above": 1024,
              "type": "keyword"
            }
          }
        }
      }
    }
  }
}

そして、logs-logstash.news Index Templateで、logs-logsash.news@customを含むようにします。logs-mappingsdata-streams-mappingsは残しておきます。

clone-template-logs2.png

logs-logstash.news Index Templateは、以下のようになります。

GET _index_template/logs-logstash.news/

レスポンス:

{
  "index_templates": [
    {
      "name": "logs-logstash.news",
      "index_template": {
        "index_patterns": [
          "logs-logstash.news-*"
        ],
        "composed_of": [
          "logs-mappings",
          "data-streams-mappings",
          "logs-logstash.news@custom"
        ],
        "priority": 249,
        "version": 1,
        "_meta": {
          "description": "logstash news template",
          "managed": false
        },
        "data_stream": {
          "hidden": false,
          "allow_custom_routing": false
        }
      }
    }
  ]
}

最後に、logs-logstash.news-qa Data streamに出力するように、Logstashの設定ファイルのoutputに反映します。最終的なlogstash-news.confは以下のようになります。

input {
  http_poller {
    urls => {
      news => "https://newsapi.org/v2/top-headlines?country=jp&apiKey=hogehoge"
    }
    request_timeout => 60
    # Supports "cron", "every", "at" and "in" schedules by rufus scheduler
    schedule => { "every" => "1m" }
    target => "news"
    ecs_compatibility => "v1"
    ssl_verification_mode => "none"
  }
}

filter {
  split {
    field => "[news][articles]"
    target => "news"
    remove_field => "[news][event]"
  }

  if ![news][author] {
    mutate {
      copy => { "[news][source][name]" => "[news][author]" }
    }
  }

  if ![news][source][id] {
    mutate {
      copy => { "[news][source][name]" => "[news][source][id]" }
    }
  }

  date {
    match => [ "[news][publishedAt]", "ISO8601" ]
  }
}

output {
  elasticsearch {
    cloud_id => "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="
    cloud_auth => "elastic:YOUR_PASSWORD"
    data_stream => "true"
    data_stream_type => "logs"
    data_stream_dataset => "logstash.news"
    data_stream_namespace => "qa"
  }
}

最終的には、以下のようにIndexされます。

{
  "_index": ".ds-logs-logstash.news-qa-2022.07.23-000001",
  "_id": "tWYYK4IBDg7ocqbwfqrs",
  "_score": 1,
  "_source": {
    "@timestamp": "2022-07-24T04:55:57Z",
    "news": {
     "author": "News.local",
     "content": null,
     "publishedAt": "2022-07-24T04:55:57Z",
     "description": "今日、素晴らしい出来事がありました!",
     "urlToImage": "https://news.local/hogehoge/hogehoge.jpg",
     "url": "https://news.local/hogehoge.html",
     "title": "これはグッドニュースだ!",
     "source": {
       "id": "News.local",
       "name": "News.local"
     }
    },
    "@version": "1",
    "data_stream": {
     "type": "logs",
     "dataset": "logstash.news",
     "namespace": "qa"
    }
  }
}

まとめ

さて、これまで、Elastic Agent編、Filebeat編、そしてLogstash編と、同じ目的をそれぞれ別の方法で実現する方法を見てきました。色々と方法があって少しややこしいと感じるかもしれませんが、逆にやりたいことに応じて柔軟に実現できる方法があると思って、前向きに捉えたいところです。

やりたいこと Elastic Agent編 Filebeat編 Logstash編
HTTP APIでJSONを取り込む Custom HTTPJSON Input HTTP JSON Input Http_poller input plugin & Split filter plugin
JSON文字列をデコード JSON processor Decode JSON fields processor Json codec plugin
null-valueフィールド対応 Set processor Script processor Mutate filter plugin
@￰timestamp書き換え Date processor Timestamp processor Date filter plugin

時系列データのData stream周りのネーミングルールなどが少しややこしいのですが、理解してしまえば、LogstashでもElastic AgentやBeats系と併せて統合的に管理が可能になるので、公式ドキュメントを参考にしながら、是非試してみてください。

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?