Edited at

Logstashのhttp_poller inputプラグインでWEB上のあらゆるデータを取得する(ついでにKibanaで可視化する)


はじめに

この記事はNTTテクノクロス Advent Calendar 2018の6日目の記事です。

NTTテクノクロスの山下 城司です。

普段はElastic Stackを採用するPJの技術支援などを担当しています。

皆さんは「WEB上の情報を集めて可視化をしたいが、そもそもデータを集めるのが大変だ。」とお困りではないでしょうか。

今回はLogstashのhttp_poller inputプラグインを使用して、WEB上のデータを簡単に収集する方法を紹介します。


Logstashとは

LogstashはElastic Stackを構成するプロダクトの1つでデータの入力、変換、出力を担当するものです。

これを使ってログ情報やメトリックなどのデータをElasticsearchに格納し、Kibanaで可視化を行うのがElastic Stackのユースケースの1つです。


環境用意

以下の環境で実行しています。


  • Windows 10 Pro

  • Docker for Windows version 2.0.0.0-win81 (29211)


    • Engine: 18.09.0

    • Compose: 1.23.2



  • Elasticsearch 6.5.1

  • Kibana 6.5.1

  • Logstash 6.5.1

また、これから紹介する手順を実施済みの環境が立ち上がるdocker-composeをGitHubに公開しています。

https://github.com/j-yama/logstash-http-poller-input-plugin-example

動作確認をしたい方はこちらをご利用ください。


実例

ここからは幾つかの実例を通してhttp_poller inputプラグインの使い方を紹介していきます。


実例1: JJWDが提供する気象情報を取得する

今回紹介するhttp_poller inputプラグインはHTTP/HTTPSプロトコルで疎通できるエンドポイントから一定間隔でデータを取得し続ける機能を提供するものです。

特にJSONで出力される場合は本当に簡単にデータを取得できます。

実例1ではJJWDさんから取得できるJSON形式の気象情報を幾つか取得してみます。


データの形式確認とコンフィグ作成方針

「1時間降水量」のエンドポイントにアクセスして、取得できるデータの形式を確認してみます。

ターミナル環境がない方はブラウザから http://jjwd.info/data/amedas-rain-1h-recent.json にアクセスしてください。

curl http://jjwd.info/data/amedas-rain-1h-recent.json

JSONのROOTが配列になっており、その下に各地方の気象情報が入っています。

「最高気温」や「最低気温」など他の気象情報JSONも同様の構造であるようです。


Logstashコンフィグ

このデータを取得するLogstashのコンフィグは以下のようになります。

# 適当なディレクトリに`jjwd.logstash.conf`を作成します。

vim /etc/logstash/pipelines/jjwd.logstash.conf

input {

http_poller {
urls => {
rain1h => "http://jjwd.info/data/amedas-rain-1h-recent.json"
maxtemperature => "http://jjwd.info/data/amedas-max-temperature-recent.json"
mintemperature => "http://jjwd.info/data/amedas-min-temperature-recent.json"
}
schedule => { every => "1h"}
codec => "json"
}
}

output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "jjwd-%{+YYYY.MM.dd}"
}
}

2~10行目がhttp_poller inputプラグインの設定項目です。

主に最低限設定が必要な項目は以下の3つです。

設定名
意味
備考

urls
データの取得先となるURL。
複数指定することも可能。それぞれ取得した結果はマージされず、独立して取得される。今回は「1時間降水量」「最高気温」「最低気温」の3種類を取得する。

schedule
データを取得する日時。
今回は「1時間ごと」のように一定間隔で実行するeveryを使用しているが、特定の時間に実行するat、cron形式で記述できるcronなどもフォーマットとして使用可能。

codec
取得するデータの形式。指定することで自動でパース、構造化までを行う。
指定できるcodecの一覧はこちら

今回のように、http_poller inputプラグインの設定の中でcodec => "json"と指定しておくと、自動で取得したデータをJSONとしてパースしてくれます。1

これだけの設定で定期的にHTTPエンドポイントからJSONデータを取得し、Elasticsearchにドキュメントとして格納することができます。


実行と投入結果の確認

では実際にこのコンフィグを使ってデータを取得してみましょう。Logstashを実行するには次のコマンドを実行します。事前にElasticsearchの起動を忘れないようにしておきましょう。

# 先ほど作成したコンフィグを指定してLogstashを実行する。

# Logstashのインストール先は環境によって読み替えること。
/usr/share/logstash/bin/logstash -f /etc/logstash/pipelines/jjwd.logstash.conf

実行後、Elasticsearchに正常にデータが格納できているか確認してみます。

# Elasticsearchに気象情報データが格納できたかを確認する。

# エンドポイントやインデックス名の日付は環境によって読み替えること。
curl localhost:9200/jjwd-2018.12.02/_search?size=2&pretty

次のように具体値の入ったJSONが出力されれば成功です。

{

"took": 37,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 5527103,
"max_score": 1,
"hits": [
{
"_index": "jjwd-2018.12.02",
"_type": "doc",
"_id": "KK91bWcBTralP9DgA65p",
"_score": 1,
"_source": {
"maxTempMRmonth": "09",
"maxTemptimeQ": "4",
"maxTempQ": "4",
"maxTempDevAvg": "+1.3",
"maxTempDevYest": "-1.8",
"maxTempExt": "",
"hour": "15",
"maxTempMRday": "06",
"maxTempExtU10": "",
"maxTempRyear": "2010",
"point": "納沙布(ノサップ)",
"maxTempYRday": "23",
"maxTempYRQ": "4",
"maxTempSseason": "1",
"maxTempYRyear": "2018",
"maxTempYRmonth": "08",
"maxTemp": 19.5,
"maxTempR": "32.6",
"maxTempMRQ": "8",
"maxTempRQ": "8",
"month": "09",
"@version": "1",
"stn": "18281",
"maxTempSmonth": "9",
"maxTempRmonth": "08",
"min": "00",
"startYear": "1977",
"year": "2018",
"maxTempmin": "14",
"maxTempIsYR": "0",
"maxTemphour": "13",
"maxTempMR": "29.7",
"maxTempMRyear": "2010",
"@timestamp": "2018-12-02T05:47:24.005Z",
"pref": "北海道 根室地方",
"intlStn": "",
"day": "21",
"maxTempYR": "26.9",
"maxTempRday": "07"
}
},
{
"_index": "jjwd-2018.12.02",
"_type": "doc",
"_id": "Ka91bWcBTralP9DgA65p",
"_score": 1,
"_source": {
"minTempMRyear": "2013",
"minTemptimeQ": "4",
"minTempSseason": "",
"minTempExtU10": "",
"hour": "02",
"minTempYR": "9.5",
"minTempYRday": "19",
"minTempRQ": "8",
"minTempRyear": "2018",
"point": "古殿(フルドノ)",
"minTempYRyear": "2018",
"minTemp": 12.1,
"@version": "1",
"month": "09",
"minTempMRmonth": "09",
"stn": "36766",
"minTempMRday": "28",
"minTempDevYest": "",
"minTempQ": "4",
"minTempYRQ": "4",
"minTempDevAvg": "",
"minTempmin": "27",
"min": "00",
"minTempYRmonth": "08",
"startYear": "2011",
"minTempMRQ": "8",
"minTempMR": "6.0",
"year": "2018",
"minTempSmonth": "",
"minTempRmonth": "01",
"@timestamp": "2018-12-02T05:47:24.014Z",
"minTempExt": "",
"minTemphour": "00",
"pref": "福島県",
"minTempRday": "31",
"intlStn": "",
"day": "21",
"minTempR": "-13.6",
"minTempIsSR": "0"
}
}
]
}
}


可視化例

あとは、このデータを元にしてKibanaで可視化をすることが可能です。詳細は割愛しますが、以下の手順を経ると最高気温と最低気温を地図上に可視化することも可能です。


  1. Logstashのfilterでprefフィールドが"北海道..."で始まる値だった場合は"北海道"に置換するようにする2


  2. jjwd-*のindex patternを作成する


    • 時系列フィールドは@timestampを指定する



  3. Region Mapで最高気温を表示するVisualizeを作成する


    • MetricsのValueは以下を設定する


      • Aggregation: Max

      • Field: maxTemp



    • Bucketsのshape fieldは以下の設定する


      • Aggregation: Terms

      • Field: pref.keyword

      • Size: 47



    • OptionsのLayer Settingsは以下を設定する


      • Vector map: Japan Prefectures

      • Join field: Prefecture name(Japanese)





  4. Region Mapで最低気温を表示するVisualizeを作成する


    • MetricsのValueは以下を設定する


      • Aggregation: Min

      • Field: minTemp



    • 残りは最高気温と同様



  5. 作成したVisualizeを並べたDashboardを作成する

これは2018年12月2日のデータですが、こうして見てみると


  • 関東・中部地方は最高気温が最も低く、日中は北海道・北陸地方よりも寒い

  • 北海道・北陸地方は最低気温が低いが最高気温は比較的高いため、1日の寒暖差が激しい

  • 西日本は最高気温、最低気温とも日本全体で見ると温暖な傾向

などが読み取れて興味深いのではないでしょうか。


実例2: NASAのEONET APIを叩いて全世界の自然現象データを取得する

次に、もう少しfilterを活用する例としてNASAが提供するEONET APIのEvents APIから全世界の自然現象データを取得してみます。

これは、火山噴火や地震、台風といった自然現象の発生について、いつ、どこで、何が起こったかを情報提供するAPIです。

なお、実例1と重複するため、実例2以降は実行方法や投入結果の確認は記載しません。


データの形式確認とコンフィグ作成方針

エンドポイントにアクセスして、取得できるデータの形式を確認してみます。

ターミナル環境がない方はブラウザから https://eonet.sci.gsfc.nasa.gov/api/v2.1/events にアクセスしてください。

curl https://eonet.sci.gsfc.nasa.gov/api/v2.1/events

着目したい点としてはevents.geometries.coordinatesに緯度経度情報が入っています。これを使えば地図上に、いつどの自然現象が起こったかを可視化できそうです。

ただし、Kibanaで可視化するためには可視化対象が1つ1つ独立したドキュメントになっている必要があります。今回であれば、緯度経度情報をそれぞれ独立したドキュメントにします。events.geometriesの単位で分割してみると良さそうです。分割にはSplit Filterプラグインを使います。

また、何度もポーリングした際にデータが重複しないように分割後のIDが一意になるように留意する必要もありそうです。events.idevents.geometries.dateを組み合わせれば一意になりそうです。


Logstashコンフィグ

以上の事を考えてLogstashのコンフィグを記述すると次のようになります。

input {

http_poller {
urls => {
nasa => "https://eonet.sci.gsfc.nasa.gov/api/v2.1/events"
}
schedule => { every => "10s"}
codec => "json"
}
}

filter {
split { field => "events"}
split { field => "[events][geometries]"}
}

output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "nasa-%{+YYYY.MM.dd}"
document_id => "%{[events][id]}-%{[events][geometries][date]}"
}
}

実例1と異なるのはfilter内のsplitで分割している点とElasticsearchに格納する際にドキュメントIDを明示的に指定している点です。順に解説していきます。


split filterプラグインによる配列フィールドの分割

まず、split filterプラグインですが、分割したい配列のフィールド名を指定すると、その配列を別々のドキュメントに分割してくれます。

例えば、元のJSONが

{

"fieldA": "a",
"children": [
{
"fieldB": "b",
"grandchild": [
{
"fieldC": "c"
},
{
"fieldC": "cc"
}
]
},
{
"fieldB": "bb",
"grandchild": [
{
"fieldC": "ccc"
}
]
}
]
}

という形式だった場合、

filter{

split { field => "children"}
}

のように、Logstashのコンフィグで設定すると 親の値を保ったまま 以下の2つのJSONに分割されます。

{

"fieldA": "a",
"children": {
"fieldB": "b",
"grandchild": [
{
"fieldC": "c"
},
{
"fieldC": "cc"
}
]
}
}

{

"fieldA": "a",
"children": {
"fieldB": "bb",
"grandchild": [
{
"fieldC": "ccc"
}
]
}
}

split filterプラグインの基本はこれです。更にgrandchildで分割したい場合は、その設定もコンフィグに追加します。入れ子のフィールドを指定する場合はフィールド名を[]で囲む必要があることに注意します。

filter{

split { field => "children"}
split { field => "[children][grandchild]"}
}

この設定で分割すると、元のJSONは次の3つに分割されます。

{

"fieldA": "a",
"children": {
"fieldB": "b",
"grandchild": {
"fieldC": "c"
}
}
}

{

"fieldA": "a",
"children": {
"fieldB": "b",
"grandchild": {
"fieldC": "cc"
}
}
}

{

"fieldA": "a",
"children": {
"fieldB": "bb",
"grandchild": {
"fieldC": "ccc"
}
}
}

今回のコンフィグもこの要領でevents[events][geometries]の両方を設定しています。


ドキュメントIDの指定

実例1では、1時間ごとに更新されるデータだったため、1時間ごとにポーリングする設定にすれば、特にデータの重複投入を考える必要がありませんでした。

しかし、実例2は更新間隔が不明であるため、データごとに一意になるIDを付与しなければ、重複して投入されてしまう恐れがあります。

Elasticsearch outputプラグインでは、コンフィグで明示的にドキュメントIDを指定しない場合、その都度IDが自動生成されてしまうためです。

ドキュメントIDを明示的に指定するには以下のように記述します。

output {

elasticsearch {
document_id => "%{[一意にデータを引き当てられるフィールド名]}"
}
}

フィールドの値を参照する場合は%{}で囲った中にフィールド名を記述します。

今回のデータの場合、events.idevents.geometries.dateを組み合わせることで、一意にデータを引き当てられそうでした。

そこで、以下のように記述します。

output {

elasticsearch {
document_id => "%{[events][id]}-%{[events][geometries][date]}"
}
}

このように記述すると、例えばevents.idEONET_4045で、events.geometries.date2018-12-02T00:00:00Zだった場合、ドキュメントIDはEONET_4045-2018-12-02T00:00:00Zになります。

これで同じデータの場合はIDが同一になり、重複投入を防ぐことができます。


可視化例

このデータを元にしてKibanaで自然現象が発生した座標を地図上に可視化をすることが可能です。手順を要約したものを以下に示します。


  1. index templateでnasa-*events.geometries.coordinatesのtypeがgeo_pointになるようにする


  2. nasa-*のindex patternを作成する


    • 時系列フィールドは@timestampを指定する



  3. Coordinate Mapで最高気温を表示するVisualizeを作成する


    • BucketsのGeo Coordinatesは以下を設定する


      • Aggregation: Geohash

      • Field: events.geometries.coordinates





自然現象を可視化してみたCoordinate Mapは以下の図になります。

特に南極大陸付近に多くの自然現象が発生していることが分かります。これは何でしょうか?Visualizeのfilter機能を使って探ってみます。それぞれのドキュメントにはevents.categories.titleに分類が登録されています。これを見れば自然現象の種類が分かりそうです。

分類がSea and Lake Iceのデータだけを表示してみました。南極大陸の周りに発生していた自然現象は海氷であることが分かります。

分類がSevere Stormsのデータのみを表示してみました。主に日本の南側で多く発生していることが分かります。

日本国内で発生した自然現象はVolcanoesの2件のみのようです。これらはいつ何が起こったことを表しているのでしょうか?

Discoverで具体的なデータを確認してみます。

確認してみると、以下であることが分かります。


  • 地図上部の自然現象


    • 発生時期: 2017年3月25日

    • 場所: 姶良火山(桜島南岳山頂火口)



  • 地図下部の自然現象


    • 発生時期: 2018年10月21日

    • 口永良部島火山(新岳火口)



当然ですが、実際にこの時期のニュースなどを調べると上記に関連した情報が見つかります。確かな情報であるようです。

おそらく、EONET APIには一定の基準を超えた影響度の高い自然現象のみが登録される仕組みになっていると思うのですが、どういう基準で登録されるのか気になりますね。


実例3: ElasticのブログRSSフィードから最新の記事情報を取得する

さて、ここまでではJSON形式のデータを取得してみましたが、形式はJSONである必要はありません。取得したデータに対して、どのような形式であったとしてもfilter pluginで自由にパース・構造化・加工できるのがLogstashの魅力です。

最後にfilterをより活用する実例としてElasticさんのブログRSSフィードから最新の記事情報を取得してみたいと思います。3


データの形式確認とコンフィグ作成方針

エンドポイントにアクセスして、取得できるデータの形式を確認してみます。

ターミナル環境がない方はブラウザから https://www.elastic.co/blog/feed にアクセスしてください。

curl https://www.elastic.co/blog/feed

まず、各タグの中に存在する<![CDATA[ ]]>はXMLのCDATAセクションを宣言する記述ですが、我々としては特に興味がないため、除去したいですよね。LogstashでXMLのパースを実行するにはXml Filterプラグインを使用するのですが、幸いな事にそれを使用すれば自動でパースの際に<![CDATA[ ]]>を除去してくれます。便利。

また、パース後のデータとしては<item>ごとに異なるドキュメントとして格納すると1記事1ドキュメントになって管理しやすいです。

実例2と同様にsplit filterプラグインを使って分割します。

また、記事を格納している<description>の中ではHTMLタグが直接書き込まれており、記事情報としてはノイズがある感じです。こちらもタグを除去できると望ましそうです。これにはMutate Filterプラグインのgsubを使います。

最後に、実例2と同様に何度もポーリングした際にデータが重複しないように分割後のIDが一意になるように留意する必要があります。今回は記事のURLを表すchannel.item.linkが一意になる値として使えそうです。


Logstashコンフィグ

以上の事を考えてLogstashのコンフィグを記述すると次のようになります。

input {

http_poller {
urls => {
elasticrss => "https://www.elastic.co/blog/feed"
}
schedule => { every => "10s" }
codec => "plain"
}
}

filter {
xml {
source => "message"
target => "doc"
remove_field => "message"
}
split { field => "[doc][channel]" }
split { field => "[doc][channel][item]" }
mutate {
gsub => ["[doc][channel][item][description]", "<.*?>", ""]
}
}

output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "elastic-rss-%{+YYYY.MM.dd}"
document_id => "%{[doc][channel][item][link]}"
}
}

ポイントはfilter内での処理です。


  1. まず、Xml Filterプラグインで取得したデータをXMLとしてパースします。inputプラグインで取得したテキストはmessageというフィールドに格納されます。sourcemessageを指定しているのはそのためです。また、パース後にはmessageフィールドの生テキストは不要となるため、remove_fieldで削除します。

  2. 次に、Split Filterプラグインで分割したいフィールドを配列から別々のドキュメントになるよう分割します。

  3. 最後にMutate Filterプラグインで記事内のHTMLタグを除去します。


可視化例

このデータを元にしてElasticさんのブログの記事直近20件で、どのような単語がよく使われているかを可視化してみます。手順を要約したものを以下に示します。


  1. index templateでindex名elastic-rss-*のフィールドdoc.channel.item.descriptionのfielddataがtrueになるようにする


  2. elastic-rss-*のindex patternを作成する


    • 時系列フィールドは@timestampを指定する



  3. Tag Cloudで記事内でどのような単語がよくつかわれているかを表示するVisualizeを作成する


    • BucketsのTagsは以下を設定する


      • Aggregation: Terms

      • Field: doc.channel.item.description

      • Size: 20

      • Exclude(Advanced): .|..|...|....|.....





記事に使われている単語でタグクラウドを表示してみました。

もう少し別の方法で一般的な単語は除去した方がよさそうですが、プロダクトに関連した単語であるelasticsearchkibanadashboardsecurityなど、Elasticさんのブログらしい結果になっています。

また、直近では6.5.0や7.0.0-alphaのリリースに関連した記事が多かったため、releaseversionという単語も大きく表示されています。


おわりに

このようにLogstashのhttp_poller inputプラグインを使いこなせばWEB上のあらゆるエンドポイントを叩いてデータを取得することができます。また、Elasticsearchに出力することで、そのままKibanaで可視化を行えるのがとても良いです。

ちなみに、今回は最終的にElasticsearchに投入するコンフィグ例ばかりでしたが、File outputプラグインを使うことで、jsonファイルやその他のフォーマットとして出力することも可能です。

皆さんも是非、ログを転送するだけでなく、便利にデータを取得するツールとしてもLogstashを使ってみてください!





  1. また、ROOTが配列の場合は、配列配下のオブジェクトをそれぞれ別ドキュメントとなるよう分割します。 



  2. ここが少し説明しづらい点ですが、KibanaのRegion Mapで地図上への可視化を行う場合、県名が入ったフィールドが必要になります。しかし今回の取得データでは北海道の県名情報として「北海道 根室地方」「北海道 釧路地方」など地名まで含まれた値が入っているため、それらを「北海道」に変換するようにします。 



  3. RSSフィードの取得といえばLogstashにはrss input pluginがありますが、対応しているフォーマットが限定されており、使える場面が少ないです。そこで、個人的にRSSフィードから記事のデータを吸い上げたいときにはhttp_poller input pluginを使うことが多いです。そんな時があるかは別として、ですが。)