この記事は NTTテクノクロス Advent Calendar 2018の11日目の記事です 。
こんにちは、NTTテクノクロスの畠中です。
IoTデータ分析チームに所属し、現在は農業ベンチャーである株式会社デザミスと連携して牛の行動モニタリングサービスU-motion®の開発に携わっています。
今日の記事では、運用監視機能として障害の発生状況を可視化する仕組みをEFKスタック(Elasticsearch, fluentd, Kibana)で構築した内容についてご紹介します。
はじめに
U-motion®のサービス概要はこちらの動画をご覧ください。
概要としては牛にセンサータグを装着し、行動を常時モニタリングすることで発情や疾病など生産性に影響を与える事象をアラートとして検出し、農家の生産性向上に貢献していくサービスとなります。
常時モニタリングするためにシステムが安定して稼働することが非常に重要となります。しかしながら、台風や地震など自然災害による停電や機器故障など、予期せぬ障害が発生することがあります。
運用監視の担当者としては、障害要因を分析するために、どの機器で障害が起きているのか、いつから起きているのか、といった情報を明らかにしたいという要望があります。そこで、各機器のデータ受信状況をEFKスタックに投入し、時系列で可視化する仕組みを構築しました。
方法
前提条件
システム構成の概要は抽象化すると以下のとおりです。
センサータグは牛に装着し、レシーバは牛舎に設置されています。
センサータグから発信されたデータはレシーバを経由してクラウド(AWS)上のS3に蓄積されています。S3に格納されるオブジェクトは、受信レシーバの「ID」と「受信時刻」をオブジェクト名に含む形で格納されています。
EFKスタックはEC2上の監視サーバに構築します。導入ソフトウェアとバージョンは以下のとおりです。(インストール手順は割愛しますが、公式サイト等に従いインストールします。)
・fluentd(td-agent) 1.1.0
・fluent-plugin-elasticsearch 1.10.0
・Elasticsearch 6.4.1
・Kibana 6.4.1
・Anaconda 2.4.1
・boto3 1.7.28
・fluent-logger 0.9.1
以降、監視データの一例として、指定時刻にレシーバからデータが受信できているか死活監視を行うケースについて紹介します。
Elasticsearchのテンプレート設定
Elasticsearchに投入するデータ形式をテンプレートで指定します。今回はS3上のオブジェクト名に含まれる「ID」と「受信時刻」を対象とします。(テンプレート内では”receiverId”と”datetime”で指定しています。)
{
"cow_receiver" : {
"order" : 0,
"index_patterns" : [
"cow_receiver-*"
],
"mappings" : {
"receiver" : {
"properties" : {
"datetime" : {
"type" : "date"
},
"receiverId" : {
"type" : "keyword"
},
"cowshedId" : {
"type" : "keyword"
}
}
}
},
"aliases" : { }
}
}
fluentdの設定
td-agent.confに出力設定を追加します。タグ名は「cow.receiver」としています。
<match cow.receiver>
@type elasticsearch
host xx.xx.xx.xx # Elasticsearchのホスト名を指定
logstash_format true
logstash_prefix cow_receiver
type_name receiver
time_key datetime
time_key_exclude_timestamp true
flush_interval 5s
</match>
データ投入処理の実装
pythonでの実装例を紹介します。実装の流れとしては、s3から指定時刻のレシーバIDの一覧を取得を行い、fluentdにデータを通知します。s3へのアクセスはboto3を、fluentdへの通知はfluent-loggerライブラリを利用しています。サンプルコードは以下の通りです。
import boto3
from fluent import sender
target_datetime = "XXX" # 指定時刻。引数等で指定する
bucket = "XXX" # s3のバケット名
fd_host = "XXX" # fluentdのホスト名
fd_port = 24224 # fluentdのポート番号
receiver_list = []
# get_prefix()はS3のオブジェクト命名規則に依存
prefix = get_prefix(target_datetime)
marker = ""
# s3データ取得処理
client = boto3.client('s3')
while True:
res = client.list_objects(Bucket=bucket, Prefix=prefix, Marker=marker, Delimiter='/')
contents = res.get('Contents')
if contents:
for content in contents:
# get_receiver_id()はS3のオブジェクト命名規則に依存
receiver_id = get_receiver_id(filename)
if receiver_id not in receiver_list:
receiver_list.append(receiver_id)
if not 'IsTruncated' in res or not res['IsTruncated']:
# 全データ取得済みなら抜ける
break
# データ投入処理
fluentd = sender.FluentSender('cow', host=fd_host, port=fd_port)
for receiver_id in receiver_list:
send_data = {
"datetime": str(target_datetime.strftime("%Y-%m-%dT%H:%M:%S+09:00")),
"receiverId": receiver_id
}
fluentd.emit("receiver", send_data)
あとはkibana側で分析・監視したい観点で可視化設定を行います。
結果
レシーバの受信状況監視
上記はある牧場の全レシーバ6台の1日の受信状況を可視化したものです。
この日は午前2時台に停電があり、6時台にブレーカをあげて復旧した例となります。
(6番のレシーバだけちょっと時間がかかっていますが、最終的に復旧しました。)
時系列のデータが蓄積されることで、障害要因の推定が容易になります。また、データが取れなくなったタイミングをアラートとして検知し、メール等で通知共有するといった応用を行うことで、営業担当からのフォローを迅速に対応するといったことも可能となります。
参考:牛の行動分析
ここまで説明した仕組みは、データ投入処理を変更することで様々なデータに適用可能です。
上記グラフは、ある酪農家での1日の牛群行動パターンを可視化したものです。
通常、酪農では日に2〜3回ミルキングパーラーで搾乳を行い、搾乳後の牛は牛舎に戻り餌を食べます。濃い緑の「採食」に注目すると、この例では2回採食行動が多くなる時間帯(8時〜、18時〜)があり、搾乳時間と合致します。また、牛がリラックスしている夜間は反芻時間、横臥時間が増える、といった傾向も見て取れます。
まとめ
IoT機器の障害監視をkibanaで可視化した事例として、U-motion®での適用事例について紹介しました。EFKスタックで構築するメリットとしては以下が挙げられると思います。
・OSSの組み合わせなので無料1、かつ環境構築が容易
・データの形式に合わせて柔軟な設計が可能
障害監視やデータの可視化を検討している皆様の参考になれば幸いです。
-
Elastic Stack自体は機械学習やセキュリティ機能が充実している有料オプションも存在します。 ↩