LoginSignup
1
2

More than 5 years have passed since last update.

Metricbeatで取得したデータをInfluxDBに突っ込む

Last updated at Posted at 2016-11-01

はじめに

タイトルの通りのことを試してみた。

InfluxDBは、データ投入時にInfluxDB Line Protocolか、もしくはネストされていないJsonをサポートしている。しかし、Metricbeatで出力されるデータはネストされているため、Fluentdを使ってネストされてないJson形式に整形してInfluxDBに入れる、ということをやってみた。

Metricbeatはv5.0.0を、InfluxDBはv1.0.0を使った。

対象データソース

metricbeat.yml内の、cpu、load、diskio、filesystem、memory、networkの項目をデータソースとして使ってみた。

/etc/metricbeat/metricbeat.yml
#------------------------------- System Module -------------------------------
- module: system
  metricsets:
    # CPU stats
    - cpu

    # System Load stats
    - load

    # Per CPU core stats
    #- core

    # IO stats
    - diskio

    # Per filesystem stats
    - filesystem

    # File system summary stats
    #- fsstat

    # Memory stats
    - memory

    # Network stats
    - network

    # Per process stats
    #- process

出力されるデータは、以下のようなJsonの形式で出力される。このJsonは、loadモジュールの例である。

{
    "@timestamp": "2016-05-23T08:05:34.853Z",
    "beat": {
        "hostname": "host.example.com",
        "name": "host.example.com"
    },
    "metricset": {
        "host": "localhost",
        "module": "system",
        "name": "load",
        "rtt": 115
    },
    "system": {
        "load": {
            "1": 1.09,
            "15": 0.49,
            "5": 0.65,
            "norm": {
                "1": 0.545,
                "15": 0.245,
                "5": 0.325
            }
        }
    },
    "type": "metricsets"
}

出典: https://www.elastic.co/guide/en/beats/metricbeat/master/metricbeat-metricset-system-load.html

データの中継

Metricbeatのデータは、一旦Kafkaに転送する。FluentdをKafka Consumerとして利用する。Fluentdで、データをConsumeしつつ、整形して、InfluxDBに突っ込む。

今回使ったKafkaのバージョンは、v0.10.0である。よって、metricbeat.yml内では、使用しているKafkaのバージョンを記載しておく必要がある。

/etc/metricbeat/metricbeat.yml
  # Kafka version metricbeat is assumed to run against. Defaults to the oldest
  # supported stable version (currently version 0.8.2.0)
  version: 0.10.0

データの整形

Fluentdは、KafkaからのConsumeとデータの整形、及びInfluxDBへの転送に使った。Fluentdのconfは以下のような設定にする。

<source>
  @type kafka_group
  brokers kafkabroker001:9092,kafkabroker002:9092
  consumer_group consumer_group_001
  topics metrics-topic
  format json
</source>

<match metrics-topic>
  @type record_reformer
  tag ${record['metricset']['name']}
  enable_ruby true
  auto_typecast true
</match>

<match cpu>
  @type record_reformer
  tag beat_cpu
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    user_pct          ${record['system']['cpu']['user']['pct'].to_f}
    system_pct        ${record['system']['cpu']['system']['pct'].to_f}
    nice_pct          ${record['system']['cpu']['nice']['pct'].to_f}
    steal_pct         ${record['system']['cpu']['steal']['pct'].to_f}
    idle_pct          ${record['system']['cpu']['idle']['pct'].to_f}
    iowait_pct        ${record['system']['cpu']['iowait']['pct'].to_f}
    irq_pct           ${record['system']['cpu']['irq']['pct'].to_f}
    softirq_pct       ${record['system']['cpu']['softirq']['pct'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match memory>
  @type record_reformer
  tag beat_memory
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    actual_free       ${record['system']['memory']['actual']['free'].to_f}
    actual_used_bytes ${record['system']['memory']['actual']['used']['bytes'].to_f}
    actual_used_pct   ${record['system']['memory']['actual']['used']['pct'].to_f}
    free              ${record['system']['memory']['free'].to_f}
    swap_free         ${record['system']['memory']['swap']['free'].to_f}
    swap_total        ${record['system']['memory']['swap']['total'].to_f}
    swap_used_bytes   ${record['system']['memory']['swap']['used']['bytes'].to_f}
    swap_used_pct     ${record['system']['memory']['swap']['used']['pct'].to_f}
    total             ${record['system']['memory']['total'].to_f}
    used_bytes        ${record['system']['memory']['used']['bytes'].to_f}
    used_pct          ${record['system']['memory']['used']['pct'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match load>
  @type record_reformer
  tag beat_load
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    load_1            ${record['system']['load']['1'].to_f}
    load_15           ${record['system']['load']['15'].to_f}
    load_5            ${record['system']['load']['5'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match diskio>
  @type record_reformer
  tag beat_diskio
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    io_time           ${record['system']['diskio']['io']['time'].to_f}
    name              ${record['system']['diskio']['name'].to_f}
    read_bytes        ${record['system']['diskio']['read']['bytes'].to_f}
    read_count        ${record['system']['diskio']['read']['count'].to_f}
    read_time         ${record['system']['diskio']['read']['time'].to_f}
    write_bytes       ${record['system']['diskio']['write']['bytes'].to_f}
    write_count       ${record['system']['diskio']['write']['count'].to_f}
    write_time        ${record['system']['diskio']['write']['time'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match filesystem>
  @type record_reformer
  tag beat_filesystem
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    available         ${record['system']['filesystem']['available'].to_f}
    device_name       ${record['system']['filesystem']['device_name'].to_f}
    files             ${record['system']['filesystem']['files'].to_f}
    free              ${record['system']['filesystem']['free'].to_f}
    free_files        ${record['system']['filesystem']['free_files'].to_f}
    mount_point       ${record['system']['filesystem']['mount_point'].to_f}
    total             ${record['system']['filesystem']['total'].to_f}
    used_bytes        ${record['system']['filesystem']['used']['bytes'].to_f}
    used_pct          ${record['system']['filesystem']['used']['pct'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match network>
  @type record_reformer
  tag beat_network
  enable_ruby true
  auto_typecast true
  renew_record true
  <record>
    host              ${record['beat']['hostname']}
    name              ${record['system']['network']['name'].to_f}
    in_bytes          ${record['system']['network']['in']['bytes'].to_f}
    in_dropped        ${record['system']['network']['in']['dropped'].to_f}
    in_errors         ${record['system']['network']['in']['errors'].to_f}
    in_packets        ${record['system']['network']['in']['packets'].to_f}
    out_bytes         ${record['system']['network']['out']['bytes'].to_f}
    out_dropped       ${record['system']['network']['out']['dropped'].to_f}
    out_errors        ${record['system']['network']['out']['errors'].to_f}
    out_packets       ${record['system']['network']['out']['packets'].to_f}
    time              ${Time.parse(record['@timestamp']).to_i}
  </record>
</match>

<match beat_cpu beat_memory beat_load beat_diskio beat_filesystem beat_network>
  @type influxdb
  host influxdb001
  port 8086
  dbname metrics_db
  user kafka
  password kafka
  use_ssl false
  verify_ssl false
  tag_keys ["host"]
  time_precision s
  flush_interval 10s
</match>

ちょっとハマった点としては、record_reformerの中でtag名を付け替えてあげないとFluentdが無限ループに陥る、という点である。

ダメな例
<match cpu>
  @type record_reformer
  tag cpu
tag付け替え
<match cpu>
  @type record_reformer
  tag beat_cpu

InfluxDBには、beat_XXXという名前でMeasurementが作成される。

おわりに

Metricbeat v5がリリースされたので、ふと思い立ってやってみた次第である。それにしても、ネストされたJsonを整形するもっといい方法はないものだろうか...

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2